You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by jo...@apache.org on 2020/10/08 04:58:18 UTC
[druid] branch 0.20.0 updated: Fix compaction task slot computation
in auto compaction (#10479) (#10488)
This is an automated email from the ASF dual-hosted git repository.
jonwei pushed a commit to branch 0.20.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/0.20.0 by this push:
new 8a651ee Fix compaction task slot computation in auto compaction (#10479) (#10488)
8a651ee is described below
commit 8a651ee7f077d4682a3b1a3a4a50f9d874e00b1d
Author: Jonathan Wei <jo...@users.noreply.github.com>
AuthorDate: Wed Oct 7 21:58:06 2020 -0700
Fix compaction task slot computation in auto compaction (#10479) (#10488)
* Fix compaction task slot computation in auto compaction
* add tests for task counting
Co-authored-by: Jihoon Son <ji...@apache.org>
---
.../parallel/ParallelIndexSupervisorTask.java | 4 +
.../parallel/ParallelIndexSupervisorTaskTest.java | 2 +-
.../server/coordinator/duty/CompactSegments.java | 63 +++++--
.../coordinator/duty/CompactSegmentsTest.java | 203 +++++++++++++++++----
4 files changed, 215 insertions(+), 57 deletions(-)
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index 4a218a0..acac279 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -466,6 +466,10 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
registerResourceCloserOnAbnormalExit(currentSubTaskHolder);
}
+ /**
+ * Returns true if this task can run in the parallel mode with the given inputSource and tuningConfig.
+ * This method should be synchronized with CompactSegments.isParallelMode(ClientCompactionTaskQueryTuningConfig).
+ */
public static boolean isParallelMode(InputSource inputSource, @Nullable ParallelIndexTuningConfig tuningConfig)
{
if (null == tuningConfig) {
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
index ef1db8a..710df1d 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
@@ -249,7 +249,7 @@ public class ParallelIndexSupervisorTaskTest
}
}
- public static class staticUtilsTest
+ public static class StaticUtilsTest
{
@Test
public void testIsParallelModeFalse_nullTuningConfig()
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index 50b31ca..3b7ee31 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -20,6 +20,7 @@
package org.apache.druid.server.coordinator.duty;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
@@ -27,6 +28,7 @@ import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.client.indexing.TaskPayloadResponse;
import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
@@ -123,8 +125,9 @@ public class CompactSegments implements CoordinatorDuty
final ClientCompactionTaskQuery compactionTaskQuery = (ClientCompactionTaskQuery) response.getPayload();
final Interval interval = compactionTaskQuery.getIoConfig().getInputSpec().getInterval();
compactionTaskIntervals.computeIfAbsent(status.getDataSource(), k -> new ArrayList<>()).add(interval);
- final int numSubTasks = findNumMaxConcurrentSubTasks(compactionTaskQuery.getTuningConfig());
- numEstimatedNonCompleteCompactionTasks += numSubTasks + 1; // count the compaction task itself
+ numEstimatedNonCompleteCompactionTasks += findMaxNumTaskSlotsUsedByOneCompactionTask(
+ compactionTaskQuery.getTuningConfig()
+ );
} else {
throw new ISE("task[%s] is not a compactionTask", status.getId());
}
@@ -160,7 +163,12 @@ public class CompactSegments implements CoordinatorDuty
if (numAvailableCompactionTaskSlots > 0) {
stats.accumulate(
- doRun(compactionConfigs, currentRunAutoCompactionSnapshotBuilders, numAvailableCompactionTaskSlots, iterator)
+ doRun(
+ compactionConfigs,
+ currentRunAutoCompactionSnapshotBuilders,
+ numAvailableCompactionTaskSlots,
+ iterator
+ )
);
} else {
stats.accumulate(makeStats(currentRunAutoCompactionSnapshotBuilders, 0, iterator));
@@ -180,24 +188,43 @@ public class CompactSegments implements CoordinatorDuty
}
/**
- * Each compaction task can run a parallel indexing task. When we count the number of current running
- * compaction tasks, we should count the sub tasks of the parallel indexing task as well. However, we currently
- * don't have a good way to get the number of current running sub tasks except poking each supervisor task,
- * which is complex to handle all kinds of failures. Here, we simply return {@code maxNumConcurrentSubTasks} instead
- * to estimate the number of sub tasks conservatively. This should be ok since it won't affect to the performance of
- * other ingestion types.
+ * Returns the maximum number of task slots used by one compaction task at any time when the task is issued with
+ * the given tuningConfig.
*/
- private int findNumMaxConcurrentSubTasks(@Nullable ClientCompactionTaskQueryTuningConfig tuningConfig)
+ @VisibleForTesting
+ static int findMaxNumTaskSlotsUsedByOneCompactionTask(@Nullable ClientCompactionTaskQueryTuningConfig tuningConfig)
{
- if (tuningConfig != null && tuningConfig.getMaxNumConcurrentSubTasks() != null) {
- // The actual number of subtasks might be smaller than the configured max.
- // However, we use the max to simplify the estimation here.
- return tuningConfig.getMaxNumConcurrentSubTasks();
+ if (isParallelMode(tuningConfig)) {
+ @Nullable Integer maxNumConcurrentSubTasks = tuningConfig.getMaxNumConcurrentSubTasks();
+ // Max number of task slots used in parallel mode = maxNumConcurrentSubTasks + 1 (supervisor task)
+ return (maxNumConcurrentSubTasks == null ? 1 : maxNumConcurrentSubTasks) + 1;
} else {
- return 0;
+ return 1;
}
}
+ /**
+ * Returns true if the compaction task can run in the parallel mode with the given tuningConfig.
+ * This method should be synchronized with ParallelIndexSupervisorTask.isParallelMode(InputSource, ParallelIndexTuningConfig).
+ */
+ @VisibleForTesting
+ static boolean isParallelMode(@Nullable ClientCompactionTaskQueryTuningConfig tuningConfig)
+ {
+ if (null == tuningConfig) {
+ return false;
+ }
+ boolean useRangePartitions = useRangePartitions(tuningConfig);
+ int minRequiredNumConcurrentSubTasks = useRangePartitions ? 1 : 2;
+ return tuningConfig.getMaxNumConcurrentSubTasks() != null
+ && tuningConfig.getMaxNumConcurrentSubTasks() >= minRequiredNumConcurrentSubTasks;
+ }
+
+ private static boolean useRangePartitions(ClientCompactionTaskQueryTuningConfig tuningConfig)
+ {
+ // dynamic partitionsSpec will be used if getPartitionsSpec() returns null
+ return tuningConfig.getPartitionsSpec() instanceof SingleDimensionPartitionsSpec;
+ }
+
private void updateAutoCompactionSnapshot(
List<DataSourceCompactionConfig> compactionConfigList,
Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders)
@@ -248,8 +275,9 @@ public class CompactSegments implements CoordinatorDuty
)
{
int numSubmittedTasks = 0;
+ int numCompactionTasksAndSubtasks = 0;
- for (; iterator.hasNext() && numSubmittedTasks < numAvailableCompactionTaskSlots;) {
+ while (iterator.hasNext() && numCompactionTasksAndSubtasks < numAvailableCompactionTaskSlots) {
final List<DataSegment> segmentsToCompact = iterator.next();
if (!segmentsToCompact.isEmpty()) {
@@ -277,7 +305,8 @@ public class CompactSegments implements CoordinatorDuty
);
LOG.infoSegments(segmentsToCompact, "Compacting segments");
// Count the compaction task itself + its sub tasks
- numSubmittedTasks += findNumMaxConcurrentSubTasks(config.getTuningConfig()) + 1;
+ numSubmittedTasks++;
+ numCompactionTasksAndSubtasks += findMaxNumTaskSlotsUsedByOneCompactionTask(config.getTuningConfig());
} else {
throw new ISE("segmentsToCompact is empty?");
}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
index 8d9b294..0c17cf9 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
@@ -25,6 +25,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
+import junitparams.converters.Nullable;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
@@ -80,6 +81,7 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import org.mockito.Mockito;
import java.io.IOException;
import java.net.URL;
@@ -288,8 +290,8 @@ public class CompactSegmentsTest
Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
Assert.assertEquals(0, autoCompactionSnapshots.size());
- for (int compaction_run_count = 0; compaction_run_count < 11; compaction_run_count++) {
- assertCompactSegmentStatistics(compactSegments, compaction_run_count);
+ for (int compactionRunCount = 0; compactionRunCount < 11; compactionRunCount++) {
+ assertCompactSegmentStatistics(compactSegments, compactionRunCount);
}
// Test that stats does not change (and is still correct) when auto compaction runs with everything is fully compacted
final CoordinatorStats stats = doCompactSegments(compactSegments);
@@ -381,7 +383,7 @@ public class CompactSegmentsTest
Assert.assertEquals(0, autoCompactionSnapshots.size());
// 3 intervals, 120 byte, 12 segments already compacted before the run
- for (int compaction_run_count = 0; compaction_run_count < 8; compaction_run_count++) {
+ for (int compactionRunCount = 0; compactionRunCount < 8; compactionRunCount++) {
// Do a cycle of auto compaction which creates one compaction task
final CoordinatorStats stats = doCompactSegments(compactSegments);
Assert.assertEquals(
@@ -393,17 +395,17 @@ public class CompactSegmentsTest
compactSegments,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
dataSourceName,
- TOTAL_BYTE_PER_DATASOURCE - 120 - 40 * (compaction_run_count + 1),
- 120 + 40 * (compaction_run_count + 1),
+ TOTAL_BYTE_PER_DATASOURCE - 120 - 40 * (compactionRunCount + 1),
+ 120 + 40 * (compactionRunCount + 1),
0,
- TOTAL_INTERVAL_PER_DATASOURCE - 3 - (compaction_run_count + 1),
- 3 + (compaction_run_count + 1),
+ TOTAL_INTERVAL_PER_DATASOURCE - 3 - (compactionRunCount + 1),
+ 3 + (compactionRunCount + 1),
0,
- TOTAL_SEGMENT_PER_DATASOURCE - 12 - 4 * (compaction_run_count + 1),
+ TOTAL_SEGMENT_PER_DATASOURCE - 12 - 4 * (compactionRunCount + 1),
// 12 segments was compressed before any auto compaction
// 4 segments was compressed in this run of auto compaction
// Each previous auto compaction run resulted in 2 compacted segments (4 segments compacted into 2 segments)
- 12 + 4 + 2 * (compaction_run_count),
+ 12 + 4 + 2 * (compactionRunCount),
0
);
}
@@ -474,7 +476,7 @@ public class CompactSegmentsTest
Assert.assertEquals(0, autoCompactionSnapshots.size());
// 3 intervals, 1200 byte (each segment is 100 bytes), 12 segments will be skipped by auto compaction
- for (int compaction_run_count = 0; compaction_run_count < 8; compaction_run_count++) {
+ for (int compactionRunCount = 0; compactionRunCount < 8; compactionRunCount++) {
// Do a cycle of auto compaction which creates one compaction task
final CoordinatorStats stats = doCompactSegments(compactSegments);
Assert.assertEquals(
@@ -487,14 +489,14 @@ public class CompactSegmentsTest
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
dataSourceName,
// Minus 120 bytes accounting for the three skipped segments' original size
- TOTAL_BYTE_PER_DATASOURCE - 120 - 40 * (compaction_run_count + 1),
- 40 * (compaction_run_count + 1),
+ TOTAL_BYTE_PER_DATASOURCE - 120 - 40 * (compactionRunCount + 1),
+ 40 * (compactionRunCount + 1),
1200,
- TOTAL_INTERVAL_PER_DATASOURCE - 3 - (compaction_run_count + 1),
- (compaction_run_count + 1),
+ TOTAL_INTERVAL_PER_DATASOURCE - 3 - (compactionRunCount + 1),
+ (compactionRunCount + 1),
3,
- TOTAL_SEGMENT_PER_DATASOURCE - 12 - 4 * (compaction_run_count + 1),
- 4 + 2 * (compaction_run_count),
+ TOTAL_SEGMENT_PER_DATASOURCE - 12 - 4 * (compactionRunCount + 1),
+ 4 + 2 * (compactionRunCount),
12
);
}
@@ -522,6 +524,34 @@ public class CompactSegmentsTest
);
}
+ @Test
+ public void testRunMultipleCompactionTaskSlots()
+ {
+ final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
+ leaderClient.start();
+ final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
+ final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, indexingServiceClient);
+
+ final CoordinatorStats stats = doCompactSegments(compactSegments, 3);
+ Assert.assertEquals(3, stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT));
+ Assert.assertEquals(3, stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT));
+ Assert.assertEquals(3, stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT));
+ }
+
+ @Test
+ public void testRunParallelCompactionMultipleCompactionTaskSlots()
+ {
+ final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
+ leaderClient.start();
+ final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
+ final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, indexingServiceClient);
+
+ final CoordinatorStats stats = doCompactSegments(compactSegments, createCompactionConfigs(2), 4);
+ Assert.assertEquals(4, stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT));
+ Assert.assertEquals(4, stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT));
+ Assert.assertEquals(2, stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT));
+ }
+
private void verifySnapshot(
CompactSegments compactSegments,
AutoCompactionSnapshot.AutoCompactionScheduleStatus scheduleStatus,
@@ -552,7 +582,7 @@ public class CompactSegmentsTest
Assert.assertEquals(expectedSegmentCountSkipped, snapshot.getSegmentCountSkipped());
}
- private void assertCompactSegmentStatistics(CompactSegments compactSegments, int compaction_run_count)
+ private void assertCompactSegmentStatistics(CompactSegments compactSegments, int compactionRunCount)
{
for (int dataSourceIndex = 0; dataSourceIndex < 3; dataSourceIndex++) {
// One compaction task triggered
@@ -574,14 +604,14 @@ public class CompactSegmentsTest
compactSegments,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
DATA_SOURCE_PREFIX + i,
- TOTAL_BYTE_PER_DATASOURCE - 40 * (compaction_run_count + 1),
- 40 * (compaction_run_count + 1),
+ TOTAL_BYTE_PER_DATASOURCE - 40 * (compactionRunCount + 1),
+ 40 * (compactionRunCount + 1),
0,
- TOTAL_INTERVAL_PER_DATASOURCE - (compaction_run_count + 1),
- (compaction_run_count + 1),
+ TOTAL_INTERVAL_PER_DATASOURCE - (compactionRunCount + 1),
+ (compactionRunCount + 1),
0,
- TOTAL_SEGMENT_PER_DATASOURCE - 4 * (compaction_run_count + 1),
- 2 * (compaction_run_count + 1),
+ TOTAL_SEGMENT_PER_DATASOURCE - 4 * (compactionRunCount + 1),
+ 2 * (compactionRunCount + 1),
0
);
} else {
@@ -589,14 +619,14 @@ public class CompactSegmentsTest
compactSegments,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
DATA_SOURCE_PREFIX + i,
- TOTAL_BYTE_PER_DATASOURCE - 40 * (compaction_run_count + 1),
- 40 * (compaction_run_count + 1),
+ TOTAL_BYTE_PER_DATASOURCE - 40 * (compactionRunCount + 1),
+ 40 * (compactionRunCount + 1),
0,
- TOTAL_INTERVAL_PER_DATASOURCE - (compaction_run_count + 1),
- (compaction_run_count + 1),
+ TOTAL_INTERVAL_PER_DATASOURCE - (compactionRunCount + 1),
+ (compactionRunCount + 1),
0,
- TOTAL_SEGMENT_PER_DATASOURCE - 4 * (compaction_run_count + 1),
- 2 * compaction_run_count + 4,
+ TOTAL_SEGMENT_PER_DATASOURCE - 4 * (compactionRunCount + 1),
+ 2 * compactionRunCount + 4,
0
);
}
@@ -608,14 +638,14 @@ public class CompactSegmentsTest
compactSegments,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
DATA_SOURCE_PREFIX + i,
- TOTAL_BYTE_PER_DATASOURCE - 40 * compaction_run_count,
- 40 * compaction_run_count,
+ TOTAL_BYTE_PER_DATASOURCE - 40 * compactionRunCount,
+ 40 * compactionRunCount,
0,
- TOTAL_INTERVAL_PER_DATASOURCE - compaction_run_count,
- compaction_run_count,
+ TOTAL_INTERVAL_PER_DATASOURCE - compactionRunCount,
+ compactionRunCount,
0,
- TOTAL_SEGMENT_PER_DATASOURCE - 4 * compaction_run_count,
- 2 * compaction_run_count,
+ TOTAL_SEGMENT_PER_DATASOURCE - 4 * compactionRunCount,
+ 2 * compactionRunCount,
0
);
}
@@ -624,15 +654,38 @@ public class CompactSegmentsTest
private CoordinatorStats doCompactSegments(CompactSegments compactSegments)
{
- return doCompactSegments(compactSegments, createCompactionConfigs());
+ return doCompactSegments(compactSegments, (Integer) null);
+ }
+
+ private CoordinatorStats doCompactSegments(CompactSegments compactSegments, @Nullable Integer numCompactionTaskSlots)
+ {
+ return doCompactSegments(compactSegments, createCompactionConfigs(), numCompactionTaskSlots);
}
- private CoordinatorStats doCompactSegments(CompactSegments compactSegments, List<DataSourceCompactionConfig> compactionConfigs)
+ private CoordinatorStats doCompactSegments(
+ CompactSegments compactSegments,
+ List<DataSourceCompactionConfig> compactionConfigs
+ )
+ {
+ return doCompactSegments(compactSegments, compactionConfigs, null);
+ }
+
+ private CoordinatorStats doCompactSegments(
+ CompactSegments compactSegments,
+ List<DataSourceCompactionConfig> compactionConfigs,
+ @Nullable Integer numCompactionTaskSlots
+ )
{
DruidCoordinatorRuntimeParams params = CoordinatorRuntimeParamsTestHelpers
.newBuilder()
.withUsedSegmentsTimelinesPerDataSourceInTest(dataSources)
- .withCompactionConfig(CoordinatorCompactionConfig.from(compactionConfigs))
+ .withCompactionConfig(
+ new CoordinatorCompactionConfig(
+ compactionConfigs,
+ numCompactionTaskSlots == null ? null : 100., // 100% when numCompactionTaskSlots is not null
+ numCompactionTaskSlots
+ )
+ )
.build();
return compactSegments.run(params).getCoordinatorStats();
}
@@ -744,6 +797,11 @@ public class CompactSegmentsTest
private List<DataSourceCompactionConfig> createCompactionConfigs()
{
+ return createCompactionConfigs(null);
+ }
+
+ private List<DataSourceCompactionConfig> createCompactionConfigs(@Nullable Integer maxNumConcurrentSubTasks)
+ {
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
for (int i = 0; i < 3; i++) {
final String dataSource = DATA_SOURCE_PREFIX + i;
@@ -765,7 +823,7 @@ public class CompactSegmentsTest
null,
null,
null,
- null,
+ maxNumConcurrentSubTasks,
null,
null,
null,
@@ -955,4 +1013,71 @@ public class CompactSegmentsTest
return EasyMock.niceMock(DruidNodeDiscovery.class);
}
}
+
+ public static class StaticUtilsTest
+ {
+ @Test
+ public void testIsParalleModeNullTuningConfigReturnFalse()
+ {
+ Assert.assertFalse(CompactSegments.isParallelMode(null));
+ }
+
+ @Test
+ public void testIsParallelModeNullPartitionsSpecReturnFalse()
+ {
+ ClientCompactionTaskQueryTuningConfig tuningConfig = Mockito.mock(ClientCompactionTaskQueryTuningConfig.class);
+ Mockito.when(tuningConfig.getPartitionsSpec()).thenReturn(null);
+ Assert.assertFalse(CompactSegments.isParallelMode(tuningConfig));
+ }
+
+ @Test
+ public void testIsParallelModeNonRangePartitionVaryingMaxNumConcurrentSubTasks()
+ {
+ ClientCompactionTaskQueryTuningConfig tuningConfig = Mockito.mock(ClientCompactionTaskQueryTuningConfig.class);
+ Mockito.when(tuningConfig.getPartitionsSpec()).thenReturn(Mockito.mock(PartitionsSpec.class));
+
+ Mockito.when(tuningConfig.getMaxNumConcurrentSubTasks()).thenReturn(null);
+ Assert.assertFalse(CompactSegments.isParallelMode(tuningConfig));
+
+ Mockito.when(tuningConfig.getMaxNumConcurrentSubTasks()).thenReturn(1);
+ Assert.assertFalse(CompactSegments.isParallelMode(tuningConfig));
+
+ Mockito.when(tuningConfig.getMaxNumConcurrentSubTasks()).thenReturn(2);
+ Assert.assertTrue(CompactSegments.isParallelMode(tuningConfig));
+ }
+
+ @Test
+ public void testIsParallelModeRangePartitionVaryingMaxNumConcurrentSubTasks()
+ {
+ ClientCompactionTaskQueryTuningConfig tuningConfig = Mockito.mock(ClientCompactionTaskQueryTuningConfig.class);
+ Mockito.when(tuningConfig.getPartitionsSpec()).thenReturn(Mockito.mock(SingleDimensionPartitionsSpec.class));
+
+ Mockito.when(tuningConfig.getMaxNumConcurrentSubTasks()).thenReturn(null);
+ Assert.assertFalse(CompactSegments.isParallelMode(tuningConfig));
+
+ Mockito.when(tuningConfig.getMaxNumConcurrentSubTasks()).thenReturn(1);
+ Assert.assertTrue(CompactSegments.isParallelMode(tuningConfig));
+
+ Mockito.when(tuningConfig.getMaxNumConcurrentSubTasks()).thenReturn(2);
+ Assert.assertTrue(CompactSegments.isParallelMode(tuningConfig));
+ }
+
+ @Test
+ public void testFindMaxNumTaskSlotsUsedByOneCompactionTaskWhenIsParallelMode()
+ {
+ ClientCompactionTaskQueryTuningConfig tuningConfig = Mockito.mock(ClientCompactionTaskQueryTuningConfig.class);
+ Mockito.when(tuningConfig.getPartitionsSpec()).thenReturn(Mockito.mock(PartitionsSpec.class));
+ Mockito.when(tuningConfig.getMaxNumConcurrentSubTasks()).thenReturn(2);
+ Assert.assertEquals(3, CompactSegments.findMaxNumTaskSlotsUsedByOneCompactionTask(tuningConfig));
+ }
+
+ @Test
+ public void testFindMaxNumTaskSlotsUsedByOneCompactionTaskWhenIsSequentialMode()
+ {
+ ClientCompactionTaskQueryTuningConfig tuningConfig = Mockito.mock(ClientCompactionTaskQueryTuningConfig.class);
+ Mockito.when(tuningConfig.getPartitionsSpec()).thenReturn(Mockito.mock(PartitionsSpec.class));
+ Mockito.when(tuningConfig.getMaxNumConcurrentSubTasks()).thenReturn(1);
+ Assert.assertEquals(1, CompactSegments.findMaxNumTaskSlotsUsedByOneCompactionTask(tuningConfig));
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org