You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2020/10/21 16:52:12 UTC

[incubator-pinot] branch master updated: RealtimeToOfflineSegments task generator (#6124)

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 1bf5d02  RealtimeToOfflineSegments task generator (#6124)
1bf5d02 is described below

commit 1bf5d021db25fe09d00e31871b5572c149ea29e6
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Wed Oct 21 09:51:57 2020 -0700

    RealtimeToOfflineSegments task generator (#6124)
    
    Here's the final piece of the feature for Pinot managed offline flows.
    This is the TaskGenerator which will create tasks of realtimeToOfflineSegmentsTask type.
    Typical usecase:
    You have setup a realtime table. You want to make it a hybrid table. Using this feature, you don't have to write your offline flows. Just set realtimeToOfflineSegmentsTask in your table config. Bring up minions. The minion tasks will push data to the offline table, 1 day at a time.
    The window size, buffer, segment processing configs are configurable via table task config.
---
 .../pinot/common/metadata/ZKMetadataProvider.java  |   5 +
 .../common/minion/MinionTaskMetadataUtils.java     |  81 ++++
 .../RealtimeToOfflineSegmentsTaskMetadata.java     |  88 ++++
 .../RealtimeToOfflineSegmentsTaskMetadataTest.java |  46 +++
 ...rInfoProvider.java => ClusterInfoAccessor.java} |  40 +-
 .../helix/core/minion/PinotTaskManager.java        |  10 +-
 .../generator/ConvertToRawIndexTaskGenerator.java  |  14 +-
 .../RealtimeToOfflineSegmentsTaskGenerator.java    | 312 ++++++++++++++
 .../minion/generator/TaskGeneratorRegistry.java    |   7 +-
 .../core/minion/generator/TaskGeneratorUtils.java  |  52 ++-
 ...RealtimeToOfflineSegmentsTaskGeneratorTest.java | 452 +++++++++++++++++++++
 .../apache/pinot/core/common/MinionConstants.java  |  26 +-
 .../processing/framework/SegmentMapper.java        |  12 +-
 .../processing/framework/SegmentMapperTest.java    |   2 +-
 ...fflineSegmentsMinionClusterIntegrationTest.java | 216 ++++++++++
 .../tests/SimpleMinionClusterIntegrationTest.java  |  12 +-
 .../org/apache/pinot/minion/MinionStarter.java     |   4 +-
 .../BaseMultipleSegmentsConversionExecutor.java    |  15 +
 .../pinot/minion/executor/BaseTaskExecutor.java    |   3 +-
 .../executor/MinionTaskZkMetadataManager.java      |  57 +++
 .../RealtimeToOfflineSegmentsTaskExecutor.java     |  88 +++-
 ...altimeToOfflineSegmentsTaskExecutorFactory.java |  12 +-
 .../executor/TaskExecutorFactoryRegistry.java      |   4 +-
 .../RealtimeToOfflineSegmentsTaskExecutorTest.java |  95 +++--
 24 files changed, 1548 insertions(+), 105 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
index af5a5d6..9fa56fa 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
@@ -59,6 +59,7 @@ public class ZKMetadataProvider {
   private static final String PROPERTYSTORE_INSTANCE_CONFIGS_PREFIX = "/CONFIGS/INSTANCE";
   private static final String PROPERTYSTORE_CLUSTER_CONFIGS_PREFIX = "/CONFIGS/CLUSTER";
   private static final String PROPERTYSTORE_SEGMENT_LINEAGE = "/SEGMENT_LINEAGE";
+  private static final String PROPERTYSTORE_MINION_TASK_METADATA_PREFIX = "/MINION_TASK_METADATA";
 
   public static void setRealtimeTableConfig(ZkHelixPropertyStore<ZNRecord> propertyStore, String realtimeTableName,
       ZNRecord znRecord) {
@@ -116,6 +117,10 @@ public class ZKMetadataProvider {
     return StringUtil.join("/", PROPERTYSTORE_SEGMENT_LINEAGE, tableNameWithType);
   }
 
+  public static String constructPropertyStorePathForMinionTaskMetadata(String taskType, String tableNameWithType) {
+    return StringUtil.join("/", PROPERTYSTORE_MINION_TASK_METADATA_PREFIX, taskType, tableNameWithType);
+  }
+
   public static boolean isSegmentExisted(ZkHelixPropertyStore<ZNRecord> propertyStore, String resourceNameForResource,
       String segmentName) {
     return propertyStore
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionTaskMetadataUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionTaskMetadataUtils.java
new file mode 100644
index 0000000..43ac82c
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionTaskMetadataUtils.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.minion;
+
+import javax.annotation.Nullable;
+import org.I0Itec.zkclient.exception.ZkException;
+import org.apache.helix.AccessOption;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.HelixPropertyStore;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.zookeeper.data.Stat;
+
+
+/**
+ * Helper methods to fetch/persist ZNRecord for minion task metadata
+ */
+public final class MinionTaskMetadataUtils {
+
+  private MinionTaskMetadataUtils() {
+
+  }
+
+  /**
+   * Fetches the ZNRecord for the given minion task and tableName, from MINION_TASK_METADATA/taskName/tableNameWthType
+   */
+  @Nullable
+  public static ZNRecord fetchMinionTaskMetadataZNRecord(HelixPropertyStore<ZNRecord> propertyStore, String taskType,
+      String tableNameWithType) {
+    String path = ZKMetadataProvider.constructPropertyStorePathForMinionTaskMetadata(taskType, tableNameWithType);
+    Stat stat = new Stat();
+    ZNRecord znRecord = propertyStore.get(path, stat, AccessOption.PERSISTENT);
+    if (znRecord != null) {
+      znRecord.setVersion(stat.getVersion());
+    }
+    return znRecord;
+  }
+
+  /**
+   * Fetches the ZNRecord for RealtimeToOfflineSegmentsTask for given tableNameWithType from MINION_TASK_METADATA/RealtimeToOfflineSegmentsTask/tableNameWthType
+   * and converts it to a {@link RealtimeToOfflineSegmentsTaskMetadata} object
+   */
+  @Nullable
+  public static RealtimeToOfflineSegmentsTaskMetadata getRealtimeToOfflineSegmentsTaskMetadata(
+      HelixPropertyStore<ZNRecord> propertyStore, String taskType, String tableNameWithType) {
+    ZNRecord znRecord = fetchMinionTaskMetadataZNRecord(propertyStore, taskType, tableNameWithType);
+    return znRecord != null ? RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(znRecord) : null;
+  }
+
+  /**
+   * Persists the provided {@link RealtimeToOfflineSegmentsTaskMetadata} to MINION_TASK_METADATA/RealtimeToOfflineSegmentsTask/tableNameWthType.
+   * Will fail if expectedVersion does not match.
+   * Set expectedVersion -1 to override version check.
+   */
+  public static void persistRealtimeToOfflineSegmentsTaskMetadata(HelixPropertyStore<ZNRecord> propertyStore,
+      String taskType, RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata,
+      int expectedVersion) {
+    String path = ZKMetadataProvider.constructPropertyStorePathForMinionTaskMetadata(taskType,
+        realtimeToOfflineSegmentsTaskMetadata.getTableNameWithType());
+    if (!propertyStore
+        .set(path, realtimeToOfflineSegmentsTaskMetadata.toZNRecord(), expectedVersion, AccessOption.PERSISTENT)) {
+      throw new ZkException(
+          "Failed to persist minion RealtimeToOfflineSegmentsTask metadata: " + realtimeToOfflineSegmentsTaskMetadata);
+    }
+  }
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/minion/RealtimeToOfflineSegmentsTaskMetadata.java b/pinot-common/src/main/java/org/apache/pinot/common/minion/RealtimeToOfflineSegmentsTaskMetadata.java
new file mode 100644
index 0000000..2bd9c4c
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/minion/RealtimeToOfflineSegmentsTaskMetadata.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.minion;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.helix.ZNRecord;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+
+/**
+ * Metadata for the minion task of type <code>RealtimeToOfflineSegmentsTask</code>.
+ * The <code>watermarkMs</code> denotes the time (exclusive) upto which tasks have been executed.
+ *
+ * This gets serialized and stored in zookeeper under the path MINION_TASK_METADATA/RealtimeToOfflineSegmentsTask/tableNameWithType
+ *
+ * PinotTaskGenerator:
+ * The <code>watermarkMs</code>> is used by the <code>RealtimeToOfflineSegmentsTaskGenerator</code>,
+ * to determine the window of execution for the task it is generating.
+ * The window of execution will be [watermarkMs, watermarkMs + bucketSize)
+ *
+ * PinotTaskExecutor:
+ * The same watermark is used by the <code>RealtimeToOfflineSegmentsTaskExecutor</code>, to:
+ * - Verify that is is running the latest task scheduled by the task generator
+ * - Update the watermark as the end of the window that it executed for
+ */
+public class RealtimeToOfflineSegmentsTaskMetadata {
+
+  private static final String WATERMARK_KEY = "watermarkMs";
+
+  private final String _tableNameWithType;
+  private final long _watermarkMs;
+
+  public RealtimeToOfflineSegmentsTaskMetadata(String tableNameWithType, long watermarkMs) {
+    _tableNameWithType = tableNameWithType;
+    _watermarkMs = watermarkMs;
+  }
+
+  public String getTableNameWithType() {
+    return _tableNameWithType;
+  }
+
+  /**
+   * Get the watermark in millis
+   */
+  public long getWatermarkMs() {
+    return _watermarkMs;
+  }
+
+  public static RealtimeToOfflineSegmentsTaskMetadata fromZNRecord(ZNRecord znRecord) {
+    long watermark = znRecord.getLongField(WATERMARK_KEY, 0);
+    return new RealtimeToOfflineSegmentsTaskMetadata(znRecord.getId(), watermark);
+  }
+
+  public ZNRecord toZNRecord() {
+    ZNRecord znRecord = new ZNRecord(_tableNameWithType);
+    znRecord.setLongField(WATERMARK_KEY, _watermarkMs);
+    return znRecord;
+  }
+
+  public String toJsonString() {
+    try {
+      return JsonUtils.objectToString(this);
+    } catch (JsonProcessingException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return toJsonString();
+  }
+}
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/metadata/RealtimeToOfflineSegmentsTaskMetadataTest.java b/pinot-common/src/test/java/org/apache/pinot/common/metadata/RealtimeToOfflineSegmentsTaskMetadataTest.java
new file mode 100644
index 0000000..e5a4db2
--- /dev/null
+++ b/pinot-common/src/test/java/org/apache/pinot/common/metadata/RealtimeToOfflineSegmentsTaskMetadataTest.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.metadata;
+
+import org.apache.helix.ZNRecord;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+/**
+ * Tests for converting to and from ZNRecord to {@link RealtimeToOfflineSegmentsTaskMetadata}
+ */
+public class RealtimeToOfflineSegmentsTaskMetadataTest {
+
+  @Test
+  public void testToFromZNRecord() {
+    RealtimeToOfflineSegmentsTaskMetadata metadata =
+        new RealtimeToOfflineSegmentsTaskMetadata("testTable_REALTIME", 1000);
+    ZNRecord znRecord = metadata.toZNRecord();
+    assertEquals(znRecord.getId(), "testTable_REALTIME");
+    assertEquals(znRecord.getSimpleField("watermarkMs"), "1000");
+
+    RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata =
+        RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(znRecord);
+    assertEquals(realtimeToOfflineSegmentsTaskMetadata.getTableNameWithType(), "testTable_REALTIME");
+    assertEquals(realtimeToOfflineSegmentsTaskMetadata.getWatermarkMs(), 1000);
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoProvider.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
similarity index 70%
rename from pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoProvider.java
rename to pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
index 678d10d..8d3db71 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoProvider.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
@@ -23,11 +23,15 @@ import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.helix.task.TaskState;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
 import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
 import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.minion.MinionTaskMetadataUtils;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
+import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.core.minion.PinotTaskConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
@@ -37,12 +41,12 @@ import org.apache.pinot.spi.data.Schema;
  * The class <code>ClusterInfoProvider</code> is an abstraction on top of {@link PinotHelixResourceManager} and
  * {@link PinotHelixTaskResourceManager} which provides cluster information for {@link PinotTaskGenerator}.
  */
-public class ClusterInfoProvider {
+public class ClusterInfoAccessor {
   private final PinotHelixResourceManager _pinotHelixResourceManager;
   private final PinotHelixTaskResourceManager _pinotHelixTaskResourceManager;
   private final ControllerConf _controllerConf;
 
-  public ClusterInfoProvider(PinotHelixResourceManager pinotHelixResourceManager,
+  public ClusterInfoAccessor(PinotHelixResourceManager pinotHelixResourceManager,
       PinotHelixTaskResourceManager pinotHelixTaskResourceManager, ControllerConf controllerConf) {
     _pinotHelixResourceManager = pinotHelixResourceManager;
     _pinotHelixTaskResourceManager = pinotHelixTaskResourceManager;
@@ -94,6 +98,38 @@ public class ClusterInfoProvider {
   }
 
   /**
+   * Get all segment metadata for the given lowlevel REALTIME table name.
+   *
+   * @param tableName Table name with or without REALTIME type suffix
+   * @return List of segment metadata
+   */
+  public List<LLCRealtimeSegmentZKMetadata> getLLCRealtimeSegmentsMetadata(String tableName) {
+    return ZKMetadataProvider
+        .getLLCRealtimeSegmentZKMetadataListForTable(_pinotHelixResourceManager.getPropertyStore(), tableName);
+  }
+
+  /**
+   * Fetches the {@link RealtimeToOfflineSegmentsTaskMetadata} from MINION_TASK_METADATA for given realtime table
+   * @param tableNameWithType realtime table name
+   */
+  public RealtimeToOfflineSegmentsTaskMetadata getMinionRealtimeToOfflineSegmentsTaskMetadata(
+      String tableNameWithType) {
+    return MinionTaskMetadataUtils
+        .getRealtimeToOfflineSegmentsTaskMetadata(_pinotHelixResourceManager.getPropertyStore(),
+            MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, tableNameWithType);
+  }
+
+  /**
+   * Sets the {@link RealtimeToOfflineSegmentsTaskMetadata} into MINION_TASK_METADATA
+   * This call will override any previous metadata node
+   */
+  public void setRealtimeToOfflineSegmentsTaskMetadata(
+      RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata) {
+    MinionTaskMetadataUtils.persistRealtimeToOfflineSegmentsTaskMetadata(_pinotHelixResourceManager.getPropertyStore(),
+        MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, realtimeToOfflineSegmentsTaskMetadata, -1);
+  }
+
+  /**
    * Get all tasks' state for the given task type.
    *
    * @param taskType Task type
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index 93422f7..7a17718 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -48,7 +48,7 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
   private static final Logger LOGGER = LoggerFactory.getLogger(PinotTaskManager.class);
 
   private final PinotHelixTaskResourceManager _helixTaskResourceManager;
-  private final ClusterInfoProvider _clusterInfoProvider;
+  private final ClusterInfoAccessor _clusterInfoAccessor;
   private final TaskGeneratorRegistry _taskGeneratorRegistry;
 
   public PinotTaskManager(PinotHelixTaskResourceManager helixTaskResourceManager,
@@ -58,8 +58,8 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
         controllerConf.getPinotTaskManagerInitialDelaySeconds(), helixResourceManager, leadControllerManager,
         controllerMetrics);
     _helixTaskResourceManager = helixTaskResourceManager;
-    _clusterInfoProvider = new ClusterInfoProvider(helixResourceManager, helixTaskResourceManager, controllerConf);
-    _taskGeneratorRegistry = new TaskGeneratorRegistry(_clusterInfoProvider);
+    _clusterInfoAccessor = new ClusterInfoAccessor(helixResourceManager, helixTaskResourceManager, controllerConf);
+    _taskGeneratorRegistry = new TaskGeneratorRegistry(_clusterInfoAccessor);
   }
 
   /**
@@ -69,8 +69,8 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
    *
    * @return Cluster info provider
    */
-  public ClusterInfoProvider getClusterInfoProvider() {
-    return _clusterInfoProvider;
+  public ClusterInfoAccessor getClusterInfoAccessor() {
+    return _clusterInfoAccessor;
   }
 
   /**
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/ConvertToRawIndexTaskGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/ConvertToRawIndexTaskGenerator.java
index dc4acf7..437ac93 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/ConvertToRawIndexTaskGenerator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/ConvertToRawIndexTaskGenerator.java
@@ -26,7 +26,7 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.pinot.common.data.Segment;
 import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
-import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
 import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.core.minion.PinotTaskConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -39,10 +39,10 @@ import org.slf4j.LoggerFactory;
 public class ConvertToRawIndexTaskGenerator implements PinotTaskGenerator {
   private static final Logger LOGGER = LoggerFactory.getLogger(ConvertToRawIndexTaskGenerator.class);
 
-  private final ClusterInfoProvider _clusterInfoProvider;
+  private final ClusterInfoAccessor _clusterInfoAccessor;
 
-  public ConvertToRawIndexTaskGenerator(ClusterInfoProvider clusterInfoProvider) {
-    _clusterInfoProvider = clusterInfoProvider;
+  public ConvertToRawIndexTaskGenerator(ClusterInfoAccessor clusterInfoAccessor) {
+    _clusterInfoAccessor = clusterInfoAccessor;
   }
 
   @Override
@@ -56,7 +56,7 @@ public class ConvertToRawIndexTaskGenerator implements PinotTaskGenerator {
 
     // Get the segments that are being converted so that we don't submit them again
     Set<Segment> runningSegments =
-        TaskGeneratorUtils.getRunningSegments(MinionConstants.ConvertToRawIndexTask.TASK_TYPE, _clusterInfoProvider);
+        TaskGeneratorUtils.getRunningSegments(MinionConstants.ConvertToRawIndexTask.TASK_TYPE, _clusterInfoAccessor);
 
     for (TableConfig tableConfig : tableConfigs) {
       // Only generate tasks for OFFLINE tables
@@ -90,7 +90,7 @@ public class ConvertToRawIndexTaskGenerator implements PinotTaskGenerator {
 
       // Generate tasks
       int tableNumTasks = 0;
-      for (OfflineSegmentZKMetadata offlineSegmentZKMetadata : _clusterInfoProvider
+      for (OfflineSegmentZKMetadata offlineSegmentZKMetadata : _clusterInfoAccessor
           .getOfflineSegmentsMetadata(offlineTableName)) {
         // Generate up to tableMaxNumTasks tasks each time for each table
         if (tableNumTasks == tableMaxNumTasks) {
@@ -111,7 +111,7 @@ public class ConvertToRawIndexTaskGenerator implements PinotTaskGenerator {
           configs.put(MinionConstants.TABLE_NAME_KEY, offlineTableName);
           configs.put(MinionConstants.SEGMENT_NAME_KEY, segmentName);
           configs.put(MinionConstants.DOWNLOAD_URL_KEY, offlineSegmentZKMetadata.getDownloadUrl());
-          configs.put(MinionConstants.UPLOAD_URL_KEY, _clusterInfoProvider.getVipUrl() + "/segments");
+          configs.put(MinionConstants.UPLOAD_URL_KEY, _clusterInfoAccessor.getVipUrl() + "/segments");
           configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY, String.valueOf(offlineSegmentZKMetadata.getCrc()));
           if (columnsToConvertConfig != null) {
             configs.put(MinionConstants.ConvertToRawIndexTask.COLUMNS_TO_CONVERT_KEY, columnsToConvertConfig);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
new file mode 100644
index 0000000..b505d28
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
@@ -0,0 +1,312 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion.generator;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
+import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
+import org.apache.pinot.common.utils.CommonConstants.Segment;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A {@link PinotTaskGenerator} implementation for generating tasks of type {@link RealtimeToOfflineSegmentsTask}
+ *
+ * These will be generated only for REALTIME tables.
+ * At any given time, only 1 task of this type should be generated for a table.
+ *
+ * Steps:
+ *  - The watermarkMs is read from the {@link RealtimeToOfflineSegmentsTaskMetadata} ZNode
+ *  found at MINION_TASK_METADATA/RealtimeToOfflineSegmentsTask/tableNameWithType
+ *  In case of cold-start, no ZNode will exist.
+ *  A new ZNode will be created, with watermarkMs as the smallest time found in the COMPLETED segments
+ *
+ *  - The execution window for the task is calculated as,
+ *  windowStartMs = watermarkMs, windowEndMs = windowStartMs + bucketTimeMs,
+ *  where bucketTime can be provided in the taskConfigs (default 1d)
+ *
+ *  - If the execution window is not older than bufferTimeMs, no task will be generated,
+ *  where bufferTime can be provided in the taskConfigs (default 2d)
+ *
+ *  - Segment metadata is scanned for all COMPLETED segments,
+ *  to pick those containing data in window [windowStartMs, windowEndMs)
+ *
+ *  - There are some special considerations for using last completed segment of a partition.
+ *  Such segments will be checked for segment endTime, to ensure there's no overflow into CONSUMING segments
+ *
+ *  - A PinotTaskConfig is created, with segment information, execution window, and any config specific to the task
+ */
+public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerator {
+  private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskGenerator.class);
+
+  private static final String DEFAULT_BUCKET_PERIOD = "1d";
+  private static final String DEFAULT_BUFFER_PERIOD = "2d";
+
+  private final ClusterInfoAccessor _clusterInfoAccessor;
+
+  public RealtimeToOfflineSegmentsTaskGenerator(ClusterInfoAccessor clusterInfoAccessor) {
+    _clusterInfoAccessor = clusterInfoAccessor;
+  }
+
+  @Override
+  public String getTaskType() {
+    return RealtimeToOfflineSegmentsTask.TASK_TYPE;
+  }
+
+  @Override
+  public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
+    String taskType = RealtimeToOfflineSegmentsTask.TASK_TYPE;
+    List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+
+    for (TableConfig tableConfig : tableConfigs) {
+      String realtimeTableName = tableConfig.getTableName();
+
+      if (tableConfig.getTableType() != TableType.REALTIME) {
+        LOGGER.warn("Skip generating task: {} for non-REALTIME table: {}", taskType, realtimeTableName);
+        continue;
+      }
+      if (new StreamConfig(realtimeTableName, tableConfig.getIndexingConfig().getStreamConfigs())
+          .hasHighLevelConsumerType()) {
+        LOGGER.warn("Skip generating task: {} for HLC REALTIME table: {}", taskType, realtimeTableName);
+        continue;
+      }
+      LOGGER.info("Start generating task configs for table: {} for task: {}", realtimeTableName, taskType);
+
+      // Only schedule 1 task of this type, per table
+      Map<String, TaskState> incompleteTasks =
+          TaskGeneratorUtils.getIncompleteTasks(taskType, realtimeTableName, _clusterInfoAccessor);
+      if (!incompleteTasks.isEmpty()) {
+        LOGGER
+            .warn("Found incomplete tasks: {} for same table: {}. Skipping task generation.", incompleteTasks.keySet(),
+                realtimeTableName);
+        continue;
+      }
+
+      // Get all segment metadata for completed segments (DONE status).
+      List<LLCRealtimeSegmentZKMetadata> completedSegmentsMetadata = new ArrayList<>();
+      Map<Integer, String> partitionToLatestCompletedSegmentName = new HashMap<>();
+      Set<Integer> allPartitions = new HashSet<>();
+      getCompletedSegmentsInfo(realtimeTableName, completedSegmentsMetadata, partitionToLatestCompletedSegmentName,
+          allPartitions);
+      if (completedSegmentsMetadata.isEmpty()) {
+        LOGGER
+            .info("No realtime-completed segments found for table: {}, skipping task generation: {}", realtimeTableName,
+                taskType);
+        continue;
+      }
+      allPartitions.removeAll(partitionToLatestCompletedSegmentName.keySet());
+      if (!allPartitions.isEmpty()) {
+        LOGGER
+            .info("Partitions: {} have no completed segments. Table: {} is not ready for {}. Skipping task generation.",
+                allPartitions, realtimeTableName, taskType);
+        continue;
+      }
+
+      TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
+      Preconditions.checkState(tableTaskConfig != null);
+      Map<String, String> taskConfigs = tableTaskConfig.getConfigsForTaskType(taskType);
+      Preconditions.checkState(taskConfigs != null, "Task config shouldn't be null for table: {}", realtimeTableName);
+
+      // Get the bucket size and buffer
+      String bucketTimePeriod =
+          taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUCKET_TIME_PERIOD_KEY, DEFAULT_BUCKET_PERIOD);
+      String bufferTimePeriod =
+          taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUFFER_TIME_PERIOD_KEY, DEFAULT_BUFFER_PERIOD);
+      long bucketMs = TimeUtils.convertPeriodToMillis(bucketTimePeriod);
+      long bufferMs = TimeUtils.convertPeriodToMillis(bufferTimePeriod);
+
+      // Get watermark from RealtimeToOfflineSegmentsTaskMetadata ZNode. WindowStart = watermark. WindowEnd = windowStart + bucket.
+      long windowStartMs = getWatermarkMs(realtimeTableName, completedSegmentsMetadata, bucketMs);
+      long windowEndMs = windowStartMs + bucketMs;
+
+      // Check that execution window is older than bufferTime
+      if (windowEndMs > System.currentTimeMillis() - bufferMs) {
+        LOGGER.info(
+            "Window with start: {} and end: {} is not older than buffer time: {} configured as {} ago. Skipping task generation: {}",
+            windowStartMs, windowEndMs, bufferMs, bufferTimePeriod, taskType);
+        continue;
+      }
+
+      // Find all COMPLETED segments with data overlapping execution window: windowStart (inclusive) to windowEnd (exclusive)
+      List<String> segmentNames = new ArrayList<>();
+      List<String> downloadURLs = new ArrayList<>();
+      Set<String> lastCompletedSegmentPerPartition = new HashSet<>(partitionToLatestCompletedSegmentName.values());
+      boolean skipGenerate = false;
+      for (LLCRealtimeSegmentZKMetadata realtimeSegmentZKMetadata : completedSegmentsMetadata) {
+        String segmentName = realtimeSegmentZKMetadata.getSegmentName();
+        TimeUnit timeUnit = realtimeSegmentZKMetadata.getTimeUnit();
+        long segmentStartTimeMs = timeUnit.toMillis(realtimeSegmentZKMetadata.getStartTime());
+        long segmentEndTimeMs = timeUnit.toMillis(realtimeSegmentZKMetadata.getEndTime());
+
+        // Check overlap with window
+        if (windowStartMs <= segmentEndTimeMs && segmentStartTimeMs < windowEndMs) {
+          // If last completed segment is being used, make sure that segment crosses over end of window.
+          // In the absence of this check, CONSUMING segments could contain some portion of the window. That data would be skipped forever.
+          if (lastCompletedSegmentPerPartition.contains(segmentName) && segmentEndTimeMs < windowEndMs) {
+            LOGGER.info(
+                "Window data overflows into CONSUMING segments for partition of segment: {}. Skipping task generation: {}",
+                segmentName, taskType);
+            skipGenerate = true;
+            break;
+          }
+          segmentNames.add(segmentName);
+          downloadURLs.add(realtimeSegmentZKMetadata.getDownloadUrl());
+        }
+      }
+
+      if (segmentNames.isEmpty() || skipGenerate) {
+        LOGGER.info("Found no eligible segments for task: {} with window [{} - {}). Skipping task generation", taskType,
+            windowStartMs, windowEndMs);
+        continue;
+      }
+
+      Map<String, String> configs = new HashMap<>();
+      configs.put(MinionConstants.TABLE_NAME_KEY, realtimeTableName);
+      configs.put(MinionConstants.SEGMENT_NAME_KEY, StringUtils.join(segmentNames, ","));
+      configs.put(MinionConstants.DOWNLOAD_URL_KEY, StringUtils.join(downloadURLs, MinionConstants.URL_SEPARATOR));
+      configs.put(MinionConstants.UPLOAD_URL_KEY, _clusterInfoAccessor.getVipUrl() + "/segments");
+
+      // Execution window
+      configs.put(RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY, String.valueOf(windowStartMs));
+      configs.put(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY, String.valueOf(windowEndMs));
+
+      // Segment processor configs
+      String timeColumnTransformationConfig =
+          taskConfigs.get(RealtimeToOfflineSegmentsTask.TIME_COLUMN_TRANSFORM_FUNCTION_KEY);
+      if (timeColumnTransformationConfig != null) {
+        configs.put(RealtimeToOfflineSegmentsTask.TIME_COLUMN_TRANSFORM_FUNCTION_KEY, timeColumnTransformationConfig);
+      }
+      String collectorTypeConfig = taskConfigs.get(RealtimeToOfflineSegmentsTask.COLLECTOR_TYPE_KEY);
+      if (collectorTypeConfig != null) {
+        configs.put(RealtimeToOfflineSegmentsTask.COLLECTOR_TYPE_KEY, collectorTypeConfig);
+      }
+      for (Map.Entry<String, String> entry : taskConfigs.entrySet()) {
+        if (entry.getKey().endsWith(RealtimeToOfflineSegmentsTask.AGGREGATION_TYPE_KEY_SUFFIX)) {
+          configs.put(entry.getKey(), entry.getValue());
+        }
+      }
+      String maxNumRecordsPerSegmentConfig =
+          taskConfigs.get(RealtimeToOfflineSegmentsTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY);
+      if (maxNumRecordsPerSegmentConfig != null) {
+        configs.put(RealtimeToOfflineSegmentsTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY, maxNumRecordsPerSegmentConfig);
+      }
+
+      pinotTaskConfigs.add(new PinotTaskConfig(taskType, configs));
+      LOGGER.info("Finished generating task configs for table: {} for task: {}", realtimeTableName, taskType);
+    }
+    return pinotTaskConfigs;
+  }
+
+  /**
+   * Fetch completed (non-consuming) segment and partition information
+   * @param realtimeTableName the realtime table name
+   * @param completedSegmentsMetadataList list for collecting the completed segments metadata
+   * @param partitionToLatestCompletedSegmentName map for collecting the partitionId to the latest completed segment name
+   * @param allPartitions set for collecting all partition ids
+   */
+  private void getCompletedSegmentsInfo(String realtimeTableName,
+      List<LLCRealtimeSegmentZKMetadata> completedSegmentsMetadataList,
+      Map<Integer, String> partitionToLatestCompletedSegmentName, Set<Integer> allPartitions) {
+    List<LLCRealtimeSegmentZKMetadata> realtimeSegmentsMetadataList =
+        _clusterInfoAccessor.getLLCRealtimeSegmentsMetadata(realtimeTableName);
+
+    Map<Integer, LLCSegmentName> latestLLCSegmentNameMap = new HashMap<>();
+    for (LLCRealtimeSegmentZKMetadata metadata : realtimeSegmentsMetadataList) {
+      LLCSegmentName llcSegmentName = new LLCSegmentName(metadata.getSegmentName());
+      allPartitions.add(llcSegmentName.getPartitionId());
+
+      if (metadata.getStatus().equals(Segment.Realtime.Status.DONE)) {
+        completedSegmentsMetadataList.add(metadata);
+        latestLLCSegmentNameMap.compute(llcSegmentName.getPartitionId(), (partitionId, latestLLCSegmentName) -> {
+          if (latestLLCSegmentName == null) {
+            return llcSegmentName;
+          } else {
+            if (llcSegmentName.getSequenceNumber() > latestLLCSegmentName.getSequenceNumber()) {
+              return llcSegmentName;
+            } else {
+              return latestLLCSegmentName;
+            }
+          }
+        });
+      }
+    }
+
+    for (Map.Entry<Integer, LLCSegmentName> entry : latestLLCSegmentNameMap.entrySet()) {
+      partitionToLatestCompletedSegmentName.put(entry.getKey(), entry.getValue().getSegmentName());
+    }
+  }
+
+  /**
+   * Get the watermark from the RealtimeToOfflineSegmentsMetadata ZNode.
+   * If the znode is null, computes the watermark using either the start time config or the start time from segment metadata
+   */
+  private long getWatermarkMs(String realtimeTableName, List<LLCRealtimeSegmentZKMetadata> completedSegmentsMetadata,
+      long bucketMs) {
+    RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata =
+        _clusterInfoAccessor.getMinionRealtimeToOfflineSegmentsTaskMetadata(realtimeTableName);
+
+    if (realtimeToOfflineSegmentsTaskMetadata == null) {
+      // No ZNode exists. Cold-start.
+      long watermarkMs;
+
+      // Find the smallest time from all segments
+      RealtimeSegmentZKMetadata minSegmentZkMetadata = null;
+      for (LLCRealtimeSegmentZKMetadata realtimeSegmentZKMetadata : completedSegmentsMetadata) {
+        if (minSegmentZkMetadata == null || realtimeSegmentZKMetadata.getStartTime() < minSegmentZkMetadata
+            .getStartTime()) {
+          minSegmentZkMetadata = realtimeSegmentZKMetadata;
+        }
+      }
+      Preconditions.checkState(minSegmentZkMetadata != null);
+
+      // Convert the segment minTime to millis
+      long minSegmentStartTimeMs = minSegmentZkMetadata.getTimeUnit().toMillis(minSegmentZkMetadata.getStartTime());
+
+      // Round off according to the bucket. This ensures we align the offline segments to proper time boundaries
+      // For example, if start time millis is 20200813T12:34:59, we want to create the first segment for window [20200813, 20200814)
+      watermarkMs = (minSegmentStartTimeMs / bucketMs) * bucketMs;
+
+      // Create RealtimeToOfflineSegmentsTaskMetadata ZNode using watermark calculated above
+      realtimeToOfflineSegmentsTaskMetadata = new RealtimeToOfflineSegmentsTaskMetadata(realtimeTableName, watermarkMs);
+      _clusterInfoAccessor.setRealtimeToOfflineSegmentsTaskMetadata(realtimeToOfflineSegmentsTaskMetadata);
+    }
+    return realtimeToOfflineSegmentsTaskMetadata.getWatermarkMs();
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java
index ff8d37e..f112d8b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java
@@ -23,7 +23,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nonnull;
-import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
 import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
 
 
@@ -33,8 +33,9 @@ import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManag
 public class TaskGeneratorRegistry {
   private final Map<String, PinotTaskGenerator> _taskGeneratorRegistry = new HashMap<>();
 
-  public TaskGeneratorRegistry(@Nonnull ClusterInfoProvider clusterInfoProvider) {
-    registerTaskGenerator(new ConvertToRawIndexTaskGenerator(clusterInfoProvider));
+  public TaskGeneratorRegistry(@Nonnull ClusterInfoAccessor clusterInfoAccessor) {
+    registerTaskGenerator(new ConvertToRawIndexTaskGenerator(clusterInfoAccessor));
+    registerTaskGenerator(new RealtimeToOfflineSegmentsTaskGenerator(clusterInfoAccessor));
   }
 
   /**
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorUtils.java
index 31f0c70..e4878a9 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorUtils.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorUtils.java
@@ -18,13 +18,14 @@
  */
 package org.apache.pinot.controller.helix.core.minion.generator;
 
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nonnull;
 import org.apache.helix.task.TaskState;
 import org.apache.pinot.common.data.Segment;
-import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
 import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
 import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.core.minion.PinotTaskConfig;
@@ -39,13 +40,13 @@ public class TaskGeneratorUtils {
    * NOTE: we consider tasks not finished in one day as stuck and don't count the segments in them
    *
    * @param taskType Task type
-   * @param clusterInfoProvider Cluster info provider
+   * @param clusterInfoAccessor Cluster info accessor
    * @return Set of running segments
    */
   public static Set<Segment> getRunningSegments(@Nonnull String taskType,
-      @Nonnull ClusterInfoProvider clusterInfoProvider) {
+      @Nonnull ClusterInfoAccessor clusterInfoAccessor) {
     Set<Segment> runningSegments = new HashSet<>();
-    Map<String, TaskState> taskStates = clusterInfoProvider.getTaskStates(taskType);
+    Map<String, TaskState> taskStates = clusterInfoAccessor.getTaskStates(taskType);
     for (Map.Entry<String, TaskState> entry : taskStates.entrySet()) {
       // Skip COMPLETED tasks
       if (entry.getValue() == TaskState.COMPLETED) {
@@ -54,13 +55,11 @@ public class TaskGeneratorUtils {
 
       // Skip tasks scheduled for more than one day
       String taskName = entry.getKey();
-      long scheduleTimeMs = Long.parseLong(
-          taskName.substring(taskName.lastIndexOf(PinotHelixTaskResourceManager.TASK_NAME_SEPARATOR) + 1));
-      if (System.currentTimeMillis() - scheduleTimeMs > ONE_DAY_IN_MILLIS) {
+      if (isTaskOlderThanOneDay(taskName)) {
         continue;
       }
 
-      for (PinotTaskConfig pinotTaskConfig : clusterInfoProvider.getTaskConfigs(entry.getKey())) {
+      for (PinotTaskConfig pinotTaskConfig : clusterInfoAccessor.getTaskConfigs(entry.getKey())) {
         Map<String, String> configs = pinotTaskConfig.getConfigs();
         runningSegments.add(
             new Segment(configs.get(MinionConstants.TABLE_NAME_KEY), configs.get(MinionConstants.SEGMENT_NAME_KEY)));
@@ -68,4 +67,41 @@ public class TaskGeneratorUtils {
     }
     return runningSegments;
   }
+
+  /**
+   * Gets all the tasks for the provided task type and tableName, which do not have TaskState COMPLETED
+   * @return map containing task name to task state for non-completed tasks
+   *
+   * NOTE: we consider tasks not finished in one day as stuck and don't count them
+   */
+  public static Map<String, TaskState> getIncompleteTasks(String taskType, String tableNameWithType,
+      ClusterInfoAccessor clusterInfoAccessor) {
+
+    Map<String, TaskState> nonCompletedTasks = new HashMap<>();
+    Map<String, TaskState> taskStates = clusterInfoAccessor.getTaskStates(taskType);
+    for (Map.Entry<String, TaskState> entry : taskStates.entrySet()) {
+      if (entry.getValue() == TaskState.COMPLETED) {
+        continue;
+      }
+      String taskName = entry.getKey();
+      if (isTaskOlderThanOneDay(taskName)) {
+        continue;
+      }
+      for (PinotTaskConfig pinotTaskConfig : clusterInfoAccessor.getTaskConfigs(entry.getKey())) {
+        if (tableNameWithType.equals(pinotTaskConfig.getConfigs().get(MinionConstants.TABLE_NAME_KEY))) {
+          nonCompletedTasks.put(entry.getKey(), entry.getValue());
+        }
+      }
+    }
+    return nonCompletedTasks;
+  }
+
+  /**
+   * Returns true if task's schedule time is older than 1d
+   */
+  private static boolean isTaskOlderThanOneDay(String taskName) {
+    long scheduleTimeMs =
+        Long.parseLong(taskName.substring(taskName.lastIndexOf(PinotHelixTaskResourceManager.TASK_NAME_SEPARATOR) + 1));
+    return System.currentTimeMillis() - scheduleTimeMs > ONE_DAY_IN_MILLIS;
+  }
 }
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGeneratorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGeneratorTest.java
new file mode 100644
index 0000000..5aa3377
--- /dev/null
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGeneratorTest.java
@@ -0,0 +1,452 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion.generator;
+
+import com.google.common.collect.Lists;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
+import org.apache.pinot.common.utils.CommonConstants.Segment.Realtime.Status;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Tests for {@link RealtimeToOfflineSegmentsTaskGenerator}
+ */
+public class RealtimeToOfflineSegmentsTaskGeneratorTest {
+
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String REALTIME_TABLE_NAME = "testTable_REALTIME";
+  private static final String TIME_COLUMN_NAME = "millisSinceEpoch";
+  private final Map<String, String> streamConfigs = new HashMap<>();
+
+  @BeforeClass
+  public void setup() {
+    streamConfigs.put(StreamConfigProperties.STREAM_TYPE, "kafka");
+    streamConfigs
+        .put(StreamConfigProperties.constructStreamProperty("kafka", StreamConfigProperties.STREAM_CONSUMER_TYPES),
+            StreamConfig.ConsumerType.LOWLEVEL.toString());
+    streamConfigs.put(StreamConfigProperties.constructStreamProperty("kafka", StreamConfigProperties.STREAM_TOPIC_NAME),
+        "myTopic");
+    streamConfigs
+        .put(StreamConfigProperties.constructStreamProperty("kafka", StreamConfigProperties.STREAM_DECODER_CLASS),
+            "org.foo.Decoder");
+  }
+
+  private TableConfig getRealtimeTableConfig(Map<String, Map<String, String>> taskConfigsMap) {
+    return new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME)
+        .setStreamConfigs(streamConfigs).setTaskConfig(new TableTaskConfig(taskConfigsMap)).build();
+  }
+
+  /**
+   * Tests for some config checks
+   */
+  @Test
+  public void testGenerateTasksCheckConfigs() {
+    ClusterInfoAccessor mockClusterInfoProvide = mock(ClusterInfoAccessor.class);
+
+    when(mockClusterInfoProvide.getTaskStates(RealtimeToOfflineSegmentsTask.TASK_TYPE)).thenReturn(new HashMap<>());
+    LLCRealtimeSegmentZKMetadata metadata1 =
+        getRealtimeSegmentZKMetadata("testTable__0__0__12345", Status.DONE, 5000, 50_000, TimeUnit.MILLISECONDS, null);
+    when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
+        .thenReturn(Lists.newArrayList(metadata1));
+
+    RealtimeToOfflineSegmentsTaskGenerator generator =
+        new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+
+    // Skip task generation, if offline table
+    TableConfig offlineTableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+    List<PinotTaskConfig> pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(offlineTableConfig));
+    assertTrue(pinotTaskConfigs.isEmpty());
+
+    // No tableTaskConfig, error
+    TableConfig realtimeTableConfig = getRealtimeTableConfig(new HashMap<>());
+    realtimeTableConfig.setTaskConfig(null);
+    try {
+      generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+      Assert.fail("Should have failed for null tableTaskConfig");
+    } catch (IllegalStateException e) {
+      // expected
+    }
+
+    // No taskConfig for task, error
+    realtimeTableConfig = getRealtimeTableConfig(new HashMap<>());
+    try {
+      generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+      Assert.fail("Should have failed for null taskConfig");
+    } catch (IllegalStateException e) {
+      // expected
+    }
+  }
+
+  /**
+   * Tests for some constraints on simultaneous tasks scheduled
+   */
+  @Test
+  public void testGenerateTasksSimultaneousConstraints() {
+    Map<String, Map<String, String>> taskConfigsMap = new HashMap<>();
+    taskConfigsMap.put(RealtimeToOfflineSegmentsTask.TASK_TYPE, new HashMap<>());
+    TableConfig realtimeTableConfig = getRealtimeTableConfig(taskConfigsMap);
+
+    ClusterInfoAccessor mockClusterInfoProvide = mock(ClusterInfoAccessor.class);
+    Map<String, TaskState> taskStatesMap = new HashMap<>();
+    String taskName = "Task_RealtimeToOfflineSegmentsTask_" + System.currentTimeMillis();
+    Map<String, String> taskConfigs = new HashMap<>();
+    taskConfigs.put(MinionConstants.TABLE_NAME_KEY, REALTIME_TABLE_NAME);
+    when(mockClusterInfoProvide.getTaskStates(RealtimeToOfflineSegmentsTask.TASK_TYPE)).thenReturn(taskStatesMap);
+    when(mockClusterInfoProvide.getTaskConfigs(taskName))
+        .thenReturn(Lists.newArrayList(new PinotTaskConfig(RealtimeToOfflineSegmentsTask.TASK_TYPE, taskConfigs)));
+    when(mockClusterInfoProvide.getMinionRealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME))
+        .thenReturn(new RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, 100_000L));
+    LLCRealtimeSegmentZKMetadata metadata1 =
+        getRealtimeSegmentZKMetadata("testTable__0__0__12345", Status.DONE, 80_000_000, 90_000_000,
+            TimeUnit.MILLISECONDS, null);
+    when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
+        .thenReturn(Lists.newArrayList(metadata1));
+
+    RealtimeToOfflineSegmentsTaskGenerator generator =
+        new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+
+    // if same task and table, IN_PROGRESS, then don't generate again
+    taskStatesMap.put(taskName, TaskState.IN_PROGRESS);
+    List<PinotTaskConfig> pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+    assertTrue(pinotTaskConfigs.isEmpty());
+
+    // if same task and table, but COMPLETED, generate
+    taskStatesMap.put(taskName, TaskState.COMPLETED);
+    pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+    assertEquals(pinotTaskConfigs.size(), 1);
+
+    // if same task and table, IN_PROGRESS, but older than 1 day, generate
+    String oldTaskName =
+        "Task_RealtimeToOfflineSegmentsTask_" + (System.currentTimeMillis() - TimeUnit.DAYS.toMillis(3));
+    taskStatesMap.remove(taskName);
+    taskStatesMap.put(oldTaskName, TaskState.IN_PROGRESS);
+    pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+    assertEquals(pinotTaskConfigs.size(), 1);
+  }
+
+  /**
+   * Tests for realtime table with no segments
+   */
+  @Test
+  public void testGenerateTasksNoSegments() {
+    Map<String, Map<String, String>> taskConfigsMap = new HashMap<>();
+    taskConfigsMap.put(RealtimeToOfflineSegmentsTask.TASK_TYPE, new HashMap<>());
+    TableConfig realtimeTableConfig = getRealtimeTableConfig(taskConfigsMap);
+
+    // No segments in table
+    ClusterInfoAccessor mockClusterInfoProvide = mock(ClusterInfoAccessor.class);
+    when(mockClusterInfoProvide.getTaskStates(RealtimeToOfflineSegmentsTask.TASK_TYPE)).thenReturn(new HashMap<>());
+    when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME)).thenReturn(Lists.newArrayList());
+
+    RealtimeToOfflineSegmentsTaskGenerator generator =
+        new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+    List<PinotTaskConfig> pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+    assertTrue(pinotTaskConfigs.isEmpty());
+
+    // No COMPLETED segments in table
+    LLCRealtimeSegmentZKMetadata seg1 =
+        getRealtimeSegmentZKMetadata("testTable__0__0__12345", Status.IN_PROGRESS, -1, -1, TimeUnit.MILLISECONDS, null);
+    when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
+        .thenReturn(Lists.newArrayList(seg1));
+
+    generator = new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+    pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+    assertTrue(pinotTaskConfigs.isEmpty());
+
+    // 2 partitions. No COMPLETED segments for partition 0
+    LLCRealtimeSegmentZKMetadata seg2 =
+        getRealtimeSegmentZKMetadata("testTable__1__0__12345", Status.DONE, 5000, 10000, TimeUnit.MILLISECONDS, null);
+    LLCRealtimeSegmentZKMetadata seg3 =
+        getRealtimeSegmentZKMetadata("testTable__1__1__13456", Status.IN_PROGRESS, -1, -1, TimeUnit.MILLISECONDS, null);
+    when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
+        .thenReturn(Lists.newArrayList(seg1, seg2, seg3));
+
+    generator = new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+    pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+    assertTrue(pinotTaskConfigs.isEmpty());
+  }
+
+  /**
+   * Test cold start. No minion metadata exists. Watermark is calculated based on config or existing segments
+   */
+  @Test
+  public void testGenerateTasksNoMinionMetadata() {
+    ClusterInfoAccessor mockClusterInfoProvide = mock(ClusterInfoAccessor.class);
+    when(mockClusterInfoProvide.getTaskStates(RealtimeToOfflineSegmentsTask.TASK_TYPE)).thenReturn(new HashMap<>());
+    when(mockClusterInfoProvide.getMinionRealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME)).thenReturn(null);
+    LLCRealtimeSegmentZKMetadata seg1 =
+        getRealtimeSegmentZKMetadata("testTable__0__0__12345", Status.DONE, 1590048000000L, 1590134400000L,
+            TimeUnit.MILLISECONDS, "download1"); // 21 May 2020 8am to 22 May 2020 8am UTC
+    LLCRealtimeSegmentZKMetadata seg2 =
+        getRealtimeSegmentZKMetadata("testTable__1__0__12345", Status.DONE, 1590048000000L, 1590134400000L,
+            TimeUnit.MILLISECONDS, "download2"); // 21 May 2020 8am to 22 May 2020 8am UTC
+    when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
+        .thenReturn(Lists.newArrayList(seg1, seg2));
+
+    // StartTime calculated using segment metadata
+    Map<String, Map<String, String>> taskConfigsMap = new HashMap<>();
+    taskConfigsMap.put(RealtimeToOfflineSegmentsTask.TASK_TYPE, new HashMap<>());
+    TableConfig realtimeTableConfig = getRealtimeTableConfig(taskConfigsMap);
+
+    RealtimeToOfflineSegmentsTaskGenerator generator =
+        new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+    List<PinotTaskConfig> pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+    assertEquals(pinotTaskConfigs.size(), 1);
+    assertEquals(pinotTaskConfigs.get(0).getTaskType(), RealtimeToOfflineSegmentsTask.TASK_TYPE);
+    Map<String, String> configs = pinotTaskConfigs.get(0).getConfigs();
+    assertEquals(configs.get(MinionConstants.TABLE_NAME_KEY), REALTIME_TABLE_NAME);
+    assertEquals(configs.get(MinionConstants.SEGMENT_NAME_KEY), "testTable__0__0__12345,testTable__1__0__12345");
+    assertEquals(configs.get(MinionConstants.DOWNLOAD_URL_KEY), "download1,download2");
+    assertEquals(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY), "1590019200000"); // 21 May 2020 UTC
+    assertEquals(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY), "1590105600000"); // 22 May 2020 UTC
+
+    // Segment metadata in hoursSinceEpoch
+    seg1 = getRealtimeSegmentZKMetadata("testTable__0__0__12345", Status.DONE, 441680L, 441703L, TimeUnit.HOURS,
+        "download1"); // 21 May 2020 8am to 22 May 2020 8am UTC
+    seg2 = getRealtimeSegmentZKMetadata("testTable__1__0__12345", Status.DONE, 441680L, 441703L, TimeUnit.HOURS,
+        "download2"); // 21 May 2020 8am to 22 May 2020 8am UTC
+    when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
+        .thenReturn(Lists.newArrayList(seg1, seg2));
+    generator = new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+    pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+    assertEquals(pinotTaskConfigs.size(), 1);
+    assertEquals(pinotTaskConfigs.get(0).getTaskType(), RealtimeToOfflineSegmentsTask.TASK_TYPE);
+    configs = pinotTaskConfigs.get(0).getConfigs();
+    assertEquals(configs.get(MinionConstants.TABLE_NAME_KEY), REALTIME_TABLE_NAME);
+    assertEquals(configs.get(MinionConstants.SEGMENT_NAME_KEY), "testTable__0__0__12345,testTable__1__0__12345");
+    assertEquals(configs.get(MinionConstants.DOWNLOAD_URL_KEY), "download1,download2");
+    assertEquals(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY), "1590019200000"); // 21 May 2020 UTC
+    assertEquals(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY), "1590105600000");  // 22 May 2020 UTC
+  }
+
+  /**
+   * Tests for subsequent runs after cold start
+   */
+  @Test
+  public void testGenerateTasksWithMinionMetadata() {
+    ClusterInfoAccessor mockClusterInfoProvide = mock(ClusterInfoAccessor.class);
+    when(mockClusterInfoProvide.getTaskStates(RealtimeToOfflineSegmentsTask.TASK_TYPE)).thenReturn(new HashMap<>());
+    when(mockClusterInfoProvide.getMinionRealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME))
+        .thenReturn(new RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, 1590019200000L)); // 21 May 2020 UTC
+    LLCRealtimeSegmentZKMetadata seg1 =
+        getRealtimeSegmentZKMetadata("testTable__0__0__12345", Status.DONE, 1589972400000L, 1590048000000L,
+            TimeUnit.MILLISECONDS, "download1"); // 05-20-2020T11:00:00 to 05-21-2020T08:00:00 UTC
+    LLCRealtimeSegmentZKMetadata seg2 =
+        getRealtimeSegmentZKMetadata("testTable__0__1__12345", Status.DONE, 1590048000000L, 1590134400000L,
+            TimeUnit.MILLISECONDS, "download2"); // 05-21-2020T08:00:00 UTC to 05-22-2020T08:00:00 UTC
+    when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
+        .thenReturn(Lists.newArrayList(seg1, seg2));
+
+    // Default configs
+    Map<String, Map<String, String>> taskConfigsMap = new HashMap<>();
+    taskConfigsMap.put(RealtimeToOfflineSegmentsTask.TASK_TYPE, new HashMap<>());
+    TableConfig realtimeTableConfig = getRealtimeTableConfig(taskConfigsMap);
+
+    RealtimeToOfflineSegmentsTaskGenerator generator =
+        new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+    List<PinotTaskConfig> pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+    assertEquals(pinotTaskConfigs.size(), 1);
+    assertEquals(pinotTaskConfigs.get(0).getTaskType(), RealtimeToOfflineSegmentsTask.TASK_TYPE);
+    Map<String, String> configs = pinotTaskConfigs.get(0).getConfigs();
+    assertEquals(configs.get(MinionConstants.TABLE_NAME_KEY), REALTIME_TABLE_NAME);
+    assertEquals(configs.get(MinionConstants.SEGMENT_NAME_KEY), "testTable__0__0__12345,testTable__0__1__12345");
+    assertEquals(configs.get(MinionConstants.DOWNLOAD_URL_KEY), "download1,download2");
+    assertEquals(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY), "1590019200000"); // 5-21-2020
+    assertEquals(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY), "1590105600000"); // 5-22-2020
+
+    // No segments match
+    when(mockClusterInfoProvide.getMinionRealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME))
+        .thenReturn(new RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, 1590490800000L)); // 26 May 2020 UTC
+    generator = new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+    pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+    assertEquals(pinotTaskConfigs.size(), 0);
+
+    // Some segments match
+    when(mockClusterInfoProvide.getMinionRealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME))
+        .thenReturn(new RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, 1590019200000L)); // 21 May 2020 UTC
+    taskConfigsMap = new HashMap<>();
+    Map<String, String> taskConfigs = new HashMap<>();
+    taskConfigs.put(RealtimeToOfflineSegmentsTask.BUCKET_TIME_PERIOD_KEY, "2h");
+    taskConfigsMap.put(RealtimeToOfflineSegmentsTask.TASK_TYPE, taskConfigs);
+    realtimeTableConfig = getRealtimeTableConfig(taskConfigsMap);
+    pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+    assertEquals(pinotTaskConfigs.size(), 1);
+    assertEquals(pinotTaskConfigs.get(0).getTaskType(), RealtimeToOfflineSegmentsTask.TASK_TYPE);
+    configs = pinotTaskConfigs.get(0).getConfigs();
+    assertEquals(configs.get(MinionConstants.TABLE_NAME_KEY), REALTIME_TABLE_NAME);
+    assertEquals(configs.get(MinionConstants.SEGMENT_NAME_KEY), "testTable__0__0__12345");
+    assertEquals(configs.get(MinionConstants.DOWNLOAD_URL_KEY), "download1");
+    assertEquals(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY),
+        "1590019200000"); // 05-21-2020T00:00:00
+    assertEquals(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY), "1590026400000"); // 05-21-2020T02:00:00
+
+    // Segment Processor configs
+    taskConfigsMap = new HashMap<>();
+    taskConfigs = new HashMap<>();
+    taskConfigs.put(RealtimeToOfflineSegmentsTask.TIME_COLUMN_TRANSFORM_FUNCTION_KEY, "foo");
+    taskConfigs.put(RealtimeToOfflineSegmentsTask.COLLECTOR_TYPE_KEY, "rollup");
+    taskConfigs.put("m1" + RealtimeToOfflineSegmentsTask.AGGREGATION_TYPE_KEY_SUFFIX, "MAX");
+    taskConfigsMap.put(RealtimeToOfflineSegmentsTask.TASK_TYPE, taskConfigs);
+    realtimeTableConfig = getRealtimeTableConfig(taskConfigsMap);
+    generator = new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+    pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+    assertEquals(pinotTaskConfigs.size(), 1);
+    assertEquals(pinotTaskConfigs.get(0).getTaskType(), RealtimeToOfflineSegmentsTask.TASK_TYPE);
+    configs = pinotTaskConfigs.get(0).getConfigs();
+    assertEquals(configs.get(MinionConstants.TABLE_NAME_KEY), REALTIME_TABLE_NAME);
+    assertEquals(configs.get(MinionConstants.SEGMENT_NAME_KEY), "testTable__0__0__12345,testTable__0__1__12345");
+    assertEquals(configs.get(MinionConstants.DOWNLOAD_URL_KEY), "download1,download2");
+    assertEquals(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY),
+        "1590019200000"); // 05-21-2020T00:00:00
+    assertEquals(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY), "1590105600000"); // 05-22-2020T00:00:00
+    assertEquals(configs.get(RealtimeToOfflineSegmentsTask.TIME_COLUMN_TRANSFORM_FUNCTION_KEY), "foo");
+    assertEquals(configs.get(RealtimeToOfflineSegmentsTask.COLLECTOR_TYPE_KEY), "rollup");
+    assertEquals(configs.get("m1" + RealtimeToOfflineSegmentsTask.AGGREGATION_TYPE_KEY_SUFFIX), "MAX");
+  }
+
+  /**
+   * Tests for skipping task generation due to CONSUMING segments overlap with window
+   */
+  @Test
+  public void testOverflowIntoConsuming() {
+    Map<String, Map<String, String>> taskConfigsMap = new HashMap<>();
+    taskConfigsMap.put(RealtimeToOfflineSegmentsTask.TASK_TYPE, new HashMap<>());
+    TableConfig realtimeTableConfig = getRealtimeTableConfig(taskConfigsMap);
+
+    ClusterInfoAccessor mockClusterInfoProvide = mock(ClusterInfoAccessor.class);
+    when(mockClusterInfoProvide.getTaskStates(RealtimeToOfflineSegmentsTask.TASK_TYPE)).thenReturn(new HashMap<>());
+
+    when(mockClusterInfoProvide.getMinionRealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME))
+        .thenReturn(new RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, 100_000L));
+    LLCRealtimeSegmentZKMetadata metadata1 =
+        getRealtimeSegmentZKMetadata("testTable__0__0__12345", Status.DONE, 50_000, 150_000, TimeUnit.MILLISECONDS,
+            null);
+    LLCRealtimeSegmentZKMetadata metadata2 =
+        getRealtimeSegmentZKMetadata("testTable__0__1__12345", Status.IN_PROGRESS, -1, -1, TimeUnit.MILLISECONDS, null);
+    when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
+        .thenReturn(Lists.newArrayList(metadata1, metadata2));
+
+    RealtimeToOfflineSegmentsTaskGenerator generator =
+        new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+
+    // last COMPLETED segment's endTime is less than windowEnd time. CONSUMING segment overlap. Skip task
+    List<PinotTaskConfig> pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+    assertTrue(pinotTaskConfigs.isEmpty());
+
+    metadata1 =
+        getRealtimeSegmentZKMetadata("testTable__0__0__12345", Status.DONE, 100_000, 200_000, TimeUnit.MILLISECONDS,
+            null);
+    metadata2 =
+        getRealtimeSegmentZKMetadata("testTable__0__1__12345", Status.IN_PROGRESS, -1, -1, TimeUnit.MILLISECONDS, null);
+    when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
+        .thenReturn(Lists.newArrayList(metadata1, metadata2));
+    pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+    assertTrue(pinotTaskConfigs.isEmpty());
+
+    // last completed segment endtime ends at window end, allow
+    metadata1 =
+        getRealtimeSegmentZKMetadata("testTable__0__0__12345", Status.DONE, 200_000, 86_500_000, TimeUnit.MILLISECONDS,
+            null);
+    metadata2 =
+        getRealtimeSegmentZKMetadata("testTable__0__1__12345", Status.IN_PROGRESS, -1, -1, TimeUnit.MILLISECONDS, null);
+    when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
+        .thenReturn(Lists.newArrayList(metadata1, metadata2));
+    pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+    assertEquals(pinotTaskConfigs.size(), 1);
+  }
+
+  @Test
+  public void testBuffer() {
+    Map<String, Map<String, String>> taskConfigsMap = new HashMap<>();
+    taskConfigsMap.put(RealtimeToOfflineSegmentsTask.TASK_TYPE, new HashMap<>());
+    TableConfig realtimeTableConfig = getRealtimeTableConfig(taskConfigsMap);
+
+    // default buffer - 2d
+    long now = System.currentTimeMillis();
+    long watermarkMs = now - TimeUnit.DAYS.toMillis(1);
+    ClusterInfoAccessor mockClusterInfoProvide = mock(ClusterInfoAccessor.class);
+    when(mockClusterInfoProvide.getTaskStates(RealtimeToOfflineSegmentsTask.TASK_TYPE)).thenReturn(new HashMap<>());
+    when(mockClusterInfoProvide.getMinionRealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME))
+        .thenReturn(new RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, watermarkMs));
+    LLCRealtimeSegmentZKMetadata metadata1 =
+        getRealtimeSegmentZKMetadata("testTable__0__0__12345", Status.DONE, watermarkMs - 100, watermarkMs + 100,
+            TimeUnit.MILLISECONDS, null);
+    when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
+        .thenReturn(Lists.newArrayList(metadata1));
+
+    RealtimeToOfflineSegmentsTaskGenerator generator =
+        new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+
+    List<PinotTaskConfig> pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+    assertTrue(pinotTaskConfigs.isEmpty());
+
+    // custom buffer
+    Map<String, String> taskConfigs = new HashMap<>();
+    taskConfigs.put(RealtimeToOfflineSegmentsTask.BUFFER_TIME_PERIOD_KEY, "15d");
+    taskConfigsMap.put(RealtimeToOfflineSegmentsTask.TASK_TYPE, taskConfigs);
+    realtimeTableConfig = getRealtimeTableConfig(taskConfigsMap);
+
+    watermarkMs = now - TimeUnit.DAYS.toMillis(10);
+    when(mockClusterInfoProvide.getMinionRealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME))
+        .thenReturn(new RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, watermarkMs));
+    metadata1 =
+        getRealtimeSegmentZKMetadata("testTable__0__0__12345", Status.DONE, watermarkMs - 100, watermarkMs + 100,
+            TimeUnit.MILLISECONDS, null);
+    when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
+        .thenReturn(Lists.newArrayList(metadata1));
+
+    pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+    assertTrue(pinotTaskConfigs.isEmpty());
+  }
+
+  private LLCRealtimeSegmentZKMetadata getRealtimeSegmentZKMetadata(String segmentName, Status status, long startTime,
+      long endTime, TimeUnit timeUnit, String downloadURL) {
+    LLCRealtimeSegmentZKMetadata realtimeSegmentZKMetadata = new LLCRealtimeSegmentZKMetadata();
+    realtimeSegmentZKMetadata.setSegmentName(segmentName);
+    realtimeSegmentZKMetadata.setStatus(status);
+    realtimeSegmentZKMetadata.setStartTime(startTime);
+    realtimeSegmentZKMetadata.setEndTime(endTime);
+    realtimeSegmentZKMetadata.setTimeUnit(timeUnit);
+    realtimeSegmentZKMetadata.setDownloadUrl(downloadURL);
+    return realtimeSegmentZKMetadata;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index f2049ea..cd98833 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -62,16 +62,30 @@ public class MinionConstants {
     public static final String MERGED_SEGMENT_NAME_KEY = "mergedSegmentNameKey";
   }
 
+  /**
+   * Creates segments for the OFFLINE table, using completed segments from the corresponding REALTIME table
+   */
   public static class RealtimeToOfflineSegmentsTask {
-    public static final String TASK_TYPE = "realtimeToOfflineSegmentsTask";
-    // window
-    public static final String WINDOW_START_MILLIS_KEY = "windowStartMillis";
-    public static final String WINDOW_END_MILLIS_KEY = "windowEndMillis";
-    // segment processing
+    public static final String TASK_TYPE = "RealtimeToOfflineSegmentsTask";
+
+    /**
+     * The time window size for the task.
+     * e.g. if set to "1d", then task is scheduled to run for a 1 day window
+     */
+    public static final String BUCKET_TIME_PERIOD_KEY = "bucketTimePeriod";
+    /**
+     * The time period to wait before picking segments for this task
+     * e.g. if set to "2d", no task will be scheduled for a time window younger than 2 days
+     */
+    public static final String BUFFER_TIME_PERIOD_KEY = "bufferTimePeriod";
+
+    // Window start and window end set by task generator
+    public static final String WINDOW_START_MS_KEY = "windowStartMs";
+    public static final String WINDOW_END_MS_KEY = "windowEndMs";
+    // Segment processing related configs
     public static final String TIME_COLUMN_TRANSFORM_FUNCTION_KEY = "timeColumnTransformFunction";
     public static final String COLLECTOR_TYPE_KEY = "collectorType";
     public static final String AGGREGATION_TYPE_KEY_SUFFIX = ".aggregationType";
     public static final String MAX_NUM_RECORDS_PER_SEGMENT_KEY = "maxNumRecordsPerSegment";
-
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapper.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapper.java
index a947d29..a09f3b5 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapper.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapper.java
@@ -50,9 +50,9 @@ import org.slf4j.LoggerFactory;
  * Mapper phase of the SegmentProcessorFramework.
  * Reads the input segment and creates partitioned avro data files
  * Performs:
- * - record transformations
+ * - record filtering
+ * - column transformations
  * - partitioning
- * - partition filtering
  */
 public class SegmentMapper {
 
@@ -74,8 +74,8 @@ public class SegmentMapper {
 
     _mapperId = mapperId;
     _avroSchema = SegmentProcessorUtils.convertPinotSchemaToAvroSchema(mapperConfig.getPinotSchema());
-    _recordTransformer = RecordTransformerFactory.getRecordTransformer(mapperConfig.getRecordTransformerConfig());
     _recordFilter = RecordFilterFactory.getRecordFilter(mapperConfig.getRecordFilterConfig());
+    _recordTransformer = RecordTransformerFactory.getRecordTransformer(mapperConfig.getRecordTransformerConfig());
     for (PartitionerConfig partitionerConfig : mapperConfig.getPartitionerConfigs()) {
       _partitioners.add(PartitionerFactory.getPartitioner(partitionerConfig));
     }
@@ -101,14 +101,14 @@ public class SegmentMapper {
     while (segmentRecordReader.hasNext()) {
       reusableRow = segmentRecordReader.next(reusableRow);
 
-      // Record transformation
-      reusableRow = _recordTransformer.transformRecord(reusableRow);
-
       // Record filtering
       if (_recordFilter.filter(reusableRow)) {
         continue;
       }
 
+      // Record transformation
+      reusableRow = _recordTransformer.transformRecord(reusableRow);
+
       // Partitioning
       int p = 0;
       for (Partitioner partitioner : _partitioners) {
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
index 1856b9d..88857e8 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
@@ -275,7 +275,7 @@ public class SegmentMapperTest {
     SegmentMapperConfig config11 = new SegmentMapperConfig(_pinotSchema,
         new RecordTransformerConfig.Builder().setTransformFunctionsMap(transformFunctionMap).build(),
         new RecordFilterConfig.Builder().setRecordFilterType(RecordFilterFactory.RecordFilterType.FILTER_FUNCTION)
-            .setFilterFunction("Groovy({timeValue != 1597795200000}, timeValue)").build(), Lists.newArrayList(
+            .setFilterFunction("Groovy({timeValue < 1597795200000L|| timeValue >= 1597881600000}, timeValue)").build(), Lists.newArrayList(
         new PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.COLUMN_VALUE)
             .setColumnName("timeValue").build()));
     Map<String, List<Object[]>> expectedRecords11 =
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
new file mode 100644
index 0000000..3f80e95
--- /dev/null
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
+import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
+import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+/**
+ * Integration test for minion task of type "RealtimeToOfflineSegmentsTask"
+ * With every task run, a new segment is created in the offline table for 1 day. Watermark also keeps progressing accordingly.
+ */
+public class RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends RealtimeClusterIntegrationTest {
+
+  private PinotHelixTaskResourceManager _helixTaskResourceManager;
+  private PinotTaskManager _taskManager;
+  private PinotHelixResourceManager _pinotHelixResourceManager;
+
+  private long _dataSmallestTimeMillis;
+  private long _dateSmallestDays;
+  private String _realtimeTableName;
+  private String _offlineTableName;
+
+  @Override
+  protected TableTaskConfig getTaskConfig() {
+    return new TableTaskConfig(
+        Collections.singletonMap(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, new HashMap<>()));
+  }
+
+  @Override
+  protected boolean useLlc() {
+    return true;
+  }
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    // Setup realtime table, and blank offline table
+    super.setUp();
+    addTableConfig(createOfflineTableConfig());
+    startMinion(null, null);
+
+    _helixTaskResourceManager = _controllerStarter.getHelixTaskResourceManager();
+    _taskManager = _controllerStarter.getTaskManager();
+    _pinotHelixResourceManager = _controllerStarter.getHelixResourceManager();
+
+    _realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(getTableName());
+    _offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
+
+    List<RealtimeSegmentZKMetadata> realtimeSegmentMetadata =
+        _pinotHelixResourceManager.getRealtimeSegmentMetadata(_realtimeTableName);
+    long minSegmentTime = Long.MAX_VALUE;
+    for (RealtimeSegmentZKMetadata metadata : realtimeSegmentMetadata) {
+      if (metadata.getStatus() == CommonConstants.Segment.Realtime.Status.DONE) {
+        if (metadata.getStartTime() < minSegmentTime) {
+          minSegmentTime = metadata.getStartTime();
+        }
+      }
+    }
+    _dataSmallestTimeMillis = minSegmentTime;
+    _dateSmallestDays = minSegmentTime / 86400000;
+  }
+
+  @Test
+  public void testRealtimeToOfflineSegmentsTask() {
+
+    List<OfflineSegmentZKMetadata> offlineSegmentMetadata =
+        _pinotHelixResourceManager.getOfflineSegmentMetadata(_offlineTableName);
+    Assert.assertTrue(offlineSegmentMetadata.isEmpty());
+
+    long expectedWatermark = _dataSmallestTimeMillis;
+    int numOfflineSegments = 0;
+    long offlineSegmentTime = _dateSmallestDays;
+    for (int i = 0; i < 3; i++) {
+      // Schedule task
+      Assert.assertTrue(
+          _taskManager.scheduleTasks().containsKey(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+      Assert.assertTrue(_helixTaskResourceManager.getTaskQueues().contains(
+          PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)));
+      // Should not generate more tasks
+      Assert.assertFalse(
+          _taskManager.scheduleTasks().containsKey(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+
+      expectedWatermark = expectedWatermark + 86400000;
+      // Wait at most 600 seconds for all tasks COMPLETED
+      waitForTaskToComplete(expectedWatermark);
+      // check segment is in offline
+      offlineSegmentMetadata = _pinotHelixResourceManager.getOfflineSegmentMetadata(_offlineTableName);
+      Assert.assertEquals(offlineSegmentMetadata.size(), ++numOfflineSegments);
+      Assert.assertEquals(offlineSegmentMetadata.get(i).getStartTime(), offlineSegmentTime);
+      Assert.assertEquals(offlineSegmentMetadata.get(i).getEndTime(), offlineSegmentTime);
+      offlineSegmentTime++;
+    }
+    testHardcodedSqlQueries();
+  }
+
+  private void waitForTaskToComplete(long expectedWatermark) {
+    TestUtils.waitForCondition(input -> {
+      // Check task state
+      for (TaskState taskState : _helixTaskResourceManager
+          .getTaskStates(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE).values()) {
+        if (taskState != TaskState.COMPLETED) {
+          return false;
+        }
+      }
+      return true;
+    }, 600_000L, "Failed to complete task");
+
+    // Check segment ZK metadata
+    RealtimeToOfflineSegmentsTaskMetadata minionTaskMetadata =
+        _taskManager.getClusterInfoAccessor().getMinionRealtimeToOfflineSegmentsTaskMetadata(_realtimeTableName);
+    Assert.assertNotNull(minionTaskMetadata);
+    Assert.assertEquals(minionTaskMetadata.getWatermarkMs(), expectedWatermark);
+  }
+
+  @Test(enabled = false)
+  public void testSegmentListApi() {
+  }
+
+  @Test(enabled = false)
+  public void testBrokerDebugOutput() {
+  }
+
+  @Test(enabled = false)
+  public void testBrokerDebugRoutingTableSQL() {
+  }
+
+  @Test(enabled = false)
+  public void testBrokerResponseMetadata() {
+  }
+
+  @Test(enabled = false)
+  public void testDictionaryBasedQueries() {
+  }
+
+  @Test(enabled = false)
+  public void testGeneratedQueriesWithMultiValues() {
+  }
+
+  @Test(enabled = false)
+  public void testGeneratedQueriesWithoutMultiValues() {
+  }
+
+  @Test(enabled = false)
+  public void testHardcodedQueries() {
+  }
+
+  @Test(enabled = false)
+  public void testHardcodedSqlQueries() {
+  }
+
+  @Test(enabled = false)
+  public void testInstanceShutdown() {
+  }
+
+  @Test(enabled = false)
+  public void testQueriesFromQueryFile() {
+  }
+
+  @Test(enabled = false)
+  public void testQueryExceptions() {
+  }
+
+  @Test(enabled = false)
+  public void testReload(boolean includeOfflineTable) {
+  }
+
+  @Test(enabled = false)
+  public void testSqlQueriesFromQueryFile() {
+  }
+
+  @Test(enabled = false)
+  public void testVirtualColumnQueries() {
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws Exception {
+    stopMinion();
+
+    super.tearDown();
+  }
+}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
index a0be8ed..5232b7a 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
@@ -27,7 +27,7 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.annotation.Nullable;
 import org.apache.helix.task.TaskState;
-import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
 import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
 import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
 import org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
@@ -94,7 +94,7 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
     _taskManager = _controllerStarter.getTaskManager();
 
     // Register the test task generator into task manager
-    _taskManager.registerTaskGenerator(new TestTaskGenerator(_taskManager.getClusterInfoProvider()));
+    _taskManager.registerTaskGenerator(new TestTaskGenerator(_taskManager.getClusterInfoAccessor()));
 
     Map<String, PinotTaskExecutorFactory> taskExecutorFactoryRegistry =
         Collections.singletonMap(TestTaskGenerator.TASK_TYPE, new TestTaskExecutorFactory());
@@ -199,10 +199,10 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
   private static class TestTaskGenerator implements PinotTaskGenerator {
     public static final String TASK_TYPE = "TestTask";
 
-    private final ClusterInfoProvider _clusterInfoProvider;
+    private final ClusterInfoAccessor _clusterInfoAccessor;
 
-    public TestTaskGenerator(ClusterInfoProvider clusterInfoProvider) {
-      _clusterInfoProvider = clusterInfoProvider;
+    public TestTaskGenerator(ClusterInfoAccessor clusterInfoAccessor) {
+      _clusterInfoAccessor = clusterInfoAccessor;
     }
 
     @Override
@@ -215,7 +215,7 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
       assertEquals(tableConfigs.size(), 2);
 
       // Generate at most 2 tasks
-      if (_clusterInfoProvider.getTaskStates(TASK_TYPE).size() >= 2) {
+      if (_clusterInfoAccessor.getTaskStates(TASK_TYPE).size() >= 2) {
         return Collections.emptyList();
       }
 
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java b/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java
index fd82dda..abb0788 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java
@@ -42,6 +42,7 @@ import org.apache.pinot.common.utils.ServiceStatus;
 import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
 import org.apache.pinot.minion.events.EventObserverFactoryRegistry;
 import org.apache.pinot.minion.events.MinionEventObserverFactory;
+import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
 import org.apache.pinot.minion.executor.PinotTaskExecutorFactory;
 import org.apache.pinot.minion.executor.TaskExecutorFactoryRegistry;
 import org.apache.pinot.minion.metrics.MinionMeter;
@@ -81,7 +82,8 @@ public class MinionStarter implements ServiceStartable {
             + CommonConstants.Minion.DEFAULT_HELIX_PORT);
     setupHelixSystemProperties();
     _helixManager = new ZKHelixManager(helixClusterName, _instanceId, InstanceType.PARTICIPANT, zkAddress);
-    _taskExecutorFactoryRegistry = new TaskExecutorFactoryRegistry();
+    MinionTaskZkMetadataManager minionTaskZkMetadataManager = new MinionTaskZkMetadataManager(_helixManager);
+    _taskExecutorFactoryRegistry = new TaskExecutorFactoryRegistry(minionTaskZkMetadataManager);
     _eventObserverFactoryRegistry = new EventObserverFactoryRegistry();
   }
 
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseMultipleSegmentsConversionExecutor.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseMultipleSegmentsConversionExecutor.java
index a17b10e..ee7be9e 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseMultipleSegmentsConversionExecutor.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseMultipleSegmentsConversionExecutor.java
@@ -64,9 +64,23 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
       File workingDir)
       throws Exception;
 
+  /**
+   * Pre processing operations to be done at the beginning of task execution
+   */
+  protected void preProcess(PinotTaskConfig pinotTaskConfig) {
+  }
+
+  /**
+   * Post processing operations to be done before exiting a successful task execution
+   */
+  protected void postProcess(PinotTaskConfig pinotTaskConfig) {
+  }
+
   @Override
   public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig)
       throws Exception {
+    preProcess(pinotTaskConfig);
+
     String taskType = pinotTaskConfig.getTaskType();
     Map<String, String> configs = pinotTaskConfig.getConfigs();
     String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
@@ -141,6 +155,7 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
 
       String outputSegmentNames = segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName)
           .collect(Collectors.joining(","));
+      postProcess(pinotTaskConfig);
       LOGGER
           .info("Done executing {} on table: {}, input segments: {}, output segments: {}", taskType, tableNameWithType,
               inputSegmentNames, outputSegmentNames);
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseTaskExecutor.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseTaskExecutor.java
index 619acc7..6875032 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseTaskExecutor.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseTaskExecutor.java
@@ -43,8 +43,7 @@ public abstract class BaseTaskExecutor implements PinotTaskExecutor {
   }
 
   protected Schema getSchema(String tableName) {
-    Schema schema =
-        ZKMetadataProvider.getTableSchema(MINION_CONTEXT.getHelixPropertyStore(), tableName);
+    Schema schema = ZKMetadataProvider.getTableSchema(MINION_CONTEXT.getHelixPropertyStore(), tableName);
     Preconditions.checkState(schema != null, "Failed to find schema for table: %s", tableName);
     return schema;
   }
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/MinionTaskZkMetadataManager.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/MinionTaskZkMetadataManager.java
new file mode 100644
index 0000000..29354b3
--- /dev/null
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/MinionTaskZkMetadataManager.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.minion.executor;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.ZNRecord;
+import org.apache.pinot.common.minion.MinionTaskMetadataUtils;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
+import org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask;
+
+
+/**
+ * An abstraction on top of {@link HelixManager}, created for the {@link PinotTaskExecutor}, restricted to only get/update minion task metadata
+ */
+public class MinionTaskZkMetadataManager {
+  private final HelixManager _helixManager;
+
+  public MinionTaskZkMetadataManager(HelixManager helixManager) {
+    _helixManager = helixManager;
+  }
+
+  /**
+   * Fetch the ZNRecord under MINION_TASK_METADATA/RealtimeToOfflineSegmentsTask for the given tableNameWithType
+   */
+  public ZNRecord getRealtimeToOfflineSegmentsTaskZNRecord(String tableNameWithType) {
+    return MinionTaskMetadataUtils
+        .fetchMinionTaskMetadataZNRecord(_helixManager.getHelixPropertyStore(), RealtimeToOfflineSegmentsTask.TASK_TYPE,
+            tableNameWithType);
+  }
+
+  /**
+   * Sets the {@link RealtimeToOfflineSegmentsTaskMetadata} into the ZNode at MINION_TASK_METADATA/RealtimeToOfflineSegmentsTask
+   * for the corresponding tableNameWithType
+   * @param expectedVersion Version expected to be updating, failing the call if there's a mismatch
+   */
+  public void setRealtimeToOfflineSegmentsTaskMetadata(
+      RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata, int expectedVersion) {
+    MinionTaskMetadataUtils.persistRealtimeToOfflineSegmentsTaskMetadata(_helixManager.getHelixPropertyStore(),
+        RealtimeToOfflineSegmentsTask.TASK_TYPE, realtimeToOfflineSegmentsTaskMetadata, expectedVersion);
+  }
+}
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutor.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutor.java
index 936f027..82cb2fe 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutor.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutor.java
@@ -28,6 +28,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.FileUtils;
+import org.apache.helix.ZNRecord;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
 import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.core.minion.PinotTaskConfig;
 import org.apache.pinot.core.segment.processing.collector.CollectorConfig;
@@ -46,25 +48,71 @@ import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.DateTimeFormatSpec;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 /**
  * A task to convert segments from a REALTIME table to segments for its corresponding OFFLINE table.
- * The realtime segments could span across multiple time windows. This task extracts data and creates segments for a configured time range.
+ * The realtime segments could span across multiple time windows.
+ * This task extracts data and creates segments for a configured time range.
  * The {@link SegmentProcessorFramework} is used for the segment conversion, which also does
- * 1. time column rollup
- * 2. time window extraction using filter function
+ * 1. time window extraction using filter function
+ * 2. time column rollup
  * 3. partitioning using table config's segmentPartitioningConfig
  * 4. aggregations and rollup
  * 5. data sorting
+ *
+ * Before beginning the task, the <code>watermarkMs</code> is checked in the minion task metadata ZNode,
+ * located at MINION_TASK_METADATA/RealtimeToOfflineSegmentsTask/<tableNameWithType>
+ * It should match the <code>windowStartMs</code>.
+ * The version of the znode is cached.
+ *
+ * After the segments are uploaded, this task updates the <code>watermarkMs</code> in the minion task metadata ZNode.
+ * The znode version is checked during update,
+ * and update only succeeds if version matches with the previously cached version
  */
 public class RealtimeToOfflineSegmentsTaskExecutor extends BaseMultipleSegmentsConversionExecutor {
   private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskExecutor.class);
   private static final String INPUT_SEGMENTS_DIR = "input_segments";
   private static final String OUTPUT_SEGMENTS_DIR = "output_segments";
 
+  private final MinionTaskZkMetadataManager _minionTaskZkMetadataManager;
+  private int _expectedVersion = Integer.MIN_VALUE;
+  private long _nextWatermark;
+
+  public RealtimeToOfflineSegmentsTaskExecutor(MinionTaskZkMetadataManager minionTaskZkMetadataManager) {
+    _minionTaskZkMetadataManager = minionTaskZkMetadataManager;
+  }
+
+  /**
+   * Fetches the RealtimeToOfflineSegmentsTask metadata ZNode for the realtime table.
+   * Checks that the <code>watermarkMs</code> from the ZNode matches the windowStartMs in the task configs.
+   * If yes, caches the ZNode version to check during update.
+   */
+  @Override
+  public void preProcess(PinotTaskConfig pinotTaskConfig) {
+    Map<String, String> configs = pinotTaskConfig.getConfigs();
+    String realtimeTableName = configs.get(MinionConstants.TABLE_NAME_KEY);
+
+    ZNRecord realtimeToOfflineSegmentsTaskZNRecord =
+        _minionTaskZkMetadataManager.getRealtimeToOfflineSegmentsTaskZNRecord(realtimeTableName);
+    Preconditions.checkState(realtimeToOfflineSegmentsTaskZNRecord != null,
+        "RealtimeToOfflineSegmentsTaskMetadata ZNRecord for table: %s should not be null. Exiting task.",
+        realtimeTableName);
+
+    RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata =
+        RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(realtimeToOfflineSegmentsTaskZNRecord);
+    long windowStartMs = Long.parseLong(configs.get(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY));
+    Preconditions.checkState(realtimeToOfflineSegmentsTaskMetadata.getWatermarkMs() == windowStartMs,
+        "watermarkMs in RealtimeToOfflineSegmentsTask metadata: %s does not match windowStartMs: %d in task configs for table: %s. "
+            + "ZNode may have been modified by another task", realtimeToOfflineSegmentsTaskMetadata, windowStartMs,
+        realtimeTableName);
+
+    _expectedVersion = realtimeToOfflineSegmentsTaskZNRecord.getVersion();
+  }
+
   @Override
   protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, List<File> originalIndexDirs,
       File workingDir)
@@ -74,19 +122,22 @@ public class RealtimeToOfflineSegmentsTaskExecutor extends BaseMultipleSegmentsC
     LOGGER.info("Starting task: {} with configs: {}", taskType, configs);
     long startMillis = System.currentTimeMillis();
 
-    String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY); // rawTableName_OFFLINE expected here
-    TableConfig tableConfig = getTableConfig(tableNameWithType);
-    Schema schema = getSchema(tableNameWithType);
+    String realtimeTableName = configs.get(MinionConstants.TABLE_NAME_KEY);
+    String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);
+    String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
+    TableConfig tableConfig = getTableConfig(offlineTableName);
+    Schema schema = getSchema(offlineTableName);
     Set<String> schemaColumns = schema.getPhysicalColumnNames();
     String timeColumn = tableConfig.getValidationConfig().getTimeColumnName();
     DateTimeFieldSpec dateTimeFieldSpec = schema.getSpecForTimeColumn(timeColumn);
     Preconditions
         .checkState(dateTimeFieldSpec != null, "No valid spec found for time column: %s in schema for table: %s",
-            timeColumn, tableNameWithType);
+            timeColumn, offlineTableName);
+
+    long windowStartMs = Long.parseLong(configs.get(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY));
+    long windowEndMs = Long.parseLong(configs.get(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY));
+    _nextWatermark = windowEndMs;
 
-    long windowStartMs =
-        Long.parseLong(configs.get(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MILLIS_KEY));
-    long windowEndMs = Long.parseLong(configs.get(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MILLIS_KEY));
     String timeColumnTransformFunction =
         configs.get(MinionConstants.RealtimeToOfflineSegmentsTask.TIME_COLUMN_TRANSFORM_FUNCTION_KEY);
     String collectorTypeStr = configs.get(MinionConstants.RealtimeToOfflineSegmentsTask.COLLECTOR_TYPE_KEY);
@@ -120,7 +171,7 @@ public class RealtimeToOfflineSegmentsTaskExecutor extends BaseMultipleSegmentsC
     if (tableConfig.getIndexingConfig().getSegmentPartitionConfig() != null) {
       Map<String, ColumnPartitionConfig> columnPartitionMap =
           tableConfig.getIndexingConfig().getSegmentPartitionConfig().getColumnPartitionMap();
-      PartitionerConfig partitionerConfig = getPartitionerConfig(columnPartitionMap, tableNameWithType, schemaColumns);
+      PartitionerConfig partitionerConfig = getPartitionerConfig(columnPartitionMap, offlineTableName, schemaColumns);
       segmentProcessorConfigBuilder.setPartitionerConfigs(Lists.newArrayList(partitionerConfig));
     }
 
@@ -162,12 +213,25 @@ public class RealtimeToOfflineSegmentsTaskExecutor extends BaseMultipleSegmentsC
     for (File file : outputSegmentsDir.listFiles()) {
       String outputSegmentName = file.getName();
       results.add(new SegmentConversionResult.Builder().setFile(file).setSegmentName(outputSegmentName)
-          .setTableNameWithType(tableNameWithType).build());
+          .setTableNameWithType(offlineTableName).build());
     }
     return results;
   }
 
   /**
+   * Fetches the RealtimeToOfflineSegmentsTask metadata ZNode for the realtime table.
+   * Checks that the version of the ZNode matches with the version cached earlier. If yes, proceeds to update watermark in the ZNode
+   * TODO: Making the minion task update the ZK metadata is an anti-pattern, however cannot see another way to do it
+   */
+  @Override
+  public void postProcess(PinotTaskConfig pinotTaskConfig) {
+    String realtimeTableName = pinotTaskConfig.getConfigs().get(MinionConstants.TABLE_NAME_KEY);
+    RealtimeToOfflineSegmentsTaskMetadata newMinionMetadata =
+        new RealtimeToOfflineSegmentsTaskMetadata(realtimeTableName, _nextWatermark);
+    _minionTaskZkMetadataManager.setRealtimeToOfflineSegmentsTaskMetadata(newMinionMetadata, _expectedVersion);
+  }
+
+  /**
    * Construct a {@link RecordTransformerConfig} for time column transformation
    */
   private RecordTransformerConfig getRecordTransformerConfigForTime(String timeColumnTransformFunction,
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutorFactory.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutorFactory.java
index b2db61f..7eabbc4 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutorFactory.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutorFactory.java
@@ -18,9 +18,19 @@
  */
 package org.apache.pinot.minion.executor;
 
+/**
+ * Factory for creating {@link RealtimeToOfflineSegmentsTaskExecutor} tasks
+ */
 public class RealtimeToOfflineSegmentsTaskExecutorFactory implements PinotTaskExecutorFactory {
+
+  private final MinionTaskZkMetadataManager _minionTaskZkMetadataManager;
+
+  public RealtimeToOfflineSegmentsTaskExecutorFactory(MinionTaskZkMetadataManager minionTaskZkMetadataManager) {
+    _minionTaskZkMetadataManager = minionTaskZkMetadataManager;
+  }
+
   @Override
   public PinotTaskExecutor create() {
-    return new RealtimeToOfflineSegmentsTaskExecutor();
+    return new RealtimeToOfflineSegmentsTaskExecutor(_minionTaskZkMetadataManager);
   }
 }
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
index bd28f79..1b783dc 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
@@ -31,13 +31,13 @@ import org.apache.pinot.core.common.MinionConstants;
 public class TaskExecutorFactoryRegistry {
   private final Map<String, PinotTaskExecutorFactory> _taskExecutorFactoryRegistry = new HashMap<>();
 
-  public TaskExecutorFactoryRegistry() {
+  public TaskExecutorFactoryRegistry(MinionTaskZkMetadataManager minionTaskZkMetadataManager) {
     registerTaskExecutorFactory(MinionConstants.ConvertToRawIndexTask.TASK_TYPE,
         new ConvertToRawIndexTaskExecutorFactory());
     registerTaskExecutorFactory(MinionConstants.PurgeTask.TASK_TYPE, new PurgeTaskExecutorFactory());
     registerTaskExecutorFactory(MinionConstants.MergeRollupTask.TASK_TYPE, new MergeRollupTaskExecutorFactory());
     registerTaskExecutorFactory(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
-        new RealtimeToOfflineSegmentsTaskExecutorFactory());
+        new RealtimeToOfflineSegmentsTaskExecutorFactory(minionTaskZkMetadataManager));
   }
 
   /**
diff --git a/pinot-minion/src/test/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutorTest.java b/pinot-minion/src/test/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutorTest.java
index 601c5e4..341f543 100644
--- a/pinot-minion/src/test/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutorTest.java
+++ b/pinot-minion/src/test/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutorTest.java
@@ -96,15 +96,16 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
         new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_WITH_PARTITIONING).setTimeColumnName(T)
             .setSegmentPartitionConfig(new SegmentPartitionConfig(columnPartitionConfigMap)).build();
     TableConfig tableConfigWithSortedCol =
-        new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_WITH_SORTED_COL).setTimeColumnName(T).setSortedColumn(D1)
-            .build();
+        new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_WITH_SORTED_COL).setTimeColumnName(T)
+            .setSortedColumn(D1).build();
     TableConfig tableConfigEpochHours =
-        new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_EPOCH_HOURS).setTimeColumnName(T_TRX).setSortedColumn(D1)
-            .setIngestionConfig(new IngestionConfig(null, Lists.newArrayList(new TransformConfig(T_TRX, "toEpochHours(t)"))))
-            .build();
+        new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_EPOCH_HOURS).setTimeColumnName(T_TRX)
+            .setSortedColumn(D1).setIngestionConfig(
+            new IngestionConfig(null, Lists.newArrayList(new TransformConfig(T_TRX, "toEpochHours(t)")))).build();
     TableConfig tableConfigSDF =
-        new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_SDF).setTimeColumnName(T_TRX).setSortedColumn(D1)
-            .setIngestionConfig(new IngestionConfig(null, Lists.newArrayList(new TransformConfig(T_TRX, "toDateTime(t, 'yyyyMMddHH')"))))
+        new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_SDF).setTimeColumnName(T_TRX)
+            .setSortedColumn(D1).setIngestionConfig(
+            new IngestionConfig(null, Lists.newArrayList(new TransformConfig(T_TRX, "toDateTime(t, 'yyyyMMddHH')"))))
             .build();
     Schema schema =
         new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension(D1, FieldSpec.DataType.STRING)
@@ -112,12 +113,12 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
             .addDateTime(T, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
     Schema schemaEpochHours =
         new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension(D1, FieldSpec.DataType.STRING)
-            .addMetric(M1, FieldSpec.DataType.INT).addDateTime(T_TRX, FieldSpec.DataType.INT, "1:HOURS:EPOCH", "1:HOURS")
-            .build();
+            .addMetric(M1, FieldSpec.DataType.INT)
+            .addDateTime(T_TRX, FieldSpec.DataType.INT, "1:HOURS:EPOCH", "1:HOURS").build();
     Schema schemaSDF =
         new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension(D1, FieldSpec.DataType.STRING)
-            .addMetric(M1, FieldSpec.DataType.INT).addDateTime(T_TRX, FieldSpec.DataType.INT, "1:HOURS:SIMPLE_DATE_FORMAT:yyyyMMddHH", "1:HOURS")
-            .build();
+            .addMetric(M1, FieldSpec.DataType.INT)
+            .addDateTime(T_TRX, FieldSpec.DataType.INT, "1:HOURS:SIMPLE_DATE_FORMAT:yyyyMMddHH", "1:HOURS").build();
 
     List<String> d1 = Lists.newArrayList("foo", "bar", "foo", "foo", "bar");
     List<List<GenericRow>> rows = new ArrayList<>(NUM_SEGMENTS);
@@ -213,12 +214,13 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
     FileUtils.deleteQuietly(WORKING_DIR);
 
     RealtimeToOfflineSegmentsTaskExecutor realtimeToOfflineSegmentsTaskExecutor =
-        new RealtimeToOfflineSegmentsTaskExecutor();
+        new RealtimeToOfflineSegmentsTaskExecutor(null);
     Map<String, String> configs = new HashMap<>();
     configs.put(MinionConstants.TABLE_NAME_KEY, "testTable_OFFLINE");
-    configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MILLIS_KEY, "1600473600000");
-    configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MILLIS_KEY, "1600560000000");
-    PinotTaskConfig pinotTaskConfig = new PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, configs);
+    configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY, "1600473600000");
+    configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY, "1600560000000");
+    PinotTaskConfig pinotTaskConfig =
+        new PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, configs);
 
     List<SegmentConversionResult> conversionResults =
         realtimeToOfflineSegmentsTaskExecutor.convert(pinotTaskConfig, _segmentIndexDirList, WORKING_DIR);
@@ -239,13 +241,14 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
     FileUtils.deleteQuietly(WORKING_DIR);
 
     RealtimeToOfflineSegmentsTaskExecutor realtimeToOfflineSegmentsTaskExecutor =
-        new RealtimeToOfflineSegmentsTaskExecutor();
+        new RealtimeToOfflineSegmentsTaskExecutor(null);
     Map<String, String> configs = new HashMap<>();
     configs.put(MinionConstants.TABLE_NAME_KEY, TABLE_NAME);
-    configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MILLIS_KEY, "1600473600000");
-    configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MILLIS_KEY, "1600560000000");
+    configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY, "1600473600000");
+    configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY, "1600560000000");
     configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.COLLECTOR_TYPE_KEY, "rollup");
-    PinotTaskConfig pinotTaskConfig = new PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, configs);
+    PinotTaskConfig pinotTaskConfig =
+        new PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, configs);
 
     List<SegmentConversionResult> conversionResults =
         realtimeToOfflineSegmentsTaskExecutor.convert(pinotTaskConfig, _segmentIndexDirList, WORKING_DIR);
@@ -266,14 +269,15 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
     FileUtils.deleteQuietly(WORKING_DIR);
 
     RealtimeToOfflineSegmentsTaskExecutor realtimeToOfflineSegmentsTaskExecutor =
-        new RealtimeToOfflineSegmentsTaskExecutor();
+        new RealtimeToOfflineSegmentsTaskExecutor(null);
     Map<String, String> configs = new HashMap<>();
     configs.put(MinionConstants.TABLE_NAME_KEY, TABLE_NAME);
-    configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MILLIS_KEY, "1600473600000");
-    configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MILLIS_KEY, "1600560000000");
+    configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY, "1600473600000");
+    configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY, "1600560000000");
     configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.COLLECTOR_TYPE_KEY, "rollup");
     configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.TIME_COLUMN_TRANSFORM_FUNCTION_KEY, "round(t, 86400000)");
-    PinotTaskConfig pinotTaskConfig = new PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, configs);
+    PinotTaskConfig pinotTaskConfig =
+        new PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, configs);
 
     List<SegmentConversionResult> conversionResults =
         realtimeToOfflineSegmentsTaskExecutor.convert(pinotTaskConfig, _segmentIndexDirList, WORKING_DIR);
@@ -294,15 +298,16 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
     FileUtils.deleteQuietly(WORKING_DIR);
 
     RealtimeToOfflineSegmentsTaskExecutor realtimeToOfflineSegmentsTaskExecutor =
-        new RealtimeToOfflineSegmentsTaskExecutor();
+        new RealtimeToOfflineSegmentsTaskExecutor(null);
     Map<String, String> configs = new HashMap<>();
     configs.put(MinionConstants.TABLE_NAME_KEY, TABLE_NAME);
-    configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MILLIS_KEY, "1600473600000");
-    configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MILLIS_KEY, "1600560000000");
+    configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY, "1600473600000");
+    configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY, "1600560000000");
     configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.TIME_COLUMN_TRANSFORM_FUNCTION_KEY, "round(t, 86400000)");
     configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.COLLECTOR_TYPE_KEY, "rollup");
     configs.put(M1 + MinionConstants.RealtimeToOfflineSegmentsTask.AGGREGATION_TYPE_KEY_SUFFIX, "max");
-    PinotTaskConfig pinotTaskConfig = new PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, configs);
+    PinotTaskConfig pinotTaskConfig =
+        new PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, configs);
 
     List<SegmentConversionResult> conversionResults =
         realtimeToOfflineSegmentsTaskExecutor.convert(pinotTaskConfig, _segmentIndexDirList, WORKING_DIR);
@@ -326,12 +331,13 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
     FileUtils.deleteQuietly(WORKING_DIR);
 
     RealtimeToOfflineSegmentsTaskExecutor realtimeToOfflineSegmentsTaskExecutor =
-        new RealtimeToOfflineSegmentsTaskExecutor();
+        new RealtimeToOfflineSegmentsTaskExecutor(null);
     Map<String, String> configs = new HashMap<>();
     configs.put(MinionConstants.TABLE_NAME_KEY, TABLE_NAME_WITH_PARTITIONING);
-    configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MILLIS_KEY, "1600468000000");
-    configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MILLIS_KEY, "1600617600000");
-    PinotTaskConfig pinotTaskConfig = new PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, configs);
+    configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY, "1600468000000");
+    configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY, "1600617600000");
+    PinotTaskConfig pinotTaskConfig =
+        new PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, configs);
 
     List<SegmentConversionResult> conversionResults =
         realtimeToOfflineSegmentsTaskExecutor.convert(pinotTaskConfig, _segmentIndexDirList, WORKING_DIR);
@@ -357,13 +363,14 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
     FileUtils.deleteQuietly(WORKING_DIR);
 
     RealtimeToOfflineSegmentsTaskExecutor realtimeToOfflineSegmentsTaskExecutor =
-        new RealtimeToOfflineSegmentsTaskExecutor();
+        new RealtimeToOfflineSegmentsTaskExecutor(null);
     Map<String, String> configs = new HashMap<>();
     configs.put(MinionConstants.TABLE_NAME_KEY, TABLE_NAME_WITH_SORTED_COL);
-    configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MILLIS_KEY, "1600473600000");
-    configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MILLIS_KEY, "1600560000000");
+    configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY, "1600473600000");
+    configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY, "1600560000000");
     configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.COLLECTOR_TYPE_KEY, "rollup");
-    PinotTaskConfig pinotTaskConfig = new PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, configs);
+    PinotTaskConfig pinotTaskConfig =
+        new PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, configs);
 
     List<SegmentConversionResult> conversionResults =
         realtimeToOfflineSegmentsTaskExecutor.convert(pinotTaskConfig, _segmentIndexDirList, WORKING_DIR);
@@ -384,13 +391,14 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
     FileUtils.deleteQuietly(WORKING_DIR);
 
     RealtimeToOfflineSegmentsTaskExecutor realtimeToOfflineSegmentsTaskExecutor =
-        new RealtimeToOfflineSegmentsTaskExecutor();
+        new RealtimeToOfflineSegmentsTaskExecutor(null);
     Map<String, String> configs = new HashMap<>();
     configs.put(MinionConstants.TABLE_NAME_KEY, TABLE_NAME_EPOCH_HOURS);
-    configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MILLIS_KEY, "1600473600000");
-    configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MILLIS_KEY, "1600560000000");
+    configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY, "1600473600000");
+    configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY, "1600560000000");
     configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.COLLECTOR_TYPE_KEY, "rollup");
-    PinotTaskConfig pinotTaskConfig = new PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, configs);
+    PinotTaskConfig pinotTaskConfig =
+        new PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, configs);
 
     List<SegmentConversionResult> conversionResults =
         realtimeToOfflineSegmentsTaskExecutor.convert(pinotTaskConfig, _segmentIndexDirListEpochHours, WORKING_DIR);
@@ -412,13 +420,14 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
     FileUtils.deleteQuietly(WORKING_DIR);
 
     RealtimeToOfflineSegmentsTaskExecutor realtimeToOfflineSegmentsTaskExecutor =
-        new RealtimeToOfflineSegmentsTaskExecutor();
+        new RealtimeToOfflineSegmentsTaskExecutor(null);
     Map<String, String> configs = new HashMap<>();
     configs.put(MinionConstants.TABLE_NAME_KEY, TABLE_NAME_SDF);
-    configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MILLIS_KEY, "1600473600000");
-    configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MILLIS_KEY, "1600560000000");
+    configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY, "1600473600000");
+    configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY, "1600560000000");
     configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.COLLECTOR_TYPE_KEY, "rollup");
-    PinotTaskConfig pinotTaskConfig = new PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, configs);
+    PinotTaskConfig pinotTaskConfig =
+        new PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, configs);
 
     List<SegmentConversionResult> conversionResults =
         realtimeToOfflineSegmentsTaskExecutor.convert(pinotTaskConfig, _segmentIndexDirListSDF, WORKING_DIR);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org