You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by jo...@apache.org on 2019/03/29 05:55:38 UTC

[incubator-druid] branch 0.14.0-incubating updated: [Backport] maxTotalRows should be checked in DataSourceCompactionConfig before setting targetCompactionSizeBytes (#7379)

This is an automated email from the ASF dual-hosted git repository.

jonwei pushed a commit to branch 0.14.0-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/0.14.0-incubating by this push:
     new 1e50319  [Backport] maxTotalRows should be checked in DataSourceCompactionConfig before setting targetCompactionSizeBytes (#7379)
1e50319 is described below

commit 1e503190037a7e71828fab6ca1f2cdd36985214e
Author: Jonathan Wei <jo...@users.noreply.github.com>
AuthorDate: Thu Mar 28 22:55:32 2019 -0700

    [Backport] maxTotalRows should be checked in DataSourceCompactionConfig before setting targetCompactionSizeBytes (#7379)
    
    * maxTotalRows should be checked in DataSourceCompactionConfig before setting targetCompactionSizeBytes (#7368)
    
    * maxTotalRows should be checked in DataSourceCompactionConfig before setting targetCompactionSizeBytes
    
    * remove unnecessary default values
    
    * remove flacky test
    
    * fix build
    
    * Add comments
    
    * checkstyle fix
    
    * Fix
---
 .../druid/tests/hadoop/ITHadoopIndexTest.java      |   2 +-
 .../tests/indexer/AbstractITBatchIndexTest.java    |   6 +-
 .../indexer/AbstractITRealtimeIndexTaskTest.java   |   2 +-
 .../druid/tests/indexer/AbstractIndexerTest.java   |   2 +-
 .../druid/tests/indexer/ITCompactionTaskTest.java  |   4 +-
 .../tests/indexer/ITKafkaIndexingServiceTest.java  |   2 +-
 .../apache/druid/tests/indexer/ITKafkaTest.java    |   3 +-
 .../tests/indexer/ITNestedQueryPushDownTest.java   |   2 +-
 .../druid/tests/indexer/ITUnionQueryTest.java      |   2 +-
 .../druid/client/indexing/ClientCompactQuery.java  |  13 +++
 .../indexing/ClientCompactQueryTuningConfig.java   |  64 +++++------
 .../client/indexing/HttpIndexingServiceClient.java |  29 +++--
 .../coordinator/DataSourceCompactionConfig.java    |  59 ++++++++--
 .../helper/DruidCoordinatorSegmentCompactor.java   |   1 -
 .../DataSourceCompactionConfigTest.java            | 128 +++++++++++++++++++++
 15 files changed, 251 insertions(+), 68 deletions(-)

diff --git a/integration-tests/src/test/java/org/apache/druid/tests/hadoop/ITHadoopIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/hadoop/ITHadoopIndexTest.java
index 233add8..8b9183a 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/hadoop/ITHadoopIndexTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/hadoop/ITHadoopIndexTest.java
@@ -65,7 +65,7 @@ public class ITHadoopIndexTest extends AbstractIndexerTest
 
     try {
       LOG.info("indexerFile name: [%s]", BATCH_TASK);
-      indexerSpec = getTaskAsString(BATCH_TASK);
+      indexerSpec = getResourceAsString(BATCH_TASK);
       indexerSpec = StringUtils.replace(indexerSpec, "%%HADOOP_TEST_PATH%%", hadoopDir);
     }
     catch (Exception e) {
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
index a531154..9ac9537 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
@@ -56,7 +56,7 @@ public class AbstractITBatchIndexTest extends AbstractIndexerTest
   {
     final String fullDatasourceName = dataSource + config.getExtraDatasourceNameSuffix();
     final String taskSpec = StringUtils.replace(
-        getTaskAsString(indexTaskFilePath),
+        getResourceAsString(indexTaskFilePath),
         "%%DATASOURCE%%",
         fullDatasourceName
     );
@@ -98,7 +98,7 @@ public class AbstractITBatchIndexTest extends AbstractIndexerTest
     final String fullReindexDatasourceName = reindexDataSource + config.getExtraDatasourceNameSuffix();
 
     String taskSpec = StringUtils.replace(
-        getTaskAsString(reindexTaskFilePath),
+        getResourceAsString(reindexTaskFilePath),
         "%%DATASOURCE%%",
         fullBaseDatasourceName
     );
@@ -148,7 +148,7 @@ public class AbstractITBatchIndexTest extends AbstractIndexerTest
   {
     final String fullDatasourceName = dataSource + config.getExtraDatasourceNameSuffix();
     final String taskSpec = StringUtils.replace(
-        getTaskAsString(indexTaskFilePath),
+        getResourceAsString(indexTaskFilePath),
         "%%DATASOURCE%%",
         fullDatasourceName
     );
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java
index cdf61de..2bf7724 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java
@@ -89,7 +89,7 @@ public abstract class AbstractITRealtimeIndexTaskTest extends AbstractIndexerTes
     try (final Closeable closeable = unloader(fullDatasourceName)) {
       // the task will run for 3 minutes and then shutdown itself
       String task = setShutOffTime(
-          getTaskAsString(getTaskResource()),
+          getResourceAsString(getTaskResource()),
           DateTimes.utc(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(3))
       );
       task = StringUtils.replace(task, "%%DATASOURCE%%", fullDatasourceName);
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java
index b739d79..30b7439 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java
@@ -112,7 +112,7 @@ public abstract class AbstractIndexerTest
     );
   }
 
-  protected String getTaskAsString(String file) throws IOException
+  protected String getResourceAsString(String file) throws IOException
   {
     final InputStream inputStream = ITRealtimeIndexTaskTest.class.getResourceAsStream(file);
     try {
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
index db6ebff..264edc2 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
@@ -131,7 +131,7 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
 
   private void loadData() throws Exception
   {
-    String taskSpec = getTaskAsString(INDEX_TASK);
+    String taskSpec = getResourceAsString(INDEX_TASK);
     taskSpec = StringUtils.replace(taskSpec, "%%DATASOURCE%%", fullDatasourceName);
     final String taskID = indexer.submitTask(taskSpec);
     LOG.info("TaskID for loading index task %s", taskID);
@@ -145,7 +145,7 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
 
   private void compactData(boolean keepSegmentGranularity) throws Exception
   {
-    final String template = getTaskAsString(COMPACTION_TASK);
+    final String template = getResourceAsString(COMPACTION_TASK);
     String taskSpec =
         StringUtils.replace(template, "${KEEP_SEGMENT_GRANULARITY}", Boolean.toString(keepSegmentGranularity));
     taskSpec = StringUtils.replace(taskSpec, "%%DATASOURCE%%", fullDatasourceName);
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java
index 5383ea5..d8283c2 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java
@@ -152,7 +152,7 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
       consumerProperties.put("bootstrap.servers", config.getKafkaInternalHost());
       addFilteredProperties(consumerProperties);
 
-      spec = getTaskAsString(INDEXER_FILE);
+      spec = getResourceAsString(INDEXER_FILE);
       spec = StringUtils.replace(spec, "%%DATASOURCE%%", fullDatasourceName);
       spec = StringUtils.replace(spec, "%%TOPIC%%", TOPIC_NAME);
       spec = StringUtils.replace(spec, "%%CONSUMER_PROPERTIES%%", jsonMapper.writeValueAsString(consumerProperties));
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaTest.java
index 10f9aab..0cb56ec 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaTest.java
@@ -144,7 +144,6 @@ public class ITKafkaTest extends AbstractIndexerTest
     catch (Exception e) {
       throw new ISE(e, "could not create kafka topic");
     }
-
     // set up kafka producer
     Properties properties = new Properties();
     addFilteredProperties(properties);
@@ -211,7 +210,7 @@ public class ITKafkaTest extends AbstractIndexerTest
 
       addFilteredProperties(consumerProperties);
 
-      indexerSpec = getTaskAsString(INDEXER_FILE);
+      indexerSpec = getResourceAsString(INDEXER_FILE);
       indexerSpec = StringUtils.replace(indexerSpec, "%%DATASOURCE%%", fullDatasourceName);
       indexerSpec = StringUtils.replace(indexerSpec, "%%TOPIC%%", TOPIC_NAME);
       indexerSpec = StringUtils.replace(indexerSpec, "%%COUNT%%", Integer.toString(num_events));
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITNestedQueryPushDownTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITNestedQueryPushDownTest.java
index 350e2ab..76f0c84 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITNestedQueryPushDownTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITNestedQueryPushDownTest.java
@@ -97,7 +97,7 @@ public class ITNestedQueryPushDownTest extends AbstractIndexerTest
 
   private void loadData() throws Exception
   {
-    String taskSpec = getTaskAsString(WIKITICKER_INDEX_TASK);
+    String taskSpec = getResourceAsString(WIKITICKER_INDEX_TASK);
     taskSpec = StringUtils.replace(taskSpec, "%%DATASOURCE%%", fullDatasourceName);
     final String taskID = indexer.submitTask(taskSpec);
     LOG.info("TaskID for loading index task %s", taskID);
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITUnionQueryTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITUnionQueryTest.java
index 8b65c40..b32a883 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITUnionQueryTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITUnionQueryTest.java
@@ -93,7 +93,7 @@ public class ITUnionQueryTest extends AbstractIndexerTest
     try {
       // Load 4 datasources with same dimensions
       String task = setShutOffTime(
-          getTaskAsString(UNION_TASK_RESOURCE),
+          getResourceAsString(UNION_TASK_RESOURCE),
           DateTimes.utc(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(3))
       );
       List<String> taskIDs = new ArrayList<>();
diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java
index c87ff90..f32b11d 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java
@@ -99,4 +99,17 @@ public class ClientCompactQuery implements ClientQuery
   {
     return context;
   }
+
+  @Override
+  public String toString()
+  {
+    return "ClientCompactQuery{" +
+           "dataSource='" + dataSource + '\'' +
+           ", segments=" + segments +
+           ", keepSegmentGranularity=" + keepSegmentGranularity +
+           ", targetCompactionSizeBytes=" + targetCompactionSizeBytes +
+           ", tuningConfig=" + tuningConfig +
+           ", context=" + context +
+           '}';
+  }
 }
diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQueryTuningConfig.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQueryTuningConfig.java
index 9bae161..0680169 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQueryTuningConfig.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQueryTuningConfig.java
@@ -29,20 +29,18 @@ import java.util.Objects;
 
 public class ClientCompactQueryTuningConfig
 {
-  // These default values should be synchronized with those of IndexTuningConfig
-  private static final int DEFAULT_MAX_ROWS_IN_MEMORY = 75_000;
-  private static final int DEFAULT_MAX_TOTAL_ROWS = 20_000_000;
-  private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec();
-  private static final int DEFAULT_MAX_PENDING_PERSISTS = 0;
-  private static final long DEFAULT_PUSH_TIMEOUT = 0;
-
   @Nullable
   private final Integer maxRowsPerSegment;
-  private final int maxRowsInMemory;
-  private final int maxTotalRows;
+  @Nullable
+  private final Integer maxRowsInMemory;
+  @Nullable
+  private final Integer maxTotalRows;
+  @Nullable
   private final IndexSpec indexSpec;
-  private final int maxPendingPersists;
-  private final long pushTimeout;
+  @Nullable
+  private final Integer maxPendingPersists;
+  @Nullable
+  private final Long pushTimeout;
 
   public static ClientCompactQueryTuningConfig from(
       @Nullable UserCompactTuningConfig userCompactTuningConfig,
@@ -70,11 +68,11 @@ public class ClientCompactQueryTuningConfig
   )
   {
     this.maxRowsPerSegment = maxRowsPerSegment;
-    this.maxRowsInMemory = maxRowsInMemory == null ? DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory;
-    this.maxTotalRows = maxTotalRows == null ? DEFAULT_MAX_TOTAL_ROWS : maxTotalRows;
-    this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec;
-    this.maxPendingPersists = maxPendingPersists == null ? DEFAULT_MAX_PENDING_PERSISTS : maxPendingPersists;
-    this.pushTimeout = pushTimeout == null ? DEFAULT_PUSH_TIMEOUT : pushTimeout;
+    this.maxRowsInMemory = maxRowsInMemory;
+    this.maxTotalRows = maxTotalRows;
+    this.indexSpec = indexSpec;
+    this.maxPendingPersists = maxPendingPersists;
+    this.pushTimeout = pushTimeout;
   }
 
   @JsonProperty
@@ -91,31 +89,36 @@ public class ClientCompactQueryTuningConfig
   }
 
   @JsonProperty
-  public int getMaxRowsInMemory()
+  @Nullable
+  public Integer getMaxRowsInMemory()
   {
     return maxRowsInMemory;
   }
 
   @JsonProperty
-  public int getMaxTotalRows()
+  @Nullable
+  public Integer getMaxTotalRows()
   {
     return maxTotalRows;
   }
 
   @JsonProperty
+  @Nullable
   public IndexSpec getIndexSpec()
   {
     return indexSpec;
   }
 
   @JsonProperty
-  public int getMaxPendingPersists()
+  @Nullable
+  public Integer getMaxPendingPersists()
   {
     return maxPendingPersists;
   }
 
   @JsonProperty
-  public long getPushTimeout()
+  @Nullable
+  public Long getPushTimeout()
   {
     return pushTimeout;
   }
@@ -130,25 +133,18 @@ public class ClientCompactQueryTuningConfig
       return false;
     }
     ClientCompactQueryTuningConfig that = (ClientCompactQueryTuningConfig) o;
-    return maxRowsInMemory == that.maxRowsInMemory &&
-           maxTotalRows == that.maxTotalRows &&
-           maxPendingPersists == that.maxPendingPersists &&
-           pushTimeout == that.pushTimeout &&
-           Objects.equals(maxRowsPerSegment, that.maxRowsPerSegment) &&
-           Objects.equals(indexSpec, that.indexSpec);
+    return Objects.equals(maxRowsPerSegment, that.maxRowsPerSegment) &&
+           Objects.equals(maxRowsInMemory, that.maxRowsInMemory) &&
+           Objects.equals(maxTotalRows, that.maxTotalRows) &&
+           Objects.equals(indexSpec, that.indexSpec) &&
+           Objects.equals(maxPendingPersists, that.maxPendingPersists) &&
+           Objects.equals(pushTimeout, that.pushTimeout);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(
-        maxRowsPerSegment,
-        maxRowsInMemory,
-        maxTotalRows,
-        indexSpec,
-        maxPendingPersists,
-        pushTimeout
-    );
+    return Objects.hash(maxRowsPerSegment, maxRowsInMemory, maxTotalRows, indexSpec, maxPendingPersists, pushTimeout);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
index 702d78b..2d8c4dc 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
@@ -22,6 +22,7 @@ package org.apache.druid.client.indexing;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.common.base.Throwables;
 import com.google.inject.Inject;
 import org.apache.druid.discovery.DruidLeaderClient;
@@ -93,7 +94,7 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
       boolean keepSegmentGranularity,
       @Nullable Long targetCompactionSizeBytes,
       int compactionTaskPriority,
-      @Nullable ClientCompactQueryTuningConfig tuningConfig,
+      ClientCompactQueryTuningConfig tuningConfig,
       @Nullable Map<String, Object> context
   )
   {
@@ -125,14 +126,20 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
   {
     try {
       final FullResponseHolder response = druidLeaderClient.go(
-          druidLeaderClient.makeRequest(
-              HttpMethod.POST,
-              "/druid/indexer/v1/task"
-          ).setContent(MediaType.APPLICATION_JSON, jsonMapper.writeValueAsBytes(taskObject))
+          druidLeaderClient.makeRequest(HttpMethod.POST, "/druid/indexer/v1/task")
+                           .setContent(MediaType.APPLICATION_JSON, jsonMapper.writeValueAsBytes(taskObject))
       );
 
       if (!response.getStatus().equals(HttpResponseStatus.OK)) {
-        throw new ISE("Failed to post task[%s]", taskObject);
+        if (!Strings.isNullOrEmpty(response.getContent())) {
+          throw new ISE(
+              "Failed to post task[%s] with error[%s].",
+              taskObject,
+              response.getContent()
+          );
+        } else {
+          throw new ISE("Failed to post task[%s]. Please check overlord log", taskObject);
+        }
       }
 
       final Map<String, Object> resultMap = jsonMapper.readValue(
@@ -154,10 +161,7 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
       final FullResponseHolder response = druidLeaderClient.go(
           druidLeaderClient.makeRequest(
               HttpMethod.POST,
-              StringUtils.format(
-                  "/druid/indexer/v1/task/%s/shutdown",
-                  StringUtils.urlEncode(taskId)
-              )
+              StringUtils.format("/druid/indexer/v1/task/%s/shutdown", StringUtils.urlEncode(taskId))
           )
       );
 
@@ -289,7 +293,10 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
   {
     try {
       final FullResponseHolder responseHolder = druidLeaderClient.go(
-          druidLeaderClient.makeRequest(HttpMethod.GET, StringUtils.format("/druid/indexer/v1/task/%s", taskId))
+          druidLeaderClient.makeRequest(
+              HttpMethod.GET,
+              StringUtils.format("/druid/indexer/v1/task/%s", StringUtils.urlEncode(taskId))
+          )
       );
 
       return jsonMapper.readValue(
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
index ef33fb0..867eef1 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
@@ -71,10 +71,6 @@ public class DataSourceCompactionConfig
       @JsonProperty("taskContext") @Nullable Map<String, Object> taskContext
   )
   {
-    Preconditions.checkArgument(
-        targetCompactionSizeBytes == null || maxRowsPerSegment == null,
-        "targetCompactionSizeBytes and maxRowsPerSegment in tuningConfig can't be used together"
-    );
     this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
     this.keepSegmentGranularity = keepSegmentGranularity == null
                                   ? DEFAULT_KEEP_SEGMENT_GRANULARITY
@@ -85,11 +81,11 @@ public class DataSourceCompactionConfig
     this.inputSegmentSizeBytes = inputSegmentSizeBytes == null
                                  ? DEFAULT_INPUT_SEGMENT_SIZE_BYTES
                                  : inputSegmentSizeBytes;
-    if (targetCompactionSizeBytes == null && maxRowsPerSegment == null) {
-      this.targetCompactionSizeBytes = DEFAULT_TARGET_COMPACTION_SIZE_BYTES;
-    } else {
-      this.targetCompactionSizeBytes = targetCompactionSizeBytes;
-    }
+    this.targetCompactionSizeBytes = getValidTargetCompactionSizeBytes(
+        targetCompactionSizeBytes,
+        maxRowsPerSegment,
+        tuningConfig
+    );
     this.maxRowsPerSegment = maxRowsPerSegment;
     this.maxNumSegmentsToCompact = maxNumSegmentsToCompact == null
                                    ? DEFAULT_NUM_INPUT_SEGMENTS
@@ -104,6 +100,51 @@ public class DataSourceCompactionConfig
     );
   }
 
+  /**
+   * This method is copied from {@code CompactionTask#getValidTargetCompactionSizeBytes}. The only difference is this
+   * method doesn't check 'numShards' which is not supported by {@link UserCompactTuningConfig}.
+   *
+   * Currently, we can't use the same method here because it's in a different module. Until we figure out how to reuse
+   * the same method, this method must be synced with {@code CompactionTask#getValidTargetCompactionSizeBytes}.
+   */
+  @Nullable
+  private static Long getValidTargetCompactionSizeBytes(
+      @Nullable Long targetCompactionSizeBytes,
+      @Nullable Integer maxRowsPerSegment,
+      @Nullable UserCompactTuningConfig tuningConfig
+  )
+  {
+    if (targetCompactionSizeBytes != null) {
+      Preconditions.checkArgument(
+          !hasPartitionConfig(maxRowsPerSegment, tuningConfig),
+          "targetCompactionSizeBytes[%s] cannot be used with maxRowsPerSegment[%s] and maxTotalRows[%s]",
+          targetCompactionSizeBytes,
+          maxRowsPerSegment,
+          tuningConfig == null ? null : tuningConfig.getMaxTotalRows()
+      );
+      return targetCompactionSizeBytes;
+    } else {
+      return hasPartitionConfig(maxRowsPerSegment, tuningConfig) ? null : DEFAULT_TARGET_COMPACTION_SIZE_BYTES;
+    }
+  }
+
+  /**
+   * his method is copied from {@code CompactionTask#hasPartitionConfig}. The two differences are
+   * 1) this method doesn't check 'numShards' which is not supported by {@link UserCompactTuningConfig}, and
+   * 2) this method accepts an additional 'maxRowsPerSegment' parameter since it's not supported by
+   * {@link UserCompactTuningConfig}.
+   *
+   * Currently, we can't use the same method here because it's in a different module. Until we figure out how to reuse
+   * the same method, this method must be synced with {@code CompactionTask#hasPartitionConfig}.
+   */
+  private static boolean hasPartitionConfig(
+      @Nullable Integer maxRowsPerSegment,
+      @Nullable UserCompactTuningConfig tuningConfig
+  )
+  {
+    return maxRowsPerSegment != null || (tuningConfig != null && tuningConfig.getMaxTotalRows() != null);
+  }
+
   @JsonProperty
   public String getDataSource()
   {
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java
index 7cbffa5..0258cd0 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java
@@ -175,7 +175,6 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper
 
     for (; iterator.hasNext() && numSubmittedTasks < numAvailableCompactionTaskSlots; numSubmittedTasks++) {
       final List<DataSegment> segmentsToCompact = iterator.next();
-
       final String dataSourceName = segmentsToCompact.get(0).getDataSource();
 
       if (segmentsToCompact.size() > 1) {
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
index eae6043..84a3d0d 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
@@ -25,7 +25,9 @@ import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig.UserCompactTuningConfig;
 import org.joda.time.Period;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import java.io.IOException;
 
@@ -33,6 +35,9 @@ public class DataSourceCompactionConfigTest
 {
   private static final ObjectMapper objectMapper = new DefaultObjectMapper();
 
+  @Rule
+  public final ExpectedException expectedException = ExpectedException.none();
+
   @Test
   public void testSerdeBasic() throws IOException
   {
@@ -56,6 +61,7 @@ public class DataSourceCompactionConfigTest
     Assert.assertEquals(25, fromJson.getTaskPriority());
     Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes());
     Assert.assertEquals(config.getTargetCompactionSizeBytes(), fromJson.getTargetCompactionSizeBytes());
+    Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment());
     Assert.assertEquals(config.getMaxNumSegmentsToCompact(), fromJson.getMaxNumSegmentsToCompact());
     Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest());
     Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
@@ -85,6 +91,7 @@ public class DataSourceCompactionConfigTest
     Assert.assertEquals(25, fromJson.getTaskPriority());
     Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes());
     Assert.assertNull(fromJson.getTargetCompactionSizeBytes());
+    Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment());
     Assert.assertEquals(config.getMaxNumSegmentsToCompact(), fromJson.getMaxNumSegmentsToCompact());
     Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest());
     Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
@@ -101,4 +108,125 @@ public class DataSourceCompactionConfigTest
     final UserCompactTuningConfig fromJson = objectMapper.readValue(json, UserCompactTuningConfig.class);
     Assert.assertEquals(config, fromJson);
   }
+
+  @Test
+  public void testSerdeWithMaxTotalRows() throws IOException
+  {
+    final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
+        "dataSource",
+        null,
+        null,
+        500L,
+        null,
+        null,
+        20,
+        new Period(3600),
+        new UserCompactTuningConfig(
+            null,
+            10000,
+            null,
+            null,
+            null
+        ),
+        ImmutableMap.of("key", "val")
+    );
+    final String json = objectMapper.writeValueAsString(config);
+    final DataSourceCompactionConfig fromJson = objectMapper.readValue(json, DataSourceCompactionConfig.class);
+
+    Assert.assertEquals(config.getDataSource(), fromJson.getDataSource());
+    Assert.assertTrue(fromJson.isKeepSegmentGranularity());
+    Assert.assertEquals(25, fromJson.getTaskPriority());
+    Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes());
+    Assert.assertNull(fromJson.getTargetCompactionSizeBytes());
+    Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment());
+    Assert.assertEquals(config.getMaxNumSegmentsToCompact(), fromJson.getMaxNumSegmentsToCompact());
+    Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest());
+    Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
+    Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext());
+  }
+
+  @Test
+  public void testTargetCompactionSizeBytesWithMaxRowsPerSegment()
+  {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage(
+        "targetCompactionSizeBytes[10000] cannot be used with maxRowsPerSegment[1000] and maxTotalRows[null]"
+    );
+    final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
+        "dataSource",
+        null,
+        null,
+        500L,
+        10000L,
+        1000,
+        20,
+        new Period(3600),
+        null,
+        ImmutableMap.of("key", "val")
+    );
+  }
+
+  @Test
+  public void testTargetCompactionSizeBytesWithMaxTotalRows()
+  {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage(
+        "targetCompactionSizeBytes[10000] cannot be used with maxRowsPerSegment[null] and maxTotalRows[10000]"
+    );
+    final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
+        "dataSource",
+        null,
+        null,
+        500L,
+        10000L,
+        null,
+        20,
+        new Period(3600),
+        new UserCompactTuningConfig(
+            null,
+            10000,
+            null,
+            null,
+            null
+        ),
+        ImmutableMap.of("key", "val")
+    );
+  }
+
+  @Test
+  public void testSerdeMaxTotalRowsWithMaxRowsPerSegment() throws IOException
+  {
+    final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
+        "dataSource",
+        null,
+        null,
+        500L,
+        null,
+        10000,
+        20,
+        new Period(3600),
+        new UserCompactTuningConfig(
+            null,
+            10000,
+            null,
+            null,
+            null
+        ),
+        ImmutableMap.of("key", "val")
+    );
+
+    final String json = objectMapper.writeValueAsString(config);
+    final DataSourceCompactionConfig fromJson = objectMapper.readValue(json, DataSourceCompactionConfig.class);
+
+    Assert.assertEquals(config.getDataSource(), fromJson.getDataSource());
+    Assert.assertTrue(fromJson.isKeepSegmentGranularity());
+    Assert.assertEquals(25, fromJson.getTaskPriority());
+    Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes());
+    Assert.assertNull(fromJson.getTargetCompactionSizeBytes());
+    Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment());
+    Assert.assertEquals(config.getMaxNumSegmentsToCompact(), fromJson.getMaxNumSegmentsToCompact());
+    Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest());
+    Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
+    Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext());
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org