You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2020/10/08 22:30:52 UTC

[GitHub] [incubator-pinot] npawar opened a new pull request #6124: RealtimeToOfflineSegments task generator

npawar opened a new pull request #6124:
URL: https://github.com/apache/incubator-pinot/pull/6124


   ## Description
   https://github.com/apache/incubator-pinot/issues/5753
   Here's the final piece of the feature for Pinot managed offline flows.
   This is the TaskGenerator which will create tasks of `realtimeToOfflineSegmentsTask` type. 
   Typical usecase:
   You have setup a realtime table. You want to make it a hybrid table. Using this feature, you don't have to write your offline flows. Just set 
   ```
   "tableName": "myTable_REALTIME",
   "task": {
       "taskTypeConfigsMap": {
         "realtimeToOfflineSegmentsTask": {
         }
       }
     }
   ```
   in your table config. The minion tasks will push data to the offline table, 1 day at a time. 
   The window size, buffer, segment processing configs are configurable via table task config.
   
   
   ## Release Notes
   New feature for Pinot managed offline flows - a minion task which will automatically take data from the realtime table and push it to the offline table.
   
   ## Documentation
   WIP
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #6124: RealtimeToOfflineSegments task generator

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on a change in pull request #6124:
URL: https://github.com/apache/incubator-pinot/pull/6124#discussion_r508092308



##########
File path: pinot-common/src/main/java/org/apache/pinot/common/minion/MinionTaskMetadataUtils.java
##########
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.minion;
+
+import javax.annotation.Nullable;
+import org.I0Itec.zkclient.exception.ZkException;
+import org.apache.helix.AccessOption;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.HelixPropertyStore;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.zookeeper.data.Stat;
+
+
+/**
+ * Helper methods to fetch/persist ZNRecord for minion task metadata
+ */
+public final class MinionTaskMetadataUtils {
+
+  private MinionTaskMetadataUtils() {
+
+  }
+
+  /**
+   * Fetches the ZNRecord for the given minion task and tableName, from MINION_TASK_METADATA/taskName/tableNameWthType
+   */
+  @Nullable
+  public static ZNRecord fetchMinionTaskMetadataZNRecord(HelixPropertyStore<ZNRecord> propertyStore, String taskType,
+      String tableNameWithType) {
+    String path = ZKMetadataProvider.constructPropertyStorePathForMinionTaskMetadata(taskType, tableNameWithType);
+    Stat stat = new Stat();
+    ZNRecord znRecord = propertyStore.get(path, stat, AccessOption.PERSISTENT);
+    if (znRecord != null) {
+      znRecord.setVersion(stat.getVersion());
+    }
+    return znRecord;
+  }
+
+  /**
+   * Fetches the ZNRecord for realtimeToOfflineSegmentsTask for given tableNameWithType from MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/tableNameWthType
+   * and converts it to a {@link RealtimeToOfflineSegmentsTaskMetadata} object
+   */
+  @Nullable
+  public static RealtimeToOfflineSegmentsTaskMetadata getRealtimeToOfflineSegmentsTaskMetadata(
+      HelixPropertyStore<ZNRecord> propertyStore, String taskType, String tableNameWithType) {
+    ZNRecord znRecord = fetchMinionTaskMetadataZNRecord(propertyStore, taskType, tableNameWithType);
+    return znRecord != null ? RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(znRecord) : null;
+  }
+
+  /**
+   * Persists the provided {@link RealtimeToOfflineSegmentsTaskMetadata} to MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/tableNameWthType.
+   * Will fail if expectedVersion does not match.
+   * Set expectedVersion -1 to override version check.
+   */
+  public static void persistRealtimeToOfflineSegmentsTaskMetadata(HelixPropertyStore<ZNRecord> propertyStore,
+      String taskType, RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata,

Review comment:
       Do we need to provide `taskType` here? It is always `realtimeToOfflineSegmentsTask` right?

##########
File path: pinot-common/src/main/java/org/apache/pinot/common/minion/RealtimeToOfflineSegmentsTaskMetadata.java
##########
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.minion;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.helix.ZNRecord;
+import org.apache.pinot.spi.utils.JsonUtils;

Review comment:
       You can remove all the json-related stuff because this class does not need to be json compatible. It should always be constructed with `fromZNRecord()`

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterUpdater.java
##########
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion;
+
+import org.apache.pinot.common.minion.MinionTaskMetadataUtils;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
+import org.apache.pinot.core.common.MinionConstants;
+
+
+/**
+ * An abstraction on top of {@link PinotHelixResourceManager}, created for the {@link PinotTaskGenerator},
+ * with scope restricted to cluster updates.
+ * This also helps in separating read and update methods from the {@link ClusterInfoProvider}
+ */
+public class ClusterUpdater {

Review comment:
       After some reconsideration, I feel it might be easier to manage if we merge provider and updater to `ClusterInfoAccessor` in case we need update the metadata based on the current value in the future.
   
   Sorry that I was suggesting separating it out in the previous review.

##########
File path: pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseTaskExecutor.java
##########
@@ -43,9 +44,20 @@ protected TableConfig getTableConfig(String tableNameWithType) {
   }
 
   protected Schema getSchema(String tableName) {
-    Schema schema =
-        ZKMetadataProvider.getTableSchema(MINION_CONTEXT.getHelixPropertyStore(), tableName);
+    Schema schema = ZKMetadataProvider.getTableSchema(MINION_CONTEXT.getHelixPropertyStore(), tableName);
     Preconditions.checkState(schema != null, "Failed to find schema for table: %s", tableName);
     return schema;
   }
+
+  /**
+   * Pre processing operations to be done at the beginning of task execution
+   */
+  protected void preProcess(PinotTaskConfig pinotTaskConfig) {

Review comment:
       Move these 2 methods to the `BaseMultipleSegmentsConversionExecutor` where it is getting called. Usually we allow child class to override methods used in the parent class.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #6124: RealtimeToOfflineSegments task generator

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #6124:
URL: https://github.com/apache/incubator-pinot/pull/6124#discussion_r506777945



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
##########
@@ -62,16 +62,35 @@ private MinionConstants() {
     public static final String MERGED_SEGMENT_NAME_KEY = "mergedSegmentNameKey";
   }
 
+  /**
+   * Creates segments for the OFFLINE table, using completed segments from the corresponding REALTIME table
+   */
   public static class RealtimeToOfflineSegmentsTask {
     public static final String TASK_TYPE = "realtimeToOfflineSegmentsTask";
-    // window
+
+    /**
+     * The time window size for the task.
+     * e.g. if set to "1d", then task is scheduled to run for a 1 day window
+     */
+    public static final String BUCKET_TIME_PERIOD_KEY = "bucketTimePeriod";
+    /**
+     * The time period to wait before picking segments for this task
+     * e.g. if set to "2d", no task will be scheduled for a time window younger than 2 days
+     */
+    public static final String BUFFER_TIME_PERIOD_KEY = "bufferTimePeriod";
+    /**
+     * Config to manually provide start time for the very first task scheduled.
+     * In the absence of this config, the very first window start is calculated as min(start time of all completed segments)
+     */
+    public static final String START_TIME_MILLIS_KEY = "startTimeMillis";

Review comment:
       Probably not needed. Removed it




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] codecov-io commented on pull request #6124: RealtimeToOfflineSegments task generator

Posted by GitBox <gi...@apache.org>.
codecov-io commented on pull request #6124:
URL: https://github.com/apache/incubator-pinot/pull/6124#issuecomment-705909906


   # [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6124?src=pr&el=h1) Report
   > Merging [#6124](https://codecov.io/gh/apache/incubator-pinot/pull/6124?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-pinot/commit/1beaab59b73f26c4e35f3b9bc856b03806cddf5a?el=desc) will **decrease** coverage by `3.33%`.
   > The diff coverage is `38.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-pinot/pull/6124/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz)](https://codecov.io/gh/apache/incubator-pinot/pull/6124?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #6124      +/-   ##
   ==========================================
   - Coverage   66.44%   63.11%   -3.34%     
   ==========================================
     Files        1075     1233     +158     
     Lines       54773    60211    +5438     
     Branches     8168     8828     +660     
   ==========================================
   + Hits        36396    38003    +1607     
   - Misses      15700    19393    +3693     
   - Partials     2677     2815     +138     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | #unittests | `63.11% <38.00%> (?)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-pinot/pull/6124?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...e/pinot/broker/api/resources/PinotBrokerDebug.java](https://codecov.io/gh/apache/incubator-pinot/pull/6124/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYXBpL3Jlc291cmNlcy9QaW5vdEJyb2tlckRlYnVnLmphdmE=) | `0.00% <0.00%> (-79.32%)` | :arrow_down: |
   | [...ot/broker/broker/AllowAllAccessControlFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/6124/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL0FsbG93QWxsQWNjZXNzQ29udHJvbEZhY3RvcnkuamF2YQ==) | `71.42% <ø> (-28.58%)` | :arrow_down: |
   | [.../helix/BrokerUserDefinedMessageHandlerFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/6124/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL2hlbGl4L0Jyb2tlclVzZXJEZWZpbmVkTWVzc2FnZUhhbmRsZXJGYWN0b3J5LmphdmE=) | `33.96% <0.00%> (-32.71%)` | :arrow_down: |
   | [...ava/org/apache/pinot/client/AbstractResultSet.java](https://codecov.io/gh/apache/incubator-pinot/pull/6124/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0Fic3RyYWN0UmVzdWx0U2V0LmphdmE=) | `53.33% <0.00%> (-3.81%)` | :arrow_down: |
   | [.../main/java/org/apache/pinot/client/Connection.java](https://codecov.io/gh/apache/incubator-pinot/pull/6124/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0Nvbm5lY3Rpb24uamF2YQ==) | `35.55% <0.00%> (-13.29%)` | :arrow_down: |
   | [...inot/client/JsonAsyncHttpPinotClientTransport.java](https://codecov.io/gh/apache/incubator-pinot/pull/6124/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0pzb25Bc3luY0h0dHBQaW5vdENsaWVudFRyYW5zcG9ydC5qYXZh) | `10.90% <0.00%> (-51.10%)` | :arrow_down: |
   | [.../org/apache/pinot/client/ResultTableResultSet.java](https://codecov.io/gh/apache/incubator-pinot/pull/6124/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L1Jlc3VsdFRhYmxlUmVzdWx0U2V0LmphdmE=) | `0.00% <0.00%> (-34.29%)` | :arrow_down: |
   | [...not/common/assignment/InstancePartitionsUtils.java](https://codecov.io/gh/apache/incubator-pinot/pull/6124/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vYXNzaWdubWVudC9JbnN0YW5jZVBhcnRpdGlvbnNVdGlscy5qYXZh) | `73.80% <ø> (+0.63%)` | :arrow_up: |
   | [.../apache/pinot/common/exception/QueryException.java](https://codecov.io/gh/apache/incubator-pinot/pull/6124/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vZXhjZXB0aW9uL1F1ZXJ5RXhjZXB0aW9uLmphdmE=) | `90.27% <ø> (+5.55%)` | :arrow_up: |
   | [...pinot/common/function/AggregationFunctionType.java](https://codecov.io/gh/apache/incubator-pinot/pull/6124/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vZnVuY3Rpb24vQWdncmVnYXRpb25GdW5jdGlvblR5cGUuamF2YQ==) | `98.27% <ø> (-1.73%)` | :arrow_down: |
   | ... and [1057 more](https://codecov.io/gh/apache/incubator-pinot/pull/6124/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6124?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6124?src=pr&el=footer). Last update [0e1d458...bcf0fc4](https://codecov.io/gh/apache/incubator-pinot/pull/6124?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #6124: RealtimeToOfflineSegments task generator

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #6124:
URL: https://github.com/apache/incubator-pinot/pull/6124#discussion_r506777684



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
##########
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion.generator;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
+import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
+import org.apache.pinot.common.utils.CommonConstants.Segment;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A {@link PinotTaskGenerator} implementation for generating tasks of type {@link RealtimeToOfflineSegmentsTask}
+ *
+ * These will be generated only for REALTIME tables.
+ * At any given time, only 1 task of this type should be generated for a table.
+ *
+ * Steps:
+ *  - The watermarkMillis is read from the {@link RealtimeToOfflineSegmentsTaskMetadata} ZNode found at MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/tableNameWithType
+ *      In case of cold-start, no ZNode will exist.
+ *      A new ZNode will be created, with watermarkMillis as the smallest time found in the COMPLETED segments (or using start time config)
+ *  - The execution window for the task is calculated as, windowStartMillis = waterMarkMillis, windowEndMillis = windowStartMillis + bucketTimeMillis,
+ *      where bucketTime can be provided in the taskConfigs (default 1d)
+ *  - If the execution window is not older than bufferTimeMillis, no task will be generated,
+ *      where bufferTime can be provided in the taskConfigs (default 2d)
+ *  - Segment metadata is scanned for all COMPLETED segments, to pick those containing data in window [windowStartMillis, windowEndMillis)
+ *  - A PinotTaskConfig is created, with segment information, execution window, and any config specific to the task
+ */
+public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerator {
+  private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskGenerator.class);
+
+  private static final String DEFAULT_BUCKET_PERIOD = "1d";
+  private static final String DEFAULT_BUFFER_PERIOD = "2d";
+
+  private final ClusterInfoProvider _clusterInfoProvider;
+
+  public RealtimeToOfflineSegmentsTaskGenerator(ClusterInfoProvider clusterInfoProvider) {
+    _clusterInfoProvider = clusterInfoProvider;
+  }
+
+  @Override
+  public String getTaskType() {
+    return RealtimeToOfflineSegmentsTask.TASK_TYPE;
+  }
+
+  @Override
+  public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
+    String taskType = RealtimeToOfflineSegmentsTask.TASK_TYPE;
+
+    List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+
+    for (TableConfig tableConfig : tableConfigs) {
+      String tableName = tableConfig.getTableName();
+
+      if (tableConfig.getTableType() != TableType.REALTIME) {
+        LOGGER.warn("Skip generating task: {} for non-REALTIME table: {}", taskType, tableName);
+        continue;
+      }
+
+      String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+      String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
+
+      // Only schedule 1 task of this type, per table
+      Map<String, TaskState> nonCompletedTasks =
+          TaskGeneratorUtils.getNonCompletedTasks(taskType, realtimeTableName, _clusterInfoProvider);
+      if (!nonCompletedTasks.isEmpty()) {
+        LOGGER.warn("Found non-completed tasks: {} for same table: {}. Skipping scheduling new task.",
+            nonCompletedTasks.keySet(), realtimeTableName);
+        continue;
+      }
+
+      List<LLCRealtimeSegmentZKMetadata> realtimeSegmentsMetadataList =
+          _clusterInfoProvider.getLLCRealtimeSegmentsMetadata(realtimeTableName);
+      List<LLCRealtimeSegmentZKMetadata> completedSegmentsMetadataList = new ArrayList<>();
+      for (LLCRealtimeSegmentZKMetadata metadata : realtimeSegmentsMetadataList) {
+        if (metadata.getStatus().equals(Segment.Realtime.Status.DONE)) {
+          completedSegmentsMetadataList.add(metadata);
+        }
+      }
+      if (completedSegmentsMetadataList.isEmpty()) {
+        LOGGER
+            .info("No realtime completed segments found for table: {}, skipping task generation: {}", realtimeTableName,
+                taskType);
+        continue;
+      }
+
+      TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
+      Preconditions.checkState(tableTaskConfig != null);
+      Map<String, String> taskConfigs = tableTaskConfig.getConfigsForTaskType(taskType);
+      Preconditions.checkState(taskConfigs != null, "Task config shouldn't be null for Table: {}", tableName);
+
+      // Get the bucket size and buffer
+      String bucketTimeStr =
+          taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUCKET_TIME_PERIOD_KEY, DEFAULT_BUCKET_PERIOD);
+      String bufferTimeStr =
+          taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUFFER_TIME_PERIOD_KEY, DEFAULT_BUFFER_PERIOD);
+      long bucketMillis = TimeUtils.convertPeriodToMillis(bucketTimeStr);
+      long bufferMillis = TimeUtils.convertPeriodToMillis(bufferTimeStr);
+
+      // Fetch RealtimeToOfflineSegmentsTaskMetadata ZNode for reading watermark
+      RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata =
+          _clusterInfoProvider.getMinionRealtimeToOfflineSegmentsTaskMetadata(realtimeTableName);
+
+      if (realtimeToOfflineSegmentsTaskMetadata == null) {
+        // No ZNode exists. Cold-start.
+        long watermarkMillis;
+
+        String startTimeStr = taskConfigs.get(RealtimeToOfflineSegmentsTask.START_TIME_MILLIS_KEY);
+        if (startTimeStr != null) {
+          // Use startTime config if provided in taskConfigs
+          watermarkMillis = Long.parseLong(startTimeStr);
+        } else {
+          // Find the smallest time from all segments
+          RealtimeSegmentZKMetadata minSegmentZkMetadata = null;
+          for (LLCRealtimeSegmentZKMetadata realtimeSegmentZKMetadata : completedSegmentsMetadataList) {
+            if (minSegmentZkMetadata == null || realtimeSegmentZKMetadata.getStartTime() < minSegmentZkMetadata
+                .getStartTime()) {
+              minSegmentZkMetadata = realtimeSegmentZKMetadata;
+            }
+          }
+          Preconditions.checkState(minSegmentZkMetadata != null);
+
+          // Convert the segment minTime to millis
+          long minSegmentStartTimeMillis =
+              minSegmentZkMetadata.getTimeUnit().toMillis(minSegmentZkMetadata.getStartTime());
+
+          // Round off according to the bucket. This ensures we align the offline segments to proper time boundaries
+          // For example, if start time millis is 20200813T12:34:59, we want to create the first segment for window [20200813, 20200814)
+          watermarkMillis = (minSegmentStartTimeMillis / bucketMillis) * bucketMillis;
+        }
+
+        // Create RealtimeToOfflineSegmentsTaskMetadata ZNode using watermark calculated above
+        realtimeToOfflineSegmentsTaskMetadata =
+            new RealtimeToOfflineSegmentsTaskMetadata(realtimeTableName, watermarkMillis);
+        _clusterInfoProvider.setRealtimeToOfflineSegmentsTaskMetadata(realtimeToOfflineSegmentsTaskMetadata);
+      }
+
+      // WindowStart = watermark. WindowEnd = windowStart + bucket.
+      long windowStartMillis = realtimeToOfflineSegmentsTaskMetadata.getWatermarkMillis();
+      long windowEndMillis = windowStartMillis + bucketMillis;
+
+      // Check that execution window is older than bufferTime
+      if (windowEndMillis > System.currentTimeMillis() - bufferMillis) {
+        LOGGER.info(
+            "Window with start: {} and end: {} is not older than buffer time: {} configured as {} ago. Skipping scheduling task: {}",
+            windowStartMillis, windowEndMillis, bufferMillis, bufferTimeStr, taskType);
+      }
+
+      // Find all COMPLETED segments with data overlapping execution window: windowStart (inclusive) to windowEnd (exclusive)
+      List<String> segmentNames = new ArrayList<>();

Review comment:
       I did not follow. these lists are for setting comma separated segments and downloadURL into the task configs




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #6124: RealtimeToOfflineSegments task generator

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #6124:
URL: https://github.com/apache/incubator-pinot/pull/6124#discussion_r506778177



##########
File path: pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
##########
@@ -275,7 +275,7 @@ public void segmentMapperTest(String mapperId, SegmentMapperConfig segmentMapper
     SegmentMapperConfig config11 = new SegmentMapperConfig(_pinotSchema,
         new RecordTransformerConfig.Builder().setTransformFunctionsMap(transformFunctionMap).build(),
         new RecordFilterConfig.Builder().setRecordFilterType(RecordFilterFactory.RecordFilterType.FILTER_FUNCTION)
-            .setFilterFunction("Groovy({timeValue != 1597795200000}, timeValue)").build(), Lists.newArrayList(
+            .setFilterFunction("Groovy({timeValue < 1597795200000L|| timeValue >= 1597881600000}, timeValue)").build(), Lists.newArrayList(

Review comment:
       This is due to changing filtering to be before transform.
   Moved filtering to before transform, because I realized while testing, that is transform happens before, user has to be aware of resulting values and apply filter accordingly.
   I feel filter then transform makes more sense.
   If someone wants filter to apply to transformed column, they can include the transformation in the filter function




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #6124: RealtimeToOfflineSegments task generator

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #6124:
URL: https://github.com/apache/incubator-pinot/pull/6124#discussion_r506672510



##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
##########
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
+import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
+import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+/**
+ * Integration test for minion task of type "realtimeToOfflineSegmentsTask"
+ * With every task run, a new segment is created in the offline table for 1 day. Watermark also keeps progressing accordingly.
+ */
+public class RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends RealtimeClusterIntegrationTest {
+
+  private PinotHelixTaskResourceManager _helixTaskResourceManager;
+  private PinotTaskManager _taskManager;
+  private PinotHelixResourceManager _pinotHelixResourceManager;
+
+  private long _dataSmallestTimeMillis;
+  private long _dateSmallestDays;
+  private String _realtimeTableName;
+  private String _offlineTableName;
+
+  @Override
+  protected TableTaskConfig getTaskConfig() {
+    return new TableTaskConfig(
+        Collections.singletonMap(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, new HashMap<>()));
+  }
+
+  @Override
+  protected boolean useLlc() {
+    return true;
+  }
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    // Setup realtime table, and blank offline table
+    super.setUp();
+    addTableConfig(createOfflineTableConfig());
+    startMinion(null, null);
+
+    _helixTaskResourceManager = _controllerStarter.getHelixTaskResourceManager();
+    _taskManager = _controllerStarter.getTaskManager();
+    _pinotHelixResourceManager = _controllerStarter.getHelixResourceManager();
+
+    _realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(getTableName());
+    _offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
+
+    List<RealtimeSegmentZKMetadata> realtimeSegmentMetadata =
+        _pinotHelixResourceManager.getRealtimeSegmentMetadata(_realtimeTableName);
+    long minSegmentTime = Long.MAX_VALUE;
+    for (RealtimeSegmentZKMetadata metadata : realtimeSegmentMetadata) {
+      if (metadata.getStatus() == CommonConstants.Segment.Realtime.Status.DONE) {
+        if (metadata.getStartTime() < minSegmentTime) {
+          minSegmentTime = metadata.getStartTime();
+        }
+      }
+    }
+    _dataSmallestTimeMillis = minSegmentTime;
+    _dateSmallestDays = minSegmentTime / 86400000;
+  }
+
+  @Test
+  public void testRealtimeToOfflineSegmentsTask() {
+
+    List<OfflineSegmentZKMetadata> offlineSegmentMetadata =
+        _pinotHelixResourceManager.getOfflineSegmentMetadata(_offlineTableName);
+    Assert.assertTrue(offlineSegmentMetadata.isEmpty());
+
+    long expectedWatermark = _dataSmallestTimeMillis;
+    int numOfflineSegments = 0;
+    long offlineSegmentTime = _dateSmallestDays;
+    for (int i = 0; i < 3; i ++) {
+      // Schedule task
+      Assert
+          .assertTrue(_taskManager.scheduleTasks().containsKey(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+      Assert.assertTrue(_helixTaskResourceManager.getTaskQueues().contains(
+          PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)));
+      // Should not generate more tasks
+      Assert
+          .assertFalse(_taskManager.scheduleTasks().containsKey(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+
+      expectedWatermark = expectedWatermark + 86400000;
+      // Wait at most 600 seconds for all tasks COMPLETED
+      waitForTaskToComplete(expectedWatermark);
+      // check segment is in offline
+      offlineSegmentMetadata = _pinotHelixResourceManager.getOfflineSegmentMetadata(_offlineTableName);
+      Assert.assertEquals(offlineSegmentMetadata.size(), ++numOfflineSegments);
+      Assert.assertEquals(offlineSegmentMetadata.get(i).getStartTime(), offlineSegmentTime);
+      Assert.assertEquals(offlineSegmentMetadata.get(i).getEndTime(), offlineSegmentTime);
+      offlineSegmentTime ++;
+    }
+    testHardcodedSqlQueries();
+  }
+
+  private void waitForTaskToComplete(long expectedWatermark) {
+    TestUtils.waitForCondition(input -> {
+      // Check task state
+      for (TaskState taskState : _helixTaskResourceManager
+          .getTaskStates(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE).values()) {
+        if (taskState != TaskState.COMPLETED) {
+          return false;
+        }
+      }
+      return true;
+    }, 600_000L, "Failed to complete task");
+
+    // Check segment ZK metadata
+    RealtimeToOfflineSegmentsTaskMetadata minionTaskMetadata =
+        _taskManager.getClusterInfoProvider().getMinionRealtimeToOfflineSegmentsTaskMetadata(_realtimeTableName);
+    Assert.assertNotNull(minionTaskMetadata);
+    Assert.assertEquals(minionTaskMetadata.getWatermarkMillis(), expectedWatermark);
+  }
+
+  @Test(enabled = false)
+  public void testSegmentListApi() {
+  }
+
+  @Test(enabled = false)
+  public void testBrokerDebugOutput() {
+  }
+
+  @Test(enabled = false)
+  public void testBrokerDebugRoutingTableSQL() {
+  }
+
+  @Test(enabled = false)
+  public void testBrokerResponseMetadata() {
+  }
+
+  @Test(enabled = false)
+  public void testDictionaryBasedQueries() {
+  }
+
+  @Test(enabled = false)
+  public void testGeneratedQueriesWithMultiValues() {
+  }
+
+  @Test(enabled = false)
+  public void testGeneratedQueriesWithoutMultiValues() {
+  }
+
+  @Test(enabled = false)
+  public void testHardcodedQueries() {
+  }
+
+  @Test(enabled = false)
+  public void testHardcodedSqlQueries() {
+  }
+
+  @Test(enabled = false)
+  public void testInstanceShutdown() {
+  }
+
+  @Test(enabled = false)
+  public void testQueriesFromQueryFile() {
+  }
+
+  @Test(enabled = false)
+  public void testQueryExceptions() {
+  }
+
+  @Test(enabled = false)
+  public void testReload(boolean includeOfflineTable) {

Review comment:
       While I was trying to disable tests by following this class, I found that `testReload` still gets executed. You need to the following:
   ```suggestion
     public void testReload() {
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #6124: RealtimeToOfflineSegments task generator

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #6124:
URL: https://github.com/apache/incubator-pinot/pull/6124#discussion_r505190192



##########
File path: pinot-common/src/main/java/org/apache/pinot/common/minion/RealtimeToOfflineSegmentsTaskMetadata.java
##########
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.minion;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.helix.ZNRecord;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+
+/**
+ * Metadata for the minion task of type <code>realtimeToOfflineSegmentsTask</code>.
+ * The <code>watermarkMillis</code> denotes the time (exclusive) upto which tasks have been executed.
+ *
+ * This gets serialized and stored in zookeeper under the path MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/tableNameWithType
+ *
+ * PinotTaskGenerator:
+ * The <code>watermarkMillis</code>> is used by the <code>RealtimeToOfflineSegmentsTaskGenerator</code>,
+ * to determine the window of execution for the task it is generating.
+ * The window of execution will be [watermarkMillis, watermarkMillis + bucketSize)
+ *
+ * PinotTaskExecutor:
+ * The same watermark is used by the <code>RealtimeToOfflineSegmentsTaskExecutor</code>, to:
+ * - Verify that is is running the latest task scheduled by the task generator
+ * - Update the watermark as the end of the window that it executed for
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class RealtimeToOfflineSegmentsTaskMetadata {
+
+  private static final String WATERMARK_KEY = "watermarkMillis";

Review comment:
       `watermarkInMillis`? 

##########
File path: pinot-common/src/test/java/org/apache/pinot/common/metadata/RealtimeToOfflineSegmentsTaskMetadataTest.java
##########
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.metadata;
+
+import org.apache.helix.ZNRecord;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+/**
+ * Tests for converting to and from ZNRecord to {@link RealtimeToOfflineSegmentsTaskMetadata}
+ */
+public class RealtimeToOfflineSegmentsTaskMetadataTest {
+
+  @Test
+  public void testToFromZNRecord() {
+

Review comment:
       remove line

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
##########
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion.generator;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
+import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
+import org.apache.pinot.common.utils.CommonConstants.Segment;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A {@link PinotTaskGenerator} implementation for generating tasks of type {@link RealtimeToOfflineSegmentsTask}
+ *
+ * These will be generated only for REALTIME tables.
+ * At any given time, only 1 task of this type should be generated for a table.
+ *
+ * Steps:
+ *  - The watermarkMillis is read from the {@link RealtimeToOfflineSegmentsTaskMetadata} ZNode found at MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/tableNameWithType
+ *      In case of cold-start, no ZNode will exist.
+ *      A new ZNode will be created, with watermarkMillis as the smallest time found in the COMPLETED segments (or using start time config)
+ *  - The execution window for the task is calculated as, windowStartMillis = waterMarkMillis, windowEndMillis = windowStartMillis + bucketTimeMillis,
+ *      where bucketTime can be provided in the taskConfigs (default 1d)
+ *  - If the execution window is not older than bufferTimeMillis, no task will be generated,
+ *      where bufferTime can be provided in the taskConfigs (default 2d)
+ *  - Segment metadata is scanned for all COMPLETED segments, to pick those containing data in window [windowStartMillis, windowEndMillis)
+ *  - A PinotTaskConfig is created, with segment information, execution window, and any config specific to the task
+ */
+public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerator {
+  private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskGenerator.class);
+
+  private static final String DEFAULT_BUCKET_PERIOD = "1d";
+  private static final String DEFAULT_BUFFER_PERIOD = "2d";
+
+  private final ClusterInfoProvider _clusterInfoProvider;
+
+  public RealtimeToOfflineSegmentsTaskGenerator(ClusterInfoProvider clusterInfoProvider) {
+    _clusterInfoProvider = clusterInfoProvider;
+  }
+
+  @Override
+  public String getTaskType() {
+    return RealtimeToOfflineSegmentsTask.TASK_TYPE;
+  }
+
+  @Override
+  public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
+    String taskType = RealtimeToOfflineSegmentsTask.TASK_TYPE;
+    List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+
+    for (TableConfig tableConfig : tableConfigs) {
+      String tableName = tableConfig.getTableName();
+
+      if (tableConfig.getTableType() != TableType.REALTIME) {
+        LOGGER.warn("Skip generating task: {} for non-REALTIME table: {}", taskType, tableName);
+        continue;
+      }
+      LOGGER.info("Start generating task configs for table: {} for task: {}", tableName, taskType);
+
+      String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+      String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
+
+      // Only schedule 1 task of this type, per table
+      Map<String, TaskState> nonCompletedTasks =
+          TaskGeneratorUtils.getNonCompletedTasks(taskType, realtimeTableName, _clusterInfoProvider);
+      if (!nonCompletedTasks.isEmpty()) {
+        LOGGER.warn("Found non-completed tasks: {} for same table: {}. Skipping task generation.",
+            nonCompletedTasks.keySet(), realtimeTableName);
+        continue;
+      }
+
+      // Get all completed segment metadata.
+      List<LLCRealtimeSegmentZKMetadata> completedSegmentsMetadata = new ArrayList<>();

Review comment:
       In my opinion, `completedSegments` is a little bit confusing because of `nonCompletedTasks`. At first glance, I thought `completedSegments` refer to the list of segments whose realtimeToOffline task is complete. However, they refer to the list of segment with `COMPLETED` state (i.e. committed segments). I don't think that we need to change the variable names but can you add a little bit more information on this as part of the comment? 
   
   

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
##########
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion.generator;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
+import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
+import org.apache.pinot.common.utils.CommonConstants.Segment;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A {@link PinotTaskGenerator} implementation for generating tasks of type {@link RealtimeToOfflineSegmentsTask}
+ *
+ * These will be generated only for REALTIME tables.
+ * At any given time, only 1 task of this type should be generated for a table.
+ *
+ * Steps:
+ *  - The watermarkMillis is read from the {@link RealtimeToOfflineSegmentsTaskMetadata} ZNode found at MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/tableNameWithType
+ *      In case of cold-start, no ZNode will exist.
+ *      A new ZNode will be created, with watermarkMillis as the smallest time found in the COMPLETED segments (or using start time config)
+ *  - The execution window for the task is calculated as, windowStartMillis = waterMarkMillis, windowEndMillis = windowStartMillis + bucketTimeMillis,
+ *      where bucketTime can be provided in the taskConfigs (default 1d)
+ *  - If the execution window is not older than bufferTimeMillis, no task will be generated,
+ *      where bufferTime can be provided in the taskConfigs (default 2d)
+ *  - Segment metadata is scanned for all COMPLETED segments, to pick those containing data in window [windowStartMillis, windowEndMillis)
+ *  - A PinotTaskConfig is created, with segment information, execution window, and any config specific to the task
+ */
+public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerator {
+  private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskGenerator.class);
+
+  private static final String DEFAULT_BUCKET_PERIOD = "1d";
+  private static final String DEFAULT_BUFFER_PERIOD = "2d";
+
+  private final ClusterInfoProvider _clusterInfoProvider;
+
+  public RealtimeToOfflineSegmentsTaskGenerator(ClusterInfoProvider clusterInfoProvider) {
+    _clusterInfoProvider = clusterInfoProvider;
+  }
+
+  @Override
+  public String getTaskType() {
+    return RealtimeToOfflineSegmentsTask.TASK_TYPE;
+  }
+
+  @Override
+  public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
+    String taskType = RealtimeToOfflineSegmentsTask.TASK_TYPE;
+    List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+
+    for (TableConfig tableConfig : tableConfigs) {
+      String tableName = tableConfig.getTableName();
+
+      if (tableConfig.getTableType() != TableType.REALTIME) {
+        LOGGER.warn("Skip generating task: {} for non-REALTIME table: {}", taskType, tableName);
+        continue;
+      }
+      LOGGER.info("Start generating task configs for table: {} for task: {}", tableName, taskType);
+
+      String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+      String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
+
+      // Only schedule 1 task of this type, per table
+      Map<String, TaskState> nonCompletedTasks =

Review comment:
       What is the recovery mechanism for tasks that are stuck? If the job is not completed after 1 day and the task is not complete, shouldn't we re-schedule the job?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #6124: RealtimeToOfflineSegments task generator

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #6124:
URL: https://github.com/apache/incubator-pinot/pull/6124#discussion_r506777846



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoProvider.java
##########
@@ -93,6 +97,38 @@ public Schema getTableSchema(String tableName) {
         .getRealtimeSegmentZKMetadataListForTable(_pinotHelixResourceManager.getPropertyStore(), tableName);
   }
 
+  /**
+   * Get all segment metadata for the given lowlevel REALTIME table name.
+   *
+   * @param tableName Table name with or without REALTIME type suffix
+   * @return List of segment metadata
+   */
+  public List<LLCRealtimeSegmentZKMetadata> getLLCRealtimeSegmentsMetadata(String tableName) {
+    return ZKMetadataProvider
+        .getLLCRealtimeSegmentZKMetadataListForTable(_pinotHelixResourceManager.getPropertyStore(), tableName);
+  }
+
+  /**
+   * Fetches the {@link RealtimeToOfflineSegmentsTaskMetadata} from MINION_TASK_METADATA for given realtime table
+   * @param tableNameWithType realtime table name
+   */
+  public RealtimeToOfflineSegmentsTaskMetadata getMinionRealtimeToOfflineSegmentsTaskMetadata(
+      String tableNameWithType) {
+    return MinionTaskMetadataUtils
+        .getRealtimeToOfflineSegmentsTaskMetadata(_pinotHelixResourceManager.getPropertyStore(),
+            MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, tableNameWithType);
+  }
+
+  /**
+   * Sets the {@link RealtimeToOfflineSegmentsTaskMetadata} into MINION_TASK_METADATA
+   * This call will override any previous metadata node
+   */
+  public void setRealtimeToOfflineSegmentsTaskMetadata(

Review comment:
       Created a parallel class, ClusterUpdater, and moved the updater method into that. Lmk if that approach is fine




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #6124: RealtimeToOfflineSegments task generator

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #6124:
URL: https://github.com/apache/incubator-pinot/pull/6124#discussion_r506777780



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
##########
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion.generator;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
+import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
+import org.apache.pinot.common.utils.CommonConstants.Segment;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A {@link PinotTaskGenerator} implementation for generating tasks of type {@link RealtimeToOfflineSegmentsTask}
+ *
+ * These will be generated only for REALTIME tables.
+ * At any given time, only 1 task of this type should be generated for a table.
+ *
+ * Steps:
+ *  - The watermarkMillis is read from the {@link RealtimeToOfflineSegmentsTaskMetadata} ZNode found at MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/tableNameWithType
+ *      In case of cold-start, no ZNode will exist.
+ *      A new ZNode will be created, with watermarkMillis as the smallest time found in the COMPLETED segments (or using start time config)
+ *  - The execution window for the task is calculated as, windowStartMillis = waterMarkMillis, windowEndMillis = windowStartMillis + bucketTimeMillis,
+ *      where bucketTime can be provided in the taskConfigs (default 1d)
+ *  - If the execution window is not older than bufferTimeMillis, no task will be generated,
+ *      where bufferTime can be provided in the taskConfigs (default 2d)
+ *  - Segment metadata is scanned for all COMPLETED segments, to pick those containing data in window [windowStartMillis, windowEndMillis)
+ *  - A PinotTaskConfig is created, with segment information, execution window, and any config specific to the task
+ */
+public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerator {
+  private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskGenerator.class);
+
+  private static final String DEFAULT_BUCKET_PERIOD = "1d";
+  private static final String DEFAULT_BUFFER_PERIOD = "2d";
+
+  private final ClusterInfoProvider _clusterInfoProvider;
+
+  public RealtimeToOfflineSegmentsTaskGenerator(ClusterInfoProvider clusterInfoProvider) {
+    _clusterInfoProvider = clusterInfoProvider;
+  }
+
+  @Override
+  public String getTaskType() {
+    return RealtimeToOfflineSegmentsTask.TASK_TYPE;
+  }
+
+  @Override
+  public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
+    String taskType = RealtimeToOfflineSegmentsTask.TASK_TYPE;
+    List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+
+    for (TableConfig tableConfig : tableConfigs) {
+      String tableName = tableConfig.getTableName();
+
+      if (tableConfig.getTableType() != TableType.REALTIME) {
+        LOGGER.warn("Skip generating task: {} for non-REALTIME table: {}", taskType, tableName);
+        continue;
+      }
+      LOGGER.info("Start generating task configs for table: {} for task: {}", tableName, taskType);
+
+      String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+      String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
+
+      // Only schedule 1 task of this type, per table
+      Map<String, TaskState> nonCompletedTasks =
+          TaskGeneratorUtils.getNonCompletedTasks(taskType, realtimeTableName, _clusterInfoProvider);
+      if (!nonCompletedTasks.isEmpty()) {
+        LOGGER.warn("Found non-completed tasks: {} for same table: {}. Skipping task generation.",
+            nonCompletedTasks.keySet(), realtimeTableName);
+        continue;
+      }
+
+      // Get all completed segment metadata.
+      List<LLCRealtimeSegmentZKMetadata> completedSegmentsMetadata = new ArrayList<>();

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #6124: RealtimeToOfflineSegments task generator

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on a change in pull request #6124:
URL: https://github.com/apache/incubator-pinot/pull/6124#discussion_r503594418



##########
File path: pinot-minion/src/main/java/org/apache/pinot/minion/executor/PinotTaskExecutor.java
##########
@@ -26,6 +26,12 @@
  */
 public interface PinotTaskExecutor {
 
+  /**
+   * Pre processing operations to be done at the beginning of task execution
+   */
+  default void preProcess(PinotTaskConfig pinotTaskConfig) {

Review comment:
       Why not make this a boolean return, so that if the pre-process fails we do not go further in the task execution? Not sure what helix does when we throw exceptions from the task executor -- probably retries the task, but that may not be good for us.

##########
File path: pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseSingleSegmentConversionExecutor.java
##########
@@ -135,6 +137,8 @@ public SegmentConversionResult executeTask(PinotTaskConfig pinotTaskConfig)
           convertedSegmentTarFile);
 
       LOGGER.info("Done executing {} on table: {}, segment: {}", taskType, tableNameWithType, segmentName);
+      postProcess(pinotTaskConfig);

Review comment:
       please move the log line to be below postProcess, since it says "Done"
   It may be useful to have a log line after each phase




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar merged pull request #6124: RealtimeToOfflineSegments task generator

Posted by GitBox <gi...@apache.org>.
npawar merged pull request #6124:
URL: https://github.com/apache/incubator-pinot/pull/6124


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #6124: RealtimeToOfflineSegments task generator

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on a change in pull request #6124:
URL: https://github.com/apache/incubator-pinot/pull/6124#discussion_r505756502



##########
File path: pinot-common/src/main/java/org/apache/pinot/common/minion/RealtimeToOfflineSegmentsTaskMetadata.java
##########
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.minion;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.helix.ZNRecord;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+
+/**
+ * Metadata for the minion task of type <code>realtimeToOfflineSegmentsTask</code>.
+ * The <code>watermarkMillis</code> denotes the time (exclusive) upto which tasks have been executed.
+ *
+ * This gets serialized and stored in zookeeper under the path MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/tableNameWithType
+ *
+ * PinotTaskGenerator:
+ * The <code>watermarkMillis</code>> is used by the <code>RealtimeToOfflineSegmentsTaskGenerator</code>,
+ * to determine the window of execution for the task it is generating.
+ * The window of execution will be [watermarkMillis, watermarkMillis + bucketSize)
+ *
+ * PinotTaskExecutor:
+ * The same watermark is used by the <code>RealtimeToOfflineSegmentsTaskExecutor</code>, to:
+ * - Verify that is is running the latest task scheduled by the task generator
+ * - Update the watermark as the end of the window that it executed for
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)

Review comment:
       This class does not need to be json compatible, as we always read/write it through `ZNRecord`

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoProvider.java
##########
@@ -93,6 +97,38 @@ public Schema getTableSchema(String tableName) {
         .getRealtimeSegmentZKMetadataListForTable(_pinotHelixResourceManager.getPropertyStore(), tableName);
   }
 
+  /**
+   * Get all segment metadata for the given lowlevel REALTIME table name.
+   *
+   * @param tableName Table name with or without REALTIME type suffix
+   * @return List of segment metadata
+   */
+  public List<LLCRealtimeSegmentZKMetadata> getLLCRealtimeSegmentsMetadata(String tableName) {
+    return ZKMetadataProvider
+        .getLLCRealtimeSegmentZKMetadataListForTable(_pinotHelixResourceManager.getPropertyStore(), tableName);
+  }
+
+  /**
+   * Fetches the {@link RealtimeToOfflineSegmentsTaskMetadata} from MINION_TASK_METADATA for given realtime table
+   * @param tableNameWithType realtime table name
+   */
+  public RealtimeToOfflineSegmentsTaskMetadata getMinionRealtimeToOfflineSegmentsTaskMetadata(
+      String tableNameWithType) {
+    return MinionTaskMetadataUtils
+        .getRealtimeToOfflineSegmentsTaskMetadata(_pinotHelixResourceManager.getPropertyStore(),
+            MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, tableNameWithType);
+  }
+
+  /**
+   * Sets the {@link RealtimeToOfflineSegmentsTaskMetadata} into MINION_TASK_METADATA
+   * This call will override any previous metadata node
+   */
+  public void setRealtimeToOfflineSegmentsTaskMetadata(

Review comment:
       Let's not put writing method in this class. This class should only contain methods of reading metadata

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
##########
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion.generator;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
+import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
+import org.apache.pinot.common.utils.CommonConstants.Segment;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A {@link PinotTaskGenerator} implementation for generating tasks of type {@link RealtimeToOfflineSegmentsTask}
+ *
+ * These will be generated only for REALTIME tables.
+ * At any given time, only 1 task of this type should be generated for a table.
+ *
+ * Steps:
+ *  - The watermarkMillis is read from the {@link RealtimeToOfflineSegmentsTaskMetadata} ZNode found at MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/tableNameWithType
+ *      In case of cold-start, no ZNode will exist.
+ *      A new ZNode will be created, with watermarkMillis as the smallest time found in the COMPLETED segments (or using start time config)
+ *  - The execution window for the task is calculated as, windowStartMillis = waterMarkMillis, windowEndMillis = windowStartMillis + bucketTimeMillis,
+ *      where bucketTime can be provided in the taskConfigs (default 1d)
+ *  - If the execution window is not older than bufferTimeMillis, no task will be generated,
+ *      where bufferTime can be provided in the taskConfigs (default 2d)
+ *  - Segment metadata is scanned for all COMPLETED segments, to pick those containing data in window [windowStartMillis, windowEndMillis)
+ *  - A PinotTaskConfig is created, with segment information, execution window, and any config specific to the task
+ */
+public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerator {
+  private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskGenerator.class);
+
+  private static final String DEFAULT_BUCKET_PERIOD = "1d";
+  private static final String DEFAULT_BUFFER_PERIOD = "2d";
+
+  private final ClusterInfoProvider _clusterInfoProvider;
+
+  public RealtimeToOfflineSegmentsTaskGenerator(ClusterInfoProvider clusterInfoProvider) {
+    _clusterInfoProvider = clusterInfoProvider;
+  }
+
+  @Override
+  public String getTaskType() {
+    return RealtimeToOfflineSegmentsTask.TASK_TYPE;
+  }
+
+  @Override
+  public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
+    String taskType = RealtimeToOfflineSegmentsTask.TASK_TYPE;
+    List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+
+    for (TableConfig tableConfig : tableConfigs) {
+      String tableName = tableConfig.getTableName();
+
+      if (tableConfig.getTableType() != TableType.REALTIME) {
+        LOGGER.warn("Skip generating task: {} for non-REALTIME table: {}", taskType, tableName);
+        continue;
+      }

Review comment:
       Also skip HLC table here?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapper.java
##########
@@ -101,14 +101,14 @@ public void map()
     while (segmentRecordReader.hasNext()) {
       reusableRow = segmentRecordReader.next(reusableRow);
 
-      // Record transformation
-      reusableRow = _recordTransformer.transformRecord(reusableRow);
-
       // Record filtering
       if (_recordFilter.filter(reusableRow)) {
         continue;
       }
 
+      // Record transformation

Review comment:
       Do we do filtering before the transformation? Can filter be applied to the transformed columns?

##########
File path: pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
##########
@@ -275,7 +275,7 @@ public void segmentMapperTest(String mapperId, SegmentMapperConfig segmentMapper
     SegmentMapperConfig config11 = new SegmentMapperConfig(_pinotSchema,
         new RecordTransformerConfig.Builder().setTransformFunctionsMap(transformFunctionMap).build(),
         new RecordFilterConfig.Builder().setRecordFilterType(RecordFilterFactory.RecordFilterType.FILTER_FUNCTION)
-            .setFilterFunction("Groovy({timeValue != 1597795200000}, timeValue)").build(), Lists.newArrayList(
+            .setFilterFunction("Groovy({timeValue < 1597795200000L|| timeValue >= 1597881600000}, timeValue)").build(), Lists.newArrayList(

Review comment:
       Is this related to this PR?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
##########
@@ -62,16 +62,35 @@ private MinionConstants() {
     public static final String MERGED_SEGMENT_NAME_KEY = "mergedSegmentNameKey";
   }
 
+  /**
+   * Creates segments for the OFFLINE table, using completed segments from the corresponding REALTIME table
+   */
   public static class RealtimeToOfflineSegmentsTask {
     public static final String TASK_TYPE = "realtimeToOfflineSegmentsTask";
-    // window
+
+    /**
+     * The time window size for the task.
+     * e.g. if set to "1d", then task is scheduled to run for a 1 day window
+     */
+    public static final String BUCKET_TIME_PERIOD_KEY = "bucketTimePeriod";
+    /**
+     * The time period to wait before picking segments for this task
+     * e.g. if set to "2d", no task will be scheduled for a time window younger than 2 days
+     */
+    public static final String BUFFER_TIME_PERIOD_KEY = "bufferTimePeriod";
+    /**
+     * Config to manually provide start time for the very first task scheduled.
+     * In the absence of this config, the very first window start is calculated as min(start time of all completed segments)
+     */
+    public static final String START_TIME_MILLIS_KEY = "startTimeMillis";

Review comment:
       Do we need this config? This config can cause inconsistent query result when pushing the first offline segments, where the old real-time records that are not merged will be skipped.
   Is there any use case we want to configure this value?

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
##########
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion.generator;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
+import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
+import org.apache.pinot.common.utils.CommonConstants.Segment;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A {@link PinotTaskGenerator} implementation for generating tasks of type {@link RealtimeToOfflineSegmentsTask}
+ *
+ * These will be generated only for REALTIME tables.
+ * At any given time, only 1 task of this type should be generated for a table.
+ *
+ * Steps:
+ *  - The watermarkMillis is read from the {@link RealtimeToOfflineSegmentsTaskMetadata} ZNode found at MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/tableNameWithType
+ *      In case of cold-start, no ZNode will exist.
+ *      A new ZNode will be created, with watermarkMillis as the smallest time found in the COMPLETED segments (or using start time config)
+ *  - The execution window for the task is calculated as, windowStartMillis = waterMarkMillis, windowEndMillis = windowStartMillis + bucketTimeMillis,
+ *      where bucketTime can be provided in the taskConfigs (default 1d)
+ *  - If the execution window is not older than bufferTimeMillis, no task will be generated,
+ *      where bufferTime can be provided in the taskConfigs (default 2d)
+ *  - Segment metadata is scanned for all COMPLETED segments, to pick those containing data in window [windowStartMillis, windowEndMillis)
+ *  - A PinotTaskConfig is created, with segment information, execution window, and any config specific to the task
+ */
+public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerator {
+  private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskGenerator.class);
+
+  private static final String DEFAULT_BUCKET_PERIOD = "1d";
+  private static final String DEFAULT_BUFFER_PERIOD = "2d";
+
+  private final ClusterInfoProvider _clusterInfoProvider;
+
+  public RealtimeToOfflineSegmentsTaskGenerator(ClusterInfoProvider clusterInfoProvider) {
+    _clusterInfoProvider = clusterInfoProvider;
+  }
+
+  @Override
+  public String getTaskType() {
+    return RealtimeToOfflineSegmentsTask.TASK_TYPE;
+  }
+
+  @Override
+  public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
+    String taskType = RealtimeToOfflineSegmentsTask.TASK_TYPE;
+    List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+
+    for (TableConfig tableConfig : tableConfigs) {
+      String tableName = tableConfig.getTableName();
+
+      if (tableConfig.getTableType() != TableType.REALTIME) {
+        LOGGER.warn("Skip generating task: {} for non-REALTIME table: {}", taskType, tableName);
+        continue;
+      }
+      LOGGER.info("Start generating task configs for table: {} for task: {}", tableName, taskType);
+
+      String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+      String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
+
+      // Only schedule 1 task of this type, per table
+      Map<String, TaskState> nonCompletedTasks =
+          TaskGeneratorUtils.getNonCompletedTasks(taskType, realtimeTableName, _clusterInfoProvider);
+      if (!nonCompletedTasks.isEmpty()) {
+        LOGGER.warn("Found non-completed tasks: {} for same table: {}. Skipping task generation.",
+            nonCompletedTasks.keySet(), realtimeTableName);
+        continue;
+      }
+
+      // Get all completed segment metadata.
+      List<LLCRealtimeSegmentZKMetadata> completedSegmentsMetadata = new ArrayList<>();
+      Map<Integer, String> partitionToLatestCompletedSegmentName = new HashMap<>();
+      Set<Integer> allPartitions = new HashSet<>();
+      getCompletedSegmentsInfo(realtimeTableName, completedSegmentsMetadata, partitionToLatestCompletedSegmentName,
+          allPartitions);
+      if (completedSegmentsMetadata.isEmpty()) {
+        LOGGER
+            .info("No realtime completed segments found for table: {}, skipping task generation: {}", realtimeTableName,
+                taskType);
+        continue;
+      }
+      allPartitions.removeAll(partitionToLatestCompletedSegmentName.keySet());
+      if (!allPartitions.isEmpty()) {
+        LOGGER
+            .info("Partitions: {} have no completed segments. Table: {} is not ready for {}. Skipping task generation.",
+                allPartitions, realtimeTableName, taskType);
+        continue;
+      }
+
+      TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
+      Preconditions.checkState(tableTaskConfig != null);
+      Map<String, String> taskConfigs = tableTaskConfig.getConfigsForTaskType(taskType);
+      Preconditions.checkState(taskConfigs != null, "Task config shouldn't be null for table: {}", tableName);
+
+      // Get the bucket size and buffer
+      String bucketTimeStr =
+          taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUCKET_TIME_PERIOD_KEY, DEFAULT_BUCKET_PERIOD);
+      String bufferTimeStr =
+          taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUFFER_TIME_PERIOD_KEY, DEFAULT_BUFFER_PERIOD);
+      String startTimeStr = taskConfigs.get(RealtimeToOfflineSegmentsTask.START_TIME_MILLIS_KEY);
+      long bucketMillis = TimeUtils.convertPeriodToMillis(bucketTimeStr);
+      long bufferMillis = TimeUtils.convertPeriodToMillis(bufferTimeStr);
+
+      // Get watermark from RealtimeToOfflineSegmentsTaskMetadata ZNode. WindowStart = watermark. WindowEnd = windowStart + bucket.
+      long windowStartMillis =
+          getWatermarkMillis(realtimeTableName, completedSegmentsMetadata, startTimeStr, bucketMillis);
+      long windowEndMillis = windowStartMillis + bucketMillis;
+
+      // Check that execution window is older than bufferTime
+      if (windowEndMillis > System.currentTimeMillis() - bufferMillis) {
+        LOGGER.info(
+            "Window with start: {} and end: {} is not older than buffer time: {} configured as {} ago. Skipping task generation: {}",
+            windowStartMillis, windowEndMillis, bufferMillis, bufferTimeStr, taskType);
+        continue;
+      }
+
+      // Find all COMPLETED segments with data overlapping execution window: windowStart (inclusive) to windowEnd (exclusive)
+      List<String> segmentNames = new ArrayList<>();
+      List<String> downloadURLs = new ArrayList<>();
+      Set<String> lastCompletedSegmentPerPartition = new HashSet<>(partitionToLatestCompletedSegmentName.values());
+      boolean skipGenerate = false;
+      for (LLCRealtimeSegmentZKMetadata realtimeSegmentZKMetadata : completedSegmentsMetadata) {
+        String segmentName = realtimeSegmentZKMetadata.getSegmentName();
+        TimeUnit timeUnit = realtimeSegmentZKMetadata.getTimeUnit();
+        long segmentStartTimeMillis = timeUnit.toMillis(realtimeSegmentZKMetadata.getStartTime());
+        long segmentEndTimeMillis = timeUnit.toMillis(realtimeSegmentZKMetadata.getEndTime());
+
+        // Check overlap with window
+        if (windowStartMillis <= segmentEndTimeMillis && segmentStartTimeMillis < windowEndMillis) {
+          // If last completed segment is being used, make sure that segment crosses over end of window.

Review comment:
       +1

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
##########
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion.generator;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
+import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
+import org.apache.pinot.common.utils.CommonConstants.Segment;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A {@link PinotTaskGenerator} implementation for generating tasks of type {@link RealtimeToOfflineSegmentsTask}
+ *
+ * These will be generated only for REALTIME tables.
+ * At any given time, only 1 task of this type should be generated for a table.
+ *
+ * Steps:
+ *  - The watermarkMillis is read from the {@link RealtimeToOfflineSegmentsTaskMetadata} ZNode found at MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/tableNameWithType
+ *      In case of cold-start, no ZNode will exist.
+ *      A new ZNode will be created, with watermarkMillis as the smallest time found in the COMPLETED segments (or using start time config)
+ *  - The execution window for the task is calculated as, windowStartMillis = waterMarkMillis, windowEndMillis = windowStartMillis + bucketTimeMillis,
+ *      where bucketTime can be provided in the taskConfigs (default 1d)
+ *  - If the execution window is not older than bufferTimeMillis, no task will be generated,
+ *      where bufferTime can be provided in the taskConfigs (default 2d)
+ *  - Segment metadata is scanned for all COMPLETED segments, to pick those containing data in window [windowStartMillis, windowEndMillis)
+ *  - A PinotTaskConfig is created, with segment information, execution window, and any config specific to the task
+ */
+public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerator {
+  private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskGenerator.class);
+
+  private static final String DEFAULT_BUCKET_PERIOD = "1d";
+  private static final String DEFAULT_BUFFER_PERIOD = "2d";
+
+  private final ClusterInfoProvider _clusterInfoProvider;
+
+  public RealtimeToOfflineSegmentsTaskGenerator(ClusterInfoProvider clusterInfoProvider) {
+    _clusterInfoProvider = clusterInfoProvider;
+  }
+
+  @Override
+  public String getTaskType() {
+    return RealtimeToOfflineSegmentsTask.TASK_TYPE;
+  }
+
+  @Override
+  public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
+    String taskType = RealtimeToOfflineSegmentsTask.TASK_TYPE;
+
+    List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+
+    for (TableConfig tableConfig : tableConfigs) {
+      String tableName = tableConfig.getTableName();
+
+      if (tableConfig.getTableType() != TableType.REALTIME) {
+        LOGGER.warn("Skip generating task: {} for non-REALTIME table: {}", taskType, tableName);
+        continue;
+      }
+
+      String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+      String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
+
+      // Only schedule 1 task of this type, per table
+      Map<String, TaskState> nonCompletedTasks =
+          TaskGeneratorUtils.getNonCompletedTasks(taskType, realtimeTableName, _clusterInfoProvider);
+      if (!nonCompletedTasks.isEmpty()) {
+        LOGGER.warn("Found non-completed tasks: {} for same table: {}. Skipping scheduling new task.",
+            nonCompletedTasks.keySet(), realtimeTableName);

Review comment:
       IMO it is fine to log one warning per table. It is quite critical issue if minion stuck for this job (data loss if it doesn't move the segments on time), so we should detect it as soon as possible

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
##########
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion.generator;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
+import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
+import org.apache.pinot.common.utils.CommonConstants.Segment;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A {@link PinotTaskGenerator} implementation for generating tasks of type {@link RealtimeToOfflineSegmentsTask}
+ *
+ * These will be generated only for REALTIME tables.
+ * At any given time, only 1 task of this type should be generated for a table.
+ *
+ * Steps:
+ *  - The watermarkMillis is read from the {@link RealtimeToOfflineSegmentsTaskMetadata} ZNode found at MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/tableNameWithType
+ *      In case of cold-start, no ZNode will exist.
+ *      A new ZNode will be created, with watermarkMillis as the smallest time found in the COMPLETED segments (or using start time config)
+ *  - The execution window for the task is calculated as, windowStartMillis = waterMarkMillis, windowEndMillis = windowStartMillis + bucketTimeMillis,
+ *      where bucketTime can be provided in the taskConfigs (default 1d)
+ *  - If the execution window is not older than bufferTimeMillis, no task will be generated,
+ *      where bufferTime can be provided in the taskConfigs (default 2d)
+ *  - Segment metadata is scanned for all COMPLETED segments, to pick those containing data in window [windowStartMillis, windowEndMillis)
+ *  - A PinotTaskConfig is created, with segment information, execution window, and any config specific to the task
+ */
+public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerator {
+  private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskGenerator.class);
+
+  private static final String DEFAULT_BUCKET_PERIOD = "1d";
+  private static final String DEFAULT_BUFFER_PERIOD = "2d";
+
+  private final ClusterInfoProvider _clusterInfoProvider;
+
+  public RealtimeToOfflineSegmentsTaskGenerator(ClusterInfoProvider clusterInfoProvider) {
+    _clusterInfoProvider = clusterInfoProvider;
+  }
+
+  @Override
+  public String getTaskType() {
+    return RealtimeToOfflineSegmentsTask.TASK_TYPE;
+  }
+
+  @Override
+  public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
+    String taskType = RealtimeToOfflineSegmentsTask.TASK_TYPE;
+    List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+
+    for (TableConfig tableConfig : tableConfigs) {
+      String tableName = tableConfig.getTableName();
+
+      if (tableConfig.getTableType() != TableType.REALTIME) {
+        LOGGER.warn("Skip generating task: {} for non-REALTIME table: {}", taskType, tableName);
+        continue;
+      }
+      LOGGER.info("Start generating task configs for table: {} for task: {}", tableName, taskType);
+
+      String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+      String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
+
+      // Only schedule 1 task of this type, per table
+      Map<String, TaskState> nonCompletedTasks =
+          TaskGeneratorUtils.getNonCompletedTasks(taskType, realtimeTableName, _clusterInfoProvider);
+      if (!nonCompletedTasks.isEmpty()) {
+        LOGGER.warn("Found non-completed tasks: {} for same table: {}. Skipping task generation.",
+            nonCompletedTasks.keySet(), realtimeTableName);
+        continue;
+      }
+
+      // Get all completed segment metadata.
+      List<LLCRealtimeSegmentZKMetadata> completedSegmentsMetadata = new ArrayList<>();
+      Map<Integer, String> partitionToLatestCompletedSegmentName = new HashMap<>();
+      Set<Integer> allPartitions = new HashSet<>();
+      getCompletedSegmentsInfo(realtimeTableName, completedSegmentsMetadata, partitionToLatestCompletedSegmentName,
+          allPartitions);
+      if (completedSegmentsMetadata.isEmpty()) {
+        LOGGER
+            .info("No realtime completed segments found for table: {}, skipping task generation: {}", realtimeTableName,
+                taskType);
+        continue;
+      }
+      allPartitions.removeAll(partitionToLatestCompletedSegmentName.keySet());
+      if (!allPartitions.isEmpty()) {
+        LOGGER
+            .info("Partitions: {} have no completed segments. Table: {} is not ready for {}. Skipping task generation.",
+                allPartitions, realtimeTableName, taskType);
+        continue;
+      }
+
+      TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
+      Preconditions.checkState(tableTaskConfig != null);
+      Map<String, String> taskConfigs = tableTaskConfig.getConfigsForTaskType(taskType);
+      Preconditions.checkState(taskConfigs != null, "Task config shouldn't be null for table: {}", tableName);
+
+      // Get the bucket size and buffer
+      String bucketTimeStr =
+          taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUCKET_TIME_PERIOD_KEY, DEFAULT_BUCKET_PERIOD);
+      String bufferTimeStr =
+          taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUFFER_TIME_PERIOD_KEY, DEFAULT_BUFFER_PERIOD);

Review comment:
       ```suggestion
         String bucketTimePeriod =
             taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUCKET_TIME_PERIOD_KEY, DEFAULT_BUCKET_PERIOD);
         String bufferTimePeriod =
             taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUFFER_TIME_PERIOD_KEY, DEFAULT_BUFFER_PERIOD);
   ```

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
##########
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion.generator;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
+import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
+import org.apache.pinot.common.utils.CommonConstants.Segment;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A {@link PinotTaskGenerator} implementation for generating tasks of type {@link RealtimeToOfflineSegmentsTask}
+ *
+ * These will be generated only for REALTIME tables.
+ * At any given time, only 1 task of this type should be generated for a table.
+ *
+ * Steps:
+ *  - The watermarkMillis is read from the {@link RealtimeToOfflineSegmentsTaskMetadata} ZNode found at MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/tableNameWithType
+ *      In case of cold-start, no ZNode will exist.
+ *      A new ZNode will be created, with watermarkMillis as the smallest time found in the COMPLETED segments (or using start time config)
+ *  - The execution window for the task is calculated as, windowStartMillis = waterMarkMillis, windowEndMillis = windowStartMillis + bucketTimeMillis,
+ *      where bucketTime can be provided in the taskConfigs (default 1d)
+ *  - If the execution window is not older than bufferTimeMillis, no task will be generated,
+ *      where bufferTime can be provided in the taskConfigs (default 2d)
+ *  - Segment metadata is scanned for all COMPLETED segments, to pick those containing data in window [windowStartMillis, windowEndMillis)
+ *  - A PinotTaskConfig is created, with segment information, execution window, and any config specific to the task
+ */
+public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerator {
+  private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskGenerator.class);
+
+  private static final String DEFAULT_BUCKET_PERIOD = "1d";
+  private static final String DEFAULT_BUFFER_PERIOD = "2d";
+
+  private final ClusterInfoProvider _clusterInfoProvider;
+
+  public RealtimeToOfflineSegmentsTaskGenerator(ClusterInfoProvider clusterInfoProvider) {
+    _clusterInfoProvider = clusterInfoProvider;
+  }
+
+  @Override
+  public String getTaskType() {
+    return RealtimeToOfflineSegmentsTask.TASK_TYPE;
+  }
+
+  @Override
+  public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
+    String taskType = RealtimeToOfflineSegmentsTask.TASK_TYPE;
+    List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+
+    for (TableConfig tableConfig : tableConfigs) {
+      String tableName = tableConfig.getTableName();
+
+      if (tableConfig.getTableType() != TableType.REALTIME) {
+        LOGGER.warn("Skip generating task: {} for non-REALTIME table: {}", taskType, tableName);
+        continue;
+      }
+      LOGGER.info("Start generating task configs for table: {} for task: {}", tableName, taskType);
+
+      String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+      String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(rawTableName);

Review comment:
       (nit) No need to extract raw table name
   ```suggestion
         String realtimeTableName = tableConfig.getTableName();
   
         if (tableConfig.getTableType() != TableType.REALTIME) {
           LOGGER.warn("Skip generating task: {} for non-REALTIME table: {}", taskType, realtimeTableName);
           continue;
         }
         LOGGER.info("Start generating task configs for table: {} for task: {}", realtimeTableName, taskType);
   ```

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
##########
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion.generator;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
+import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
+import org.apache.pinot.common.utils.CommonConstants.Segment;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A {@link PinotTaskGenerator} implementation for generating tasks of type {@link RealtimeToOfflineSegmentsTask}
+ *
+ * These will be generated only for REALTIME tables.
+ * At any given time, only 1 task of this type should be generated for a table.
+ *
+ * Steps:
+ *  - The watermarkMillis is read from the {@link RealtimeToOfflineSegmentsTaskMetadata} ZNode found at MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/tableNameWithType

Review comment:
       Reformat this part of the javadoc (keep each line less than 120 characters)

##########
File path: pinot-common/src/main/java/org/apache/pinot/common/minion/RealtimeToOfflineSegmentsTaskMetadata.java
##########
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.minion;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.helix.ZNRecord;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+
+/**
+ * Metadata for the minion task of type <code>realtimeToOfflineSegmentsTask</code>.
+ * The <code>watermarkMillis</code> denotes the time (exclusive) upto which tasks have been executed.
+ *
+ * This gets serialized and stored in zookeeper under the path MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/tableNameWithType
+ *
+ * PinotTaskGenerator:
+ * The <code>watermarkMillis</code>> is used by the <code>RealtimeToOfflineSegmentsTaskGenerator</code>,
+ * to determine the window of execution for the task it is generating.
+ * The window of execution will be [watermarkMillis, watermarkMillis + bucketSize)
+ *
+ * PinotTaskExecutor:
+ * The same watermark is used by the <code>RealtimeToOfflineSegmentsTaskExecutor</code>, to:
+ * - Verify that is is running the latest task scheduled by the task generator
+ * - Update the watermark as the end of the window that it executed for
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class RealtimeToOfflineSegmentsTaskMetadata {
+
+  private static final String WATERMARK_KEY = "watermarkMillis";
+
+  private final String _tableNameWithType;
+  private long _watermarkMillis;

Review comment:
       Should we change this to final and make the metadata immutable?

##########
File path: pinot-minion/src/main/java/org/apache/pinot/minion/executor/PinotTaskExecutor.java
##########
@@ -26,6 +26,12 @@
  */
 public interface PinotTaskExecutor {
 
+  /**
+   * Pre processing operations to be done at the beginning of task execution
+   */
+  default void preProcess(PinotTaskConfig pinotTaskConfig) {

Review comment:
       I don't think we should put `preProcess()` and `postProcess()` into the interface. You can still keep them in the `RealtimeToOfflineSegmentsTaskExecutor` for better organization of the code.

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorUtils.java
##########
@@ -68,4 +69,26 @@
     }
     return runningSegments;
   }
+
+  /**
+   * Gets all the tasks for the provided task type and tableName, which do not have TaskState COMPLETED
+   * @return map containing task name to task state for non-completed tasks
+   */
+  public static Map<String, TaskState> getNonCompletedTasks(String taskType, String tableNameWithType,
+      ClusterInfoProvider clusterInfoProvider) {
+
+    Map<String, TaskState> nonCompletedTasks = new HashMap<>();
+    Map<String, TaskState> taskStates = clusterInfoProvider.getTaskStates(taskType);
+    for (Map.Entry<String, TaskState> entry : taskStates.entrySet()) {
+      if (entry.getValue() == TaskState.COMPLETED) {
+        continue;
+      }

Review comment:
       Also skip the old tasks?

##########
File path: pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutor.java
##########
@@ -162,11 +212,34 @@
     for (File file : outputSegmentsDir.listFiles()) {
       String outputSegmentName = file.getName();
       results.add(new SegmentConversionResult.Builder().setFile(file).setSegmentName(outputSegmentName)
-          .setTableNameWithType(tableNameWithType).build());
+          .setTableNameWithType(offlineTableName).build());
     }
     return results;
   }
 
+  /**
+   * Fetches the realtimeToOfflineSegmentsTask metadata ZNode for the realtime table.
+   * Checks that the version of the ZNode matches with the version cached earlier. If yes, proceeds to update watermark in the ZNode
+   * TODO: Making the minion task update the ZK metadata is an anti-pattern, however cannot another way to do it
+   */
+  @Override
+  public void postProcess(PinotTaskConfig pinotTaskConfig) {
+    String realtimeTableName = pinotTaskConfig.getConfigs().get(MinionConstants.TABLE_NAME_KEY);
+
+    ZNRecord realtimeToOfflineSegmentsTaskRecord =

Review comment:
       No need to check version again, the `setRealtimeToOfflineSegmentsTaskMetadata()` can perform the version check

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
##########
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion.generator;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
+import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
+import org.apache.pinot.common.utils.CommonConstants.Segment;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A {@link PinotTaskGenerator} implementation for generating tasks of type {@link RealtimeToOfflineSegmentsTask}
+ *
+ * These will be generated only for REALTIME tables.
+ * At any given time, only 1 task of this type should be generated for a table.
+ *
+ * Steps:
+ *  - The watermarkMillis is read from the {@link RealtimeToOfflineSegmentsTaskMetadata} ZNode found at MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/tableNameWithType
+ *      In case of cold-start, no ZNode will exist.
+ *      A new ZNode will be created, with watermarkMillis as the smallest time found in the COMPLETED segments (or using start time config)
+ *  - The execution window for the task is calculated as, windowStartMillis = waterMarkMillis, windowEndMillis = windowStartMillis + bucketTimeMillis,
+ *      where bucketTime can be provided in the taskConfigs (default 1d)
+ *  - If the execution window is not older than bufferTimeMillis, no task will be generated,
+ *      where bufferTime can be provided in the taskConfigs (default 2d)
+ *  - Segment metadata is scanned for all COMPLETED segments, to pick those containing data in window [windowStartMillis, windowEndMillis)
+ *  - A PinotTaskConfig is created, with segment information, execution window, and any config specific to the task
+ */
+public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerator {
+  private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskGenerator.class);
+
+  private static final String DEFAULT_BUCKET_PERIOD = "1d";
+  private static final String DEFAULT_BUFFER_PERIOD = "2d";
+
+  private final ClusterInfoProvider _clusterInfoProvider;
+
+  public RealtimeToOfflineSegmentsTaskGenerator(ClusterInfoProvider clusterInfoProvider) {
+    _clusterInfoProvider = clusterInfoProvider;
+  }
+
+  @Override
+  public String getTaskType() {
+    return RealtimeToOfflineSegmentsTask.TASK_TYPE;
+  }
+
+  @Override
+  public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
+    String taskType = RealtimeToOfflineSegmentsTask.TASK_TYPE;
+    List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+
+    for (TableConfig tableConfig : tableConfigs) {
+      String tableName = tableConfig.getTableName();
+
+      if (tableConfig.getTableType() != TableType.REALTIME) {
+        LOGGER.warn("Skip generating task: {} for non-REALTIME table: {}", taskType, tableName);
+        continue;
+      }
+      LOGGER.info("Start generating task configs for table: {} for task: {}", tableName, taskType);
+
+      String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+      String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
+
+      // Only schedule 1 task of this type, per table
+      Map<String, TaskState> nonCompletedTasks =
+          TaskGeneratorUtils.getNonCompletedTasks(taskType, realtimeTableName, _clusterInfoProvider);
+      if (!nonCompletedTasks.isEmpty()) {
+        LOGGER.warn("Found non-completed tasks: {} for same table: {}. Skipping task generation.",
+            nonCompletedTasks.keySet(), realtimeTableName);
+        continue;
+      }
+
+      // Get all completed segment metadata.
+      List<LLCRealtimeSegmentZKMetadata> completedSegmentsMetadata = new ArrayList<>();
+      Map<Integer, String> partitionToLatestCompletedSegmentName = new HashMap<>();
+      Set<Integer> allPartitions = new HashSet<>();
+      getCompletedSegmentsInfo(realtimeTableName, completedSegmentsMetadata, partitionToLatestCompletedSegmentName,
+          allPartitions);
+      if (completedSegmentsMetadata.isEmpty()) {
+        LOGGER
+            .info("No realtime completed segments found for table: {}, skipping task generation: {}", realtimeTableName,
+                taskType);
+        continue;
+      }
+      allPartitions.removeAll(partitionToLatestCompletedSegmentName.keySet());
+      if (!allPartitions.isEmpty()) {
+        LOGGER
+            .info("Partitions: {} have no completed segments. Table: {} is not ready for {}. Skipping task generation.",
+                allPartitions, realtimeTableName, taskType);
+        continue;
+      }
+
+      TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
+      Preconditions.checkState(tableTaskConfig != null);
+      Map<String, String> taskConfigs = tableTaskConfig.getConfigsForTaskType(taskType);
+      Preconditions.checkState(taskConfigs != null, "Task config shouldn't be null for table: {}", tableName);
+
+      // Get the bucket size and buffer
+      String bucketTimeStr =
+          taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUCKET_TIME_PERIOD_KEY, DEFAULT_BUCKET_PERIOD);
+      String bufferTimeStr =
+          taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUFFER_TIME_PERIOD_KEY, DEFAULT_BUFFER_PERIOD);
+      String startTimeStr = taskConfigs.get(RealtimeToOfflineSegmentsTask.START_TIME_MILLIS_KEY);
+      long bucketMillis = TimeUtils.convertPeriodToMillis(bucketTimeStr);
+      long bufferMillis = TimeUtils.convertPeriodToMillis(bufferTimeStr);

Review comment:
       Same for other `Millis`
   ```suggestion
         long bucketTimeMs = TimeUtils.convertPeriodToMillis(bucketTimeStr);
         long bufferTimeMs = TimeUtils.convertPeriodToMillis(bufferTimeStr);
   ```

##########
File path: pinot-minion/src/main/java/org/apache/pinot/minion/executor/PinotTaskExecutor.java
##########
@@ -26,6 +26,12 @@
  */
 public interface PinotTaskExecutor {
 
+  /**
+   * Pre processing operations to be done at the beginning of task execution
+   */
+  default void preProcess(PinotTaskConfig pinotTaskConfig) {

Review comment:
       > Why not make this a boolean return, so that if the pre-process fails we do not go further in the task execution? Not sure what helix does when we throw exceptions from the task executor -- probably retries the task, but that may not be good for us.
   
   I prefer throwing exception when something went wrong to force the caller to handle the exception, or the caller could just ignore the return value, which could cause unexpected behavior. Minion code can handle the exception and fail the task

##########
File path: pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutor.java
##########
@@ -54,17 +57,60 @@
  * A task to convert segments from a REALTIME table to segments for its corresponding OFFLINE table.
  * The realtime segments could span across multiple time windows. This task extracts data and creates segments for a configured time range.
  * The {@link SegmentProcessorFramework} is used for the segment conversion, which also does
- * 1. time column rollup
- * 2. time window extraction using filter function
+ * 1. time window extraction using filter function
+ * 2. time column rollup
  * 3. partitioning using table config's segmentPartitioningConfig
  * 4. aggregations and rollup
  * 5. data sorting
+ *
+ * Before beginning the task, the <code>watermarkMillis</code> is checked in the minion task metadata ZNode, located at MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/<tableNameWithType>
+ * It should match the <code>windowStartMillis</code>.
+ * The version of the znode is cached.
+ *
+ * After the segments are uploaded, this task updates the <code>watermarkMillis</code> in the minion task metadata ZNode.
+ * The znode version is checked during update, and update only succeeds if version matches with the previously cached version
  */
 public class RealtimeToOfflineSegmentsTaskExecutor extends BaseMultipleSegmentsConversionExecutor {
   private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskExecutor.class);
   private static final String INPUT_SEGMENTS_DIR = "input_segments";
   private static final String OUTPUT_SEGMENTS_DIR = "output_segments";
 
+  private final MinionTaskZkMetadataManager _minionTaskZkMetadataManager;
+  private int _expectedVersion = -1;

Review comment:
       Put `Integer.MIN_VALUE` as the default? We don't want to override the ZNRecord if this is not correctly set

##########
File path: pinot-common/src/main/java/org/apache/pinot/common/minion/RealtimeToOfflineSegmentsTaskMetadata.java
##########
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.minion;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.helix.ZNRecord;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+
+/**
+ * Metadata for the minion task of type <code>realtimeToOfflineSegmentsTask</code>.
+ * The <code>watermarkMillis</code> denotes the time (exclusive) upto which tasks have been executed.
+ *
+ * This gets serialized and stored in zookeeper under the path MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/tableNameWithType
+ *
+ * PinotTaskGenerator:
+ * The <code>watermarkMillis</code>> is used by the <code>RealtimeToOfflineSegmentsTaskGenerator</code>,
+ * to determine the window of execution for the task it is generating.
+ * The window of execution will be [watermarkMillis, watermarkMillis + bucketSize)
+ *
+ * PinotTaskExecutor:
+ * The same watermark is used by the <code>RealtimeToOfflineSegmentsTaskExecutor</code>, to:
+ * - Verify that is is running the latest task scheduled by the task generator
+ * - Update the watermark as the end of the window that it executed for
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class RealtimeToOfflineSegmentsTaskMetadata {
+
+  private static final String WATERMARK_KEY = "watermarkMillis";

Review comment:
       I would suggest `watermarkMs` to be consistent with other classes

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
##########
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion.generator;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
+import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
+import org.apache.pinot.common.utils.CommonConstants.Segment;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A {@link PinotTaskGenerator} implementation for generating tasks of type {@link RealtimeToOfflineSegmentsTask}
+ *
+ * These will be generated only for REALTIME tables.
+ * At any given time, only 1 task of this type should be generated for a table.
+ *
+ * Steps:
+ *  - The watermarkMillis is read from the {@link RealtimeToOfflineSegmentsTaskMetadata} ZNode found at MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/tableNameWithType
+ *      In case of cold-start, no ZNode will exist.
+ *      A new ZNode will be created, with watermarkMillis as the smallest time found in the COMPLETED segments (or using start time config)
+ *  - The execution window for the task is calculated as, windowStartMillis = waterMarkMillis, windowEndMillis = windowStartMillis + bucketTimeMillis,
+ *      where bucketTime can be provided in the taskConfigs (default 1d)
+ *  - If the execution window is not older than bufferTimeMillis, no task will be generated,
+ *      where bufferTime can be provided in the taskConfigs (default 2d)
+ *  - Segment metadata is scanned for all COMPLETED segments, to pick those containing data in window [windowStartMillis, windowEndMillis)
+ *  - A PinotTaskConfig is created, with segment information, execution window, and any config specific to the task
+ */
+public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerator {
+  private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskGenerator.class);
+
+  private static final String DEFAULT_BUCKET_PERIOD = "1d";
+  private static final String DEFAULT_BUFFER_PERIOD = "2d";
+
+  private final ClusterInfoProvider _clusterInfoProvider;
+
+  public RealtimeToOfflineSegmentsTaskGenerator(ClusterInfoProvider clusterInfoProvider) {
+    _clusterInfoProvider = clusterInfoProvider;
+  }
+
+  @Override
+  public String getTaskType() {
+    return RealtimeToOfflineSegmentsTask.TASK_TYPE;
+  }
+
+  @Override
+  public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
+    String taskType = RealtimeToOfflineSegmentsTask.TASK_TYPE;
+    List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+
+    for (TableConfig tableConfig : tableConfigs) {
+      String tableName = tableConfig.getTableName();
+
+      if (tableConfig.getTableType() != TableType.REALTIME) {
+        LOGGER.warn("Skip generating task: {} for non-REALTIME table: {}", taskType, tableName);
+        continue;
+      }
+      LOGGER.info("Start generating task configs for table: {} for task: {}", tableName, taskType);
+
+      String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+      String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
+
+      // Only schedule 1 task of this type, per table
+      Map<String, TaskState> nonCompletedTasks =

Review comment:
       We can keep the same logic as in `TaskGeneratorUtils.getRunningSegments()` to skip old tasks




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] codecov-io edited a comment on pull request #6124: RealtimeToOfflineSegments task generator

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #6124:
URL: https://github.com/apache/incubator-pinot/pull/6124#issuecomment-705909906


   # [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6124?src=pr&el=h1) Report
   > Merging [#6124](https://codecov.io/gh/apache/incubator-pinot/pull/6124?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-pinot/commit/1beaab59b73f26c4e35f3b9bc856b03806cddf5a?el=desc) will **decrease** coverage by `2.39%`.
   > The diff coverage is `38.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-pinot/pull/6124/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz)](https://codecov.io/gh/apache/incubator-pinot/pull/6124?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #6124      +/-   ##
   ==========================================
   - Coverage   66.44%   64.05%   -2.40%     
   ==========================================
     Files        1075     1229     +154     
     Lines       54773    58023    +3250     
     Branches     8168     8559     +391     
   ==========================================
   + Hits        36396    37166     +770     
   - Misses      15700    18161    +2461     
   - Partials     2677     2696      +19     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | #unittests | `64.05% <38.00%> (?)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-pinot/pull/6124?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...e/pinot/broker/api/resources/PinotBrokerDebug.java](https://codecov.io/gh/apache/incubator-pinot/pull/6124/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYXBpL3Jlc291cmNlcy9QaW5vdEJyb2tlckRlYnVnLmphdmE=) | `0.00% <0.00%> (-79.32%)` | :arrow_down: |
   | [...ot/broker/broker/AllowAllAccessControlFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/6124/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL0FsbG93QWxsQWNjZXNzQ29udHJvbEZhY3RvcnkuamF2YQ==) | `71.42% <ø> (-28.58%)` | :arrow_down: |
   | [.../helix/BrokerUserDefinedMessageHandlerFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/6124/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL2hlbGl4L0Jyb2tlclVzZXJEZWZpbmVkTWVzc2FnZUhhbmRsZXJGYWN0b3J5LmphdmE=) | `33.96% <0.00%> (-32.71%)` | :arrow_down: |
   | [...ava/org/apache/pinot/client/AbstractResultSet.java](https://codecov.io/gh/apache/incubator-pinot/pull/6124/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0Fic3RyYWN0UmVzdWx0U2V0LmphdmE=) | `53.33% <0.00%> (-3.81%)` | :arrow_down: |
   | [.../main/java/org/apache/pinot/client/Connection.java](https://codecov.io/gh/apache/incubator-pinot/pull/6124/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0Nvbm5lY3Rpb24uamF2YQ==) | `35.55% <0.00%> (-13.29%)` | :arrow_down: |
   | [...inot/client/JsonAsyncHttpPinotClientTransport.java](https://codecov.io/gh/apache/incubator-pinot/pull/6124/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0pzb25Bc3luY0h0dHBQaW5vdENsaWVudFRyYW5zcG9ydC5qYXZh) | `10.90% <0.00%> (-51.10%)` | :arrow_down: |
   | [.../org/apache/pinot/client/ResultTableResultSet.java](https://codecov.io/gh/apache/incubator-pinot/pull/6124/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L1Jlc3VsdFRhYmxlUmVzdWx0U2V0LmphdmE=) | `0.00% <0.00%> (-34.29%)` | :arrow_down: |
   | [...not/common/assignment/InstancePartitionsUtils.java](https://codecov.io/gh/apache/incubator-pinot/pull/6124/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vYXNzaWdubWVudC9JbnN0YW5jZVBhcnRpdGlvbnNVdGlscy5qYXZh) | `73.80% <ø> (+0.63%)` | :arrow_up: |
   | [.../apache/pinot/common/exception/QueryException.java](https://codecov.io/gh/apache/incubator-pinot/pull/6124/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vZXhjZXB0aW9uL1F1ZXJ5RXhjZXB0aW9uLmphdmE=) | `90.27% <ø> (+5.55%)` | :arrow_up: |
   | [...pinot/common/function/AggregationFunctionType.java](https://codecov.io/gh/apache/incubator-pinot/pull/6124/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vZnVuY3Rpb24vQWdncmVnYXRpb25GdW5jdGlvblR5cGUuamF2YQ==) | `98.27% <ø> (-1.73%)` | :arrow_down: |
   | ... and [1054 more](https://codecov.io/gh/apache/incubator-pinot/pull/6124/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6124?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6124?src=pr&el=footer). Last update [0e1d458...7577085](https://codecov.io/gh/apache/incubator-pinot/pull/6124?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #6124: RealtimeToOfflineSegments task generator

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #6124:
URL: https://github.com/apache/incubator-pinot/pull/6124#discussion_r508945680



##########
File path: pinot-common/src/main/java/org/apache/pinot/common/minion/MinionTaskMetadataUtils.java
##########
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.minion;
+
+import javax.annotation.Nullable;
+import org.I0Itec.zkclient.exception.ZkException;
+import org.apache.helix.AccessOption;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.HelixPropertyStore;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.zookeeper.data.Stat;
+
+
+/**
+ * Helper methods to fetch/persist ZNRecord for minion task metadata
+ */
+public final class MinionTaskMetadataUtils {
+
+  private MinionTaskMetadataUtils() {
+
+  }
+
+  /**
+   * Fetches the ZNRecord for the given minion task and tableName, from MINION_TASK_METADATA/taskName/tableNameWthType
+   */
+  @Nullable
+  public static ZNRecord fetchMinionTaskMetadataZNRecord(HelixPropertyStore<ZNRecord> propertyStore, String taskType,
+      String tableNameWithType) {
+    String path = ZKMetadataProvider.constructPropertyStorePathForMinionTaskMetadata(taskType, tableNameWithType);
+    Stat stat = new Stat();
+    ZNRecord znRecord = propertyStore.get(path, stat, AccessOption.PERSISTENT);
+    if (znRecord != null) {
+      znRecord.setVersion(stat.getVersion());
+    }
+    return znRecord;
+  }
+
+  /**
+   * Fetches the ZNRecord for realtimeToOfflineSegmentsTask for given tableNameWithType from MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/tableNameWthType
+   * and converts it to a {@link RealtimeToOfflineSegmentsTaskMetadata} object
+   */
+  @Nullable
+  public static RealtimeToOfflineSegmentsTaskMetadata getRealtimeToOfflineSegmentsTaskMetadata(
+      HelixPropertyStore<ZNRecord> propertyStore, String taskType, String tableNameWithType) {
+    ZNRecord znRecord = fetchMinionTaskMetadataZNRecord(propertyStore, taskType, tableNameWithType);
+    return znRecord != null ? RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(znRecord) : null;
+  }
+
+  /**
+   * Persists the provided {@link RealtimeToOfflineSegmentsTaskMetadata} to MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/tableNameWthType.
+   * Will fail if expectedVersion does not match.
+   * Set expectedVersion -1 to override version check.
+   */
+  public static void persistRealtimeToOfflineSegmentsTaskMetadata(HelixPropertyStore<ZNRecord> propertyStore,
+      String taskType, RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata,

Review comment:
       MinionConstants is not accessible here. And I didn't want to have static variable String for it




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #6124: RealtimeToOfflineSegments task generator

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #6124:
URL: https://github.com/apache/incubator-pinot/pull/6124#discussion_r506778282



##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
##########
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
+import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
+import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+/**
+ * Integration test for minion task of type "realtimeToOfflineSegmentsTask"
+ * With every task run, a new segment is created in the offline table for 1 day. Watermark also keeps progressing accordingly.
+ */
+public class RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends RealtimeClusterIntegrationTest {
+
+  private PinotHelixTaskResourceManager _helixTaskResourceManager;
+  private PinotTaskManager _taskManager;
+  private PinotHelixResourceManager _pinotHelixResourceManager;
+
+  private long _dataSmallestTimeMillis;
+  private long _dateSmallestDays;
+  private String _realtimeTableName;
+  private String _offlineTableName;
+
+  @Override
+  protected TableTaskConfig getTaskConfig() {
+    return new TableTaskConfig(
+        Collections.singletonMap(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, new HashMap<>()));
+  }
+
+  @Override
+  protected boolean useLlc() {
+    return true;
+  }
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    // Setup realtime table, and blank offline table
+    super.setUp();
+    addTableConfig(createOfflineTableConfig());
+    startMinion(null, null);
+
+    _helixTaskResourceManager = _controllerStarter.getHelixTaskResourceManager();
+    _taskManager = _controllerStarter.getTaskManager();
+    _pinotHelixResourceManager = _controllerStarter.getHelixResourceManager();
+
+    _realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(getTableName());
+    _offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
+
+    List<RealtimeSegmentZKMetadata> realtimeSegmentMetadata =
+        _pinotHelixResourceManager.getRealtimeSegmentMetadata(_realtimeTableName);
+    long minSegmentTime = Long.MAX_VALUE;
+    for (RealtimeSegmentZKMetadata metadata : realtimeSegmentMetadata) {
+      if (metadata.getStatus() == CommonConstants.Segment.Realtime.Status.DONE) {
+        if (metadata.getStartTime() < minSegmentTime) {
+          minSegmentTime = metadata.getStartTime();
+        }
+      }
+    }
+    _dataSmallestTimeMillis = minSegmentTime;
+    _dateSmallestDays = minSegmentTime / 86400000;
+  }
+
+  @Test
+  public void testRealtimeToOfflineSegmentsTask() {
+
+    List<OfflineSegmentZKMetadata> offlineSegmentMetadata =
+        _pinotHelixResourceManager.getOfflineSegmentMetadata(_offlineTableName);
+    Assert.assertTrue(offlineSegmentMetadata.isEmpty());
+
+    long expectedWatermark = _dataSmallestTimeMillis;
+    int numOfflineSegments = 0;
+    long offlineSegmentTime = _dateSmallestDays;
+    for (int i = 0; i < 3; i ++) {
+      // Schedule task
+      Assert
+          .assertTrue(_taskManager.scheduleTasks().containsKey(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+      Assert.assertTrue(_helixTaskResourceManager.getTaskQueues().contains(
+          PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)));
+      // Should not generate more tasks
+      Assert
+          .assertFalse(_taskManager.scheduleTasks().containsKey(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+
+      expectedWatermark = expectedWatermark + 86400000;
+      // Wait at most 600 seconds for all tasks COMPLETED
+      waitForTaskToComplete(expectedWatermark);
+      // check segment is in offline
+      offlineSegmentMetadata = _pinotHelixResourceManager.getOfflineSegmentMetadata(_offlineTableName);
+      Assert.assertEquals(offlineSegmentMetadata.size(), ++numOfflineSegments);
+      Assert.assertEquals(offlineSegmentMetadata.get(i).getStartTime(), offlineSegmentTime);
+      Assert.assertEquals(offlineSegmentMetadata.get(i).getEndTime(), offlineSegmentTime);
+      offlineSegmentTime ++;
+    }
+    testHardcodedSqlQueries();
+  }
+
+  private void waitForTaskToComplete(long expectedWatermark) {
+    TestUtils.waitForCondition(input -> {
+      // Check task state
+      for (TaskState taskState : _helixTaskResourceManager
+          .getTaskStates(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE).values()) {
+        if (taskState != TaskState.COMPLETED) {
+          return false;
+        }
+      }
+      return true;
+    }, 600_000L, "Failed to complete task");
+
+    // Check segment ZK metadata
+    RealtimeToOfflineSegmentsTaskMetadata minionTaskMetadata =
+        _taskManager.getClusterInfoProvider().getMinionRealtimeToOfflineSegmentsTaskMetadata(_realtimeTableName);
+    Assert.assertNotNull(minionTaskMetadata);
+    Assert.assertEquals(minionTaskMetadata.getWatermarkMillis(), expectedWatermark);
+  }
+
+  @Test(enabled = false)
+  public void testSegmentListApi() {
+  }
+
+  @Test(enabled = false)
+  public void testBrokerDebugOutput() {
+  }
+
+  @Test(enabled = false)
+  public void testBrokerDebugRoutingTableSQL() {
+  }
+
+  @Test(enabled = false)
+  public void testBrokerResponseMetadata() {
+  }
+
+  @Test(enabled = false)
+  public void testDictionaryBasedQueries() {
+  }
+
+  @Test(enabled = false)
+  public void testGeneratedQueriesWithMultiValues() {
+  }
+
+  @Test(enabled = false)
+  public void testGeneratedQueriesWithoutMultiValues() {
+  }
+
+  @Test(enabled = false)
+  public void testHardcodedQueries() {
+  }
+
+  @Test(enabled = false)
+  public void testHardcodedSqlQueries() {
+  }
+
+  @Test(enabled = false)
+  public void testInstanceShutdown() {
+  }
+
+  @Test(enabled = false)
+  public void testQueriesFromQueryFile() {
+  }
+
+  @Test(enabled = false)
+  public void testQueryExceptions() {
+  }
+
+  @Test(enabled = false)
+  public void testReload(boolean includeOfflineTable) {

Review comment:
       It doesn't show up for me. You're likely extending Hybrid or LLC integration test. I have Realtime integration test as parent, and it doesnt have the method you mention




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #6124: RealtimeToOfflineSegments task generator

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #6124:
URL: https://github.com/apache/incubator-pinot/pull/6124#discussion_r506777612



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
##########
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion.generator;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
+import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
+import org.apache.pinot.common.utils.CommonConstants.Segment;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A {@link PinotTaskGenerator} implementation for generating tasks of type {@link RealtimeToOfflineSegmentsTask}
+ *
+ * These will be generated only for REALTIME tables.
+ * At any given time, only 1 task of this type should be generated for a table.
+ *
+ * Steps:
+ *  - The watermarkMillis is read from the {@link RealtimeToOfflineSegmentsTaskMetadata} ZNode found at MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/tableNameWithType
+ *      In case of cold-start, no ZNode will exist.
+ *      A new ZNode will be created, with watermarkMillis as the smallest time found in the COMPLETED segments (or using start time config)
+ *  - The execution window for the task is calculated as, windowStartMillis = waterMarkMillis, windowEndMillis = windowStartMillis + bucketTimeMillis,
+ *      where bucketTime can be provided in the taskConfigs (default 1d)
+ *  - If the execution window is not older than bufferTimeMillis, no task will be generated,
+ *      where bufferTime can be provided in the taskConfigs (default 2d)
+ *  - Segment metadata is scanned for all COMPLETED segments, to pick those containing data in window [windowStartMillis, windowEndMillis)
+ *  - A PinotTaskConfig is created, with segment information, execution window, and any config specific to the task
+ */
+public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerator {
+  private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskGenerator.class);
+
+  private static final String DEFAULT_BUCKET_PERIOD = "1d";
+  private static final String DEFAULT_BUFFER_PERIOD = "2d";
+
+  private final ClusterInfoProvider _clusterInfoProvider;
+
+  public RealtimeToOfflineSegmentsTaskGenerator(ClusterInfoProvider clusterInfoProvider) {
+    _clusterInfoProvider = clusterInfoProvider;
+  }
+
+  @Override
+  public String getTaskType() {
+    return RealtimeToOfflineSegmentsTask.TASK_TYPE;
+  }
+
+  @Override
+  public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
+    String taskType = RealtimeToOfflineSegmentsTask.TASK_TYPE;
+
+    List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+
+    for (TableConfig tableConfig : tableConfigs) {
+      String tableName = tableConfig.getTableName();
+
+      if (tableConfig.getTableType() != TableType.REALTIME) {
+        LOGGER.warn("Skip generating task: {} for non-REALTIME table: {}", taskType, tableName);
+        continue;
+      }
+
+      String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+      String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
+
+      // Only schedule 1 task of this type, per table
+      Map<String, TaskState> nonCompletedTasks =
+          TaskGeneratorUtils.getNonCompletedTasks(taskType, realtimeTableName, _clusterInfoProvider);
+      if (!nonCompletedTasks.isEmpty()) {
+        LOGGER.warn("Found non-completed tasks: {} for same table: {}. Skipping scheduling new task.",
+            nonCompletedTasks.keySet(), realtimeTableName);

Review comment:
       there shouldn't be more than 1 in this list, because this is just for 1 table. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] codecov-io commented on pull request #6124: RealtimeToOfflineSegments task generator

Posted by GitBox <gi...@apache.org>.
codecov-io commented on pull request #6124:
URL: https://github.com/apache/incubator-pinot/pull/6124#issuecomment-705909906


   # [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6124?src=pr&el=h1) Report
   > Merging [#6124](https://codecov.io/gh/apache/incubator-pinot/pull/6124?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-pinot/commit/1beaab59b73f26c4e35f3b9bc856b03806cddf5a?el=desc) will **decrease** coverage by `3.33%`.
   > The diff coverage is `38.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-pinot/pull/6124/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz)](https://codecov.io/gh/apache/incubator-pinot/pull/6124?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #6124      +/-   ##
   ==========================================
   - Coverage   66.44%   63.11%   -3.34%     
   ==========================================
     Files        1075     1233     +158     
     Lines       54773    60211    +5438     
     Branches     8168     8828     +660     
   ==========================================
   + Hits        36396    38003    +1607     
   - Misses      15700    19393    +3693     
   - Partials     2677     2815     +138     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | #unittests | `63.11% <38.00%> (?)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-pinot/pull/6124?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...e/pinot/broker/api/resources/PinotBrokerDebug.java](https://codecov.io/gh/apache/incubator-pinot/pull/6124/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYXBpL3Jlc291cmNlcy9QaW5vdEJyb2tlckRlYnVnLmphdmE=) | `0.00% <0.00%> (-79.32%)` | :arrow_down: |
   | [...ot/broker/broker/AllowAllAccessControlFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/6124/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL0FsbG93QWxsQWNjZXNzQ29udHJvbEZhY3RvcnkuamF2YQ==) | `71.42% <ø> (-28.58%)` | :arrow_down: |
   | [.../helix/BrokerUserDefinedMessageHandlerFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/6124/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL2hlbGl4L0Jyb2tlclVzZXJEZWZpbmVkTWVzc2FnZUhhbmRsZXJGYWN0b3J5LmphdmE=) | `33.96% <0.00%> (-32.71%)` | :arrow_down: |
   | [...ava/org/apache/pinot/client/AbstractResultSet.java](https://codecov.io/gh/apache/incubator-pinot/pull/6124/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0Fic3RyYWN0UmVzdWx0U2V0LmphdmE=) | `53.33% <0.00%> (-3.81%)` | :arrow_down: |
   | [.../main/java/org/apache/pinot/client/Connection.java](https://codecov.io/gh/apache/incubator-pinot/pull/6124/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0Nvbm5lY3Rpb24uamF2YQ==) | `35.55% <0.00%> (-13.29%)` | :arrow_down: |
   | [...inot/client/JsonAsyncHttpPinotClientTransport.java](https://codecov.io/gh/apache/incubator-pinot/pull/6124/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0pzb25Bc3luY0h0dHBQaW5vdENsaWVudFRyYW5zcG9ydC5qYXZh) | `10.90% <0.00%> (-51.10%)` | :arrow_down: |
   | [.../org/apache/pinot/client/ResultTableResultSet.java](https://codecov.io/gh/apache/incubator-pinot/pull/6124/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L1Jlc3VsdFRhYmxlUmVzdWx0U2V0LmphdmE=) | `0.00% <0.00%> (-34.29%)` | :arrow_down: |
   | [...not/common/assignment/InstancePartitionsUtils.java](https://codecov.io/gh/apache/incubator-pinot/pull/6124/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vYXNzaWdubWVudC9JbnN0YW5jZVBhcnRpdGlvbnNVdGlscy5qYXZh) | `73.80% <ø> (+0.63%)` | :arrow_up: |
   | [.../apache/pinot/common/exception/QueryException.java](https://codecov.io/gh/apache/incubator-pinot/pull/6124/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vZXhjZXB0aW9uL1F1ZXJ5RXhjZXB0aW9uLmphdmE=) | `90.27% <ø> (+5.55%)` | :arrow_up: |
   | [...pinot/common/function/AggregationFunctionType.java](https://codecov.io/gh/apache/incubator-pinot/pull/6124/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vZnVuY3Rpb24vQWdncmVnYXRpb25GdW5jdGlvblR5cGUuamF2YQ==) | `98.27% <ø> (-1.73%)` | :arrow_down: |
   | ... and [1057 more](https://codecov.io/gh/apache/incubator-pinot/pull/6124/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6124?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6124?src=pr&el=footer). Last update [0e1d458...bcf0fc4](https://codecov.io/gh/apache/incubator-pinot/pull/6124?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #6124: RealtimeToOfflineSegments task generator

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on a change in pull request #6124:
URL: https://github.com/apache/incubator-pinot/pull/6124#discussion_r503580109



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
##########
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion.generator;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
+import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
+import org.apache.pinot.common.utils.CommonConstants.Segment;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A {@link PinotTaskGenerator} implementation for generating tasks of type {@link RealtimeToOfflineSegmentsTask}
+ *
+ * These will be generated only for REALTIME tables.
+ * At any given time, only 1 task of this type should be generated for a table.
+ *
+ * Steps:
+ *  - The watermarkMillis is read from the {@link RealtimeToOfflineSegmentsTaskMetadata} ZNode found at MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/tableNameWithType
+ *      In case of cold-start, no ZNode will exist.
+ *      A new ZNode will be created, with watermarkMillis as the smallest time found in the COMPLETED segments (or using start time config)
+ *  - The execution window for the task is calculated as, windowStartMillis = waterMarkMillis, windowEndMillis = windowStartMillis + bucketTimeMillis,
+ *      where bucketTime can be provided in the taskConfigs (default 1d)
+ *  - If the execution window is not older than bufferTimeMillis, no task will be generated,
+ *      where bufferTime can be provided in the taskConfigs (default 2d)
+ *  - Segment metadata is scanned for all COMPLETED segments, to pick those containing data in window [windowStartMillis, windowEndMillis)
+ *  - A PinotTaskConfig is created, with segment information, execution window, and any config specific to the task
+ */
+public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerator {
+  private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskGenerator.class);
+
+  private static final String DEFAULT_BUCKET_PERIOD = "1d";
+  private static final String DEFAULT_BUFFER_PERIOD = "2d";
+
+  private final ClusterInfoProvider _clusterInfoProvider;
+
+  public RealtimeToOfflineSegmentsTaskGenerator(ClusterInfoProvider clusterInfoProvider) {
+    _clusterInfoProvider = clusterInfoProvider;
+  }
+
+  @Override
+  public String getTaskType() {
+    return RealtimeToOfflineSegmentsTask.TASK_TYPE;
+  }
+
+  @Override
+  public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {

Review comment:
       It will improve readability if you break this method down to call helpers.

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
##########
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion.generator;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
+import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
+import org.apache.pinot.common.utils.CommonConstants.Segment;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A {@link PinotTaskGenerator} implementation for generating tasks of type {@link RealtimeToOfflineSegmentsTask}
+ *
+ * These will be generated only for REALTIME tables.
+ * At any given time, only 1 task of this type should be generated for a table.
+ *
+ * Steps:
+ *  - The watermarkMillis is read from the {@link RealtimeToOfflineSegmentsTaskMetadata} ZNode found at MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/tableNameWithType
+ *      In case of cold-start, no ZNode will exist.
+ *      A new ZNode will be created, with watermarkMillis as the smallest time found in the COMPLETED segments (or using start time config)
+ *  - The execution window for the task is calculated as, windowStartMillis = waterMarkMillis, windowEndMillis = windowStartMillis + bucketTimeMillis,
+ *      where bucketTime can be provided in the taskConfigs (default 1d)
+ *  - If the execution window is not older than bufferTimeMillis, no task will be generated,
+ *      where bufferTime can be provided in the taskConfigs (default 2d)
+ *  - Segment metadata is scanned for all COMPLETED segments, to pick those containing data in window [windowStartMillis, windowEndMillis)
+ *  - A PinotTaskConfig is created, with segment information, execution window, and any config specific to the task
+ */
+public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerator {
+  private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskGenerator.class);
+
+  private static final String DEFAULT_BUCKET_PERIOD = "1d";
+  private static final String DEFAULT_BUFFER_PERIOD = "2d";
+
+  private final ClusterInfoProvider _clusterInfoProvider;
+
+  public RealtimeToOfflineSegmentsTaskGenerator(ClusterInfoProvider clusterInfoProvider) {
+    _clusterInfoProvider = clusterInfoProvider;
+  }
+
+  @Override
+  public String getTaskType() {
+    return RealtimeToOfflineSegmentsTask.TASK_TYPE;
+  }
+
+  @Override
+  public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
+    String taskType = RealtimeToOfflineSegmentsTask.TASK_TYPE;
+
+    List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+
+    for (TableConfig tableConfig : tableConfigs) {
+      String tableName = tableConfig.getTableName();
+
+      if (tableConfig.getTableType() != TableType.REALTIME) {
+        LOGGER.warn("Skip generating task: {} for non-REALTIME table: {}", taskType, tableName);
+        continue;
+      }
+
+      String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+      String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
+
+      // Only schedule 1 task of this type, per table
+      Map<String, TaskState> nonCompletedTasks =
+          TaskGeneratorUtils.getNonCompletedTasks(taskType, realtimeTableName, _clusterInfoProvider);
+      if (!nonCompletedTasks.isEmpty()) {
+        LOGGER.warn("Found non-completed tasks: {} for same table: {}. Skipping scheduling new task.",
+            nonCompletedTasks.keySet(), realtimeTableName);

Review comment:
       Maybe you want to log the first few of these ? If for any reason tasks start to get queued, we will have a long list here.

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
##########
@@ -62,16 +62,35 @@ private MinionConstants() {
     public static final String MERGED_SEGMENT_NAME_KEY = "mergedSegmentNameKey";
   }
 
+  /**
+   * Creates segments for the OFFLINE table, using completed segments from the corresponding REALTIME table
+   */
   public static class RealtimeToOfflineSegmentsTask {
     public static final String TASK_TYPE = "realtimeToOfflineSegmentsTask";
-    // window
+
+    /**
+     * The time window size for the task.
+     * e.g. if set to "1d", then task is scheduled to run for a 1 day window
+     */
+    public static final String BUCKET_TIME_PERIOD_KEY = "bucketTimePeriod";
+    /**
+     * The time period to wait before picking segments for this task
+     * e.g. if set to "2d", no task will be scheduled for a time window younger than 2 days

Review comment:
       ```suggestion
        * e.g. if set to "2d", no task will be scheduled for a segments newer than 2 days
   ```
   :)
   Is this interpretation correct?

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
##########
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion.generator;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
+import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
+import org.apache.pinot.common.utils.CommonConstants.Segment;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A {@link PinotTaskGenerator} implementation for generating tasks of type {@link RealtimeToOfflineSegmentsTask}
+ *
+ * These will be generated only for REALTIME tables.
+ * At any given time, only 1 task of this type should be generated for a table.
+ *
+ * Steps:
+ *  - The watermarkMillis is read from the {@link RealtimeToOfflineSegmentsTaskMetadata} ZNode found at MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/tableNameWithType
+ *      In case of cold-start, no ZNode will exist.
+ *      A new ZNode will be created, with watermarkMillis as the smallest time found in the COMPLETED segments (or using start time config)
+ *  - The execution window for the task is calculated as, windowStartMillis = waterMarkMillis, windowEndMillis = windowStartMillis + bucketTimeMillis,
+ *      where bucketTime can be provided in the taskConfigs (default 1d)
+ *  - If the execution window is not older than bufferTimeMillis, no task will be generated,
+ *      where bufferTime can be provided in the taskConfigs (default 2d)
+ *  - Segment metadata is scanned for all COMPLETED segments, to pick those containing data in window [windowStartMillis, windowEndMillis)
+ *  - A PinotTaskConfig is created, with segment information, execution window, and any config specific to the task
+ */
+public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerator {
+  private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskGenerator.class);
+
+  private static final String DEFAULT_BUCKET_PERIOD = "1d";
+  private static final String DEFAULT_BUFFER_PERIOD = "2d";
+
+  private final ClusterInfoProvider _clusterInfoProvider;
+
+  public RealtimeToOfflineSegmentsTaskGenerator(ClusterInfoProvider clusterInfoProvider) {
+    _clusterInfoProvider = clusterInfoProvider;
+  }
+
+  @Override
+  public String getTaskType() {
+    return RealtimeToOfflineSegmentsTask.TASK_TYPE;
+  }
+
+  @Override
+  public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
+    String taskType = RealtimeToOfflineSegmentsTask.TASK_TYPE;
+
+    List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+
+    for (TableConfig tableConfig : tableConfigs) {
+      String tableName = tableConfig.getTableName();
+
+      if (tableConfig.getTableType() != TableType.REALTIME) {
+        LOGGER.warn("Skip generating task: {} for non-REALTIME table: {}", taskType, tableName);
+        continue;
+      }
+
+      String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+      String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
+
+      // Only schedule 1 task of this type, per table
+      Map<String, TaskState> nonCompletedTasks =
+          TaskGeneratorUtils.getNonCompletedTasks(taskType, realtimeTableName, _clusterInfoProvider);
+      if (!nonCompletedTasks.isEmpty()) {
+        LOGGER.warn("Found non-completed tasks: {} for same table: {}. Skipping scheduling new task.",
+            nonCompletedTasks.keySet(), realtimeTableName);
+        continue;
+      }
+
+      List<LLCRealtimeSegmentZKMetadata> realtimeSegmentsMetadataList =
+          _clusterInfoProvider.getLLCRealtimeSegmentsMetadata(realtimeTableName);
+      List<LLCRealtimeSegmentZKMetadata> completedSegmentsMetadataList = new ArrayList<>();
+      for (LLCRealtimeSegmentZKMetadata metadata : realtimeSegmentsMetadataList) {
+        if (metadata.getStatus().equals(Segment.Realtime.Status.DONE)) {
+          completedSegmentsMetadataList.add(metadata);
+        }
+      }
+      if (completedSegmentsMetadataList.isEmpty()) {
+        LOGGER
+            .info("No realtime completed segments found for table: {}, skipping task generation: {}", realtimeTableName,
+                taskType);
+        continue;
+      }
+
+      TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
+      Preconditions.checkState(tableTaskConfig != null);
+      Map<String, String> taskConfigs = tableTaskConfig.getConfigsForTaskType(taskType);
+      Preconditions.checkState(taskConfigs != null, "Task config shouldn't be null for Table: {}", tableName);
+
+      // Get the bucket size and buffer
+      String bucketTimeStr =
+          taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUCKET_TIME_PERIOD_KEY, DEFAULT_BUCKET_PERIOD);
+      String bufferTimeStr =
+          taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUFFER_TIME_PERIOD_KEY, DEFAULT_BUFFER_PERIOD);
+      long bucketMillis = TimeUtils.convertPeriodToMillis(bucketTimeStr);
+      long bufferMillis = TimeUtils.convertPeriodToMillis(bufferTimeStr);
+
+      // Fetch RealtimeToOfflineSegmentsTaskMetadata ZNode for reading watermark
+      RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata =
+          _clusterInfoProvider.getMinionRealtimeToOfflineSegmentsTaskMetadata(realtimeTableName);
+
+      if (realtimeToOfflineSegmentsTaskMetadata == null) {
+        // No ZNode exists. Cold-start.
+        long watermarkMillis;
+
+        String startTimeStr = taskConfigs.get(RealtimeToOfflineSegmentsTask.START_TIME_MILLIS_KEY);
+        if (startTimeStr != null) {
+          // Use startTime config if provided in taskConfigs
+          watermarkMillis = Long.parseLong(startTimeStr);
+        } else {
+          // Find the smallest time from all segments
+          RealtimeSegmentZKMetadata minSegmentZkMetadata = null;
+          for (LLCRealtimeSegmentZKMetadata realtimeSegmentZKMetadata : completedSegmentsMetadataList) {
+            if (minSegmentZkMetadata == null || realtimeSegmentZKMetadata.getStartTime() < minSegmentZkMetadata
+                .getStartTime()) {
+              minSegmentZkMetadata = realtimeSegmentZKMetadata;
+            }
+          }
+          Preconditions.checkState(minSegmentZkMetadata != null);
+
+          // Convert the segment minTime to millis
+          long minSegmentStartTimeMillis =
+              minSegmentZkMetadata.getTimeUnit().toMillis(minSegmentZkMetadata.getStartTime());
+
+          // Round off according to the bucket. This ensures we align the offline segments to proper time boundaries
+          // For example, if start time millis is 20200813T12:34:59, we want to create the first segment for window [20200813, 20200814)
+          watermarkMillis = (minSegmentStartTimeMillis / bucketMillis) * bucketMillis;
+        }
+
+        // Create RealtimeToOfflineSegmentsTaskMetadata ZNode using watermark calculated above
+        realtimeToOfflineSegmentsTaskMetadata =
+            new RealtimeToOfflineSegmentsTaskMetadata(realtimeTableName, watermarkMillis);
+        _clusterInfoProvider.setRealtimeToOfflineSegmentsTaskMetadata(realtimeToOfflineSegmentsTaskMetadata);
+      }
+
+      // WindowStart = watermark. WindowEnd = windowStart + bucket.
+      long windowStartMillis = realtimeToOfflineSegmentsTaskMetadata.getWatermarkMillis();
+      long windowEndMillis = windowStartMillis + bucketMillis;
+
+      // Check that execution window is older than bufferTime
+      if (windowEndMillis > System.currentTimeMillis() - bufferMillis) {
+        LOGGER.info(
+            "Window with start: {} and end: {} is not older than buffer time: {} configured as {} ago. Skipping scheduling task: {}",
+            windowStartMillis, windowEndMillis, bufferMillis, bufferTimeStr, taskType);
+      }
+
+      // Find all COMPLETED segments with data overlapping execution window: windowStart (inclusive) to windowEnd (exclusive)
+      List<String> segmentNames = new ArrayList<>();

Review comment:
       Instead of two lists, maybe you can have another list of LLCRealtimeSegmentZkMetadata?

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
##########
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion.generator;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
+import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
+import org.apache.pinot.common.utils.CommonConstants.Segment;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A {@link PinotTaskGenerator} implementation for generating tasks of type {@link RealtimeToOfflineSegmentsTask}
+ *
+ * These will be generated only for REALTIME tables.
+ * At any given time, only 1 task of this type should be generated for a table.
+ *
+ * Steps:
+ *  - The watermarkMillis is read from the {@link RealtimeToOfflineSegmentsTaskMetadata} ZNode found at MINION_TASK_METADATA/realtimeToOfflineSegmentsTask/tableNameWithType
+ *      In case of cold-start, no ZNode will exist.
+ *      A new ZNode will be created, with watermarkMillis as the smallest time found in the COMPLETED segments (or using start time config)
+ *  - The execution window for the task is calculated as, windowStartMillis = waterMarkMillis, windowEndMillis = windowStartMillis + bucketTimeMillis,
+ *      where bucketTime can be provided in the taskConfigs (default 1d)
+ *  - If the execution window is not older than bufferTimeMillis, no task will be generated,
+ *      where bufferTime can be provided in the taskConfigs (default 2d)
+ *  - Segment metadata is scanned for all COMPLETED segments, to pick those containing data in window [windowStartMillis, windowEndMillis)
+ *  - A PinotTaskConfig is created, with segment information, execution window, and any config specific to the task
+ */
+public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerator {
+  private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskGenerator.class);
+
+  private static final String DEFAULT_BUCKET_PERIOD = "1d";
+  private static final String DEFAULT_BUFFER_PERIOD = "2d";
+
+  private final ClusterInfoProvider _clusterInfoProvider;
+
+  public RealtimeToOfflineSegmentsTaskGenerator(ClusterInfoProvider clusterInfoProvider) {
+    _clusterInfoProvider = clusterInfoProvider;
+  }
+
+  @Override
+  public String getTaskType() {
+    return RealtimeToOfflineSegmentsTask.TASK_TYPE;
+  }
+
+  @Override
+  public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
+    String taskType = RealtimeToOfflineSegmentsTask.TASK_TYPE;
+
+    List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+
+    for (TableConfig tableConfig : tableConfigs) {
+      String tableName = tableConfig.getTableName();
+
+      if (tableConfig.getTableType() != TableType.REALTIME) {
+        LOGGER.warn("Skip generating task: {} for non-REALTIME table: {}", taskType, tableName);
+        continue;
+      }
+
+      String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+      String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
+
+      // Only schedule 1 task of this type, per table
+      Map<String, TaskState> nonCompletedTasks =
+          TaskGeneratorUtils.getNonCompletedTasks(taskType, realtimeTableName, _clusterInfoProvider);
+      if (!nonCompletedTasks.isEmpty()) {
+        LOGGER.warn("Found non-completed tasks: {} for same table: {}. Skipping scheduling new task.",
+            nonCompletedTasks.keySet(), realtimeTableName);
+        continue;
+      }
+
+      List<LLCRealtimeSegmentZKMetadata> realtimeSegmentsMetadataList =
+          _clusterInfoProvider.getLLCRealtimeSegmentsMetadata(realtimeTableName);
+      List<LLCRealtimeSegmentZKMetadata> completedSegmentsMetadataList = new ArrayList<>();
+      for (LLCRealtimeSegmentZKMetadata metadata : realtimeSegmentsMetadataList) {
+        if (metadata.getStatus().equals(Segment.Realtime.Status.DONE)) {
+          completedSegmentsMetadataList.add(metadata);
+        }
+      }
+      if (completedSegmentsMetadataList.isEmpty()) {
+        LOGGER
+            .info("No realtime completed segments found for table: {}, skipping task generation: {}", realtimeTableName,
+                taskType);
+        continue;
+      }
+
+      TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
+      Preconditions.checkState(tableTaskConfig != null);
+      Map<String, String> taskConfigs = tableTaskConfig.getConfigsForTaskType(taskType);
+      Preconditions.checkState(taskConfigs != null, "Task config shouldn't be null for Table: {}", tableName);
+
+      // Get the bucket size and buffer
+      String bucketTimeStr =
+          taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUCKET_TIME_PERIOD_KEY, DEFAULT_BUCKET_PERIOD);
+      String bufferTimeStr =
+          taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUFFER_TIME_PERIOD_KEY, DEFAULT_BUFFER_PERIOD);
+      long bucketMillis = TimeUtils.convertPeriodToMillis(bucketTimeStr);
+      long bufferMillis = TimeUtils.convertPeriodToMillis(bufferTimeStr);
+
+      // Fetch RealtimeToOfflineSegmentsTaskMetadata ZNode for reading watermark
+      RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata =
+          _clusterInfoProvider.getMinionRealtimeToOfflineSegmentsTaskMetadata(realtimeTableName);
+
+      if (realtimeToOfflineSegmentsTaskMetadata == null) {
+        // No ZNode exists. Cold-start.
+        long watermarkMillis;
+
+        String startTimeStr = taskConfigs.get(RealtimeToOfflineSegmentsTask.START_TIME_MILLIS_KEY);
+        if (startTimeStr != null) {
+          // Use startTime config if provided in taskConfigs
+          watermarkMillis = Long.parseLong(startTimeStr);
+        } else {
+          // Find the smallest time from all segments
+          RealtimeSegmentZKMetadata minSegmentZkMetadata = null;
+          for (LLCRealtimeSegmentZKMetadata realtimeSegmentZKMetadata : completedSegmentsMetadataList) {
+            if (minSegmentZkMetadata == null || realtimeSegmentZKMetadata.getStartTime() < minSegmentZkMetadata
+                .getStartTime()) {
+              minSegmentZkMetadata = realtimeSegmentZKMetadata;
+            }
+          }
+          Preconditions.checkState(minSegmentZkMetadata != null);
+
+          // Convert the segment minTime to millis
+          long minSegmentStartTimeMillis =
+              minSegmentZkMetadata.getTimeUnit().toMillis(minSegmentZkMetadata.getStartTime());
+
+          // Round off according to the bucket. This ensures we align the offline segments to proper time boundaries
+          // For example, if start time millis is 20200813T12:34:59, we want to create the first segment for window [20200813, 20200814)
+          watermarkMillis = (minSegmentStartTimeMillis / bucketMillis) * bucketMillis;
+        }
+
+        // Create RealtimeToOfflineSegmentsTaskMetadata ZNode using watermark calculated above
+        realtimeToOfflineSegmentsTaskMetadata =
+            new RealtimeToOfflineSegmentsTaskMetadata(realtimeTableName, watermarkMillis);
+        _clusterInfoProvider.setRealtimeToOfflineSegmentsTaskMetadata(realtimeToOfflineSegmentsTaskMetadata);
+      }
+
+      // WindowStart = watermark. WindowEnd = windowStart + bucket.
+      long windowStartMillis = realtimeToOfflineSegmentsTaskMetadata.getWatermarkMillis();
+      long windowEndMillis = windowStartMillis + bucketMillis;
+
+      // Check that execution window is older than bufferTime
+      if (windowEndMillis > System.currentTimeMillis() - bufferMillis) {

Review comment:
       if you use a method to get the current time, you can test the logic in this class by manipulating time to anything.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org