You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2021/10/08 03:56:30 UTC

[pinot] branch master updated: Introduce MinionConf, move END_REPLACE_SEGMENTS_TIMEOUT_MS to minion config instead of task config. (#7516)

This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 6969b38  Introduce MinionConf, move END_REPLACE_SEGMENTS_TIMEOUT_MS to minion config instead of task config. (#7516)
6969b38 is described below

commit 6969b38fb03537e77764ca5b9e9524165f97251c
Author: Jiapeng Tao <ji...@linkedin.com>
AuthorDate: Thu Oct 7 20:56:11 2021 -0700

    Introduce MinionConf, move END_REPLACE_SEGMENTS_TIMEOUT_MS to minion config instead of task config. (#7516)
---
 .../apache/pinot/core/common/MinionConstants.java  |  1 -
 .../minion/tasks/TestTaskExecutorFactory.java      |  5 ++
 .../org/apache/pinot/minion/BaseMinionStarter.java | 19 +++----
 .../java/org/apache/pinot/minion/MinionConf.java   | 66 ++++++++++++++++++++++
 .../minion/executor/PinotTaskExecutorFactory.java  |  9 +++
 .../executor/TaskExecutorFactoryRegistry.java      |  5 +-
 .../BaseMultipleSegmentsConversionExecutor.java    | 14 +++--
 .../ConvertToRawIndexTaskExecutorFactory.java      |  5 ++
 .../tasks/mergerollup/MergeRollupTaskExecutor.java |  5 ++
 .../MergeRollupTaskExecutorFactory.java            |  9 ++-
 .../mergerollup/MergeRollupTaskGenerator.java      |  3 -
 .../tasks/purge/PurgeTaskExecutorFactory.java      |  5 ++
 .../RealtimeToOfflineSegmentsTaskExecutor.java     |  5 +-
 ...altimeToOfflineSegmentsTaskExecutorFactory.java | 10 +++-
 ...egmentGenerationAndPushTaskExecutorFactory.java |  5 ++
 .../mergerollup/MergeRollupTaskExecutorTest.java   |  3 +-
 .../RealtimeToOfflineSegmentsTaskExecutorTest.java | 16 +++---
 17 files changed, 151 insertions(+), 34 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index 36341b8..06ff9d5 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -49,7 +49,6 @@ public class MinionConstants {
 
   public static final String TABLE_MAX_NUM_TASKS_KEY = "tableMaxNumTasks";
   public static final String ENABLE_REPLACE_SEGMENTS_KEY = "enableReplaceSegments";
-  public static final String END_REPLACE_SEGMENTS_SOCKET_TIMEOUT_MS_KEY = "endReplaceSegmentsSocketTimeoutMs";
 
   public static class ConvertToRawIndexTask {
     public static final String TASK_TYPE = "ConvertToRawIndexTask";
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/plugin/minion/tasks/TestTaskExecutorFactory.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/plugin/minion/tasks/TestTaskExecutorFactory.java
index 149ee19..47d8610 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/plugin/minion/tasks/TestTaskExecutorFactory.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/plugin/minion/tasks/TestTaskExecutorFactory.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
 import org.apache.pinot.core.minion.PinotTaskConfig;
 import org.apache.pinot.integration.tests.SimpleMinionClusterIntegrationTest;
+import org.apache.pinot.minion.MinionConf;
 import org.apache.pinot.minion.exception.TaskCancelledException;
 import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
 import org.apache.pinot.minion.executor.PinotTaskExecutor;
@@ -48,6 +49,10 @@ public class TestTaskExecutorFactory implements PinotTaskExecutorFactory {
   }
 
   @Override
+  public void init(MinionTaskZkMetadataManager zkMetadataManager, MinionConf minionConf) {
+  }
+
+  @Override
   public String getTaskType() {
     return SimpleMinionClusterIntegrationTest.TASK_TYPE;
   }
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java b/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java
index 5083261..e08033c 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java
@@ -57,7 +57,6 @@ import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
 import org.apache.pinot.spi.services.ServiceRole;
 import org.apache.pinot.spi.services.ServiceStartable;
 import org.apache.pinot.spi.utils.CommonConstants;
-import org.apache.pinot.spi.utils.NetUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -70,7 +69,7 @@ public abstract class BaseMinionStarter implements ServiceStartable {
 
   private static final String HTTPS_ENABLED = "enabled";
 
-  protected PinotConfiguration _config;
+  protected MinionConf _config;
   protected String _hostname;
   protected int _port;
   protected String _instanceId;
@@ -83,14 +82,12 @@ public abstract class BaseMinionStarter implements ServiceStartable {
   @Override
   public void init(PinotConfiguration config)
       throws Exception {
-    _config = config;
-    String helixClusterName = _config.getProperty(CommonConstants.Helix.CONFIG_OF_CLUSTER_NAME);
-    String zkAddress = _config.getProperty(CommonConstants.Helix.CONFIG_OF_ZOOKEEPR_SERVER);
-    _hostname = _config.getProperty(CommonConstants.Helix.KEY_OF_MINION_HOST,
-        _config.getProperty(CommonConstants.Helix.SET_INSTANCE_ID_TO_HOSTNAME_KEY, false) ? NetUtils
-            .getHostnameOrAddress() : NetUtils.getHostAddress());
-    _port = _config.getProperty(CommonConstants.Helix.KEY_OF_MINION_PORT, CommonConstants.Minion.DEFAULT_HELIX_PORT);
-    _instanceId = _config.getProperty(CommonConstants.Helix.Instance.INSTANCE_ID_KEY);
+    _config = new MinionConf(config.toMap());
+    String helixClusterName = _config.getHelixClusterName();
+    String zkAddress = _config.getZkAddress();
+    _hostname = _config.getHostName();
+    _port = _config.getPort();
+    _instanceId = _config.getInstanceId();
     if (_instanceId != null) {
       // NOTE: Force all instances to have the same prefix in order to derive the instance type based on the instance id
       Preconditions.checkState(_instanceId.startsWith(CommonConstants.Helix.PREFIX_OF_MINION_INSTANCE),
@@ -102,7 +99,7 @@ public abstract class BaseMinionStarter implements ServiceStartable {
     setupHelixSystemProperties();
     _helixManager = new ZKHelixManager(helixClusterName, _instanceId, InstanceType.PARTICIPANT, zkAddress);
     MinionTaskZkMetadataManager minionTaskZkMetadataManager = new MinionTaskZkMetadataManager(_helixManager);
-    _taskExecutorFactoryRegistry = new TaskExecutorFactoryRegistry(minionTaskZkMetadataManager);
+    _taskExecutorFactoryRegistry = new TaskExecutorFactoryRegistry(minionTaskZkMetadataManager, _config);
     _eventObserverFactoryRegistry = new EventObserverFactoryRegistry(minionTaskZkMetadataManager);
   }
 
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/MinionConf.java b/pinot-minion/src/main/java/org/apache/pinot/minion/MinionConf.java
new file mode 100644
index 0000000..49d04ea
--- /dev/null
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/MinionConf.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.minion;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.NetUtils;
+
+
+public class MinionConf extends PinotConfiguration {
+  public static final String END_REPLACE_SEGMENTS_TIMEOUT_MS_KEY = "pinot.minion.endReplaceSegments.timeoutMs";
+  public static final int DEFAULT_END_REPLACE_SEGMENTS_SOCKET_TIMEOUT_MS = 10 * 60 * 1000; // 10 mins
+
+  public MinionConf() {
+    super(new HashMap<>());
+  }
+
+  public MinionConf(Map<String, Object> baseProperties) {
+    super(baseProperties);
+  }
+
+  public String getHelixClusterName() {
+    return getProperty(CommonConstants.Helix.CONFIG_OF_CLUSTER_NAME);
+  }
+
+  public String getZkAddress() {
+    return getProperty(CommonConstants.Helix.CONFIG_OF_ZOOKEEPR_SERVER);
+  }
+
+  public String getHostName()
+      throws Exception {
+    return getProperty(CommonConstants.Helix.KEY_OF_MINION_HOST,
+        getProperty(CommonConstants.Helix.SET_INSTANCE_ID_TO_HOSTNAME_KEY, false) ? NetUtils
+            .getHostnameOrAddress() : NetUtils.getHostAddress());
+  }
+
+  public int getPort() {
+    return getProperty(CommonConstants.Helix.KEY_OF_MINION_PORT, CommonConstants.Minion.DEFAULT_HELIX_PORT);
+  }
+
+  public String getInstanceId() {
+    return getProperty(CommonConstants.Helix.Instance.INSTANCE_ID_KEY);
+  }
+
+  public int getEndReplaceSegmentsTimeoutMs() {
+    return getProperty(END_REPLACE_SEGMENTS_TIMEOUT_MS_KEY, DEFAULT_END_REPLACE_SEGMENTS_SOCKET_TIMEOUT_MS);
+  }
+}
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/PinotTaskExecutorFactory.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/PinotTaskExecutorFactory.java
index a51044b..9c7aeed 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/PinotTaskExecutorFactory.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/PinotTaskExecutorFactory.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pinot.minion.executor;
 
+import org.apache.pinot.minion.MinionConf;
+
+
 /**
  * Factory for {@link PinotTaskExecutor}.
  */
@@ -26,9 +29,15 @@ public interface PinotTaskExecutorFactory {
   /**
    * Initializes the task executor factory.
    */
+  @Deprecated
   void init(MinionTaskZkMetadataManager zkMetadataManager);
 
   /**
+   * Initializes the task executor factory.
+   */
+  void init(MinionTaskZkMetadataManager zkMetadataManager, MinionConf minionConf);
+
+  /**
    * Returns the task type of the executor.
    */
   String getTaskType();
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
index 3ba1c40..0d15bdb 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
@@ -21,6 +21,7 @@ package org.apache.pinot.minion.executor;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
+import org.apache.pinot.minion.MinionConf;
 import org.apache.pinot.spi.annotations.minion.TaskExecutorFactory;
 import org.apache.pinot.spi.utils.PinotReflectionUtils;
 import org.slf4j.Logger;
@@ -45,7 +46,7 @@ public class TaskExecutorFactoryRegistry {
    * NOTE: In order to plugin a class using reflection, the class should include ".plugin.minion.tasks." in its class
    * path. This convention can significantly reduce the time of class scanning.
    */
-  public TaskExecutorFactoryRegistry(MinionTaskZkMetadataManager zkMetadataManager) {
+  public TaskExecutorFactoryRegistry(MinionTaskZkMetadataManager zkMetadataManager, MinionConf minionConf) {
     long startTimeMs = System.currentTimeMillis();
     Set<Class<?>> classes = getTaskExecutorFactoryClasses();
     for (Class<?> clazz : classes) {
@@ -53,7 +54,7 @@ public class TaskExecutorFactoryRegistry {
       if (annotation.enabled()) {
         try {
           PinotTaskExecutorFactory taskExecutorFactory = (PinotTaskExecutorFactory) clazz.newInstance();
-          taskExecutorFactory.init(zkMetadataManager);
+          taskExecutorFactory.init(zkMetadataManager, minionConf);
           registerTaskExecutorFactory(taskExecutorFactory);
         } catch (Exception e) {
           LOGGER.error("Caught exception while initializing and registering task executor factory: {}, skipping it",
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
index fd65337..cc9370d 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
@@ -38,6 +38,7 @@ import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
 import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.minion.MinionConf;
 import org.apache.pinot.minion.exception.TaskCancelledException;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
@@ -57,6 +58,12 @@ import org.slf4j.LoggerFactory;
 public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExecutor {
   private static final Logger LOGGER = LoggerFactory.getLogger(BaseMultipleSegmentsConversionExecutor.class);
 
+  protected MinionConf _minionConf;
+
+  public BaseMultipleSegmentsConversionExecutor(MinionConf minionConf) {
+    _minionConf = minionConf;
+  }
+
   /**
    * Converts the segment based on the given {@link PinotTaskConfig}.
    *
@@ -206,12 +213,9 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
 
       // Update the segment lineage to indicate that the segment replacement is done.
       if (replaceSegmentsEnabled) {
-        int endReplaceSegmentsSocketTimeoutMs =
-            configs.get(MinionConstants.END_REPLACE_SEGMENTS_SOCKET_TIMEOUT_MS_KEY) != null
-                ? Integer.parseInt(configs.get(MinionConstants.END_REPLACE_SEGMENTS_SOCKET_TIMEOUT_MS_KEY))
-                : FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS;
         SegmentConversionUtils
-            .endSegmentReplace(tableNameWithType, uploadURL, lineageEntryId, endReplaceSegmentsSocketTimeoutMs);
+            .endSegmentReplace(tableNameWithType, uploadURL, lineageEntryId,
+                _minionConf.getEndReplaceSegmentsTimeoutMs());
       }
 
       String outputSegmentNames = segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName)
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskExecutorFactory.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskExecutorFactory.java
index 59e805a..0b208c4 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskExecutorFactory.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskExecutorFactory.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.plugin.minion.tasks.converttorawindex;
 
 import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.minion.MinionConf;
 import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
 import org.apache.pinot.minion.executor.PinotTaskExecutor;
 import org.apache.pinot.minion.executor.PinotTaskExecutorFactory;
@@ -33,6 +34,10 @@ public class ConvertToRawIndexTaskExecutorFactory implements PinotTaskExecutorFa
   }
 
   @Override
+  public void init(MinionTaskZkMetadataManager zkMetadataManager, MinionConf minionConf) {
+  }
+
+  @Override
   public String getTaskType() {
     return MinionConstants.ConvertToRawIndexTask.TASK_TYPE;
   }
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutor.java
index 47ab740..f054b02 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutor.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutor.java
@@ -29,6 +29,7 @@ import org.apache.pinot.core.common.MinionConstants.MergeRollupTask;
 import org.apache.pinot.core.minion.PinotTaskConfig;
 import org.apache.pinot.core.segment.processing.framework.SegmentProcessorConfig;
 import org.apache.pinot.core.segment.processing.framework.SegmentProcessorFramework;
+import org.apache.pinot.minion.MinionConf;
 import org.apache.pinot.plugin.minion.tasks.BaseMultipleSegmentsConversionExecutor;
 import org.apache.pinot.plugin.minion.tasks.MergeTaskUtils;
 import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
@@ -46,6 +47,10 @@ import org.slf4j.LoggerFactory;
 public class MergeRollupTaskExecutor extends BaseMultipleSegmentsConversionExecutor {
   private static final Logger LOGGER = LoggerFactory.getLogger(MergeRollupTaskExecutor.class);
 
+  public MergeRollupTaskExecutor(MinionConf minionConf) {
+    super(minionConf);
+  }
+
   @Override
   protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, List<File> segmentDirs,
       File workingDir)
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutorFactory.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutorFactory.java
index 6183f7b..9b086b2 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutorFactory.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutorFactory.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.plugin.minion.tasks.mergerollup;
 
 import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.minion.MinionConf;
 import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
 import org.apache.pinot.minion.executor.PinotTaskExecutor;
 import org.apache.pinot.minion.executor.PinotTaskExecutorFactory;
@@ -27,18 +28,24 @@ import org.apache.pinot.spi.annotations.minion.TaskExecutorFactory;
 
 @TaskExecutorFactory
 public class MergeRollupTaskExecutorFactory implements PinotTaskExecutorFactory {
+  private MinionConf _minionConf;
 
   @Override
   public void init(MinionTaskZkMetadataManager zkMetadataManager) {
   }
 
   @Override
+  public void init(MinionTaskZkMetadataManager zkMetadataManager, MinionConf minionConf) {
+    _minionConf = minionConf;
+  }
+
+  @Override
   public String getTaskType() {
     return MinionConstants.MergeRollupTask.TASK_TYPE;
   }
 
   @Override
   public PinotTaskExecutor create() {
-    return new MergeRollupTaskExecutor();
+    return new MergeRollupTaskExecutor(_minionConf);
   }
 }
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
index 5c8ec8f..c1ab565 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
@@ -94,7 +94,6 @@ import org.slf4j.LoggerFactory;
 public class MergeRollupTaskGenerator implements PinotTaskGenerator {
   private static final Logger LOGGER = LoggerFactory.getLogger(MergeRollupTaskGenerator.class);
 
-  public static final int END_REPLACE_SEGMENTS_SOCKET_TIMEOUT_MS = 30 * 60 * 1000; // 30 mins
   private static final int DEFAULT_MAX_NUM_RECORDS_PER_TASK = 50_000_000;
   private static final String REFRESH = "REFRESH";
 
@@ -486,8 +485,6 @@ public class MergeRollupTaskGenerator implements PinotTaskGenerator {
       configs.put(MergeRollupTask.SEGMENT_NAME_PREFIX_KEY,
           MergeRollupTask.MERGED_SEGMENT_NAME_PREFIX + mergeLevel + "_" + System.currentTimeMillis() + "_" + i + "_"
               + TableNameBuilder.extractRawTableName(offlineTableName));
-      configs.put(MinionConstants.END_REPLACE_SEGMENTS_SOCKET_TIMEOUT_MS_KEY,
-          String.valueOf(END_REPLACE_SEGMENTS_SOCKET_TIMEOUT_MS));
       pinotTaskConfigs.add(new PinotTaskConfig(MergeRollupTask.TASK_TYPE, configs));
     }
 
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutorFactory.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutorFactory.java
index a76da57..036ed02 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutorFactory.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutorFactory.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.plugin.minion.tasks.purge;
 
 import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.minion.MinionConf;
 import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
 import org.apache.pinot.minion.executor.PinotTaskExecutor;
 import org.apache.pinot.minion.executor.PinotTaskExecutorFactory;
@@ -33,6 +34,10 @@ public class PurgeTaskExecutorFactory implements PinotTaskExecutorFactory {
   }
 
   @Override
+  public void init(MinionTaskZkMetadataManager zkMetadataManager, MinionConf minionConf) {
+  }
+
+  @Override
   public String getTaskType() {
     return MinionConstants.PurgeTask.TASK_TYPE;
   }
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java
index dfa3a8f..c957320 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java
@@ -33,6 +33,7 @@ import org.apache.pinot.core.minion.PinotTaskConfig;
 import org.apache.pinot.core.segment.processing.framework.MergeType;
 import org.apache.pinot.core.segment.processing.framework.SegmentProcessorConfig;
 import org.apache.pinot.core.segment.processing.framework.SegmentProcessorFramework;
+import org.apache.pinot.minion.MinionConf;
 import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
 import org.apache.pinot.plugin.minion.tasks.BaseMultipleSegmentsConversionExecutor;
 import org.apache.pinot.plugin.minion.tasks.MergeTaskUtils;
@@ -72,7 +73,9 @@ public class RealtimeToOfflineSegmentsTaskExecutor extends BaseMultipleSegmentsC
   private final MinionTaskZkMetadataManager _minionTaskZkMetadataManager;
   private int _expectedVersion = Integer.MIN_VALUE;
 
-  public RealtimeToOfflineSegmentsTaskExecutor(MinionTaskZkMetadataManager minionTaskZkMetadataManager) {
+  public RealtimeToOfflineSegmentsTaskExecutor(MinionTaskZkMetadataManager minionTaskZkMetadataManager,
+      MinionConf minionConf) {
+    super(minionConf);
     _minionTaskZkMetadataManager = minionTaskZkMetadataManager;
   }
 
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorFactory.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorFactory.java
index ee70a45..2c705df 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorFactory.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorFactory.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.plugin.minion.tasks.realtimetoofflinesegments;
 
 import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.minion.MinionConf;
 import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
 import org.apache.pinot.minion.executor.PinotTaskExecutor;
 import org.apache.pinot.minion.executor.PinotTaskExecutorFactory;
@@ -31,6 +32,7 @@ import org.apache.pinot.spi.annotations.minion.TaskExecutorFactory;
 @TaskExecutorFactory
 public class RealtimeToOfflineSegmentsTaskExecutorFactory implements PinotTaskExecutorFactory {
   private MinionTaskZkMetadataManager _zkMetadataManager;
+  private MinionConf _minionConf;
 
   @Override
   public void init(MinionTaskZkMetadataManager zkMetadataManager) {
@@ -38,12 +40,18 @@ public class RealtimeToOfflineSegmentsTaskExecutorFactory implements PinotTaskEx
   }
 
   @Override
+  public void init(MinionTaskZkMetadataManager zkMetadataManager, MinionConf minionConf) {
+    _zkMetadataManager = zkMetadataManager;
+    _minionConf = minionConf;
+  }
+
+  @Override
   public String getTaskType() {
     return MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE;
   }
 
   @Override
   public PinotTaskExecutor create() {
-    return new RealtimeToOfflineSegmentsTaskExecutor(_zkMetadataManager);
+    return new RealtimeToOfflineSegmentsTaskExecutor(_zkMetadataManager, _minionConf);
   }
 }
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskExecutorFactory.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskExecutorFactory.java
index 383f4e0..0a821e7 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskExecutorFactory.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskExecutorFactory.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.plugin.minion.tasks.segmentgenerationandpush;
 
 import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.minion.MinionConf;
 import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
 import org.apache.pinot.minion.executor.PinotTaskExecutor;
 import org.apache.pinot.minion.executor.PinotTaskExecutorFactory;
@@ -33,6 +34,10 @@ public class SegmentGenerationAndPushTaskExecutorFactory implements PinotTaskExe
   }
 
   @Override
+  public void init(MinionTaskZkMetadataManager zkMetadataManager, MinionConf minionConf) {
+  }
+
+  @Override
   public String getTaskType() {
     return MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE;
   }
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutorTest.java
index 3b3f404..69a4541 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutorTest.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutorTest.java
@@ -31,6 +31,7 @@ import org.apache.pinot.common.utils.SchemaUtils;
 import org.apache.pinot.common.utils.config.TableConfigUtils;
 import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.minion.MinionConf;
 import org.apache.pinot.minion.MinionContext;
 import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
 import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
@@ -104,7 +105,7 @@ public class MergeRollupTaskExecutorTest {
   @Test
   public void testConvert()
       throws Exception {
-    MergeRollupTaskExecutor mergeRollupTaskExecutor = new MergeRollupTaskExecutor();
+    MergeRollupTaskExecutor mergeRollupTaskExecutor = new MergeRollupTaskExecutor(new MinionConf());
     Map<String, String> configs = new HashMap<>();
     configs.put(MinionConstants.TABLE_NAME_KEY, "testTable_OFFLINE");
     configs.put(MinionConstants.MergeRollupTask.MERGE_LEVEL_KEY, "daily");
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorTest.java
index 1f88a43..4fcdbf6 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorTest.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorTest.java
@@ -215,7 +215,7 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
     FileUtils.deleteQuietly(WORKING_DIR);
 
     RealtimeToOfflineSegmentsTaskExecutor realtimeToOfflineSegmentsTaskExecutor =
-        new RealtimeToOfflineSegmentsTaskExecutor(null);
+        new RealtimeToOfflineSegmentsTaskExecutor(null, null);
     Map<String, String> configs = new HashMap<>();
     configs.put(MinionConstants.TABLE_NAME_KEY, "testTable_OFFLINE");
     configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY, "1600473600000");
@@ -242,7 +242,7 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
     FileUtils.deleteQuietly(WORKING_DIR);
 
     RealtimeToOfflineSegmentsTaskExecutor realtimeToOfflineSegmentsTaskExecutor =
-        new RealtimeToOfflineSegmentsTaskExecutor(null);
+        new RealtimeToOfflineSegmentsTaskExecutor(null, null);
     Map<String, String> configs = new HashMap<>();
     configs.put(MinionConstants.TABLE_NAME_KEY, TABLE_NAME);
     configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY, "1600473600000");
@@ -270,7 +270,7 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
     FileUtils.deleteQuietly(WORKING_DIR);
 
     RealtimeToOfflineSegmentsTaskExecutor realtimeToOfflineSegmentsTaskExecutor =
-        new RealtimeToOfflineSegmentsTaskExecutor(null);
+        new RealtimeToOfflineSegmentsTaskExecutor(null, null);
     Map<String, String> configs = new HashMap<>();
     configs.put(MinionConstants.TABLE_NAME_KEY, TABLE_NAME);
     configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY, "1600473600000");
@@ -299,7 +299,7 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
     FileUtils.deleteQuietly(WORKING_DIR);
 
     RealtimeToOfflineSegmentsTaskExecutor realtimeToOfflineSegmentsTaskExecutor =
-        new RealtimeToOfflineSegmentsTaskExecutor(null);
+        new RealtimeToOfflineSegmentsTaskExecutor(null, null);
     Map<String, String> configs = new HashMap<>();
     configs.put(MinionConstants.TABLE_NAME_KEY, TABLE_NAME);
     configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY, "1600473600000");
@@ -332,7 +332,7 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
     FileUtils.deleteQuietly(WORKING_DIR);
 
     RealtimeToOfflineSegmentsTaskExecutor realtimeToOfflineSegmentsTaskExecutor =
-        new RealtimeToOfflineSegmentsTaskExecutor(null);
+        new RealtimeToOfflineSegmentsTaskExecutor(null, null);
     Map<String, String> configs = new HashMap<>();
     configs.put(MinionConstants.TABLE_NAME_KEY, TABLE_NAME_WITH_PARTITIONING);
     configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY, "1600468000000");
@@ -364,7 +364,7 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
     FileUtils.deleteQuietly(WORKING_DIR);
 
     RealtimeToOfflineSegmentsTaskExecutor realtimeToOfflineSegmentsTaskExecutor =
-        new RealtimeToOfflineSegmentsTaskExecutor(null);
+        new RealtimeToOfflineSegmentsTaskExecutor(null, null);
     Map<String, String> configs = new HashMap<>();
     configs.put(MinionConstants.TABLE_NAME_KEY, TABLE_NAME_WITH_SORTED_COL);
     configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY, "1600473600000");
@@ -392,7 +392,7 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
     FileUtils.deleteQuietly(WORKING_DIR);
 
     RealtimeToOfflineSegmentsTaskExecutor realtimeToOfflineSegmentsTaskExecutor =
-        new RealtimeToOfflineSegmentsTaskExecutor(null);
+        new RealtimeToOfflineSegmentsTaskExecutor(null, null);
     Map<String, String> configs = new HashMap<>();
     configs.put(MinionConstants.TABLE_NAME_KEY, TABLE_NAME_EPOCH_HOURS);
     configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY, "1600473600000");
@@ -421,7 +421,7 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
     FileUtils.deleteQuietly(WORKING_DIR);
 
     RealtimeToOfflineSegmentsTaskExecutor realtimeToOfflineSegmentsTaskExecutor =
-        new RealtimeToOfflineSegmentsTaskExecutor(null);
+        new RealtimeToOfflineSegmentsTaskExecutor(null, null);
     Map<String, String> configs = new HashMap<>();
     configs.put(MinionConstants.TABLE_NAME_KEY, TABLE_NAME_SDF);
     configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY, "1600473600000");

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org