You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2021/05/27 23:35:05 UTC
[druid] branch master updated: Fix bug: 502 bad gateway thrown when
we edit/delete any auto compaction config created 0.21.0 or before (#11311)
This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new e5633d7 Fix bug: 502 bad gateway thrown when we edit/delete any auto compaction config created 0.21.0 or before (#11311)
e5633d7 is described below
commit e5633d7842f44464ed463a8718051c7542a81ccb
Author: Maytas Monsereenusorn <ma...@apache.org>
AuthorDate: Thu May 27 16:34:32 2021 -0700
Fix bug: 502 bad gateway thrown when we edit/delete any auto compaction config created 0.21.0 or before (#11311)
* fix bug
* add test
* fix IT
* fix checkstyle
* address comments
---
.travis.yml | 6 +-
.../apache/druid/common/config/ConfigManager.java | 7 +-
.../druid/common/config/ConfigManagerConfig.java | 8 ++
.../druid/common/config/JacksonConfigManager.java | 20 ++-
.../druid/common/config/ConfigManagerTest.java | 22 +++-
.../common/config/JacksonConfigManagerTest.java | 44 +++++++
integration-tests/docker/druid.sh | 2 +-
.../docker/test-data/upgrade-sample-data.sql | 16 +++
.../java/org/apache/druid/tests/TestNGGroup.java | 2 +
.../duty/ITAutoCompactionUpgradeTest.java | 121 ++++++++++++++++++
.../coordinator/CoordinatorCompactionConfig.java | 17 +++
.../coordinator/duty/KillCompactionConfig.java | 16 ++-
.../http/CoordinatorCompactionConfigsResource.java | 54 ++++++--
.../coordinator/duty/KillCompactionConfigTest.java | 142 ++++++++++++++++-----
.../CoordinatorCompactionConfigsResourceTest.java | 88 +++++++++----
15 files changed, 480 insertions(+), 85 deletions(-)
diff --git a/.travis.yml b/.travis.yml
index 5fd56d8..2fbc7d6 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -592,13 +592,13 @@ jobs:
stage: Tests - phase 2
jdk: openjdk8
services: *integration_test_services
- env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep [...]
+ env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep [...]
script: *run_integration_test
after_failure: *integration_test_diags
- <<: *integration_tests
name: "(Compile=openjdk8, Run=openjdk8) other integration tests with Indexer"
- env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep [...]
+ env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep [...]
- <<: *integration_tests
name: "(Compile=openjdk8, Run=openjdk8) leadership and high availability integration tests"
@@ -671,7 +671,7 @@ jobs:
- <<: *integration_tests
name: "(Compile=openjdk8, Run=openjdk11) other integration test"
jdk: openjdk8
- env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep [...]
+ env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep [...]
- <<: *integration_tests
name: "(Compile=openjdk8, Run=openjdk11) leadership and high availability integration tests"
diff --git a/core/src/main/java/org/apache/druid/common/config/ConfigManager.java b/core/src/main/java/org/apache/druid/common/config/ConfigManager.java
index 7bad650..56cbcd5 100644
--- a/core/src/main/java/org/apache/druid/common/config/ConfigManager.java
+++ b/core/src/main/java/org/apache/druid/common/config/ConfigManager.java
@@ -176,7 +176,7 @@ public class ConfigManager
return set(key, serde, null, obj);
}
- public <T> SetResult set(final String key, final ConfigSerde<T> serde, @Nullable final T oldObject, final T newObject)
+ public <T> SetResult set(final String key, final ConfigSerde<T> serde, @Nullable final byte[] oldValue, final T newObject)
{
if (newObject == null || !started) {
if (newObject == null) {
@@ -191,11 +191,10 @@ public class ConfigManager
try {
return exec.submit(
() -> {
- if (oldObject == null) {
+ if (oldValue == null || !config.get().isEnableCompareAndSwap()) {
dbConnector.insertOrUpdate(configTable, "name", "payload", key, newBytes);
} else {
- final byte[] oldBytes = serde.serialize(oldObject);
- MetadataCASUpdate metadataCASUpdate = createMetadataCASUpdate(key, oldBytes, newBytes);
+ MetadataCASUpdate metadataCASUpdate = createMetadataCASUpdate(key, oldValue, newBytes);
boolean success = dbConnector.compareAndSwap(ImmutableList.of(metadataCASUpdate));
if (!success) {
return SetResult.fail(new IllegalStateException("Config value has changed"), true);
diff --git a/core/src/main/java/org/apache/druid/common/config/ConfigManagerConfig.java b/core/src/main/java/org/apache/druid/common/config/ConfigManagerConfig.java
index 5ef5816..472bacd 100644
--- a/core/src/main/java/org/apache/druid/common/config/ConfigManagerConfig.java
+++ b/core/src/main/java/org/apache/druid/common/config/ConfigManagerConfig.java
@@ -32,8 +32,16 @@ public class ConfigManagerConfig
@NotNull
private Period pollDuration = new Period("PT1M");
+ @JsonProperty
+ private boolean enableCompareAndSwap = true;
+
public Period getPollDuration()
{
return pollDuration;
}
+
+ public boolean isEnableCompareAndSwap()
+ {
+ return enableCompareAndSwap;
+ }
}
diff --git a/core/src/main/java/org/apache/druid/common/config/JacksonConfigManager.java b/core/src/main/java/org/apache/druid/common/config/JacksonConfigManager.java
index 7e3eeeb..7cf78a5 100644
--- a/core/src/main/java/org/apache/druid/common/config/JacksonConfigManager.java
+++ b/core/src/main/java/org/apache/druid/common/config/JacksonConfigManager.java
@@ -73,6 +73,16 @@ public class JacksonConfigManager
return configManager.watchConfig(key, create(clazz, defaultVal));
}
+ public <T> T convertByteToConfig(byte[] configInByte, Class<? extends T> clazz, T defaultVal)
+ {
+ if (configInByte == null) {
+ return defaultVal;
+ } else {
+ final ConfigSerde<T> serde = create(clazz, defaultVal);
+ return serde.deserialize(configInByte);
+ }
+ }
+
/**
* Set the config and add audit entry
*
@@ -90,14 +100,18 @@ public class JacksonConfigManager
*
* @param key of the config to set
* @param oldValue old config value. If not null, then the update will only succeed if the insert
- * happens when current database entry is the same as this value. If null, then the insert
- * will not consider the current database entry.
+ * happens when current database entry is the same as this value. Note that the current database
+ * entry (in array of bytes) have to be exactly the same as the array of bytes of this value for
+ * update to succeed. If null, then the insert will not consider the current database entry. Note
+ * that this field intentionally uses byte array to be resilient across serde of existing data
+ * retrieved from the database (instead of Java object which may have additional fields added
+ * as a result of serde)
* @param newValue new config value to insert
* @param auditInfo metadata regarding the change to config, for audit purposes
*/
public <T> SetResult set(
String key,
- @Nullable T oldValue,
+ @Nullable byte[] oldValue,
T newValue,
AuditInfo auditInfo
)
diff --git a/core/src/test/java/org/apache/druid/common/config/ConfigManagerTest.java b/core/src/test/java/org/apache/druid/common/config/ConfigManagerTest.java
index 038aa38..9705939 100644
--- a/core/src/test/java/org/apache/druid/common/config/ConfigManagerTest.java
+++ b/core/src/test/java/org/apache/druid/common/config/ConfigManagerTest.java
@@ -49,7 +49,7 @@ public class ConfigManagerTest
{
private static final String CONFIG_KEY = "configX";
private static final String TABLE_NAME = "config_table";
- private static final TestConfig OLD_CONFIG = new TestConfig("1", "x", 1);
+ private static final byte[] OLD_CONFIG = {1, 2, 3};
private static final TestConfig NEW_CONFIG = new TestConfig("2", "y", 2);
@Mock
@@ -120,6 +120,7 @@ public class ConfigManagerTest
@Test
public void testSetOldObjectNotNullShouldSwap()
{
+ when(mockConfigManagerConfig.isEnableCompareAndSwap()).thenReturn(true);
when(mockDbConnector.compareAndSwap(any(List.class))).thenReturn(true);
final ArgumentCaptor<List<MetadataCASUpdate>> updateCaptor = ArgumentCaptor.forClass(List.class);
configManager.start();
@@ -134,10 +135,27 @@ public class ConfigManagerTest
Assert.assertEquals(MetadataStorageConnector.CONFIG_TABLE_KEY_COLUMN, updateCaptor.getValue().get(0).getKeyColumn());
Assert.assertEquals(MetadataStorageConnector.CONFIG_TABLE_VALUE_COLUMN, updateCaptor.getValue().get(0).getValueColumn());
Assert.assertEquals(CONFIG_KEY, updateCaptor.getValue().get(0).getKey());
- Assert.assertArrayEquals(configConfigSerdeFromClass.serialize(OLD_CONFIG), updateCaptor.getValue().get(0).getOldValue());
+ Assert.assertArrayEquals(OLD_CONFIG, updateCaptor.getValue().get(0).getOldValue());
Assert.assertArrayEquals(configConfigSerdeFromClass.serialize(NEW_CONFIG), updateCaptor.getValue().get(0).getNewValue());
}
+ @Test
+ public void testSetOldObjectNotNullButCompareAndSwapDisabledShouldInsertWithoutSwap()
+ {
+ when(mockConfigManagerConfig.isEnableCompareAndSwap()).thenReturn(false);
+ configManager.start();
+ ConfigManager.SetResult setResult = configManager.set(CONFIG_KEY, configConfigSerdeFromClass, OLD_CONFIG, NEW_CONFIG);
+ Assert.assertTrue(setResult.isOk());
+ Mockito.verify(mockDbConnector).insertOrUpdate(
+ ArgumentMatchers.eq(TABLE_NAME),
+ ArgumentMatchers.anyString(),
+ ArgumentMatchers.anyString(),
+ ArgumentMatchers.eq(CONFIG_KEY),
+ ArgumentMatchers.any(byte[].class)
+ );
+ Mockito.verifyNoMoreInteractions(mockDbConnector);
+ }
+
static class TestConfig
{
private final String version;
diff --git a/core/src/test/java/org/apache/druid/common/config/JacksonConfigManagerTest.java b/core/src/test/java/org/apache/druid/common/config/JacksonConfigManagerTest.java
index 25220eb..4a4aaf7 100644
--- a/core/src/test/java/org/apache/druid/common/config/JacksonConfigManagerTest.java
+++ b/core/src/test/java/org/apache/druid/common/config/JacksonConfigManagerTest.java
@@ -38,6 +38,8 @@ import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
+import java.util.Objects;
+
@RunWith(MockitoJUnitRunner.class)
public class JacksonConfigManagerTest
{
@@ -136,6 +138,27 @@ public class JacksonConfigManagerTest
Assert.assertNotNull(configSerdeCapture.getValue());
}
+ @Test
+ public void testConvertByteToConfigWithNullConfigInByte()
+ {
+ TestConfig defaultExpected = new TestConfig("version", null, 3);
+ TestConfig actual = jacksonConfigManager.convertByteToConfig(null, TestConfig.class, defaultExpected);
+ Assert.assertEquals(defaultExpected, actual);
+ }
+
+ @Test
+ public void testConvertByteToConfigWithNonNullConfigInByte()
+ {
+ ConfigSerde<TestConfig> configConfigSerdeFromTypeReference = jacksonConfigManager.create(new TypeReference<TestConfig>()
+ {
+ }, null);
+ TestConfig defaultConfig = new TestConfig("version", null, 3);
+ TestConfig expectedConfig = new TestConfig("version2", null, 5);
+ byte[] expectedConfigInByte = configConfigSerdeFromTypeReference.serialize(expectedConfig);
+
+ TestConfig actual = jacksonConfigManager.convertByteToConfig(expectedConfigInByte, TestConfig.class, defaultConfig);
+ Assert.assertEquals(expectedConfig, actual);
+ }
static class TestConfig
{
@@ -169,6 +192,27 @@ public class JacksonConfigManagerTest
{
return settingInt;
}
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TestConfig config = (TestConfig) o;
+ return settingInt == config.settingInt &&
+ Objects.equals(version, config.version) &&
+ Objects.equals(settingString, config.settingString);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(version, settingString, settingInt);
+ }
}
static class ClassThatJacksonCannotSerialize
diff --git a/integration-tests/docker/druid.sh b/integration-tests/docker/druid.sh
index 0a4f00b..baa0e97 100755
--- a/integration-tests/docker/druid.sh
+++ b/integration-tests/docker/druid.sh
@@ -85,7 +85,7 @@ setupData()
# The "query" and "security" test groups require data to be setup before running the tests.
# In particular, they requires segments to be download from a pre-existing s3 bucket.
# This is done by using the loadSpec put into metadatastore and s3 credientials set below.
- if [ "$DRUID_INTEGRATION_TEST_GROUP" = "query" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-error" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "high-availability" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "security" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "ldap-security" ]; then
+ if [ "$DRUID_INTEGRATION_TEST_GROUP" = "query" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-error" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "high-availability" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "security" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "ldap-security" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "upgrade" ]; then
# touch is needed because OverlayFS's copy-up operation breaks POSIX standards. See https://github.com/docker/for-linux/issues/72.
find /var/lib/mysql -type f -exec touch {} \; && service mysql start \
&& cat /test-data/${DRUID_INTEGRATION_TEST_GROUP}-sample-data.sql | mysql -u root druid && /etc/init.d/mysql stop
diff --git a/integration-tests/docker/test-data/upgrade-sample-data.sql b/integration-tests/docker/test-data/upgrade-sample-data.sql
new file mode 100644
index 0000000..a58fdab
--- /dev/null
+++ b/integration-tests/docker/test-data/upgrade-sample-data.sql
@@ -0,0 +1,16 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+INSERT INTO druid_config (name, payload) VALUES ('coordinator.compaction.config', '{"compactionConfigs":[{"dataSource":"upgradeTest","taskPriority":25,"inputSegmentSizeBytes":419430400,"maxRowsPerSegment":null,"skipOffsetFromLatest":"P1D","tuningConfig":{"maxRowsInMemory":null,"maxBytesInMemory":null,"maxTotalRows":null,"splitHintSpec":null,"partitionsSpec":{"type":"hashed","numShards":null,"partitionDimensions":[],"partitionFunction":"murmur3_32_abs","maxRowsPerSegment":5000000},"indexS [...]
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java
index 8b962d2..bd346ff 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java
@@ -43,6 +43,8 @@ public class TestNGGroup
public static final String COMPACTION = "compaction";
+ public static final String UPGRADE = "upgrade";
+
public static final String APPEND_INGESTION = "append-ingestion";
public static final String PERFECT_ROLLUP_PARALLEL_BATCH_INDEX = "perfect-rollup-parallel-batch-index";
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java
new file mode 100644
index 0000000..b960707
--- /dev/null
+++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.coordinator.duty;
+
+import com.google.inject.Inject;
+import org.apache.druid.data.input.MaxSizeSplitHintSpec;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.indexer.partitions.PartitionsSpec;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
+import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
+import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig;
+import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
+import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.druid.testing.clients.CompactionResourceTestClient;
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.tests.TestNGGroup;
+import org.apache.druid.tests.indexer.AbstractIndexerTest;
+import org.joda.time.Period;
+import org.testng.Assert;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+@Test(groups = {TestNGGroup.UPGRADE})
+@Guice(moduleFactory = DruidTestModuleFactory.class)
+public class ITAutoCompactionUpgradeTest extends AbstractIndexerTest
+{
+ private static final Logger LOG = new Logger(ITAutoCompactionUpgradeTest.class);
+ private static final String UPGRADE_DATASOURCE_NAME = "upgradeTest";
+
+ @Inject
+ protected CompactionResourceTestClient compactionResource;
+
+ @Inject
+ private IntegrationTestingConfig config;
+
+ @Test
+ public void testUpgradeAutoCompactionConfigurationWhenConfigurationFromOlderVersionAlreadyExist() throws Exception
+ {
+ // Verify that compaction config already exist. This config was inserted manually into the database using SQL script.
+ // This auto compaction configuration payload is from Druid 0.21.0
+ CoordinatorCompactionConfig coordinatorCompactionConfig = compactionResource.getCoordinatorCompactionConfigs();
+ DataSourceCompactionConfig foundDataSourceCompactionConfig = null;
+ for (DataSourceCompactionConfig dataSourceCompactionConfig : coordinatorCompactionConfig.getCompactionConfigs()) {
+ if (dataSourceCompactionConfig.getDataSource().equals(UPGRADE_DATASOURCE_NAME)) {
+ foundDataSourceCompactionConfig = dataSourceCompactionConfig;
+ }
+ }
+ Assert.assertNotNull(foundDataSourceCompactionConfig);
+
+ // Now submit a new auto compaction configuration
+ PartitionsSpec newPartitionsSpec = new DynamicPartitionsSpec(4000, null);
+ Period newSkipOffset = Period.seconds(0);
+
+ DataSourceCompactionConfig compactionConfig = new DataSourceCompactionConfig(
+ UPGRADE_DATASOURCE_NAME,
+ null,
+ null,
+ null,
+ newSkipOffset,
+ new UserCompactionTaskQueryTuningConfig(
+ null,
+ null,
+ null,
+ new MaxSizeSplitHintSpec(null, 1),
+ newPartitionsSpec,
+ null,
+ null,
+ null,
+ null,
+ null,
+ 1,
+ null,
+ null,
+ null,
+ null,
+ null,
+ 1
+ ),
+ new UserCompactionTaskGranularityConfig(Granularities.YEAR, null),
+ new UserCompactionTaskIOConfig(true),
+ null
+ );
+ compactionResource.submitCompactionConfig(compactionConfig);
+
+ // Wait for compaction config to persist
+ Thread.sleep(2000);
+
+ // Verify that compaction was successfully updated
+ coordinatorCompactionConfig = compactionResource.getCoordinatorCompactionConfigs();
+ foundDataSourceCompactionConfig = null;
+ for (DataSourceCompactionConfig dataSourceCompactionConfig : coordinatorCompactionConfig.getCompactionConfigs()) {
+ if (dataSourceCompactionConfig.getDataSource().equals(UPGRADE_DATASOURCE_NAME)) {
+ foundDataSourceCompactionConfig = dataSourceCompactionConfig;
+ }
+ }
+ Assert.assertNotNull(foundDataSourceCompactionConfig);
+ Assert.assertNotNull(foundDataSourceCompactionConfig.getTuningConfig());
+ Assert.assertEquals(foundDataSourceCompactionConfig.getTuningConfig().getPartitionsSpec(), newPartitionsSpec);
+ Assert.assertEquals(foundDataSourceCompactionConfig.getSkipOffsetFromLatest(), newSkipOffset);
+ }
+}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java
index cc1fdf6..409a813 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java
@@ -24,6 +24,8 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.apache.druid.common.config.JacksonConfigManager;
+import org.apache.druid.metadata.MetadataStorageConnector;
+import org.apache.druid.metadata.MetadataStorageTablesConfig;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -86,6 +88,21 @@ public class CoordinatorCompactionConfig
);
}
+ public static byte[] getConfigInByteFromDb(final MetadataStorageConnector connector, MetadataStorageTablesConfig config)
+ {
+ return connector.lookup(
+ config.getConfigTable(),
+ "name",
+ "payload",
+ CoordinatorCompactionConfig.CONFIG_KEY
+ );
+ }
+
+ public static CoordinatorCompactionConfig convertByteToConfig(final JacksonConfigManager configManager, byte[] configInByte)
+ {
+ return configManager.convertByteToConfig(configInByte, CoordinatorCompactionConfig.class, CoordinatorCompactionConfig.empty());
+ }
+
@Nonnull
public static CoordinatorCompactionConfig current(final JacksonConfigManager configManager)
{
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillCompactionConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillCompactionConfig.java
index cd54cf3..74645b5 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillCompactionConfig.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillCompactionConfig.java
@@ -30,6 +30,8 @@ import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.metadata.MetadataStorageConnector;
+import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.SqlSegmentsMetadataManager;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
@@ -58,17 +60,23 @@ public class KillCompactionConfig implements CoordinatorDuty
private final JacksonConfigManager jacksonConfigManager;
private final SqlSegmentsMetadataManager sqlSegmentsMetadataManager;
+ private final MetadataStorageConnector connector;
+ private final MetadataStorageTablesConfig connectorConfig;
@Inject
public KillCompactionConfig(
DruidCoordinatorConfig config,
SqlSegmentsMetadataManager sqlSegmentsMetadataManager,
- JacksonConfigManager jacksonConfigManager
+ JacksonConfigManager jacksonConfigManager,
+ MetadataStorageConnector connector,
+ MetadataStorageTablesConfig connectorConfig
)
{
this.sqlSegmentsMetadataManager = sqlSegmentsMetadataManager;
this.jacksonConfigManager = jacksonConfigManager;
this.period = config.getCoordinatorCompactionKillPeriod().getMillis();
+ this.connector = connector;
+ this.connectorConfig = connectorConfig;
Preconditions.checkArgument(
this.period >= config.getCoordinatorMetadataStoreManagementPeriod().getMillis(),
"Coordinator compaction configuration kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod"
@@ -88,7 +96,8 @@ public class KillCompactionConfig implements CoordinatorDuty
try {
RetryUtils.retry(
() -> {
- CoordinatorCompactionConfig current = CoordinatorCompactionConfig.current(jacksonConfigManager);
+ final byte[] currentBytes = CoordinatorCompactionConfig.getConfigInByteFromDb(connector, connectorConfig);
+ final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.convertByteToConfig(jacksonConfigManager, currentBytes);
// If current compaction config is empty then there is nothing to do
if (CoordinatorCompactionConfig.empty().equals(current)) {
log.info(
@@ -112,8 +121,7 @@ public class KillCompactionConfig implements CoordinatorDuty
ConfigManager.SetResult result = jacksonConfigManager.set(
CoordinatorCompactionConfig.CONFIG_KEY,
- // Do database insert without swap if the current config is empty as this means the config may be null in the database
- current,
+ currentBytes,
CoordinatorCompactionConfig.from(current, ImmutableList.copyOf(updated.values())),
new AuditInfo(
"KillCompactionConfig",
diff --git a/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java b/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java
index a5cd611..cb17c45 100644
--- a/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java
@@ -28,6 +28,9 @@ import org.apache.druid.audit.AuditInfo;
import org.apache.druid.audit.AuditManager;
import org.apache.druid.common.config.ConfigManager.SetResult;
import org.apache.druid.common.config.JacksonConfigManager;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.metadata.MetadataStorageConnector;
+import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.http.security.ConfigResourceFilter;
@@ -57,15 +60,24 @@ import java.util.stream.Collectors;
@ResourceFilters(ConfigResourceFilter.class)
public class CoordinatorCompactionConfigsResource
{
+ private static final Logger LOG = new Logger(CoordinatorCompactionConfigsResource.class);
private static final long UPDATE_RETRY_DELAY = 1000;
static final int UPDATE_NUM_RETRY = 5;
private final JacksonConfigManager manager;
+ private final MetadataStorageConnector connector;
+ private final MetadataStorageTablesConfig connectorConfig;
@Inject
- public CoordinatorCompactionConfigsResource(JacksonConfigManager manager)
+ public CoordinatorCompactionConfigsResource(
+ JacksonConfigManager manager,
+ MetadataStorageConnector connector,
+ MetadataStorageTablesConfig connectorConfig
+ )
{
this.manager = manager;
+ this.connector = connector;
+ this.connectorConfig = connectorConfig;
}
@GET
@@ -87,8 +99,8 @@ public class CoordinatorCompactionConfigsResource
)
{
Callable<SetResult> callable = () -> {
- final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.current(manager);
-
+ final byte[] currentBytes = getCurrentConfigInByteFromDb();
+ final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.convertByteToConfig(manager, currentBytes);
final CoordinatorCompactionConfig newCompactionConfig = CoordinatorCompactionConfig.from(
current,
compactionTaskSlotRatio,
@@ -97,8 +109,7 @@ public class CoordinatorCompactionConfigsResource
return manager.set(
CoordinatorCompactionConfig.CONFIG_KEY,
- // Do database insert without swap if the current config is empty as this means the config may be null in the database
- CoordinatorCompactionConfig.empty().equals(current) ? null : current,
+ currentBytes,
newCompactionConfig,
new AuditInfo(author, comment, req.getRemoteAddr())
);
@@ -116,7 +127,8 @@ public class CoordinatorCompactionConfigsResource
)
{
Callable<SetResult> callable = () -> {
- final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.current(manager);
+ final byte[] currentBytes = getCurrentConfigInByteFromDb();
+ final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.convertByteToConfig(manager, currentBytes);
final CoordinatorCompactionConfig newCompactionConfig;
final Map<String, DataSourceCompactionConfig> newConfigs = current
.getCompactionConfigs()
@@ -127,8 +139,7 @@ public class CoordinatorCompactionConfigsResource
return manager.set(
CoordinatorCompactionConfig.CONFIG_KEY,
- // Do database insert without swap if the current config is empty as this means the config may be null in the database
- CoordinatorCompactionConfig.empty().equals(current) ? null : current,
+ currentBytes,
newCompactionConfig,
new AuditInfo(author, comment, req.getRemoteAddr())
);
@@ -166,7 +177,8 @@ public class CoordinatorCompactionConfigsResource
)
{
Callable<SetResult> callable = () -> {
- final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.current(manager);
+ final byte[] currentBytes = getCurrentConfigInByteFromDb();
+ final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.convertByteToConfig(manager, currentBytes);
final Map<String, DataSourceCompactionConfig> configs = current
.getCompactionConfigs()
.stream()
@@ -179,8 +191,7 @@ public class CoordinatorCompactionConfigsResource
return manager.set(
CoordinatorCompactionConfig.CONFIG_KEY,
- // Do database insert without swap if the current config is empty as this means the config may be null in the database
- CoordinatorCompactionConfig.empty().equals(current) ? null : current,
+ currentBytes,
CoordinatorCompactionConfig.from(current, ImmutableList.copyOf(configs.values())),
new AuditInfo(author, comment, req.getRemoteAddr())
);
@@ -204,18 +215,21 @@ public class CoordinatorCompactionConfigsResource
}
}
catch (Exception e) {
+ LOG.warn(e, "Update compaction config failed");
return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
- .entity(ImmutableMap.of("error", e))
+ .entity(ImmutableMap.of("error", createErrorMessage(e)))
.build();
}
if (setResult.isOk()) {
return Response.ok().build();
} else if (setResult.getException() instanceof NoSuchElementException) {
+ LOG.warn(setResult.getException(), "Update compaction config failed");
return Response.status(Response.Status.NOT_FOUND).build();
} else {
+ LOG.warn(setResult.getException(), "Update compaction config failed");
return Response.status(Response.Status.BAD_REQUEST)
- .entity(ImmutableMap.of("error", setResult.getException()))
+ .entity(ImmutableMap.of("error", createErrorMessage(setResult.getException())))
.build();
}
}
@@ -229,4 +243,18 @@ public class CoordinatorCompactionConfigsResource
throw new RuntimeException(ie);
}
}
+
+ private byte[] getCurrentConfigInByteFromDb()
+ {
+ return CoordinatorCompactionConfig.getConfigInByteFromDb(connector, connectorConfig);
+ }
+
+ private String createErrorMessage(Exception e)
+ {
+ if (e.getMessage() == null) {
+ return "Unknown Error";
+ } else {
+ return e.getMessage();
+ }
+ }
}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java
index 555c998..4aad5df 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java
@@ -27,6 +27,8 @@ import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
+import org.apache.druid.metadata.MetadataStorageConnector;
+import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.SqlSegmentsMetadataManager;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
@@ -36,6 +38,7 @@ import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
import org.joda.time.Duration;
import org.joda.time.Period;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -48,8 +51,6 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
-import java.util.concurrent.atomic.AtomicReference;
-
@RunWith(MockitoJUnitRunner.class)
public class KillCompactionConfigTest
{
@@ -65,11 +66,23 @@ public class KillCompactionConfigTest
@Mock
private JacksonConfigManager mockJacksonConfigManager;
+ @Mock
+ private MetadataStorageConnector mockConnector;
+
+ @Mock
+ private MetadataStorageTablesConfig mockConnectorConfig;
+
@Rule
public ExpectedException exception = ExpectedException.none();
private KillCompactionConfig killCompactionConfig;
+ @Before
+ public void setup()
+ {
+ Mockito.when(mockConnectorConfig.getConfigTable()).thenReturn("druid_config");
+ }
+
@Test
public void testRunSkipIfLastRunLessThanPeriod()
{
@@ -93,7 +106,13 @@ public class KillCompactionConfigTest
10,
null
);
- killCompactionConfig = new KillCompactionConfig(druidCoordinatorConfig, mockSqlSegmentsMetadataManager, mockJacksonConfigManager);
+ killCompactionConfig = new KillCompactionConfig(
+ druidCoordinatorConfig,
+ mockSqlSegmentsMetadataManager,
+ mockJacksonConfigManager,
+ mockConnector,
+ mockConnectorConfig
+ );
killCompactionConfig.run(mockDruidCoordinatorRuntimeParams);
Mockito.verifyZeroInteractions(mockSqlSegmentsMetadataManager);
Mockito.verifyZeroInteractions(mockJacksonConfigManager);
@@ -125,7 +144,13 @@ public class KillCompactionConfigTest
);
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Coordinator compaction configuration kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod");
- killCompactionConfig = new KillCompactionConfig(druidCoordinatorConfig, mockSqlSegmentsMetadataManager, mockJacksonConfigManager);
+ killCompactionConfig = new KillCompactionConfig(
+ druidCoordinatorConfig,
+ mockSqlSegmentsMetadataManager,
+ mockJacksonConfigManager,
+ mockConnector,
+ mockConnectorConfig
+ );
}
@@ -134,11 +159,17 @@ public class KillCompactionConfigTest
{
Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
// Set current compaction config to an empty compaction config
- Mockito.when(mockJacksonConfigManager.watch(
- ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
+ Mockito.when(mockConnector.lookup(
+ ArgumentMatchers.anyString(),
+ ArgumentMatchers.eq("name"),
+ ArgumentMatchers.eq("payload"),
+ ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY))
+ ).thenReturn(null);
+ Mockito.when(mockJacksonConfigManager.convertByteToConfig(
+ ArgumentMatchers.eq(null),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()))
- ).thenReturn(new AtomicReference<>(CoordinatorCompactionConfig.empty()));
+ ).thenReturn(CoordinatorCompactionConfig.empty());
TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig(
null,
@@ -160,18 +191,30 @@ public class KillCompactionConfigTest
10,
null
);
- killCompactionConfig = new KillCompactionConfig(druidCoordinatorConfig, mockSqlSegmentsMetadataManager, mockJacksonConfigManager);
+ killCompactionConfig = new KillCompactionConfig(
+ druidCoordinatorConfig,
+ mockSqlSegmentsMetadataManager,
+ mockJacksonConfigManager,
+ mockConnector,
+ mockConnectorConfig
+ );
killCompactionConfig.run(mockDruidCoordinatorRuntimeParams);
Mockito.verifyZeroInteractions(mockSqlSegmentsMetadataManager);
final ArgumentCaptor<ServiceEventBuilder> emittedEventCaptor = ArgumentCaptor.forClass(ServiceEventBuilder.class);
Mockito.verify(mockServiceEmitter).emit(emittedEventCaptor.capture());
Assert.assertEquals(KillCompactionConfig.COUNT_METRIC, emittedEventCaptor.getValue().build(ImmutableMap.of()).toMap().get("metric"));
Assert.assertEquals(0, emittedEventCaptor.getValue().build(ImmutableMap.of()).toMap().get("value"));
- Mockito.verify(mockJacksonConfigManager).watch(
- ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
+ Mockito.verify(mockJacksonConfigManager).convertByteToConfig(
+ ArgumentMatchers.eq(null),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())
);
+ Mockito.verify(mockConnector).lookup(
+ ArgumentMatchers.anyString(),
+ ArgumentMatchers.eq("name"),
+ ArgumentMatchers.eq("payload"),
+ ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)
+ );
Mockito.verifyNoMoreInteractions(mockJacksonConfigManager);
}
@@ -204,14 +247,21 @@ public class KillCompactionConfigTest
ImmutableMap.of("key", "val")
);
CoordinatorCompactionConfig originalCurrentConfig = CoordinatorCompactionConfig.from(ImmutableList.of(inactiveDatasourceConfig, activeDatasourceConfig));
- Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
- Mockito.when(mockJacksonConfigManager.watch(
- ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
+ byte[] originalCurrentConfigBytes = {1, 2, 3};
+ Mockito.when(mockConnector.lookup(
+ ArgumentMatchers.anyString(),
+ ArgumentMatchers.eq("name"),
+ ArgumentMatchers.eq("payload"),
+ ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY))
+ ).thenReturn(originalCurrentConfigBytes);
+ Mockito.when(mockJacksonConfigManager.convertByteToConfig(
+ ArgumentMatchers.eq(originalCurrentConfigBytes),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()))
- ).thenReturn(new AtomicReference<>(originalCurrentConfig));
+ ).thenReturn(originalCurrentConfig);
+ Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
Mockito.when(mockSqlSegmentsMetadataManager.retrieveAllDataSourceNames()).thenReturn(ImmutableSet.of(activeDatasourceName));
- final ArgumentCaptor<CoordinatorCompactionConfig> oldConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
+ final ArgumentCaptor<byte[]> oldConfigCaptor = ArgumentCaptor.forClass(byte[].class);
final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
Mockito.when(mockJacksonConfigManager.set(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
@@ -240,12 +290,18 @@ public class KillCompactionConfigTest
10,
null
);
- killCompactionConfig = new KillCompactionConfig(druidCoordinatorConfig, mockSqlSegmentsMetadataManager, mockJacksonConfigManager);
+ killCompactionConfig = new KillCompactionConfig(
+ druidCoordinatorConfig,
+ mockSqlSegmentsMetadataManager,
+ mockJacksonConfigManager,
+ mockConnector,
+ mockConnectorConfig
+ );
killCompactionConfig.run(mockDruidCoordinatorRuntimeParams);
// Verify and Assert
Assert.assertNotNull(oldConfigCaptor.getValue());
- Assert.assertEquals(oldConfigCaptor.getValue(), originalCurrentConfig);
+ Assert.assertEquals(oldConfigCaptor.getValue(), originalCurrentConfigBytes);
Assert.assertNotNull(newConfigCaptor.getValue());
// The updated config should only contains one compaction config for the active datasource
Assert.assertEquals(1, newConfigCaptor.getValue().getCompactionConfigs().size());
@@ -257,14 +313,20 @@ public class KillCompactionConfigTest
// Should delete 1 config
Assert.assertEquals(1, emittedEventCaptor.getValue().build(ImmutableMap.of()).toMap().get("value"));
- Mockito.verify(mockJacksonConfigManager).watch(
- ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
+ Mockito.verify(mockJacksonConfigManager).convertByteToConfig(
+ ArgumentMatchers.eq(originalCurrentConfigBytes),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())
);
+ Mockito.verify(mockConnector).lookup(
+ ArgumentMatchers.anyString(),
+ ArgumentMatchers.eq("name"),
+ ArgumentMatchers.eq("payload"),
+ ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)
+ );
Mockito.verify(mockJacksonConfigManager).set(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
- ArgumentMatchers.any(CoordinatorCompactionConfig.class),
+ ArgumentMatchers.any(byte[].class),
ArgumentMatchers.any(CoordinatorCompactionConfig.class),
ArgumentMatchers.any()
);
@@ -290,16 +352,23 @@ public class KillCompactionConfigTest
);
CoordinatorCompactionConfig originalCurrentConfig = CoordinatorCompactionConfig.from(ImmutableList.of(inactiveDatasourceConfig));
- Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
- Mockito.when(mockJacksonConfigManager.watch(
- ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
+ byte[] originalCurrentConfigBytes = {1, 2, 3};
+ Mockito.when(mockConnector.lookup(
+ ArgumentMatchers.anyString(),
+ ArgumentMatchers.eq("name"),
+ ArgumentMatchers.eq("payload"),
+ ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY))
+ ).thenReturn(originalCurrentConfigBytes);
+ Mockito.when(mockJacksonConfigManager.convertByteToConfig(
+ ArgumentMatchers.eq(originalCurrentConfigBytes),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()))
- ).thenReturn(new AtomicReference<>(originalCurrentConfig));
+ ).thenReturn(originalCurrentConfig);
+ Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
Mockito.when(mockSqlSegmentsMetadataManager.retrieveAllDataSourceNames()).thenReturn(ImmutableSet.of());
Mockito.when(mockJacksonConfigManager.set(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
- ArgumentMatchers.any(CoordinatorCompactionConfig.class),
+ ArgumentMatchers.any(byte[].class),
ArgumentMatchers.any(CoordinatorCompactionConfig.class),
ArgumentMatchers.any())
).thenAnswer(new Answer() {
@@ -337,7 +406,13 @@ public class KillCompactionConfigTest
10,
null
);
- killCompactionConfig = new KillCompactionConfig(druidCoordinatorConfig, mockSqlSegmentsMetadataManager, mockJacksonConfigManager);
+ killCompactionConfig = new KillCompactionConfig(
+ druidCoordinatorConfig,
+ mockSqlSegmentsMetadataManager,
+ mockJacksonConfigManager,
+ mockConnector,
+ mockConnectorConfig
+ );
killCompactionConfig.run(mockDruidCoordinatorRuntimeParams);
// Verify and Assert
@@ -347,16 +422,23 @@ public class KillCompactionConfigTest
// Should delete 1 config
Assert.assertEquals(1, emittedEventCaptor.getValue().build(ImmutableMap.of()).toMap().get("value"));
- // Should call watch (to refresh current compaction config) four times due to RetryableException when failed
- Mockito.verify(mockJacksonConfigManager, Mockito.times(4)).watch(
- ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
+ // Should call convertByteToConfig and lookup (to refresh current compaction config) four times due to RetryableException when failed
+ Mockito.verify(mockJacksonConfigManager, Mockito.times(4)).convertByteToConfig(
+ ArgumentMatchers.eq(originalCurrentConfigBytes),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())
);
+ Mockito.verify(mockConnector, Mockito.times(4)).lookup(
+ ArgumentMatchers.anyString(),
+ ArgumentMatchers.eq("name"),
+ ArgumentMatchers.eq("payload"),
+ ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)
+ );
+
// Should call set (to try set new updated compaction config) four times due to RetryableException when failed
Mockito.verify(mockJacksonConfigManager, Mockito.times(4)).set(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
- ArgumentMatchers.any(CoordinatorCompactionConfig.class),
+ ArgumentMatchers.any(byte[].class),
ArgumentMatchers.any(CoordinatorCompactionConfig.class),
ArgumentMatchers.any()
);
diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
index 988fd2b..c130dc3 100644
--- a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
+++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
@@ -25,6 +25,8 @@ import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.druid.common.config.ConfigManager;
import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.metadata.MetadataStorageConnector;
+import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
@@ -42,7 +44,6 @@ import org.mockito.junit.MockitoJUnitRunner;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.Response;
import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicReference;
@RunWith(MockitoJUnitRunner.class)
public class CoordinatorCompactionConfigsResourceTest
@@ -58,6 +59,8 @@ public class CoordinatorCompactionConfigsResourceTest
null,
ImmutableMap.of("key", "val")
);
+ private static final byte[] OLD_CONFIG_IN_BYTES = {1, 2, 3};
+
private static final CoordinatorCompactionConfig ORIGINAL_CONFIG = CoordinatorCompactionConfig.from(ImmutableList.of(OLD_CONFIG));
@Mock
@@ -66,24 +69,41 @@ public class CoordinatorCompactionConfigsResourceTest
@Mock
private HttpServletRequest mockHttpServletRequest;
+ @Mock
+ private MetadataStorageConnector mockConnector;
+
+ @Mock
+ private MetadataStorageTablesConfig mockConnectorConfig;
+
private CoordinatorCompactionConfigsResource coordinatorCompactionConfigsResource;
@Before
public void setup()
{
- Mockito.when(mockJacksonConfigManager.watch(
- ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
+ Mockito.when(mockConnector.lookup(
+ ArgumentMatchers.anyString(),
+ ArgumentMatchers.eq("name"),
+ ArgumentMatchers.eq("payload"),
+ ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY))
+ ).thenReturn(OLD_CONFIG_IN_BYTES);
+ Mockito.when(mockJacksonConfigManager.convertByteToConfig(
+ ArgumentMatchers.eq(OLD_CONFIG_IN_BYTES),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()))
- ).thenReturn(new AtomicReference<>(ORIGINAL_CONFIG));
- coordinatorCompactionConfigsResource = new CoordinatorCompactionConfigsResource(mockJacksonConfigManager);
+ ).thenReturn(ORIGINAL_CONFIG);
+ Mockito.when(mockConnectorConfig.getConfigTable()).thenReturn("druid_config");
+ coordinatorCompactionConfigsResource = new CoordinatorCompactionConfigsResource(
+ mockJacksonConfigManager,
+ mockConnector,
+ mockConnectorConfig
+ );
Mockito.when(mockHttpServletRequest.getRemoteAddr()).thenReturn("123");
}
@Test
public void testSetCompactionTaskLimitWithExistingConfig()
{
- final ArgumentCaptor<CoordinatorCompactionConfig> oldConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
+ final ArgumentCaptor<byte[]> oldConfigCaptor = ArgumentCaptor.forClass(byte[].class);
final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
Mockito.when(mockJacksonConfigManager.set(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
@@ -105,7 +125,7 @@ public class CoordinatorCompactionConfigsResourceTest
);
Assert.assertEquals(Response.Status.OK.getStatusCode(), result.getStatus());
Assert.assertNotNull(oldConfigCaptor.getValue());
- Assert.assertEquals(oldConfigCaptor.getValue(), ORIGINAL_CONFIG);
+ Assert.assertEquals(oldConfigCaptor.getValue(), OLD_CONFIG_IN_BYTES);
Assert.assertNotNull(newConfigCaptor.getValue());
Assert.assertEquals(newConfigCaptor.getValue().getMaxCompactionTaskSlots(), maxCompactionTaskSlots);
Assert.assertEquals(compactionTaskSlotRatio, newConfigCaptor.getValue().getCompactionTaskSlotRatio(), 0);
@@ -114,7 +134,7 @@ public class CoordinatorCompactionConfigsResourceTest
@Test
public void testAddOrUpdateCompactionConfigWithExistingConfig()
{
- final ArgumentCaptor<CoordinatorCompactionConfig> oldConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
+ final ArgumentCaptor<byte[]> oldConfigCaptor = ArgumentCaptor.forClass(byte[].class);
final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
Mockito.when(mockJacksonConfigManager.set(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
@@ -144,7 +164,7 @@ public class CoordinatorCompactionConfigsResourceTest
);
Assert.assertEquals(Response.Status.OK.getStatusCode(), result.getStatus());
Assert.assertNotNull(oldConfigCaptor.getValue());
- Assert.assertEquals(oldConfigCaptor.getValue(), ORIGINAL_CONFIG);
+ Assert.assertEquals(oldConfigCaptor.getValue(), OLD_CONFIG_IN_BYTES);
Assert.assertNotNull(newConfigCaptor.getValue());
Assert.assertEquals(2, newConfigCaptor.getValue().getCompactionConfigs().size());
Assert.assertEquals(OLD_CONFIG, newConfigCaptor.getValue().getCompactionConfigs().get(0));
@@ -154,7 +174,7 @@ public class CoordinatorCompactionConfigsResourceTest
@Test
public void testDeleteCompactionConfigWithExistingConfig()
{
- final ArgumentCaptor<CoordinatorCompactionConfig> oldConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
+ final ArgumentCaptor<byte[]> oldConfigCaptor = ArgumentCaptor.forClass(byte[].class);
final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
Mockito.when(mockJacksonConfigManager.set(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
@@ -175,11 +195,11 @@ public class CoordinatorCompactionConfigsResourceTest
ImmutableMap.of("key", "val")
);
final CoordinatorCompactionConfig originalConfig = CoordinatorCompactionConfig.from(ImmutableList.of(toDelete));
- Mockito.when(mockJacksonConfigManager.watch(
- ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
+ Mockito.when(mockJacksonConfigManager.convertByteToConfig(
+ ArgumentMatchers.eq(OLD_CONFIG_IN_BYTES),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()))
- ).thenReturn(new AtomicReference<>(originalConfig));
+ ).thenReturn(originalConfig);
String author = "maytas";
String comment = "hello";
@@ -191,7 +211,7 @@ public class CoordinatorCompactionConfigsResourceTest
);
Assert.assertEquals(Response.Status.OK.getStatusCode(), result.getStatus());
Assert.assertNotNull(oldConfigCaptor.getValue());
- Assert.assertEquals(oldConfigCaptor.getValue(), originalConfig);
+ Assert.assertEquals(oldConfigCaptor.getValue(), OLD_CONFIG_IN_BYTES);
Assert.assertNotNull(newConfigCaptor.getValue());
Assert.assertEquals(0, newConfigCaptor.getValue().getCompactionConfigs().size());
}
@@ -223,12 +243,18 @@ public class CoordinatorCompactionConfigsResourceTest
@Test
public void testSetCompactionTaskLimitWithoutExistingConfig()
{
- Mockito.when(mockJacksonConfigManager.watch(
- ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
+ Mockito.when(mockConnector.lookup(
+ ArgumentMatchers.anyString(),
+ ArgumentMatchers.eq("name"),
+ ArgumentMatchers.eq("payload"),
+ ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY))
+ ).thenReturn(null);
+ Mockito.when(mockJacksonConfigManager.convertByteToConfig(
+ ArgumentMatchers.eq(null),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()))
- ).thenReturn(new AtomicReference<>(CoordinatorCompactionConfig.empty()));
- final ArgumentCaptor<CoordinatorCompactionConfig> oldConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
+ ).thenReturn(CoordinatorCompactionConfig.empty());
+ final ArgumentCaptor<byte[]> oldConfigCaptor = ArgumentCaptor.forClass(byte[].class);
final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
Mockito.when(mockJacksonConfigManager.set(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
@@ -258,12 +284,18 @@ public class CoordinatorCompactionConfigsResourceTest
@Test
public void testAddOrUpdateCompactionConfigWithoutExistingConfig()
{
- Mockito.when(mockJacksonConfigManager.watch(
- ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
+ Mockito.when(mockConnector.lookup(
+ ArgumentMatchers.anyString(),
+ ArgumentMatchers.eq("name"),
+ ArgumentMatchers.eq("payload"),
+ ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY))
+ ).thenReturn(null);
+ Mockito.when(mockJacksonConfigManager.convertByteToConfig(
+ ArgumentMatchers.eq(null),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()))
- ).thenReturn(new AtomicReference<>(CoordinatorCompactionConfig.empty()));
- final ArgumentCaptor<CoordinatorCompactionConfig> oldConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
+ ).thenReturn(CoordinatorCompactionConfig.empty());
+ final ArgumentCaptor<byte[]> oldConfigCaptor = ArgumentCaptor.forClass(byte[].class);
final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
Mockito.when(mockJacksonConfigManager.set(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
@@ -301,11 +333,17 @@ public class CoordinatorCompactionConfigsResourceTest
@Test
public void testDeleteCompactionConfigWithoutExistingConfigShouldFailAsDatasourceNotExist()
{
- Mockito.when(mockJacksonConfigManager.watch(
- ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
+ Mockito.when(mockConnector.lookup(
+ ArgumentMatchers.anyString(),
+ ArgumentMatchers.eq("name"),
+ ArgumentMatchers.eq("payload"),
+ ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY))
+ ).thenReturn(null);
+ Mockito.when(mockJacksonConfigManager.convertByteToConfig(
+ ArgumentMatchers.eq(null),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()))
- ).thenReturn(new AtomicReference<>(CoordinatorCompactionConfig.empty()));
+ ).thenReturn(CoordinatorCompactionConfig.empty());
String author = "maytas";
String comment = "hello";
Response result = coordinatorCompactionConfigsResource.deleteCompactionConfig(
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org