You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2021/10/08 03:56:30 UTC
[pinot] branch master updated: Introduce MinionConf,
move END_REPLACE_SEGMENTS_TIMEOUT_MS to minion config instead of
task config. (#7516)
This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 6969b38 Introduce MinionConf, move END_REPLACE_SEGMENTS_TIMEOUT_MS to minion config instead of task config. (#7516)
6969b38 is described below
commit 6969b38fb03537e77764ca5b9e9524165f97251c
Author: Jiapeng Tao <ji...@linkedin.com>
AuthorDate: Thu Oct 7 20:56:11 2021 -0700
Introduce MinionConf, move END_REPLACE_SEGMENTS_TIMEOUT_MS to minion config instead of task config. (#7516)
---
.../apache/pinot/core/common/MinionConstants.java | 1 -
.../minion/tasks/TestTaskExecutorFactory.java | 5 ++
.../org/apache/pinot/minion/BaseMinionStarter.java | 19 +++----
.../java/org/apache/pinot/minion/MinionConf.java | 66 ++++++++++++++++++++++
.../minion/executor/PinotTaskExecutorFactory.java | 9 +++
.../executor/TaskExecutorFactoryRegistry.java | 5 +-
.../BaseMultipleSegmentsConversionExecutor.java | 14 +++--
.../ConvertToRawIndexTaskExecutorFactory.java | 5 ++
.../tasks/mergerollup/MergeRollupTaskExecutor.java | 5 ++
.../MergeRollupTaskExecutorFactory.java | 9 ++-
.../mergerollup/MergeRollupTaskGenerator.java | 3 -
.../tasks/purge/PurgeTaskExecutorFactory.java | 5 ++
.../RealtimeToOfflineSegmentsTaskExecutor.java | 5 +-
...altimeToOfflineSegmentsTaskExecutorFactory.java | 10 +++-
...egmentGenerationAndPushTaskExecutorFactory.java | 5 ++
.../mergerollup/MergeRollupTaskExecutorTest.java | 3 +-
.../RealtimeToOfflineSegmentsTaskExecutorTest.java | 16 +++---
17 files changed, 151 insertions(+), 34 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index 36341b8..06ff9d5 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -49,7 +49,6 @@ public class MinionConstants {
public static final String TABLE_MAX_NUM_TASKS_KEY = "tableMaxNumTasks";
public static final String ENABLE_REPLACE_SEGMENTS_KEY = "enableReplaceSegments";
- public static final String END_REPLACE_SEGMENTS_SOCKET_TIMEOUT_MS_KEY = "endReplaceSegmentsSocketTimeoutMs";
public static class ConvertToRawIndexTask {
public static final String TASK_TYPE = "ConvertToRawIndexTask";
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/plugin/minion/tasks/TestTaskExecutorFactory.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/plugin/minion/tasks/TestTaskExecutorFactory.java
index 149ee19..47d8610 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/plugin/minion/tasks/TestTaskExecutorFactory.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/plugin/minion/tasks/TestTaskExecutorFactory.java
@@ -22,6 +22,7 @@ import java.util.Map;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.integration.tests.SimpleMinionClusterIntegrationTest;
+import org.apache.pinot.minion.MinionConf;
import org.apache.pinot.minion.exception.TaskCancelledException;
import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
import org.apache.pinot.minion.executor.PinotTaskExecutor;
@@ -48,6 +49,10 @@ public class TestTaskExecutorFactory implements PinotTaskExecutorFactory {
}
@Override
+ public void init(MinionTaskZkMetadataManager zkMetadataManager, MinionConf minionConf) {
+ }
+
+ @Override
public String getTaskType() {
return SimpleMinionClusterIntegrationTest.TASK_TYPE;
}
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java b/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java
index 5083261..e08033c 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java
@@ -57,7 +57,6 @@ import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
import org.apache.pinot.spi.services.ServiceRole;
import org.apache.pinot.spi.services.ServiceStartable;
import org.apache.pinot.spi.utils.CommonConstants;
-import org.apache.pinot.spi.utils.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,7 +69,7 @@ public abstract class BaseMinionStarter implements ServiceStartable {
private static final String HTTPS_ENABLED = "enabled";
- protected PinotConfiguration _config;
+ protected MinionConf _config;
protected String _hostname;
protected int _port;
protected String _instanceId;
@@ -83,14 +82,12 @@ public abstract class BaseMinionStarter implements ServiceStartable {
@Override
public void init(PinotConfiguration config)
throws Exception {
- _config = config;
- String helixClusterName = _config.getProperty(CommonConstants.Helix.CONFIG_OF_CLUSTER_NAME);
- String zkAddress = _config.getProperty(CommonConstants.Helix.CONFIG_OF_ZOOKEEPR_SERVER);
- _hostname = _config.getProperty(CommonConstants.Helix.KEY_OF_MINION_HOST,
- _config.getProperty(CommonConstants.Helix.SET_INSTANCE_ID_TO_HOSTNAME_KEY, false) ? NetUtils
- .getHostnameOrAddress() : NetUtils.getHostAddress());
- _port = _config.getProperty(CommonConstants.Helix.KEY_OF_MINION_PORT, CommonConstants.Minion.DEFAULT_HELIX_PORT);
- _instanceId = _config.getProperty(CommonConstants.Helix.Instance.INSTANCE_ID_KEY);
+ _config = new MinionConf(config.toMap());
+ String helixClusterName = _config.getHelixClusterName();
+ String zkAddress = _config.getZkAddress();
+ _hostname = _config.getHostName();
+ _port = _config.getPort();
+ _instanceId = _config.getInstanceId();
if (_instanceId != null) {
// NOTE: Force all instances to have the same prefix in order to derive the instance type based on the instance id
Preconditions.checkState(_instanceId.startsWith(CommonConstants.Helix.PREFIX_OF_MINION_INSTANCE),
@@ -102,7 +99,7 @@ public abstract class BaseMinionStarter implements ServiceStartable {
setupHelixSystemProperties();
_helixManager = new ZKHelixManager(helixClusterName, _instanceId, InstanceType.PARTICIPANT, zkAddress);
MinionTaskZkMetadataManager minionTaskZkMetadataManager = new MinionTaskZkMetadataManager(_helixManager);
- _taskExecutorFactoryRegistry = new TaskExecutorFactoryRegistry(minionTaskZkMetadataManager);
+ _taskExecutorFactoryRegistry = new TaskExecutorFactoryRegistry(minionTaskZkMetadataManager, _config);
_eventObserverFactoryRegistry = new EventObserverFactoryRegistry(minionTaskZkMetadataManager);
}
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/MinionConf.java b/pinot-minion/src/main/java/org/apache/pinot/minion/MinionConf.java
new file mode 100644
index 0000000..49d04ea
--- /dev/null
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/MinionConf.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.minion;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.NetUtils;
+
+
+public class MinionConf extends PinotConfiguration {
+ public static final String END_REPLACE_SEGMENTS_TIMEOUT_MS_KEY = "pinot.minion.endReplaceSegments.timeoutMs";
+ public static final int DEFAULT_END_REPLACE_SEGMENTS_SOCKET_TIMEOUT_MS = 10 * 60 * 1000; // 10 mins
+
+ public MinionConf() {
+ super(new HashMap<>());
+ }
+
+ public MinionConf(Map<String, Object> baseProperties) {
+ super(baseProperties);
+ }
+
+ public String getHelixClusterName() {
+ return getProperty(CommonConstants.Helix.CONFIG_OF_CLUSTER_NAME);
+ }
+
+ public String getZkAddress() {
+ return getProperty(CommonConstants.Helix.CONFIG_OF_ZOOKEEPR_SERVER);
+ }
+
+ public String getHostName()
+ throws Exception {
+ return getProperty(CommonConstants.Helix.KEY_OF_MINION_HOST,
+ getProperty(CommonConstants.Helix.SET_INSTANCE_ID_TO_HOSTNAME_KEY, false) ? NetUtils
+ .getHostnameOrAddress() : NetUtils.getHostAddress());
+ }
+
+ public int getPort() {
+ return getProperty(CommonConstants.Helix.KEY_OF_MINION_PORT, CommonConstants.Minion.DEFAULT_HELIX_PORT);
+ }
+
+ public String getInstanceId() {
+ return getProperty(CommonConstants.Helix.Instance.INSTANCE_ID_KEY);
+ }
+
+ public int getEndReplaceSegmentsTimeoutMs() {
+ return getProperty(END_REPLACE_SEGMENTS_TIMEOUT_MS_KEY, DEFAULT_END_REPLACE_SEGMENTS_SOCKET_TIMEOUT_MS);
+ }
+}
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/PinotTaskExecutorFactory.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/PinotTaskExecutorFactory.java
index a51044b..9c7aeed 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/PinotTaskExecutorFactory.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/PinotTaskExecutorFactory.java
@@ -18,6 +18,9 @@
*/
package org.apache.pinot.minion.executor;
+import org.apache.pinot.minion.MinionConf;
+
+
/**
* Factory for {@link PinotTaskExecutor}.
*/
@@ -26,9 +29,15 @@ public interface PinotTaskExecutorFactory {
/**
* Initializes the task executor factory.
*/
+ @Deprecated
void init(MinionTaskZkMetadataManager zkMetadataManager);
/**
+ * Initializes the task executor factory.
+ */
+ void init(MinionTaskZkMetadataManager zkMetadataManager, MinionConf minionConf);
+
+ /**
* Returns the task type of the executor.
*/
String getTaskType();
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
index 3ba1c40..0d15bdb 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
@@ -21,6 +21,7 @@ package org.apache.pinot.minion.executor;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import org.apache.pinot.minion.MinionConf;
import org.apache.pinot.spi.annotations.minion.TaskExecutorFactory;
import org.apache.pinot.spi.utils.PinotReflectionUtils;
import org.slf4j.Logger;
@@ -45,7 +46,7 @@ public class TaskExecutorFactoryRegistry {
* NOTE: In order to plugin a class using reflection, the class should include ".plugin.minion.tasks." in its class
* path. This convention can significantly reduce the time of class scanning.
*/
- public TaskExecutorFactoryRegistry(MinionTaskZkMetadataManager zkMetadataManager) {
+ public TaskExecutorFactoryRegistry(MinionTaskZkMetadataManager zkMetadataManager, MinionConf minionConf) {
long startTimeMs = System.currentTimeMillis();
Set<Class<?>> classes = getTaskExecutorFactoryClasses();
for (Class<?> clazz : classes) {
@@ -53,7 +54,7 @@ public class TaskExecutorFactoryRegistry {
if (annotation.enabled()) {
try {
PinotTaskExecutorFactory taskExecutorFactory = (PinotTaskExecutorFactory) clazz.newInstance();
- taskExecutorFactory.init(zkMetadataManager);
+ taskExecutorFactory.init(zkMetadataManager, minionConf);
registerTaskExecutorFactory(taskExecutorFactory);
} catch (Exception e) {
LOGGER.error("Caught exception while initializing and registering task executor factory: {}, skipping it",
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
index fd65337..cc9370d 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
@@ -38,6 +38,7 @@ import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.minion.MinionConf;
import org.apache.pinot.minion.exception.TaskCancelledException;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
@@ -57,6 +58,12 @@ import org.slf4j.LoggerFactory;
public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(BaseMultipleSegmentsConversionExecutor.class);
+ protected MinionConf _minionConf;
+
+ public BaseMultipleSegmentsConversionExecutor(MinionConf minionConf) {
+ _minionConf = minionConf;
+ }
+
/**
* Converts the segment based on the given {@link PinotTaskConfig}.
*
@@ -206,12 +213,9 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
// Update the segment lineage to indicate that the segment replacement is done.
if (replaceSegmentsEnabled) {
- int endReplaceSegmentsSocketTimeoutMs =
- configs.get(MinionConstants.END_REPLACE_SEGMENTS_SOCKET_TIMEOUT_MS_KEY) != null
- ? Integer.parseInt(configs.get(MinionConstants.END_REPLACE_SEGMENTS_SOCKET_TIMEOUT_MS_KEY))
- : FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS;
SegmentConversionUtils
- .endSegmentReplace(tableNameWithType, uploadURL, lineageEntryId, endReplaceSegmentsSocketTimeoutMs);
+ .endSegmentReplace(tableNameWithType, uploadURL, lineageEntryId,
+ _minionConf.getEndReplaceSegmentsTimeoutMs());
}
String outputSegmentNames = segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName)
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskExecutorFactory.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskExecutorFactory.java
index 59e805a..0b208c4 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskExecutorFactory.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskExecutorFactory.java
@@ -19,6 +19,7 @@
package org.apache.pinot.plugin.minion.tasks.converttorawindex;
import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.minion.MinionConf;
import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
import org.apache.pinot.minion.executor.PinotTaskExecutor;
import org.apache.pinot.minion.executor.PinotTaskExecutorFactory;
@@ -33,6 +34,10 @@ public class ConvertToRawIndexTaskExecutorFactory implements PinotTaskExecutorFa
}
@Override
+ public void init(MinionTaskZkMetadataManager zkMetadataManager, MinionConf minionConf) {
+ }
+
+ @Override
public String getTaskType() {
return MinionConstants.ConvertToRawIndexTask.TASK_TYPE;
}
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutor.java
index 47ab740..f054b02 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutor.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutor.java
@@ -29,6 +29,7 @@ import org.apache.pinot.core.common.MinionConstants.MergeRollupTask;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.core.segment.processing.framework.SegmentProcessorConfig;
import org.apache.pinot.core.segment.processing.framework.SegmentProcessorFramework;
+import org.apache.pinot.minion.MinionConf;
import org.apache.pinot.plugin.minion.tasks.BaseMultipleSegmentsConversionExecutor;
import org.apache.pinot.plugin.minion.tasks.MergeTaskUtils;
import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
@@ -46,6 +47,10 @@ import org.slf4j.LoggerFactory;
public class MergeRollupTaskExecutor extends BaseMultipleSegmentsConversionExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(MergeRollupTaskExecutor.class);
+ public MergeRollupTaskExecutor(MinionConf minionConf) {
+ super(minionConf);
+ }
+
@Override
protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, List<File> segmentDirs,
File workingDir)
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutorFactory.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutorFactory.java
index 6183f7b..9b086b2 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutorFactory.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutorFactory.java
@@ -19,6 +19,7 @@
package org.apache.pinot.plugin.minion.tasks.mergerollup;
import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.minion.MinionConf;
import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
import org.apache.pinot.minion.executor.PinotTaskExecutor;
import org.apache.pinot.minion.executor.PinotTaskExecutorFactory;
@@ -27,18 +28,24 @@ import org.apache.pinot.spi.annotations.minion.TaskExecutorFactory;
@TaskExecutorFactory
public class MergeRollupTaskExecutorFactory implements PinotTaskExecutorFactory {
+ private MinionConf _minionConf;
@Override
public void init(MinionTaskZkMetadataManager zkMetadataManager) {
}
@Override
+ public void init(MinionTaskZkMetadataManager zkMetadataManager, MinionConf minionConf) {
+ _minionConf = minionConf;
+ }
+
+ @Override
public String getTaskType() {
return MinionConstants.MergeRollupTask.TASK_TYPE;
}
@Override
public PinotTaskExecutor create() {
- return new MergeRollupTaskExecutor();
+ return new MergeRollupTaskExecutor(_minionConf);
}
}
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
index 5c8ec8f..c1ab565 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
@@ -94,7 +94,6 @@ import org.slf4j.LoggerFactory;
public class MergeRollupTaskGenerator implements PinotTaskGenerator {
private static final Logger LOGGER = LoggerFactory.getLogger(MergeRollupTaskGenerator.class);
- public static final int END_REPLACE_SEGMENTS_SOCKET_TIMEOUT_MS = 30 * 60 * 1000; // 30 mins
private static final int DEFAULT_MAX_NUM_RECORDS_PER_TASK = 50_000_000;
private static final String REFRESH = "REFRESH";
@@ -486,8 +485,6 @@ public class MergeRollupTaskGenerator implements PinotTaskGenerator {
configs.put(MergeRollupTask.SEGMENT_NAME_PREFIX_KEY,
MergeRollupTask.MERGED_SEGMENT_NAME_PREFIX + mergeLevel + "_" + System.currentTimeMillis() + "_" + i + "_"
+ TableNameBuilder.extractRawTableName(offlineTableName));
- configs.put(MinionConstants.END_REPLACE_SEGMENTS_SOCKET_TIMEOUT_MS_KEY,
- String.valueOf(END_REPLACE_SEGMENTS_SOCKET_TIMEOUT_MS));
pinotTaskConfigs.add(new PinotTaskConfig(MergeRollupTask.TASK_TYPE, configs));
}
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutorFactory.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutorFactory.java
index a76da57..036ed02 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutorFactory.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutorFactory.java
@@ -19,6 +19,7 @@
package org.apache.pinot.plugin.minion.tasks.purge;
import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.minion.MinionConf;
import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
import org.apache.pinot.minion.executor.PinotTaskExecutor;
import org.apache.pinot.minion.executor.PinotTaskExecutorFactory;
@@ -33,6 +34,10 @@ public class PurgeTaskExecutorFactory implements PinotTaskExecutorFactory {
}
@Override
+ public void init(MinionTaskZkMetadataManager zkMetadataManager, MinionConf minionConf) {
+ }
+
+ @Override
public String getTaskType() {
return MinionConstants.PurgeTask.TASK_TYPE;
}
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java
index dfa3a8f..c957320 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java
@@ -33,6 +33,7 @@ import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.core.segment.processing.framework.MergeType;
import org.apache.pinot.core.segment.processing.framework.SegmentProcessorConfig;
import org.apache.pinot.core.segment.processing.framework.SegmentProcessorFramework;
+import org.apache.pinot.minion.MinionConf;
import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
import org.apache.pinot.plugin.minion.tasks.BaseMultipleSegmentsConversionExecutor;
import org.apache.pinot.plugin.minion.tasks.MergeTaskUtils;
@@ -72,7 +73,9 @@ public class RealtimeToOfflineSegmentsTaskExecutor extends BaseMultipleSegmentsC
private final MinionTaskZkMetadataManager _minionTaskZkMetadataManager;
private int _expectedVersion = Integer.MIN_VALUE;
- public RealtimeToOfflineSegmentsTaskExecutor(MinionTaskZkMetadataManager minionTaskZkMetadataManager) {
+ public RealtimeToOfflineSegmentsTaskExecutor(MinionTaskZkMetadataManager minionTaskZkMetadataManager,
+ MinionConf minionConf) {
+ super(minionConf);
_minionTaskZkMetadataManager = minionTaskZkMetadataManager;
}
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorFactory.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorFactory.java
index ee70a45..2c705df 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorFactory.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorFactory.java
@@ -19,6 +19,7 @@
package org.apache.pinot.plugin.minion.tasks.realtimetoofflinesegments;
import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.minion.MinionConf;
import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
import org.apache.pinot.minion.executor.PinotTaskExecutor;
import org.apache.pinot.minion.executor.PinotTaskExecutorFactory;
@@ -31,6 +32,7 @@ import org.apache.pinot.spi.annotations.minion.TaskExecutorFactory;
@TaskExecutorFactory
public class RealtimeToOfflineSegmentsTaskExecutorFactory implements PinotTaskExecutorFactory {
private MinionTaskZkMetadataManager _zkMetadataManager;
+ private MinionConf _minionConf;
@Override
public void init(MinionTaskZkMetadataManager zkMetadataManager) {
@@ -38,12 +40,18 @@ public class RealtimeToOfflineSegmentsTaskExecutorFactory implements PinotTaskEx
}
@Override
+ public void init(MinionTaskZkMetadataManager zkMetadataManager, MinionConf minionConf) {
+ _zkMetadataManager = zkMetadataManager;
+ _minionConf = minionConf;
+ }
+
+ @Override
public String getTaskType() {
return MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE;
}
@Override
public PinotTaskExecutor create() {
- return new RealtimeToOfflineSegmentsTaskExecutor(_zkMetadataManager);
+ return new RealtimeToOfflineSegmentsTaskExecutor(_zkMetadataManager, _minionConf);
}
}
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskExecutorFactory.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskExecutorFactory.java
index 383f4e0..0a821e7 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskExecutorFactory.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskExecutorFactory.java
@@ -19,6 +19,7 @@
package org.apache.pinot.plugin.minion.tasks.segmentgenerationandpush;
import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.minion.MinionConf;
import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
import org.apache.pinot.minion.executor.PinotTaskExecutor;
import org.apache.pinot.minion.executor.PinotTaskExecutorFactory;
@@ -33,6 +34,10 @@ public class SegmentGenerationAndPushTaskExecutorFactory implements PinotTaskExe
}
@Override
+ public void init(MinionTaskZkMetadataManager zkMetadataManager, MinionConf minionConf) {
+ }
+
+ @Override
public String getTaskType() {
return MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE;
}
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutorTest.java
index 3b3f404..69a4541 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutorTest.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutorTest.java
@@ -31,6 +31,7 @@ import org.apache.pinot.common.utils.SchemaUtils;
import org.apache.pinot.common.utils.config.TableConfigUtils;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.minion.MinionConf;
import org.apache.pinot.minion.MinionContext;
import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
@@ -104,7 +105,7 @@ public class MergeRollupTaskExecutorTest {
@Test
public void testConvert()
throws Exception {
- MergeRollupTaskExecutor mergeRollupTaskExecutor = new MergeRollupTaskExecutor();
+ MergeRollupTaskExecutor mergeRollupTaskExecutor = new MergeRollupTaskExecutor(new MinionConf());
Map<String, String> configs = new HashMap<>();
configs.put(MinionConstants.TABLE_NAME_KEY, "testTable_OFFLINE");
configs.put(MinionConstants.MergeRollupTask.MERGE_LEVEL_KEY, "daily");
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorTest.java
index 1f88a43..4fcdbf6 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorTest.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorTest.java
@@ -215,7 +215,7 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
FileUtils.deleteQuietly(WORKING_DIR);
RealtimeToOfflineSegmentsTaskExecutor realtimeToOfflineSegmentsTaskExecutor =
- new RealtimeToOfflineSegmentsTaskExecutor(null);
+ new RealtimeToOfflineSegmentsTaskExecutor(null, null);
Map<String, String> configs = new HashMap<>();
configs.put(MinionConstants.TABLE_NAME_KEY, "testTable_OFFLINE");
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY, "1600473600000");
@@ -242,7 +242,7 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
FileUtils.deleteQuietly(WORKING_DIR);
RealtimeToOfflineSegmentsTaskExecutor realtimeToOfflineSegmentsTaskExecutor =
- new RealtimeToOfflineSegmentsTaskExecutor(null);
+ new RealtimeToOfflineSegmentsTaskExecutor(null, null);
Map<String, String> configs = new HashMap<>();
configs.put(MinionConstants.TABLE_NAME_KEY, TABLE_NAME);
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY, "1600473600000");
@@ -270,7 +270,7 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
FileUtils.deleteQuietly(WORKING_DIR);
RealtimeToOfflineSegmentsTaskExecutor realtimeToOfflineSegmentsTaskExecutor =
- new RealtimeToOfflineSegmentsTaskExecutor(null);
+ new RealtimeToOfflineSegmentsTaskExecutor(null, null);
Map<String, String> configs = new HashMap<>();
configs.put(MinionConstants.TABLE_NAME_KEY, TABLE_NAME);
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY, "1600473600000");
@@ -299,7 +299,7 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
FileUtils.deleteQuietly(WORKING_DIR);
RealtimeToOfflineSegmentsTaskExecutor realtimeToOfflineSegmentsTaskExecutor =
- new RealtimeToOfflineSegmentsTaskExecutor(null);
+ new RealtimeToOfflineSegmentsTaskExecutor(null, null);
Map<String, String> configs = new HashMap<>();
configs.put(MinionConstants.TABLE_NAME_KEY, TABLE_NAME);
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY, "1600473600000");
@@ -332,7 +332,7 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
FileUtils.deleteQuietly(WORKING_DIR);
RealtimeToOfflineSegmentsTaskExecutor realtimeToOfflineSegmentsTaskExecutor =
- new RealtimeToOfflineSegmentsTaskExecutor(null);
+ new RealtimeToOfflineSegmentsTaskExecutor(null, null);
Map<String, String> configs = new HashMap<>();
configs.put(MinionConstants.TABLE_NAME_KEY, TABLE_NAME_WITH_PARTITIONING);
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY, "1600468000000");
@@ -364,7 +364,7 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
FileUtils.deleteQuietly(WORKING_DIR);
RealtimeToOfflineSegmentsTaskExecutor realtimeToOfflineSegmentsTaskExecutor =
- new RealtimeToOfflineSegmentsTaskExecutor(null);
+ new RealtimeToOfflineSegmentsTaskExecutor(null, null);
Map<String, String> configs = new HashMap<>();
configs.put(MinionConstants.TABLE_NAME_KEY, TABLE_NAME_WITH_SORTED_COL);
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY, "1600473600000");
@@ -392,7 +392,7 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
FileUtils.deleteQuietly(WORKING_DIR);
RealtimeToOfflineSegmentsTaskExecutor realtimeToOfflineSegmentsTaskExecutor =
- new RealtimeToOfflineSegmentsTaskExecutor(null);
+ new RealtimeToOfflineSegmentsTaskExecutor(null, null);
Map<String, String> configs = new HashMap<>();
configs.put(MinionConstants.TABLE_NAME_KEY, TABLE_NAME_EPOCH_HOURS);
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY, "1600473600000");
@@ -421,7 +421,7 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
FileUtils.deleteQuietly(WORKING_DIR);
RealtimeToOfflineSegmentsTaskExecutor realtimeToOfflineSegmentsTaskExecutor =
- new RealtimeToOfflineSegmentsTaskExecutor(null);
+ new RealtimeToOfflineSegmentsTaskExecutor(null, null);
Map<String, String> configs = new HashMap<>();
configs.put(MinionConstants.TABLE_NAME_KEY, TABLE_NAME_SDF);
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY, "1600473600000");
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org