You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by jo...@apache.org on 2019/03/29 05:55:38 UTC
[incubator-druid] branch 0.14.0-incubating updated: [Backport]
maxTotalRows should be checked in DataSourceCompactionConfig before setting
targetCompactionSizeBytes (#7379)
This is an automated email from the ASF dual-hosted git repository.
jonwei pushed a commit to branch 0.14.0-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/0.14.0-incubating by this push:
new 1e50319 [Backport] maxTotalRows should be checked in DataSourceCompactionConfig before setting targetCompactionSizeBytes (#7379)
1e50319 is described below
commit 1e503190037a7e71828fab6ca1f2cdd36985214e
Author: Jonathan Wei <jo...@users.noreply.github.com>
AuthorDate: Thu Mar 28 22:55:32 2019 -0700
[Backport] maxTotalRows should be checked in DataSourceCompactionConfig before setting targetCompactionSizeBytes (#7379)
* maxTotalRows should be checked in DataSourceCompactionConfig before setting targetCompactionSizeBytes (#7368)
* maxTotalRows should be checked in DataSourceCompactionConfig before setting targetCompactionSizeBytes
* remove unnecessary default values
* remove flacky test
* fix build
* Add comments
* checkstyle fix
* Fix
---
.../druid/tests/hadoop/ITHadoopIndexTest.java | 2 +-
.../tests/indexer/AbstractITBatchIndexTest.java | 6 +-
.../indexer/AbstractITRealtimeIndexTaskTest.java | 2 +-
.../druid/tests/indexer/AbstractIndexerTest.java | 2 +-
.../druid/tests/indexer/ITCompactionTaskTest.java | 4 +-
.../tests/indexer/ITKafkaIndexingServiceTest.java | 2 +-
.../apache/druid/tests/indexer/ITKafkaTest.java | 3 +-
.../tests/indexer/ITNestedQueryPushDownTest.java | 2 +-
.../druid/tests/indexer/ITUnionQueryTest.java | 2 +-
.../druid/client/indexing/ClientCompactQuery.java | 13 +++
.../indexing/ClientCompactQueryTuningConfig.java | 64 +++++------
.../client/indexing/HttpIndexingServiceClient.java | 29 +++--
.../coordinator/DataSourceCompactionConfig.java | 59 ++++++++--
.../helper/DruidCoordinatorSegmentCompactor.java | 1 -
.../DataSourceCompactionConfigTest.java | 128 +++++++++++++++++++++
15 files changed, 251 insertions(+), 68 deletions(-)
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/hadoop/ITHadoopIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/hadoop/ITHadoopIndexTest.java
index 233add8..8b9183a 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/hadoop/ITHadoopIndexTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/hadoop/ITHadoopIndexTest.java
@@ -65,7 +65,7 @@ public class ITHadoopIndexTest extends AbstractIndexerTest
try {
LOG.info("indexerFile name: [%s]", BATCH_TASK);
- indexerSpec = getTaskAsString(BATCH_TASK);
+ indexerSpec = getResourceAsString(BATCH_TASK);
indexerSpec = StringUtils.replace(indexerSpec, "%%HADOOP_TEST_PATH%%", hadoopDir);
}
catch (Exception e) {
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
index a531154..9ac9537 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
@@ -56,7 +56,7 @@ public class AbstractITBatchIndexTest extends AbstractIndexerTest
{
final String fullDatasourceName = dataSource + config.getExtraDatasourceNameSuffix();
final String taskSpec = StringUtils.replace(
- getTaskAsString(indexTaskFilePath),
+ getResourceAsString(indexTaskFilePath),
"%%DATASOURCE%%",
fullDatasourceName
);
@@ -98,7 +98,7 @@ public class AbstractITBatchIndexTest extends AbstractIndexerTest
final String fullReindexDatasourceName = reindexDataSource + config.getExtraDatasourceNameSuffix();
String taskSpec = StringUtils.replace(
- getTaskAsString(reindexTaskFilePath),
+ getResourceAsString(reindexTaskFilePath),
"%%DATASOURCE%%",
fullBaseDatasourceName
);
@@ -148,7 +148,7 @@ public class AbstractITBatchIndexTest extends AbstractIndexerTest
{
final String fullDatasourceName = dataSource + config.getExtraDatasourceNameSuffix();
final String taskSpec = StringUtils.replace(
- getTaskAsString(indexTaskFilePath),
+ getResourceAsString(indexTaskFilePath),
"%%DATASOURCE%%",
fullDatasourceName
);
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java
index cdf61de..2bf7724 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java
@@ -89,7 +89,7 @@ public abstract class AbstractITRealtimeIndexTaskTest extends AbstractIndexerTes
try (final Closeable closeable = unloader(fullDatasourceName)) {
// the task will run for 3 minutes and then shutdown itself
String task = setShutOffTime(
- getTaskAsString(getTaskResource()),
+ getResourceAsString(getTaskResource()),
DateTimes.utc(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(3))
);
task = StringUtils.replace(task, "%%DATASOURCE%%", fullDatasourceName);
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java
index b739d79..30b7439 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java
@@ -112,7 +112,7 @@ public abstract class AbstractIndexerTest
);
}
- protected String getTaskAsString(String file) throws IOException
+ protected String getResourceAsString(String file) throws IOException
{
final InputStream inputStream = ITRealtimeIndexTaskTest.class.getResourceAsStream(file);
try {
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
index db6ebff..264edc2 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
@@ -131,7 +131,7 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
private void loadData() throws Exception
{
- String taskSpec = getTaskAsString(INDEX_TASK);
+ String taskSpec = getResourceAsString(INDEX_TASK);
taskSpec = StringUtils.replace(taskSpec, "%%DATASOURCE%%", fullDatasourceName);
final String taskID = indexer.submitTask(taskSpec);
LOG.info("TaskID for loading index task %s", taskID);
@@ -145,7 +145,7 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
private void compactData(boolean keepSegmentGranularity) throws Exception
{
- final String template = getTaskAsString(COMPACTION_TASK);
+ final String template = getResourceAsString(COMPACTION_TASK);
String taskSpec =
StringUtils.replace(template, "${KEEP_SEGMENT_GRANULARITY}", Boolean.toString(keepSegmentGranularity));
taskSpec = StringUtils.replace(taskSpec, "%%DATASOURCE%%", fullDatasourceName);
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java
index 5383ea5..d8283c2 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java
@@ -152,7 +152,7 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
consumerProperties.put("bootstrap.servers", config.getKafkaInternalHost());
addFilteredProperties(consumerProperties);
- spec = getTaskAsString(INDEXER_FILE);
+ spec = getResourceAsString(INDEXER_FILE);
spec = StringUtils.replace(spec, "%%DATASOURCE%%", fullDatasourceName);
spec = StringUtils.replace(spec, "%%TOPIC%%", TOPIC_NAME);
spec = StringUtils.replace(spec, "%%CONSUMER_PROPERTIES%%", jsonMapper.writeValueAsString(consumerProperties));
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaTest.java
index 10f9aab..0cb56ec 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaTest.java
@@ -144,7 +144,6 @@ public class ITKafkaTest extends AbstractIndexerTest
catch (Exception e) {
throw new ISE(e, "could not create kafka topic");
}
-
// set up kafka producer
Properties properties = new Properties();
addFilteredProperties(properties);
@@ -211,7 +210,7 @@ public class ITKafkaTest extends AbstractIndexerTest
addFilteredProperties(consumerProperties);
- indexerSpec = getTaskAsString(INDEXER_FILE);
+ indexerSpec = getResourceAsString(INDEXER_FILE);
indexerSpec = StringUtils.replace(indexerSpec, "%%DATASOURCE%%", fullDatasourceName);
indexerSpec = StringUtils.replace(indexerSpec, "%%TOPIC%%", TOPIC_NAME);
indexerSpec = StringUtils.replace(indexerSpec, "%%COUNT%%", Integer.toString(num_events));
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITNestedQueryPushDownTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITNestedQueryPushDownTest.java
index 350e2ab..76f0c84 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITNestedQueryPushDownTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITNestedQueryPushDownTest.java
@@ -97,7 +97,7 @@ public class ITNestedQueryPushDownTest extends AbstractIndexerTest
private void loadData() throws Exception
{
- String taskSpec = getTaskAsString(WIKITICKER_INDEX_TASK);
+ String taskSpec = getResourceAsString(WIKITICKER_INDEX_TASK);
taskSpec = StringUtils.replace(taskSpec, "%%DATASOURCE%%", fullDatasourceName);
final String taskID = indexer.submitTask(taskSpec);
LOG.info("TaskID for loading index task %s", taskID);
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITUnionQueryTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITUnionQueryTest.java
index 8b65c40..b32a883 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITUnionQueryTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITUnionQueryTest.java
@@ -93,7 +93,7 @@ public class ITUnionQueryTest extends AbstractIndexerTest
try {
// Load 4 datasources with same dimensions
String task = setShutOffTime(
- getTaskAsString(UNION_TASK_RESOURCE),
+ getResourceAsString(UNION_TASK_RESOURCE),
DateTimes.utc(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(3))
);
List<String> taskIDs = new ArrayList<>();
diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java
index c87ff90..f32b11d 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java
@@ -99,4 +99,17 @@ public class ClientCompactQuery implements ClientQuery
{
return context;
}
+
+ @Override
+ public String toString()
+ {
+ return "ClientCompactQuery{" +
+ "dataSource='" + dataSource + '\'' +
+ ", segments=" + segments +
+ ", keepSegmentGranularity=" + keepSegmentGranularity +
+ ", targetCompactionSizeBytes=" + targetCompactionSizeBytes +
+ ", tuningConfig=" + tuningConfig +
+ ", context=" + context +
+ '}';
+ }
}
diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQueryTuningConfig.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQueryTuningConfig.java
index 9bae161..0680169 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQueryTuningConfig.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQueryTuningConfig.java
@@ -29,20 +29,18 @@ import java.util.Objects;
public class ClientCompactQueryTuningConfig
{
- // These default values should be synchronized with those of IndexTuningConfig
- private static final int DEFAULT_MAX_ROWS_IN_MEMORY = 75_000;
- private static final int DEFAULT_MAX_TOTAL_ROWS = 20_000_000;
- private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec();
- private static final int DEFAULT_MAX_PENDING_PERSISTS = 0;
- private static final long DEFAULT_PUSH_TIMEOUT = 0;
-
@Nullable
private final Integer maxRowsPerSegment;
- private final int maxRowsInMemory;
- private final int maxTotalRows;
+ @Nullable
+ private final Integer maxRowsInMemory;
+ @Nullable
+ private final Integer maxTotalRows;
+ @Nullable
private final IndexSpec indexSpec;
- private final int maxPendingPersists;
- private final long pushTimeout;
+ @Nullable
+ private final Integer maxPendingPersists;
+ @Nullable
+ private final Long pushTimeout;
public static ClientCompactQueryTuningConfig from(
@Nullable UserCompactTuningConfig userCompactTuningConfig,
@@ -70,11 +68,11 @@ public class ClientCompactQueryTuningConfig
)
{
this.maxRowsPerSegment = maxRowsPerSegment;
- this.maxRowsInMemory = maxRowsInMemory == null ? DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory;
- this.maxTotalRows = maxTotalRows == null ? DEFAULT_MAX_TOTAL_ROWS : maxTotalRows;
- this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec;
- this.maxPendingPersists = maxPendingPersists == null ? DEFAULT_MAX_PENDING_PERSISTS : maxPendingPersists;
- this.pushTimeout = pushTimeout == null ? DEFAULT_PUSH_TIMEOUT : pushTimeout;
+ this.maxRowsInMemory = maxRowsInMemory;
+ this.maxTotalRows = maxTotalRows;
+ this.indexSpec = indexSpec;
+ this.maxPendingPersists = maxPendingPersists;
+ this.pushTimeout = pushTimeout;
}
@JsonProperty
@@ -91,31 +89,36 @@ public class ClientCompactQueryTuningConfig
}
@JsonProperty
- public int getMaxRowsInMemory()
+ @Nullable
+ public Integer getMaxRowsInMemory()
{
return maxRowsInMemory;
}
@JsonProperty
- public int getMaxTotalRows()
+ @Nullable
+ public Integer getMaxTotalRows()
{
return maxTotalRows;
}
@JsonProperty
+ @Nullable
public IndexSpec getIndexSpec()
{
return indexSpec;
}
@JsonProperty
- public int getMaxPendingPersists()
+ @Nullable
+ public Integer getMaxPendingPersists()
{
return maxPendingPersists;
}
@JsonProperty
- public long getPushTimeout()
+ @Nullable
+ public Long getPushTimeout()
{
return pushTimeout;
}
@@ -130,25 +133,18 @@ public class ClientCompactQueryTuningConfig
return false;
}
ClientCompactQueryTuningConfig that = (ClientCompactQueryTuningConfig) o;
- return maxRowsInMemory == that.maxRowsInMemory &&
- maxTotalRows == that.maxTotalRows &&
- maxPendingPersists == that.maxPendingPersists &&
- pushTimeout == that.pushTimeout &&
- Objects.equals(maxRowsPerSegment, that.maxRowsPerSegment) &&
- Objects.equals(indexSpec, that.indexSpec);
+ return Objects.equals(maxRowsPerSegment, that.maxRowsPerSegment) &&
+ Objects.equals(maxRowsInMemory, that.maxRowsInMemory) &&
+ Objects.equals(maxTotalRows, that.maxTotalRows) &&
+ Objects.equals(indexSpec, that.indexSpec) &&
+ Objects.equals(maxPendingPersists, that.maxPendingPersists) &&
+ Objects.equals(pushTimeout, that.pushTimeout);
}
@Override
public int hashCode()
{
- return Objects.hash(
- maxRowsPerSegment,
- maxRowsInMemory,
- maxTotalRows,
- indexSpec,
- maxPendingPersists,
- pushTimeout
- );
+ return Objects.hash(maxRowsPerSegment, maxRowsInMemory, maxTotalRows, indexSpec, maxPendingPersists, pushTimeout);
}
@Override
diff --git a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
index 702d78b..2d8c4dc 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
@@ -22,6 +22,7 @@ package org.apache.druid.client.indexing;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import org.apache.druid.discovery.DruidLeaderClient;
@@ -93,7 +94,7 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
boolean keepSegmentGranularity,
@Nullable Long targetCompactionSizeBytes,
int compactionTaskPriority,
- @Nullable ClientCompactQueryTuningConfig tuningConfig,
+ ClientCompactQueryTuningConfig tuningConfig,
@Nullable Map<String, Object> context
)
{
@@ -125,14 +126,20 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
{
try {
final FullResponseHolder response = druidLeaderClient.go(
- druidLeaderClient.makeRequest(
- HttpMethod.POST,
- "/druid/indexer/v1/task"
- ).setContent(MediaType.APPLICATION_JSON, jsonMapper.writeValueAsBytes(taskObject))
+ druidLeaderClient.makeRequest(HttpMethod.POST, "/druid/indexer/v1/task")
+ .setContent(MediaType.APPLICATION_JSON, jsonMapper.writeValueAsBytes(taskObject))
);
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
- throw new ISE("Failed to post task[%s]", taskObject);
+ if (!Strings.isNullOrEmpty(response.getContent())) {
+ throw new ISE(
+ "Failed to post task[%s] with error[%s].",
+ taskObject,
+ response.getContent()
+ );
+ } else {
+ throw new ISE("Failed to post task[%s]. Please check overlord log", taskObject);
+ }
}
final Map<String, Object> resultMap = jsonMapper.readValue(
@@ -154,10 +161,7 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
final FullResponseHolder response = druidLeaderClient.go(
druidLeaderClient.makeRequest(
HttpMethod.POST,
- StringUtils.format(
- "/druid/indexer/v1/task/%s/shutdown",
- StringUtils.urlEncode(taskId)
- )
+ StringUtils.format("/druid/indexer/v1/task/%s/shutdown", StringUtils.urlEncode(taskId))
)
);
@@ -289,7 +293,10 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
{
try {
final FullResponseHolder responseHolder = druidLeaderClient.go(
- druidLeaderClient.makeRequest(HttpMethod.GET, StringUtils.format("/druid/indexer/v1/task/%s", taskId))
+ druidLeaderClient.makeRequest(
+ HttpMethod.GET,
+ StringUtils.format("/druid/indexer/v1/task/%s", StringUtils.urlEncode(taskId))
+ )
);
return jsonMapper.readValue(
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
index ef33fb0..867eef1 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
@@ -71,10 +71,6 @@ public class DataSourceCompactionConfig
@JsonProperty("taskContext") @Nullable Map<String, Object> taskContext
)
{
- Preconditions.checkArgument(
- targetCompactionSizeBytes == null || maxRowsPerSegment == null,
- "targetCompactionSizeBytes and maxRowsPerSegment in tuningConfig can't be used together"
- );
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.keepSegmentGranularity = keepSegmentGranularity == null
? DEFAULT_KEEP_SEGMENT_GRANULARITY
@@ -85,11 +81,11 @@ public class DataSourceCompactionConfig
this.inputSegmentSizeBytes = inputSegmentSizeBytes == null
? DEFAULT_INPUT_SEGMENT_SIZE_BYTES
: inputSegmentSizeBytes;
- if (targetCompactionSizeBytes == null && maxRowsPerSegment == null) {
- this.targetCompactionSizeBytes = DEFAULT_TARGET_COMPACTION_SIZE_BYTES;
- } else {
- this.targetCompactionSizeBytes = targetCompactionSizeBytes;
- }
+ this.targetCompactionSizeBytes = getValidTargetCompactionSizeBytes(
+ targetCompactionSizeBytes,
+ maxRowsPerSegment,
+ tuningConfig
+ );
this.maxRowsPerSegment = maxRowsPerSegment;
this.maxNumSegmentsToCompact = maxNumSegmentsToCompact == null
? DEFAULT_NUM_INPUT_SEGMENTS
@@ -104,6 +100,51 @@ public class DataSourceCompactionConfig
);
}
+ /**
+ * This method is copied from {@code CompactionTask#getValidTargetCompactionSizeBytes}. The only difference is this
+ * method doesn't check 'numShards' which is not supported by {@link UserCompactTuningConfig}.
+ *
+ * Currently, we can't use the same method here because it's in a different module. Until we figure out how to reuse
+ * the same method, this method must be synced with {@code CompactionTask#getValidTargetCompactionSizeBytes}.
+ */
+ @Nullable
+ private static Long getValidTargetCompactionSizeBytes(
+ @Nullable Long targetCompactionSizeBytes,
+ @Nullable Integer maxRowsPerSegment,
+ @Nullable UserCompactTuningConfig tuningConfig
+ )
+ {
+ if (targetCompactionSizeBytes != null) {
+ Preconditions.checkArgument(
+ !hasPartitionConfig(maxRowsPerSegment, tuningConfig),
+ "targetCompactionSizeBytes[%s] cannot be used with maxRowsPerSegment[%s] and maxTotalRows[%s]",
+ targetCompactionSizeBytes,
+ maxRowsPerSegment,
+ tuningConfig == null ? null : tuningConfig.getMaxTotalRows()
+ );
+ return targetCompactionSizeBytes;
+ } else {
+ return hasPartitionConfig(maxRowsPerSegment, tuningConfig) ? null : DEFAULT_TARGET_COMPACTION_SIZE_BYTES;
+ }
+ }
+
+ /**
+ * his method is copied from {@code CompactionTask#hasPartitionConfig}. The two differences are
+ * 1) this method doesn't check 'numShards' which is not supported by {@link UserCompactTuningConfig}, and
+ * 2) this method accepts an additional 'maxRowsPerSegment' parameter since it's not supported by
+ * {@link UserCompactTuningConfig}.
+ *
+ * Currently, we can't use the same method here because it's in a different module. Until we figure out how to reuse
+ * the same method, this method must be synced with {@code CompactionTask#hasPartitionConfig}.
+ */
+ private static boolean hasPartitionConfig(
+ @Nullable Integer maxRowsPerSegment,
+ @Nullable UserCompactTuningConfig tuningConfig
+ )
+ {
+ return maxRowsPerSegment != null || (tuningConfig != null && tuningConfig.getMaxTotalRows() != null);
+ }
+
@JsonProperty
public String getDataSource()
{
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java
index 7cbffa5..0258cd0 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java
@@ -175,7 +175,6 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper
for (; iterator.hasNext() && numSubmittedTasks < numAvailableCompactionTaskSlots; numSubmittedTasks++) {
final List<DataSegment> segmentsToCompact = iterator.next();
-
final String dataSourceName = segmentsToCompact.get(0).getDataSource();
if (segmentsToCompact.size() > 1) {
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
index eae6043..84a3d0d 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
@@ -25,7 +25,9 @@ import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig.UserCompactTuningConfig;
import org.joda.time.Period;
import org.junit.Assert;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import java.io.IOException;
@@ -33,6 +35,9 @@ public class DataSourceCompactionConfigTest
{
private static final ObjectMapper objectMapper = new DefaultObjectMapper();
+ @Rule
+ public final ExpectedException expectedException = ExpectedException.none();
+
@Test
public void testSerdeBasic() throws IOException
{
@@ -56,6 +61,7 @@ public class DataSourceCompactionConfigTest
Assert.assertEquals(25, fromJson.getTaskPriority());
Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes());
Assert.assertEquals(config.getTargetCompactionSizeBytes(), fromJson.getTargetCompactionSizeBytes());
+ Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment());
Assert.assertEquals(config.getMaxNumSegmentsToCompact(), fromJson.getMaxNumSegmentsToCompact());
Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest());
Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
@@ -85,6 +91,7 @@ public class DataSourceCompactionConfigTest
Assert.assertEquals(25, fromJson.getTaskPriority());
Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes());
Assert.assertNull(fromJson.getTargetCompactionSizeBytes());
+ Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment());
Assert.assertEquals(config.getMaxNumSegmentsToCompact(), fromJson.getMaxNumSegmentsToCompact());
Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest());
Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
@@ -101,4 +108,125 @@ public class DataSourceCompactionConfigTest
final UserCompactTuningConfig fromJson = objectMapper.readValue(json, UserCompactTuningConfig.class);
Assert.assertEquals(config, fromJson);
}
+
+ @Test
+ public void testSerdeWithMaxTotalRows() throws IOException
+ {
+ final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
+ "dataSource",
+ null,
+ null,
+ 500L,
+ null,
+ null,
+ 20,
+ new Period(3600),
+ new UserCompactTuningConfig(
+ null,
+ 10000,
+ null,
+ null,
+ null
+ ),
+ ImmutableMap.of("key", "val")
+ );
+ final String json = objectMapper.writeValueAsString(config);
+ final DataSourceCompactionConfig fromJson = objectMapper.readValue(json, DataSourceCompactionConfig.class);
+
+ Assert.assertEquals(config.getDataSource(), fromJson.getDataSource());
+ Assert.assertTrue(fromJson.isKeepSegmentGranularity());
+ Assert.assertEquals(25, fromJson.getTaskPriority());
+ Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes());
+ Assert.assertNull(fromJson.getTargetCompactionSizeBytes());
+ Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment());
+ Assert.assertEquals(config.getMaxNumSegmentsToCompact(), fromJson.getMaxNumSegmentsToCompact());
+ Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest());
+ Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
+ Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext());
+ }
+
+ @Test
+ public void testTargetCompactionSizeBytesWithMaxRowsPerSegment()
+ {
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage(
+ "targetCompactionSizeBytes[10000] cannot be used with maxRowsPerSegment[1000] and maxTotalRows[null]"
+ );
+ final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
+ "dataSource",
+ null,
+ null,
+ 500L,
+ 10000L,
+ 1000,
+ 20,
+ new Period(3600),
+ null,
+ ImmutableMap.of("key", "val")
+ );
+ }
+
+ @Test
+ public void testTargetCompactionSizeBytesWithMaxTotalRows()
+ {
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage(
+ "targetCompactionSizeBytes[10000] cannot be used with maxRowsPerSegment[null] and maxTotalRows[10000]"
+ );
+ final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
+ "dataSource",
+ null,
+ null,
+ 500L,
+ 10000L,
+ null,
+ 20,
+ new Period(3600),
+ new UserCompactTuningConfig(
+ null,
+ 10000,
+ null,
+ null,
+ null
+ ),
+ ImmutableMap.of("key", "val")
+ );
+ }
+
+ @Test
+ public void testSerdeMaxTotalRowsWithMaxRowsPerSegment() throws IOException
+ {
+ final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
+ "dataSource",
+ null,
+ null,
+ 500L,
+ null,
+ 10000,
+ 20,
+ new Period(3600),
+ new UserCompactTuningConfig(
+ null,
+ 10000,
+ null,
+ null,
+ null
+ ),
+ ImmutableMap.of("key", "val")
+ );
+
+ final String json = objectMapper.writeValueAsString(config);
+ final DataSourceCompactionConfig fromJson = objectMapper.readValue(json, DataSourceCompactionConfig.class);
+
+ Assert.assertEquals(config.getDataSource(), fromJson.getDataSource());
+ Assert.assertTrue(fromJson.isKeepSegmentGranularity());
+ Assert.assertEquals(25, fromJson.getTaskPriority());
+ Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes());
+ Assert.assertNull(fromJson.getTargetCompactionSizeBytes());
+ Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment());
+ Assert.assertEquals(config.getMaxNumSegmentsToCompact(), fromJson.getMaxNumSegmentsToCompact());
+ Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest());
+ Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
+ Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org