You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by jo...@apache.org on 2020/10/08 04:58:18 UTC

[druid] branch 0.20.0 updated: Fix compaction task slot computation in auto compaction (#10479) (#10488)

This is an automated email from the ASF dual-hosted git repository.

jonwei pushed a commit to branch 0.20.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/0.20.0 by this push:
     new 8a651ee  Fix compaction task slot computation in auto compaction (#10479) (#10488)
8a651ee is described below

commit 8a651ee7f077d4682a3b1a3a4a50f9d874e00b1d
Author: Jonathan Wei <jo...@users.noreply.github.com>
AuthorDate: Wed Oct 7 21:58:06 2020 -0700

    Fix compaction task slot computation in auto compaction (#10479) (#10488)
    
    * Fix compaction task slot computation in auto compaction
    
    * add tests for task counting
    
    Co-authored-by: Jihoon Son <ji...@apache.org>
---
 .../parallel/ParallelIndexSupervisorTask.java      |   4 +
 .../parallel/ParallelIndexSupervisorTaskTest.java  |   2 +-
 .../server/coordinator/duty/CompactSegments.java   |  63 +++++--
 .../coordinator/duty/CompactSegmentsTest.java      | 203 +++++++++++++++++----
 4 files changed, 215 insertions(+), 57 deletions(-)

diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index 4a218a0..acac279 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -466,6 +466,10 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
     registerResourceCloserOnAbnormalExit(currentSubTaskHolder);
   }
 
+  /**
+   * Returns true if this task can run in the parallel mode with the given inputSource and tuningConfig.
+   * This method should be synchronized with CompactSegments.isParallelMode(ClientCompactionTaskQueryTuningConfig).
+   */
   public static boolean isParallelMode(InputSource inputSource, @Nullable ParallelIndexTuningConfig tuningConfig)
   {
     if (null == tuningConfig) {
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
index ef1db8a..710df1d 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
@@ -249,7 +249,7 @@ public class ParallelIndexSupervisorTaskTest
     }
   }
 
-  public static class staticUtilsTest
+  public static class StaticUtilsTest
   {
     @Test
     public void testIsParallelModeFalse_nullTuningConfig()
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index 50b31ca..3b7ee31 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -20,6 +20,7 @@
 package org.apache.druid.server.coordinator.duty;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
 import com.google.inject.Inject;
 import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
@@ -27,6 +28,7 @@ import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
 import org.apache.druid.client.indexing.IndexingServiceClient;
 import org.apache.druid.client.indexing.TaskPayloadResponse;
 import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
@@ -123,8 +125,9 @@ public class CompactSegments implements CoordinatorDuty
             final ClientCompactionTaskQuery compactionTaskQuery = (ClientCompactionTaskQuery) response.getPayload();
             final Interval interval = compactionTaskQuery.getIoConfig().getInputSpec().getInterval();
             compactionTaskIntervals.computeIfAbsent(status.getDataSource(), k -> new ArrayList<>()).add(interval);
-            final int numSubTasks = findNumMaxConcurrentSubTasks(compactionTaskQuery.getTuningConfig());
-            numEstimatedNonCompleteCompactionTasks += numSubTasks + 1; // count the compaction task itself
+            numEstimatedNonCompleteCompactionTasks += findMaxNumTaskSlotsUsedByOneCompactionTask(
+                compactionTaskQuery.getTuningConfig()
+            );
           } else {
             throw new ISE("task[%s] is not a compactionTask", status.getId());
           }
@@ -160,7 +163,12 @@ public class CompactSegments implements CoordinatorDuty
 
         if (numAvailableCompactionTaskSlots > 0) {
           stats.accumulate(
-              doRun(compactionConfigs, currentRunAutoCompactionSnapshotBuilders, numAvailableCompactionTaskSlots, iterator)
+              doRun(
+                  compactionConfigs,
+                  currentRunAutoCompactionSnapshotBuilders,
+                  numAvailableCompactionTaskSlots,
+                  iterator
+              )
           );
         } else {
           stats.accumulate(makeStats(currentRunAutoCompactionSnapshotBuilders, 0, iterator));
@@ -180,24 +188,43 @@ public class CompactSegments implements CoordinatorDuty
   }
 
   /**
-   * Each compaction task can run a parallel indexing task. When we count the number of current running
-   * compaction tasks, we should count the sub tasks of the parallel indexing task as well. However, we currently
-   * don't have a good way to get the number of current running sub tasks except poking each supervisor task,
-   * which is complex to handle all kinds of failures. Here, we simply return {@code maxNumConcurrentSubTasks} instead
-   * to estimate the number of sub tasks conservatively. This should be ok since it won't affect to the performance of
-   * other ingestion types.
+   * Returns the maximum number of task slots used by one compaction task at any time when the task is issued with
+   * the given tuningConfig.
    */
-  private int findNumMaxConcurrentSubTasks(@Nullable ClientCompactionTaskQueryTuningConfig tuningConfig)
+  @VisibleForTesting
+  static int findMaxNumTaskSlotsUsedByOneCompactionTask(@Nullable ClientCompactionTaskQueryTuningConfig tuningConfig)
   {
-    if (tuningConfig != null && tuningConfig.getMaxNumConcurrentSubTasks() != null) {
-      // The actual number of subtasks might be smaller than the configured max.
-      // However, we use the max to simplify the estimation here.
-      return tuningConfig.getMaxNumConcurrentSubTasks();
+    if (isParallelMode(tuningConfig)) {
+      @Nullable Integer maxNumConcurrentSubTasks = tuningConfig.getMaxNumConcurrentSubTasks();
+      // Max number of task slots used in parallel mode = maxNumConcurrentSubTasks + 1 (supervisor task)
+      return (maxNumConcurrentSubTasks == null ? 1 : maxNumConcurrentSubTasks) + 1;
     } else {
-      return 0;
+      return 1;
     }
   }
 
+  /**
+   * Returns true if the compaction task can run in the parallel mode with the given tuningConfig.
+   * This method should be synchronized with ParallelIndexSupervisorTask.isParallelMode(InputSource, ParallelIndexTuningConfig).
+   */
+  @VisibleForTesting
+  static boolean isParallelMode(@Nullable ClientCompactionTaskQueryTuningConfig tuningConfig)
+  {
+    if (null == tuningConfig) {
+      return false;
+    }
+    boolean useRangePartitions = useRangePartitions(tuningConfig);
+    int minRequiredNumConcurrentSubTasks = useRangePartitions ? 1 : 2;
+    return tuningConfig.getMaxNumConcurrentSubTasks() != null
+           && tuningConfig.getMaxNumConcurrentSubTasks() >= minRequiredNumConcurrentSubTasks;
+  }
+
+  private static boolean useRangePartitions(ClientCompactionTaskQueryTuningConfig tuningConfig)
+  {
+    // dynamic partitionsSpec will be used if getPartitionsSpec() returns null
+    return tuningConfig.getPartitionsSpec() instanceof SingleDimensionPartitionsSpec;
+  }
+
   private void updateAutoCompactionSnapshot(
       List<DataSourceCompactionConfig> compactionConfigList,
       Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders)
@@ -248,8 +275,9 @@ public class CompactSegments implements CoordinatorDuty
   )
   {
     int numSubmittedTasks = 0;
+    int numCompactionTasksAndSubtasks = 0;
 
-    for (; iterator.hasNext() && numSubmittedTasks < numAvailableCompactionTaskSlots;) {
+    while (iterator.hasNext() && numCompactionTasksAndSubtasks < numAvailableCompactionTaskSlots) {
       final List<DataSegment> segmentsToCompact = iterator.next();
 
       if (!segmentsToCompact.isEmpty()) {
@@ -277,7 +305,8 @@ public class CompactSegments implements CoordinatorDuty
         );
         LOG.infoSegments(segmentsToCompact, "Compacting segments");
         // Count the compaction task itself + its sub tasks
-        numSubmittedTasks += findNumMaxConcurrentSubTasks(config.getTuningConfig()) + 1;
+        numSubmittedTasks++;
+        numCompactionTasksAndSubtasks += findMaxNumTaskSlotsUsedByOneCompactionTask(config.getTuningConfig());
       } else {
         throw new ISE("segmentsToCompact is empty?");
       }
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
index 8d9b294..0c17cf9 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
@@ -25,6 +25,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
+import junitparams.converters.Nullable;
 import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.druid.client.DataSourcesSnapshot;
 import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
@@ -80,6 +81,7 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import org.mockito.Mockito;
 
 import java.io.IOException;
 import java.net.URL;
@@ -288,8 +290,8 @@ public class CompactSegmentsTest
     Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
     Assert.assertEquals(0, autoCompactionSnapshots.size());
 
-    for (int compaction_run_count = 0; compaction_run_count < 11; compaction_run_count++) {
-      assertCompactSegmentStatistics(compactSegments, compaction_run_count);
+    for (int compactionRunCount = 0; compactionRunCount < 11; compactionRunCount++) {
+      assertCompactSegmentStatistics(compactSegments, compactionRunCount);
     }
     // Test that stats does not change (and is still correct) when auto compaction runs with everything is fully compacted
     final CoordinatorStats stats = doCompactSegments(compactSegments);
@@ -381,7 +383,7 @@ public class CompactSegmentsTest
     Assert.assertEquals(0, autoCompactionSnapshots.size());
 
     // 3 intervals, 120 byte, 12 segments already compacted before the run
-    for (int compaction_run_count = 0; compaction_run_count < 8; compaction_run_count++) {
+    for (int compactionRunCount = 0; compactionRunCount < 8; compactionRunCount++) {
       // Do a cycle of auto compaction which creates one compaction task
       final CoordinatorStats stats = doCompactSegments(compactSegments);
       Assert.assertEquals(
@@ -393,17 +395,17 @@ public class CompactSegmentsTest
           compactSegments,
           AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
           dataSourceName,
-          TOTAL_BYTE_PER_DATASOURCE - 120 - 40 * (compaction_run_count + 1),
-          120 + 40 * (compaction_run_count + 1),
+          TOTAL_BYTE_PER_DATASOURCE - 120 - 40 * (compactionRunCount + 1),
+          120 + 40 * (compactionRunCount + 1),
           0,
-          TOTAL_INTERVAL_PER_DATASOURCE - 3 - (compaction_run_count + 1),
-          3 + (compaction_run_count + 1),
+          TOTAL_INTERVAL_PER_DATASOURCE - 3 - (compactionRunCount + 1),
+          3 + (compactionRunCount + 1),
           0,
-          TOTAL_SEGMENT_PER_DATASOURCE - 12 - 4 * (compaction_run_count + 1),
+          TOTAL_SEGMENT_PER_DATASOURCE - 12 - 4 * (compactionRunCount + 1),
           // 12 segments was compressed before any auto compaction
           // 4 segments was compressed in this run of auto compaction
           // Each previous auto compaction run resulted in 2 compacted segments (4 segments compacted into 2 segments)
-          12 + 4 + 2 * (compaction_run_count),
+          12 + 4 + 2 * (compactionRunCount),
           0
       );
     }
@@ -474,7 +476,7 @@ public class CompactSegmentsTest
     Assert.assertEquals(0, autoCompactionSnapshots.size());
 
     // 3 intervals, 1200 byte (each segment is 100 bytes), 12 segments will be skipped by auto compaction
-    for (int compaction_run_count = 0; compaction_run_count < 8; compaction_run_count++) {
+    for (int compactionRunCount = 0; compactionRunCount < 8; compactionRunCount++) {
       // Do a cycle of auto compaction which creates one compaction task
       final CoordinatorStats stats = doCompactSegments(compactSegments);
       Assert.assertEquals(
@@ -487,14 +489,14 @@ public class CompactSegmentsTest
           AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
           dataSourceName,
           // Minus 120 bytes accounting for the three skipped segments' original size
-          TOTAL_BYTE_PER_DATASOURCE - 120 - 40 * (compaction_run_count + 1),
-          40 * (compaction_run_count + 1),
+          TOTAL_BYTE_PER_DATASOURCE - 120 - 40 * (compactionRunCount + 1),
+          40 * (compactionRunCount + 1),
           1200,
-          TOTAL_INTERVAL_PER_DATASOURCE - 3 - (compaction_run_count + 1),
-          (compaction_run_count + 1),
+          TOTAL_INTERVAL_PER_DATASOURCE - 3 - (compactionRunCount + 1),
+          (compactionRunCount + 1),
           3,
-          TOTAL_SEGMENT_PER_DATASOURCE - 12 - 4 * (compaction_run_count + 1),
-          4 + 2 * (compaction_run_count),
+          TOTAL_SEGMENT_PER_DATASOURCE - 12 - 4 * (compactionRunCount + 1),
+          4 + 2 * (compactionRunCount),
           12
       );
     }
@@ -522,6 +524,34 @@ public class CompactSegmentsTest
     );
   }
 
+  @Test
+  public void testRunMultipleCompactionTaskSlots()
+  {
+    final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
+    leaderClient.start();
+    final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
+    final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, indexingServiceClient);
+
+    final CoordinatorStats stats = doCompactSegments(compactSegments, 3);
+    Assert.assertEquals(3, stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT));
+    Assert.assertEquals(3, stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT));
+    Assert.assertEquals(3, stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT));
+  }
+
+  @Test
+  public void testRunParallelCompactionMultipleCompactionTaskSlots()
+  {
+    final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
+    leaderClient.start();
+    final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
+    final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, indexingServiceClient);
+
+    final CoordinatorStats stats = doCompactSegments(compactSegments, createCompactionConfigs(2), 4);
+    Assert.assertEquals(4, stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT));
+    Assert.assertEquals(4, stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT));
+    Assert.assertEquals(2, stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT));
+  }
+
   private void verifySnapshot(
       CompactSegments compactSegments,
       AutoCompactionSnapshot.AutoCompactionScheduleStatus scheduleStatus,
@@ -552,7 +582,7 @@ public class CompactSegmentsTest
     Assert.assertEquals(expectedSegmentCountSkipped, snapshot.getSegmentCountSkipped());
   }
 
-  private void assertCompactSegmentStatistics(CompactSegments compactSegments, int compaction_run_count)
+  private void assertCompactSegmentStatistics(CompactSegments compactSegments, int compactionRunCount)
   {
     for (int dataSourceIndex = 0; dataSourceIndex < 3; dataSourceIndex++) {
       // One compaction task triggered
@@ -574,14 +604,14 @@ public class CompactSegmentsTest
               compactSegments,
               AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
               DATA_SOURCE_PREFIX + i,
-              TOTAL_BYTE_PER_DATASOURCE - 40 * (compaction_run_count + 1),
-              40 * (compaction_run_count + 1),
+              TOTAL_BYTE_PER_DATASOURCE - 40 * (compactionRunCount + 1),
+              40 * (compactionRunCount + 1),
               0,
-              TOTAL_INTERVAL_PER_DATASOURCE - (compaction_run_count + 1),
-              (compaction_run_count + 1),
+              TOTAL_INTERVAL_PER_DATASOURCE - (compactionRunCount + 1),
+              (compactionRunCount + 1),
               0,
-              TOTAL_SEGMENT_PER_DATASOURCE - 4 * (compaction_run_count + 1),
-              2 * (compaction_run_count + 1),
+              TOTAL_SEGMENT_PER_DATASOURCE - 4 * (compactionRunCount + 1),
+              2 * (compactionRunCount + 1),
               0
           );
         } else {
@@ -589,14 +619,14 @@ public class CompactSegmentsTest
               compactSegments,
               AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
               DATA_SOURCE_PREFIX + i,
-              TOTAL_BYTE_PER_DATASOURCE - 40 * (compaction_run_count + 1),
-              40 * (compaction_run_count + 1),
+              TOTAL_BYTE_PER_DATASOURCE - 40 * (compactionRunCount + 1),
+              40 * (compactionRunCount + 1),
               0,
-              TOTAL_INTERVAL_PER_DATASOURCE - (compaction_run_count + 1),
-              (compaction_run_count + 1),
+              TOTAL_INTERVAL_PER_DATASOURCE - (compactionRunCount + 1),
+              (compactionRunCount + 1),
               0,
-              TOTAL_SEGMENT_PER_DATASOURCE - 4 * (compaction_run_count + 1),
-              2 * compaction_run_count + 4,
+              TOTAL_SEGMENT_PER_DATASOURCE - 4 * (compactionRunCount + 1),
+              2 * compactionRunCount + 4,
               0
           );
         }
@@ -608,14 +638,14 @@ public class CompactSegmentsTest
             compactSegments,
             AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
             DATA_SOURCE_PREFIX + i,
-            TOTAL_BYTE_PER_DATASOURCE - 40 * compaction_run_count,
-            40 * compaction_run_count,
+            TOTAL_BYTE_PER_DATASOURCE - 40 * compactionRunCount,
+            40 * compactionRunCount,
             0,
-            TOTAL_INTERVAL_PER_DATASOURCE - compaction_run_count,
-            compaction_run_count,
+            TOTAL_INTERVAL_PER_DATASOURCE - compactionRunCount,
+            compactionRunCount,
             0,
-            TOTAL_SEGMENT_PER_DATASOURCE - 4 * compaction_run_count,
-            2 * compaction_run_count,
+            TOTAL_SEGMENT_PER_DATASOURCE - 4 * compactionRunCount,
+            2 * compactionRunCount,
             0
         );
       }
@@ -624,15 +654,38 @@ public class CompactSegmentsTest
 
   private CoordinatorStats doCompactSegments(CompactSegments compactSegments)
   {
-    return doCompactSegments(compactSegments, createCompactionConfigs());
+    return doCompactSegments(compactSegments, (Integer) null);
+  }
+
+  private CoordinatorStats doCompactSegments(CompactSegments compactSegments, @Nullable Integer numCompactionTaskSlots)
+  {
+    return doCompactSegments(compactSegments, createCompactionConfigs(), numCompactionTaskSlots);
   }
 
-  private CoordinatorStats doCompactSegments(CompactSegments compactSegments, List<DataSourceCompactionConfig> compactionConfigs)
+  private CoordinatorStats doCompactSegments(
+      CompactSegments compactSegments,
+      List<DataSourceCompactionConfig> compactionConfigs
+  )
+  {
+    return doCompactSegments(compactSegments, compactionConfigs, null);
+  }
+
+  private CoordinatorStats doCompactSegments(
+      CompactSegments compactSegments,
+      List<DataSourceCompactionConfig> compactionConfigs,
+      @Nullable Integer numCompactionTaskSlots
+  )
   {
     DruidCoordinatorRuntimeParams params = CoordinatorRuntimeParamsTestHelpers
         .newBuilder()
         .withUsedSegmentsTimelinesPerDataSourceInTest(dataSources)
-        .withCompactionConfig(CoordinatorCompactionConfig.from(compactionConfigs))
+        .withCompactionConfig(
+            new CoordinatorCompactionConfig(
+                compactionConfigs,
+                numCompactionTaskSlots == null ? null : 100., // 100% when numCompactionTaskSlots is not null
+                numCompactionTaskSlots
+            )
+        )
         .build();
     return compactSegments.run(params).getCoordinatorStats();
   }
@@ -744,6 +797,11 @@ public class CompactSegmentsTest
 
   private List<DataSourceCompactionConfig> createCompactionConfigs()
   {
+    return createCompactionConfigs(null);
+  }
+
+  private List<DataSourceCompactionConfig> createCompactionConfigs(@Nullable Integer maxNumConcurrentSubTasks)
+  {
     final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
     for (int i = 0; i < 3; i++) {
       final String dataSource = DATA_SOURCE_PREFIX + i;
@@ -765,7 +823,7 @@ public class CompactSegmentsTest
                   null,
                   null,
                   null,
-                  null,
+                  maxNumConcurrentSubTasks,
                   null,
                   null,
                   null,
@@ -955,4 +1013,71 @@ public class CompactSegmentsTest
       return EasyMock.niceMock(DruidNodeDiscovery.class);
     }
   }
+
+  public static class StaticUtilsTest
+  {
+    @Test
+    public void testIsParalleModeNullTuningConfigReturnFalse()
+    {
+      Assert.assertFalse(CompactSegments.isParallelMode(null));
+    }
+
+    @Test
+    public void testIsParallelModeNullPartitionsSpecReturnFalse()
+    {
+      ClientCompactionTaskQueryTuningConfig tuningConfig = Mockito.mock(ClientCompactionTaskQueryTuningConfig.class);
+      Mockito.when(tuningConfig.getPartitionsSpec()).thenReturn(null);
+      Assert.assertFalse(CompactSegments.isParallelMode(tuningConfig));
+    }
+
+    @Test
+    public void testIsParallelModeNonRangePartitionVaryingMaxNumConcurrentSubTasks()
+    {
+      ClientCompactionTaskQueryTuningConfig tuningConfig = Mockito.mock(ClientCompactionTaskQueryTuningConfig.class);
+      Mockito.when(tuningConfig.getPartitionsSpec()).thenReturn(Mockito.mock(PartitionsSpec.class));
+
+      Mockito.when(tuningConfig.getMaxNumConcurrentSubTasks()).thenReturn(null);
+      Assert.assertFalse(CompactSegments.isParallelMode(tuningConfig));
+
+      Mockito.when(tuningConfig.getMaxNumConcurrentSubTasks()).thenReturn(1);
+      Assert.assertFalse(CompactSegments.isParallelMode(tuningConfig));
+
+      Mockito.when(tuningConfig.getMaxNumConcurrentSubTasks()).thenReturn(2);
+      Assert.assertTrue(CompactSegments.isParallelMode(tuningConfig));
+    }
+
+    @Test
+    public void testIsParallelModeRangePartitionVaryingMaxNumConcurrentSubTasks()
+    {
+      ClientCompactionTaskQueryTuningConfig tuningConfig = Mockito.mock(ClientCompactionTaskQueryTuningConfig.class);
+      Mockito.when(tuningConfig.getPartitionsSpec()).thenReturn(Mockito.mock(SingleDimensionPartitionsSpec.class));
+
+      Mockito.when(tuningConfig.getMaxNumConcurrentSubTasks()).thenReturn(null);
+      Assert.assertFalse(CompactSegments.isParallelMode(tuningConfig));
+
+      Mockito.when(tuningConfig.getMaxNumConcurrentSubTasks()).thenReturn(1);
+      Assert.assertTrue(CompactSegments.isParallelMode(tuningConfig));
+
+      Mockito.when(tuningConfig.getMaxNumConcurrentSubTasks()).thenReturn(2);
+      Assert.assertTrue(CompactSegments.isParallelMode(tuningConfig));
+    }
+
+    @Test
+    public void testFindMaxNumTaskSlotsUsedByOneCompactionTaskWhenIsParallelMode()
+    {
+      ClientCompactionTaskQueryTuningConfig tuningConfig = Mockito.mock(ClientCompactionTaskQueryTuningConfig.class);
+      Mockito.when(tuningConfig.getPartitionsSpec()).thenReturn(Mockito.mock(PartitionsSpec.class));
+      Mockito.when(tuningConfig.getMaxNumConcurrentSubTasks()).thenReturn(2);
+      Assert.assertEquals(3, CompactSegments.findMaxNumTaskSlotsUsedByOneCompactionTask(tuningConfig));
+    }
+
+    @Test
+    public void testFindMaxNumTaskSlotsUsedByOneCompactionTaskWhenIsSequentialMode()
+    {
+      ClientCompactionTaskQueryTuningConfig tuningConfig = Mockito.mock(ClientCompactionTaskQueryTuningConfig.class);
+      Mockito.when(tuningConfig.getPartitionsSpec()).thenReturn(Mockito.mock(PartitionsSpec.class));
+      Mockito.when(tuningConfig.getMaxNumConcurrentSubTasks()).thenReturn(1);
+      Assert.assertEquals(1, CompactSegments.findMaxNumTaskSlotsUsedByOneCompactionTask(tuningConfig));
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org