You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2021/05/27 23:35:05 UTC

[druid] branch master updated: Fix bug: 502 bad gateway thrown when we edit/delete any auto compaction config created 0.21.0 or before (#11311)

This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new e5633d7  Fix bug: 502 bad gateway thrown when we edit/delete any auto compaction config created 0.21.0 or before (#11311)
e5633d7 is described below

commit e5633d7842f44464ed463a8718051c7542a81ccb
Author: Maytas Monsereenusorn <ma...@apache.org>
AuthorDate: Thu May 27 16:34:32 2021 -0700

    Fix bug: 502 bad gateway thrown when we edit/delete any auto compaction config created 0.21.0 or before (#11311)
    
    * fix bug
    
    * add test
    
    * fix IT
    
    * fix checkstyle
    
    * address comments
---
 .travis.yml                                        |   6 +-
 .../apache/druid/common/config/ConfigManager.java  |   7 +-
 .../druid/common/config/ConfigManagerConfig.java   |   8 ++
 .../druid/common/config/JacksonConfigManager.java  |  20 ++-
 .../druid/common/config/ConfigManagerTest.java     |  22 +++-
 .../common/config/JacksonConfigManagerTest.java    |  44 +++++++
 integration-tests/docker/druid.sh                  |   2 +-
 .../docker/test-data/upgrade-sample-data.sql       |  16 +++
 .../java/org/apache/druid/tests/TestNGGroup.java   |   2 +
 .../duty/ITAutoCompactionUpgradeTest.java          | 121 ++++++++++++++++++
 .../coordinator/CoordinatorCompactionConfig.java   |  17 +++
 .../coordinator/duty/KillCompactionConfig.java     |  16 ++-
 .../http/CoordinatorCompactionConfigsResource.java |  54 ++++++--
 .../coordinator/duty/KillCompactionConfigTest.java | 142 ++++++++++++++++-----
 .../CoordinatorCompactionConfigsResourceTest.java  |  88 +++++++++----
 15 files changed, 480 insertions(+), 85 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index 5fd56d8..2fbc7d6 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -592,13 +592,13 @@ jobs:
       stage: Tests - phase 2
       jdk: openjdk8
       services: *integration_test_services
-      env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep [...]
+      env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep [...]
       script: *run_integration_test
       after_failure: *integration_test_diags
 
     - <<: *integration_tests
       name: "(Compile=openjdk8, Run=openjdk8) other integration tests with Indexer"
-      env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep [...]
+      env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep [...]
 
     - <<: *integration_tests
       name: "(Compile=openjdk8, Run=openjdk8) leadership and high availability integration tests"
@@ -671,7 +671,7 @@ jobs:
     - <<: *integration_tests
       name: "(Compile=openjdk8, Run=openjdk11) other integration test"
       jdk: openjdk8
-      env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep [...]
+      env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep [...]
 
     - <<: *integration_tests
       name: "(Compile=openjdk8, Run=openjdk11) leadership and high availability integration tests"
diff --git a/core/src/main/java/org/apache/druid/common/config/ConfigManager.java b/core/src/main/java/org/apache/druid/common/config/ConfigManager.java
index 7bad650..56cbcd5 100644
--- a/core/src/main/java/org/apache/druid/common/config/ConfigManager.java
+++ b/core/src/main/java/org/apache/druid/common/config/ConfigManager.java
@@ -176,7 +176,7 @@ public class ConfigManager
     return set(key, serde, null, obj);
   }
 
-  public <T> SetResult set(final String key, final ConfigSerde<T> serde, @Nullable final T oldObject, final T newObject)
+  public <T> SetResult set(final String key, final ConfigSerde<T> serde, @Nullable final byte[] oldValue, final T newObject)
   {
     if (newObject == null || !started) {
       if (newObject == null) {
@@ -191,11 +191,10 @@ public class ConfigManager
     try {
       return exec.submit(
           () -> {
-            if (oldObject == null) {
+            if (oldValue == null || !config.get().isEnableCompareAndSwap()) {
               dbConnector.insertOrUpdate(configTable, "name", "payload", key, newBytes);
             } else {
-              final byte[] oldBytes = serde.serialize(oldObject);
-              MetadataCASUpdate metadataCASUpdate = createMetadataCASUpdate(key, oldBytes, newBytes);
+              MetadataCASUpdate metadataCASUpdate = createMetadataCASUpdate(key, oldValue, newBytes);
               boolean success = dbConnector.compareAndSwap(ImmutableList.of(metadataCASUpdate));
               if (!success) {
                 return SetResult.fail(new IllegalStateException("Config value has changed"), true);
diff --git a/core/src/main/java/org/apache/druid/common/config/ConfigManagerConfig.java b/core/src/main/java/org/apache/druid/common/config/ConfigManagerConfig.java
index 5ef5816..472bacd 100644
--- a/core/src/main/java/org/apache/druid/common/config/ConfigManagerConfig.java
+++ b/core/src/main/java/org/apache/druid/common/config/ConfigManagerConfig.java
@@ -32,8 +32,16 @@ public class ConfigManagerConfig
   @NotNull
   private Period pollDuration = new Period("PT1M");
 
+  @JsonProperty
+  private boolean enableCompareAndSwap = true;
+
   public Period getPollDuration()
   {
     return pollDuration;
   }
+
+  public boolean isEnableCompareAndSwap()
+  {
+    return enableCompareAndSwap;
+  }
 }
diff --git a/core/src/main/java/org/apache/druid/common/config/JacksonConfigManager.java b/core/src/main/java/org/apache/druid/common/config/JacksonConfigManager.java
index 7e3eeeb..7cf78a5 100644
--- a/core/src/main/java/org/apache/druid/common/config/JacksonConfigManager.java
+++ b/core/src/main/java/org/apache/druid/common/config/JacksonConfigManager.java
@@ -73,6 +73,16 @@ public class JacksonConfigManager
     return configManager.watchConfig(key, create(clazz, defaultVal));
   }
 
+  public <T> T convertByteToConfig(byte[] configInByte, Class<? extends T> clazz, T defaultVal)
+  {
+    if (configInByte == null) {
+      return defaultVal;
+    } else {
+      final ConfigSerde<T> serde = create(clazz, defaultVal);
+      return serde.deserialize(configInByte);
+    }
+  }
+
   /**
    * Set the config and add audit entry
    *
@@ -90,14 +100,18 @@ public class JacksonConfigManager
    *
    * @param key of the config to set
    * @param oldValue old config value. If not null, then the update will only succeed if the insert
-   *                 happens when current database entry is the same as this value. If null, then the insert
-   *                 will not consider the current database entry.
+   *                 happens when current database entry is the same as this value. Note that the current database
+   *                 entry (in array of bytes) have to be exactly the same as the array of bytes of this value for
+   *                 update to succeed. If null, then the insert will not consider the current database entry. Note
+   *                 that this field intentionally uses byte array to be resilient across serde of existing data
+   *                 retrieved from the database (instead of Java object which may have additional fields added
+   *                 as a result of serde)
    * @param newValue new config value to insert
    * @param auditInfo metadata regarding the change to config, for audit purposes
    */
   public <T> SetResult set(
       String key,
-      @Nullable T oldValue,
+      @Nullable byte[] oldValue,
       T newValue,
       AuditInfo auditInfo
   )
diff --git a/core/src/test/java/org/apache/druid/common/config/ConfigManagerTest.java b/core/src/test/java/org/apache/druid/common/config/ConfigManagerTest.java
index 038aa38..9705939 100644
--- a/core/src/test/java/org/apache/druid/common/config/ConfigManagerTest.java
+++ b/core/src/test/java/org/apache/druid/common/config/ConfigManagerTest.java
@@ -49,7 +49,7 @@ public class ConfigManagerTest
 {
   private static final String CONFIG_KEY = "configX";
   private static final String TABLE_NAME = "config_table";
-  private static final TestConfig OLD_CONFIG = new TestConfig("1", "x", 1);
+  private static final byte[] OLD_CONFIG = {1, 2, 3};
   private static final TestConfig NEW_CONFIG = new TestConfig("2", "y", 2);
 
   @Mock
@@ -120,6 +120,7 @@ public class ConfigManagerTest
   @Test
   public void testSetOldObjectNotNullShouldSwap()
   {
+    when(mockConfigManagerConfig.isEnableCompareAndSwap()).thenReturn(true);
     when(mockDbConnector.compareAndSwap(any(List.class))).thenReturn(true);
     final ArgumentCaptor<List<MetadataCASUpdate>> updateCaptor = ArgumentCaptor.forClass(List.class);
     configManager.start();
@@ -134,10 +135,27 @@ public class ConfigManagerTest
     Assert.assertEquals(MetadataStorageConnector.CONFIG_TABLE_KEY_COLUMN, updateCaptor.getValue().get(0).getKeyColumn());
     Assert.assertEquals(MetadataStorageConnector.CONFIG_TABLE_VALUE_COLUMN, updateCaptor.getValue().get(0).getValueColumn());
     Assert.assertEquals(CONFIG_KEY, updateCaptor.getValue().get(0).getKey());
-    Assert.assertArrayEquals(configConfigSerdeFromClass.serialize(OLD_CONFIG), updateCaptor.getValue().get(0).getOldValue());
+    Assert.assertArrayEquals(OLD_CONFIG, updateCaptor.getValue().get(0).getOldValue());
     Assert.assertArrayEquals(configConfigSerdeFromClass.serialize(NEW_CONFIG), updateCaptor.getValue().get(0).getNewValue());
   }
 
+  @Test
+  public void testSetOldObjectNotNullButCompareAndSwapDisabledShouldInsertWithoutSwap()
+  {
+    when(mockConfigManagerConfig.isEnableCompareAndSwap()).thenReturn(false);
+    configManager.start();
+    ConfigManager.SetResult setResult = configManager.set(CONFIG_KEY, configConfigSerdeFromClass, OLD_CONFIG, NEW_CONFIG);
+    Assert.assertTrue(setResult.isOk());
+    Mockito.verify(mockDbConnector).insertOrUpdate(
+        ArgumentMatchers.eq(TABLE_NAME),
+        ArgumentMatchers.anyString(),
+        ArgumentMatchers.anyString(),
+        ArgumentMatchers.eq(CONFIG_KEY),
+        ArgumentMatchers.any(byte[].class)
+    );
+    Mockito.verifyNoMoreInteractions(mockDbConnector);
+  }
+
   static class TestConfig
   {
     private final String version;
diff --git a/core/src/test/java/org/apache/druid/common/config/JacksonConfigManagerTest.java b/core/src/test/java/org/apache/druid/common/config/JacksonConfigManagerTest.java
index 25220eb..4a4aaf7 100644
--- a/core/src/test/java/org/apache/druid/common/config/JacksonConfigManagerTest.java
+++ b/core/src/test/java/org/apache/druid/common/config/JacksonConfigManagerTest.java
@@ -38,6 +38,8 @@ import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.junit.MockitoJUnitRunner;
 
+import java.util.Objects;
+
 @RunWith(MockitoJUnitRunner.class)
 public class JacksonConfigManagerTest
 {
@@ -136,6 +138,27 @@ public class JacksonConfigManagerTest
     Assert.assertNotNull(configSerdeCapture.getValue());
   }
 
+  @Test
+  public void testConvertByteToConfigWithNullConfigInByte()
+  {
+    TestConfig defaultExpected = new TestConfig("version", null, 3);
+    TestConfig actual = jacksonConfigManager.convertByteToConfig(null, TestConfig.class, defaultExpected);
+    Assert.assertEquals(defaultExpected, actual);
+  }
+
+  @Test
+  public void testConvertByteToConfigWithNonNullConfigInByte()
+  {
+    ConfigSerde<TestConfig> configConfigSerdeFromTypeReference = jacksonConfigManager.create(new TypeReference<TestConfig>()
+    {
+    }, null);
+    TestConfig defaultConfig = new TestConfig("version", null, 3);
+    TestConfig expectedConfig = new TestConfig("version2", null, 5);
+    byte[] expectedConfigInByte = configConfigSerdeFromTypeReference.serialize(expectedConfig);
+
+    TestConfig actual = jacksonConfigManager.convertByteToConfig(expectedConfigInByte, TestConfig.class, defaultConfig);
+    Assert.assertEquals(expectedConfig, actual);
+  }
 
   static class TestConfig
   {
@@ -169,6 +192,27 @@ public class JacksonConfigManagerTest
     {
       return settingInt;
     }
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      TestConfig config = (TestConfig) o;
+      return settingInt == config.settingInt &&
+             Objects.equals(version, config.version) &&
+             Objects.equals(settingString, config.settingString);
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return Objects.hash(version, settingString, settingInt);
+    }
   }
 
   static class ClassThatJacksonCannotSerialize
diff --git a/integration-tests/docker/druid.sh b/integration-tests/docker/druid.sh
index 0a4f00b..baa0e97 100755
--- a/integration-tests/docker/druid.sh
+++ b/integration-tests/docker/druid.sh
@@ -85,7 +85,7 @@ setupData()
   # The "query" and "security" test groups require data to be setup before running the tests.
   # In particular, they requires segments to be download from a pre-existing s3 bucket.
   # This is done by using the loadSpec put into metadatastore and s3 credientials set below.
-  if [ "$DRUID_INTEGRATION_TEST_GROUP" = "query" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-error" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "high-availability" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "security" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "ldap-security" ]; then
+  if [ "$DRUID_INTEGRATION_TEST_GROUP" = "query" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-error" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "high-availability" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "security" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "ldap-security" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "upgrade" ]; then
     # touch is needed because OverlayFS's copy-up operation breaks POSIX standards. See https://github.com/docker/for-linux/issues/72.
     find /var/lib/mysql -type f -exec touch {} \; && service mysql start \
       && cat /test-data/${DRUID_INTEGRATION_TEST_GROUP}-sample-data.sql | mysql -u root druid && /etc/init.d/mysql stop
diff --git a/integration-tests/docker/test-data/upgrade-sample-data.sql b/integration-tests/docker/test-data/upgrade-sample-data.sql
new file mode 100644
index 0000000..a58fdab
--- /dev/null
+++ b/integration-tests/docker/test-data/upgrade-sample-data.sql
@@ -0,0 +1,16 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+INSERT INTO druid_config (name, payload) VALUES ('coordinator.compaction.config', '{"compactionConfigs":[{"dataSource":"upgradeTest","taskPriority":25,"inputSegmentSizeBytes":419430400,"maxRowsPerSegment":null,"skipOffsetFromLatest":"P1D","tuningConfig":{"maxRowsInMemory":null,"maxBytesInMemory":null,"maxTotalRows":null,"splitHintSpec":null,"partitionsSpec":{"type":"hashed","numShards":null,"partitionDimensions":[],"partitionFunction":"murmur3_32_abs","maxRowsPerSegment":5000000},"indexS [...]
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java
index 8b962d2..bd346ff 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java
@@ -43,6 +43,8 @@ public class TestNGGroup
 
   public static final String COMPACTION = "compaction";
 
+  public static final String UPGRADE = "upgrade";
+
   public static final String APPEND_INGESTION = "append-ingestion";
 
   public static final String PERFECT_ROLLUP_PARALLEL_BATCH_INDEX = "perfect-rollup-parallel-batch-index";
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java
new file mode 100644
index 0000000..b960707
--- /dev/null
+++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.coordinator.duty;
+
+import com.google.inject.Inject;
+import org.apache.druid.data.input.MaxSizeSplitHintSpec;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.indexer.partitions.PartitionsSpec;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
+import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
+import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig;
+import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
+import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.druid.testing.clients.CompactionResourceTestClient;
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.tests.TestNGGroup;
+import org.apache.druid.tests.indexer.AbstractIndexerTest;
+import org.joda.time.Period;
+import org.testng.Assert;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+@Test(groups = {TestNGGroup.UPGRADE})
+@Guice(moduleFactory = DruidTestModuleFactory.class)
+public class ITAutoCompactionUpgradeTest extends AbstractIndexerTest
+{
+  private static final Logger LOG = new Logger(ITAutoCompactionUpgradeTest.class);
+  private static final String UPGRADE_DATASOURCE_NAME = "upgradeTest";
+
+  @Inject
+  protected CompactionResourceTestClient compactionResource;
+
+  @Inject
+  private IntegrationTestingConfig config;
+
+  @Test
+  public void testUpgradeAutoCompactionConfigurationWhenConfigurationFromOlderVersionAlreadyExist() throws Exception
+  {
+    // Verify that compaction config already exist. This config was inserted manually into the database using SQL script.
+    // This auto compaction configuration payload is from Druid 0.21.0
+    CoordinatorCompactionConfig coordinatorCompactionConfig = compactionResource.getCoordinatorCompactionConfigs();
+    DataSourceCompactionConfig foundDataSourceCompactionConfig = null;
+    for (DataSourceCompactionConfig dataSourceCompactionConfig : coordinatorCompactionConfig.getCompactionConfigs()) {
+      if (dataSourceCompactionConfig.getDataSource().equals(UPGRADE_DATASOURCE_NAME)) {
+        foundDataSourceCompactionConfig = dataSourceCompactionConfig;
+      }
+    }
+    Assert.assertNotNull(foundDataSourceCompactionConfig);
+
+    // Now submit a new auto compaction configuration
+    PartitionsSpec newPartitionsSpec = new DynamicPartitionsSpec(4000, null);
+    Period newSkipOffset = Period.seconds(0);
+
+    DataSourceCompactionConfig compactionConfig = new DataSourceCompactionConfig(
+        UPGRADE_DATASOURCE_NAME,
+        null,
+        null,
+        null,
+        newSkipOffset,
+        new UserCompactionTaskQueryTuningConfig(
+            null,
+            null,
+            null,
+            new MaxSizeSplitHintSpec(null, 1),
+            newPartitionsSpec,
+            null,
+            null,
+            null,
+            null,
+            null,
+            1,
+            null,
+            null,
+            null,
+            null,
+            null,
+            1
+        ),
+        new UserCompactionTaskGranularityConfig(Granularities.YEAR, null),
+        new UserCompactionTaskIOConfig(true),
+        null
+    );
+    compactionResource.submitCompactionConfig(compactionConfig);
+
+    // Wait for compaction config to persist
+    Thread.sleep(2000);
+
+    // Verify that compaction was successfully updated
+    coordinatorCompactionConfig = compactionResource.getCoordinatorCompactionConfigs();
+    foundDataSourceCompactionConfig = null;
+    for (DataSourceCompactionConfig dataSourceCompactionConfig : coordinatorCompactionConfig.getCompactionConfigs()) {
+      if (dataSourceCompactionConfig.getDataSource().equals(UPGRADE_DATASOURCE_NAME)) {
+        foundDataSourceCompactionConfig = dataSourceCompactionConfig;
+      }
+    }
+    Assert.assertNotNull(foundDataSourceCompactionConfig);
+    Assert.assertNotNull(foundDataSourceCompactionConfig.getTuningConfig());
+    Assert.assertEquals(foundDataSourceCompactionConfig.getTuningConfig().getPartitionsSpec(), newPartitionsSpec);
+    Assert.assertEquals(foundDataSourceCompactionConfig.getSkipOffsetFromLatest(), newSkipOffset);
+  }
+}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java
index cc1fdf6..409a813 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java
@@ -24,6 +24,8 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import org.apache.druid.common.config.JacksonConfigManager;
+import org.apache.druid.metadata.MetadataStorageConnector;
+import org.apache.druid.metadata.MetadataStorageTablesConfig;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -86,6 +88,21 @@ public class CoordinatorCompactionConfig
     );
   }
 
+  public static byte[] getConfigInByteFromDb(final MetadataStorageConnector connector, MetadataStorageTablesConfig config)
+  {
+    return connector.lookup(
+        config.getConfigTable(),
+        "name",
+        "payload",
+        CoordinatorCompactionConfig.CONFIG_KEY
+    );
+  }
+
+  public static CoordinatorCompactionConfig convertByteToConfig(final JacksonConfigManager configManager, byte[] configInByte)
+  {
+    return configManager.convertByteToConfig(configInByte, CoordinatorCompactionConfig.class, CoordinatorCompactionConfig.empty());
+  }
+
   @Nonnull
   public static CoordinatorCompactionConfig current(final JacksonConfigManager configManager)
   {
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillCompactionConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillCompactionConfig.java
index cd54cf3..74645b5 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillCompactionConfig.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillCompactionConfig.java
@@ -30,6 +30,8 @@ import org.apache.druid.java.util.common.RetryUtils;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.metadata.MetadataStorageConnector;
+import org.apache.druid.metadata.MetadataStorageTablesConfig;
 import org.apache.druid.metadata.SqlSegmentsMetadataManager;
 import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
@@ -58,17 +60,23 @@ public class KillCompactionConfig implements CoordinatorDuty
 
   private final JacksonConfigManager jacksonConfigManager;
   private final SqlSegmentsMetadataManager sqlSegmentsMetadataManager;
+  private final MetadataStorageConnector connector;
+  private final MetadataStorageTablesConfig connectorConfig;
 
   @Inject
   public KillCompactionConfig(
       DruidCoordinatorConfig config,
       SqlSegmentsMetadataManager sqlSegmentsMetadataManager,
-      JacksonConfigManager jacksonConfigManager
+      JacksonConfigManager jacksonConfigManager,
+      MetadataStorageConnector connector,
+      MetadataStorageTablesConfig connectorConfig
   )
   {
     this.sqlSegmentsMetadataManager = sqlSegmentsMetadataManager;
     this.jacksonConfigManager = jacksonConfigManager;
     this.period = config.getCoordinatorCompactionKillPeriod().getMillis();
+    this.connector = connector;
+    this.connectorConfig = connectorConfig;
     Preconditions.checkArgument(
         this.period >= config.getCoordinatorMetadataStoreManagementPeriod().getMillis(),
         "Coordinator compaction configuration kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod"
@@ -88,7 +96,8 @@ public class KillCompactionConfig implements CoordinatorDuty
       try {
         RetryUtils.retry(
             () -> {
-              CoordinatorCompactionConfig current = CoordinatorCompactionConfig.current(jacksonConfigManager);
+              final byte[] currentBytes = CoordinatorCompactionConfig.getConfigInByteFromDb(connector, connectorConfig);
+              final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.convertByteToConfig(jacksonConfigManager, currentBytes);
               // If current compaction config is empty then there is nothing to do
               if (CoordinatorCompactionConfig.empty().equals(current)) {
                 log.info(
@@ -112,8 +121,7 @@ public class KillCompactionConfig implements CoordinatorDuty
 
               ConfigManager.SetResult result = jacksonConfigManager.set(
                   CoordinatorCompactionConfig.CONFIG_KEY,
-                  // Do database insert without swap if the current config is empty as this means the config may be null in the database
-                  current,
+                  currentBytes,
                   CoordinatorCompactionConfig.from(current, ImmutableList.copyOf(updated.values())),
                   new AuditInfo(
                       "KillCompactionConfig",
diff --git a/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java b/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java
index a5cd611..cb17c45 100644
--- a/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java
@@ -28,6 +28,9 @@ import org.apache.druid.audit.AuditInfo;
 import org.apache.druid.audit.AuditManager;
 import org.apache.druid.common.config.ConfigManager.SetResult;
 import org.apache.druid.common.config.JacksonConfigManager;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.metadata.MetadataStorageConnector;
+import org.apache.druid.metadata.MetadataStorageTablesConfig;
 import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
 import org.apache.druid.server.http.security.ConfigResourceFilter;
@@ -57,15 +60,24 @@ import java.util.stream.Collectors;
 @ResourceFilters(ConfigResourceFilter.class)
 public class CoordinatorCompactionConfigsResource
 {
+  private static final Logger LOG = new Logger(CoordinatorCompactionConfigsResource.class);
   private static final long UPDATE_RETRY_DELAY = 1000;
   static final int UPDATE_NUM_RETRY = 5;
 
   private final JacksonConfigManager manager;
+  private final MetadataStorageConnector connector;
+  private final MetadataStorageTablesConfig connectorConfig;
 
   @Inject
-  public CoordinatorCompactionConfigsResource(JacksonConfigManager manager)
+  public CoordinatorCompactionConfigsResource(
+      JacksonConfigManager manager,
+      MetadataStorageConnector connector,
+      MetadataStorageTablesConfig connectorConfig
+  )
   {
     this.manager = manager;
+    this.connector = connector;
+    this.connectorConfig = connectorConfig;
   }
 
   @GET
@@ -87,8 +99,8 @@ public class CoordinatorCompactionConfigsResource
   )
   {
     Callable<SetResult> callable = () -> {
-      final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.current(manager);
-
+      final byte[] currentBytes = getCurrentConfigInByteFromDb();
+      final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.convertByteToConfig(manager, currentBytes);
       final CoordinatorCompactionConfig newCompactionConfig = CoordinatorCompactionConfig.from(
           current,
           compactionTaskSlotRatio,
@@ -97,8 +109,7 @@ public class CoordinatorCompactionConfigsResource
 
       return manager.set(
           CoordinatorCompactionConfig.CONFIG_KEY,
-          // Do database insert without swap if the current config is empty as this means the config may be null in the database
-          CoordinatorCompactionConfig.empty().equals(current) ? null : current,
+          currentBytes,
           newCompactionConfig,
           new AuditInfo(author, comment, req.getRemoteAddr())
       );
@@ -116,7 +127,8 @@ public class CoordinatorCompactionConfigsResource
   )
   {
     Callable<SetResult> callable = () -> {
-      final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.current(manager);
+      final byte[] currentBytes = getCurrentConfigInByteFromDb();
+      final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.convertByteToConfig(manager, currentBytes);
       final CoordinatorCompactionConfig newCompactionConfig;
       final Map<String, DataSourceCompactionConfig> newConfigs = current
           .getCompactionConfigs()
@@ -127,8 +139,7 @@ public class CoordinatorCompactionConfigsResource
 
       return manager.set(
           CoordinatorCompactionConfig.CONFIG_KEY,
-          // Do database insert without swap if the current config is empty as this means the config may be null in the database
-          CoordinatorCompactionConfig.empty().equals(current) ? null : current,
+          currentBytes,
           newCompactionConfig,
           new AuditInfo(author, comment, req.getRemoteAddr())
       );
@@ -166,7 +177,8 @@ public class CoordinatorCompactionConfigsResource
   )
   {
     Callable<SetResult> callable = () -> {
-      final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.current(manager);
+      final byte[] currentBytes = getCurrentConfigInByteFromDb();
+      final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.convertByteToConfig(manager, currentBytes);
       final Map<String, DataSourceCompactionConfig> configs = current
           .getCompactionConfigs()
           .stream()
@@ -179,8 +191,7 @@ public class CoordinatorCompactionConfigsResource
 
       return manager.set(
           CoordinatorCompactionConfig.CONFIG_KEY,
-          // Do database insert without swap if the current config is empty as this means the config may be null in the database
-          CoordinatorCompactionConfig.empty().equals(current) ? null : current,
+          currentBytes,
           CoordinatorCompactionConfig.from(current, ImmutableList.copyOf(configs.values())),
           new AuditInfo(author, comment, req.getRemoteAddr())
       );
@@ -204,18 +215,21 @@ public class CoordinatorCompactionConfigsResource
       }
     }
     catch (Exception e) {
+      LOG.warn(e, "Update compaction config failed");
       return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
-                     .entity(ImmutableMap.of("error", e))
+                     .entity(ImmutableMap.of("error", createErrorMessage(e)))
                      .build();
     }
 
     if (setResult.isOk()) {
       return Response.ok().build();
     } else if (setResult.getException() instanceof NoSuchElementException) {
+      LOG.warn(setResult.getException(), "Update compaction config failed");
       return Response.status(Response.Status.NOT_FOUND).build();
     } else {
+      LOG.warn(setResult.getException(), "Update compaction config failed");
       return Response.status(Response.Status.BAD_REQUEST)
-                     .entity(ImmutableMap.of("error", setResult.getException()))
+                     .entity(ImmutableMap.of("error", createErrorMessage(setResult.getException())))
                      .build();
     }
   }
@@ -229,4 +243,18 @@ public class CoordinatorCompactionConfigsResource
       throw new RuntimeException(ie);
     }
   }
+
+  private byte[] getCurrentConfigInByteFromDb()
+  {
+    return CoordinatorCompactionConfig.getConfigInByteFromDb(connector, connectorConfig);
+  }
+
+  private String createErrorMessage(Exception e)
+  {
+    if (e.getMessage() == null) {
+      return "Unknown Error";
+    } else {
+      return e.getMessage();
+    }
+  }
 }
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java
index 555c998..4aad5df 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java
@@ -27,6 +27,8 @@ import org.apache.druid.common.config.JacksonConfigManager;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
+import org.apache.druid.metadata.MetadataStorageConnector;
+import org.apache.druid.metadata.MetadataStorageTablesConfig;
 import org.apache.druid.metadata.SqlSegmentsMetadataManager;
 import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
@@ -36,6 +38,7 @@ import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
 import org.joda.time.Duration;
 import org.joda.time.Period;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -48,8 +51,6 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.junit.MockitoJUnitRunner;
 import org.mockito.stubbing.Answer;
 
-import java.util.concurrent.atomic.AtomicReference;
-
 @RunWith(MockitoJUnitRunner.class)
 public class KillCompactionConfigTest
 {
@@ -65,11 +66,23 @@ public class KillCompactionConfigTest
   @Mock
   private JacksonConfigManager mockJacksonConfigManager;
 
+  @Mock
+  private MetadataStorageConnector mockConnector;
+
+  @Mock
+  private MetadataStorageTablesConfig mockConnectorConfig;
+
   @Rule
   public ExpectedException exception = ExpectedException.none();
 
   private KillCompactionConfig killCompactionConfig;
 
+  @Before
+  public void setup()
+  {
+    Mockito.when(mockConnectorConfig.getConfigTable()).thenReturn("druid_config");
+  }
+
   @Test
   public void testRunSkipIfLastRunLessThanPeriod()
   {
@@ -93,7 +106,13 @@ public class KillCompactionConfigTest
         10,
         null
     );
-    killCompactionConfig = new KillCompactionConfig(druidCoordinatorConfig, mockSqlSegmentsMetadataManager, mockJacksonConfigManager);
+    killCompactionConfig = new KillCompactionConfig(
+        druidCoordinatorConfig,
+        mockSqlSegmentsMetadataManager,
+        mockJacksonConfigManager,
+        mockConnector,
+        mockConnectorConfig
+    );
     killCompactionConfig.run(mockDruidCoordinatorRuntimeParams);
     Mockito.verifyZeroInteractions(mockSqlSegmentsMetadataManager);
     Mockito.verifyZeroInteractions(mockJacksonConfigManager);
@@ -125,7 +144,13 @@ public class KillCompactionConfigTest
     );
     exception.expect(IllegalArgumentException.class);
     exception.expectMessage("Coordinator compaction configuration kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod");
-    killCompactionConfig = new KillCompactionConfig(druidCoordinatorConfig, mockSqlSegmentsMetadataManager, mockJacksonConfigManager);
+    killCompactionConfig = new KillCompactionConfig(
+        druidCoordinatorConfig,
+        mockSqlSegmentsMetadataManager,
+        mockJacksonConfigManager,
+        mockConnector,
+        mockConnectorConfig
+    );
   }
 
 
@@ -134,11 +159,17 @@ public class KillCompactionConfigTest
   {
     Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
     // Set current compaction config to an empty compaction config
-    Mockito.when(mockJacksonConfigManager.watch(
-        ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
+    Mockito.when(mockConnector.lookup(
+        ArgumentMatchers.anyString(),
+        ArgumentMatchers.eq("name"),
+        ArgumentMatchers.eq("payload"),
+        ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY))
+    ).thenReturn(null);
+    Mockito.when(mockJacksonConfigManager.convertByteToConfig(
+        ArgumentMatchers.eq(null),
         ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
         ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()))
-    ).thenReturn(new AtomicReference<>(CoordinatorCompactionConfig.empty()));
+    ).thenReturn(CoordinatorCompactionConfig.empty());
 
     TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig(
         null,
@@ -160,18 +191,30 @@ public class KillCompactionConfigTest
         10,
         null
     );
-    killCompactionConfig = new KillCompactionConfig(druidCoordinatorConfig, mockSqlSegmentsMetadataManager, mockJacksonConfigManager);
+    killCompactionConfig = new KillCompactionConfig(
+        druidCoordinatorConfig,
+        mockSqlSegmentsMetadataManager,
+        mockJacksonConfigManager,
+        mockConnector,
+        mockConnectorConfig
+    );
     killCompactionConfig.run(mockDruidCoordinatorRuntimeParams);
     Mockito.verifyZeroInteractions(mockSqlSegmentsMetadataManager);
     final ArgumentCaptor<ServiceEventBuilder> emittedEventCaptor = ArgumentCaptor.forClass(ServiceEventBuilder.class);
     Mockito.verify(mockServiceEmitter).emit(emittedEventCaptor.capture());
     Assert.assertEquals(KillCompactionConfig.COUNT_METRIC, emittedEventCaptor.getValue().build(ImmutableMap.of()).toMap().get("metric"));
     Assert.assertEquals(0, emittedEventCaptor.getValue().build(ImmutableMap.of()).toMap().get("value"));
-    Mockito.verify(mockJacksonConfigManager).watch(
-        ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
+    Mockito.verify(mockJacksonConfigManager).convertByteToConfig(
+        ArgumentMatchers.eq(null),
         ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
         ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())
     );
+    Mockito.verify(mockConnector).lookup(
+        ArgumentMatchers.anyString(),
+        ArgumentMatchers.eq("name"),
+        ArgumentMatchers.eq("payload"),
+        ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)
+    );
     Mockito.verifyNoMoreInteractions(mockJacksonConfigManager);
   }
 
@@ -204,14 +247,21 @@ public class KillCompactionConfigTest
         ImmutableMap.of("key", "val")
     );
     CoordinatorCompactionConfig originalCurrentConfig = CoordinatorCompactionConfig.from(ImmutableList.of(inactiveDatasourceConfig, activeDatasourceConfig));
-    Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
-    Mockito.when(mockJacksonConfigManager.watch(
-        ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
+    byte[] originalCurrentConfigBytes = {1, 2, 3};
+    Mockito.when(mockConnector.lookup(
+        ArgumentMatchers.anyString(),
+        ArgumentMatchers.eq("name"),
+        ArgumentMatchers.eq("payload"),
+        ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY))
+    ).thenReturn(originalCurrentConfigBytes);
+    Mockito.when(mockJacksonConfigManager.convertByteToConfig(
+        ArgumentMatchers.eq(originalCurrentConfigBytes),
         ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
         ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()))
-    ).thenReturn(new AtomicReference<>(originalCurrentConfig));
+    ).thenReturn(originalCurrentConfig);
+    Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
     Mockito.when(mockSqlSegmentsMetadataManager.retrieveAllDataSourceNames()).thenReturn(ImmutableSet.of(activeDatasourceName));
-    final ArgumentCaptor<CoordinatorCompactionConfig> oldConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
+    final ArgumentCaptor<byte[]> oldConfigCaptor = ArgumentCaptor.forClass(byte[].class);
     final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
     Mockito.when(mockJacksonConfigManager.set(
         ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
@@ -240,12 +290,18 @@ public class KillCompactionConfigTest
         10,
         null
     );
-    killCompactionConfig = new KillCompactionConfig(druidCoordinatorConfig, mockSqlSegmentsMetadataManager, mockJacksonConfigManager);
+    killCompactionConfig = new KillCompactionConfig(
+        druidCoordinatorConfig,
+        mockSqlSegmentsMetadataManager,
+        mockJacksonConfigManager,
+        mockConnector,
+        mockConnectorConfig
+    );
     killCompactionConfig.run(mockDruidCoordinatorRuntimeParams);
 
     // Verify and Assert
     Assert.assertNotNull(oldConfigCaptor.getValue());
-    Assert.assertEquals(oldConfigCaptor.getValue(), originalCurrentConfig);
+    Assert.assertEquals(oldConfigCaptor.getValue(), originalCurrentConfigBytes);
     Assert.assertNotNull(newConfigCaptor.getValue());
     // The updated config should only contains one compaction config for the active datasource
     Assert.assertEquals(1, newConfigCaptor.getValue().getCompactionConfigs().size());
@@ -257,14 +313,20 @@ public class KillCompactionConfigTest
     // Should delete 1 config
     Assert.assertEquals(1, emittedEventCaptor.getValue().build(ImmutableMap.of()).toMap().get("value"));
 
-    Mockito.verify(mockJacksonConfigManager).watch(
-        ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
+    Mockito.verify(mockJacksonConfigManager).convertByteToConfig(
+        ArgumentMatchers.eq(originalCurrentConfigBytes),
         ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
         ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())
     );
+    Mockito.verify(mockConnector).lookup(
+        ArgumentMatchers.anyString(),
+        ArgumentMatchers.eq("name"),
+        ArgumentMatchers.eq("payload"),
+        ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)
+    );
     Mockito.verify(mockJacksonConfigManager).set(
         ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
-        ArgumentMatchers.any(CoordinatorCompactionConfig.class),
+        ArgumentMatchers.any(byte[].class),
         ArgumentMatchers.any(CoordinatorCompactionConfig.class),
         ArgumentMatchers.any()
     );
@@ -290,16 +352,23 @@ public class KillCompactionConfigTest
     );
 
     CoordinatorCompactionConfig originalCurrentConfig = CoordinatorCompactionConfig.from(ImmutableList.of(inactiveDatasourceConfig));
-    Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
-    Mockito.when(mockJacksonConfigManager.watch(
-        ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
+    byte[] originalCurrentConfigBytes = {1, 2, 3};
+    Mockito.when(mockConnector.lookup(
+        ArgumentMatchers.anyString(),
+        ArgumentMatchers.eq("name"),
+        ArgumentMatchers.eq("payload"),
+        ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY))
+    ).thenReturn(originalCurrentConfigBytes);
+    Mockito.when(mockJacksonConfigManager.convertByteToConfig(
+        ArgumentMatchers.eq(originalCurrentConfigBytes),
         ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
         ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()))
-    ).thenReturn(new AtomicReference<>(originalCurrentConfig));
+    ).thenReturn(originalCurrentConfig);
+    Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
     Mockito.when(mockSqlSegmentsMetadataManager.retrieveAllDataSourceNames()).thenReturn(ImmutableSet.of());
     Mockito.when(mockJacksonConfigManager.set(
         ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
-        ArgumentMatchers.any(CoordinatorCompactionConfig.class),
+        ArgumentMatchers.any(byte[].class),
         ArgumentMatchers.any(CoordinatorCompactionConfig.class),
         ArgumentMatchers.any())
     ).thenAnswer(new Answer() {
@@ -337,7 +406,13 @@ public class KillCompactionConfigTest
         10,
         null
     );
-    killCompactionConfig = new KillCompactionConfig(druidCoordinatorConfig, mockSqlSegmentsMetadataManager, mockJacksonConfigManager);
+    killCompactionConfig = new KillCompactionConfig(
+        druidCoordinatorConfig,
+        mockSqlSegmentsMetadataManager,
+        mockJacksonConfigManager,
+        mockConnector,
+        mockConnectorConfig
+    );
     killCompactionConfig.run(mockDruidCoordinatorRuntimeParams);
 
     // Verify and Assert
@@ -347,16 +422,23 @@ public class KillCompactionConfigTest
     // Should delete 1 config
     Assert.assertEquals(1, emittedEventCaptor.getValue().build(ImmutableMap.of()).toMap().get("value"));
 
-    // Should call watch (to refresh current compaction config) four times due to RetryableException when failed
-    Mockito.verify(mockJacksonConfigManager, Mockito.times(4)).watch(
-        ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
+    // Should call convertByteToConfig and lookup (to refresh current compaction config) four times due to RetryableException when failed
+    Mockito.verify(mockJacksonConfigManager, Mockito.times(4)).convertByteToConfig(
+        ArgumentMatchers.eq(originalCurrentConfigBytes),
         ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
         ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())
     );
+    Mockito.verify(mockConnector, Mockito.times(4)).lookup(
+        ArgumentMatchers.anyString(),
+        ArgumentMatchers.eq("name"),
+        ArgumentMatchers.eq("payload"),
+        ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY)
+    );
+
     // Should call set (to try set new updated compaction config) four times due to RetryableException when failed
     Mockito.verify(mockJacksonConfigManager, Mockito.times(4)).set(
         ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
-        ArgumentMatchers.any(CoordinatorCompactionConfig.class),
+        ArgumentMatchers.any(byte[].class),
         ArgumentMatchers.any(CoordinatorCompactionConfig.class),
         ArgumentMatchers.any()
     );
diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
index 988fd2b..c130dc3 100644
--- a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
+++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
@@ -25,6 +25,8 @@ import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.druid.common.config.ConfigManager;
 import org.apache.druid.common.config.JacksonConfigManager;
 import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.metadata.MetadataStorageConnector;
+import org.apache.druid.metadata.MetadataStorageTablesConfig;
 import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
 import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
@@ -42,7 +44,6 @@ import org.mockito.junit.MockitoJUnitRunner;
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.core.Response;
 import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicReference;
 
 @RunWith(MockitoJUnitRunner.class)
 public class CoordinatorCompactionConfigsResourceTest
@@ -58,6 +59,8 @@ public class CoordinatorCompactionConfigsResourceTest
       null,
       ImmutableMap.of("key", "val")
   );
+  private static final byte[] OLD_CONFIG_IN_BYTES = {1, 2, 3};
+
   private static final CoordinatorCompactionConfig ORIGINAL_CONFIG = CoordinatorCompactionConfig.from(ImmutableList.of(OLD_CONFIG));
 
   @Mock
@@ -66,24 +69,41 @@ public class CoordinatorCompactionConfigsResourceTest
   @Mock
   private HttpServletRequest mockHttpServletRequest;
 
+  @Mock
+  private MetadataStorageConnector mockConnector;
+
+  @Mock
+  private MetadataStorageTablesConfig mockConnectorConfig;
+
   private CoordinatorCompactionConfigsResource coordinatorCompactionConfigsResource;
 
   @Before
   public void setup()
   {
-    Mockito.when(mockJacksonConfigManager.watch(
-        ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
+    Mockito.when(mockConnector.lookup(
+        ArgumentMatchers.anyString(),
+        ArgumentMatchers.eq("name"),
+        ArgumentMatchers.eq("payload"),
+        ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY))
+    ).thenReturn(OLD_CONFIG_IN_BYTES);
+    Mockito.when(mockJacksonConfigManager.convertByteToConfig(
+        ArgumentMatchers.eq(OLD_CONFIG_IN_BYTES),
         ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
         ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()))
-    ).thenReturn(new AtomicReference<>(ORIGINAL_CONFIG));
-    coordinatorCompactionConfigsResource = new CoordinatorCompactionConfigsResource(mockJacksonConfigManager);
+    ).thenReturn(ORIGINAL_CONFIG);
+    Mockito.when(mockConnectorConfig.getConfigTable()).thenReturn("druid_config");
+    coordinatorCompactionConfigsResource = new CoordinatorCompactionConfigsResource(
+        mockJacksonConfigManager,
+        mockConnector,
+        mockConnectorConfig
+    );
     Mockito.when(mockHttpServletRequest.getRemoteAddr()).thenReturn("123");
   }
 
   @Test
   public void testSetCompactionTaskLimitWithExistingConfig()
   {
-    final ArgumentCaptor<CoordinatorCompactionConfig> oldConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
+    final ArgumentCaptor<byte[]> oldConfigCaptor = ArgumentCaptor.forClass(byte[].class);
     final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
     Mockito.when(mockJacksonConfigManager.set(
         ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
@@ -105,7 +125,7 @@ public class CoordinatorCompactionConfigsResourceTest
     );
     Assert.assertEquals(Response.Status.OK.getStatusCode(), result.getStatus());
     Assert.assertNotNull(oldConfigCaptor.getValue());
-    Assert.assertEquals(oldConfigCaptor.getValue(), ORIGINAL_CONFIG);
+    Assert.assertEquals(oldConfigCaptor.getValue(), OLD_CONFIG_IN_BYTES);
     Assert.assertNotNull(newConfigCaptor.getValue());
     Assert.assertEquals(newConfigCaptor.getValue().getMaxCompactionTaskSlots(), maxCompactionTaskSlots);
     Assert.assertEquals(compactionTaskSlotRatio, newConfigCaptor.getValue().getCompactionTaskSlotRatio(), 0);
@@ -114,7 +134,7 @@ public class CoordinatorCompactionConfigsResourceTest
   @Test
   public void testAddOrUpdateCompactionConfigWithExistingConfig()
   {
-    final ArgumentCaptor<CoordinatorCompactionConfig> oldConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
+    final ArgumentCaptor<byte[]> oldConfigCaptor = ArgumentCaptor.forClass(byte[].class);
     final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
     Mockito.when(mockJacksonConfigManager.set(
         ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
@@ -144,7 +164,7 @@ public class CoordinatorCompactionConfigsResourceTest
     );
     Assert.assertEquals(Response.Status.OK.getStatusCode(), result.getStatus());
     Assert.assertNotNull(oldConfigCaptor.getValue());
-    Assert.assertEquals(oldConfigCaptor.getValue(), ORIGINAL_CONFIG);
+    Assert.assertEquals(oldConfigCaptor.getValue(), OLD_CONFIG_IN_BYTES);
     Assert.assertNotNull(newConfigCaptor.getValue());
     Assert.assertEquals(2, newConfigCaptor.getValue().getCompactionConfigs().size());
     Assert.assertEquals(OLD_CONFIG, newConfigCaptor.getValue().getCompactionConfigs().get(0));
@@ -154,7 +174,7 @@ public class CoordinatorCompactionConfigsResourceTest
   @Test
   public void testDeleteCompactionConfigWithExistingConfig()
   {
-    final ArgumentCaptor<CoordinatorCompactionConfig> oldConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
+    final ArgumentCaptor<byte[]> oldConfigCaptor = ArgumentCaptor.forClass(byte[].class);
     final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
     Mockito.when(mockJacksonConfigManager.set(
         ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
@@ -175,11 +195,11 @@ public class CoordinatorCompactionConfigsResourceTest
         ImmutableMap.of("key", "val")
     );
     final CoordinatorCompactionConfig originalConfig = CoordinatorCompactionConfig.from(ImmutableList.of(toDelete));
-    Mockito.when(mockJacksonConfigManager.watch(
-        ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
+    Mockito.when(mockJacksonConfigManager.convertByteToConfig(
+        ArgumentMatchers.eq(OLD_CONFIG_IN_BYTES),
         ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
         ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()))
-    ).thenReturn(new AtomicReference<>(originalConfig));
+    ).thenReturn(originalConfig);
 
     String author = "maytas";
     String comment = "hello";
@@ -191,7 +211,7 @@ public class CoordinatorCompactionConfigsResourceTest
     );
     Assert.assertEquals(Response.Status.OK.getStatusCode(), result.getStatus());
     Assert.assertNotNull(oldConfigCaptor.getValue());
-    Assert.assertEquals(oldConfigCaptor.getValue(), originalConfig);
+    Assert.assertEquals(oldConfigCaptor.getValue(), OLD_CONFIG_IN_BYTES);
     Assert.assertNotNull(newConfigCaptor.getValue());
     Assert.assertEquals(0, newConfigCaptor.getValue().getCompactionConfigs().size());
   }
@@ -223,12 +243,18 @@ public class CoordinatorCompactionConfigsResourceTest
   @Test
   public void testSetCompactionTaskLimitWithoutExistingConfig()
   {
-    Mockito.when(mockJacksonConfigManager.watch(
-        ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
+    Mockito.when(mockConnector.lookup(
+        ArgumentMatchers.anyString(),
+        ArgumentMatchers.eq("name"),
+        ArgumentMatchers.eq("payload"),
+        ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY))
+    ).thenReturn(null);
+    Mockito.when(mockJacksonConfigManager.convertByteToConfig(
+        ArgumentMatchers.eq(null),
         ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
         ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()))
-    ).thenReturn(new AtomicReference<>(CoordinatorCompactionConfig.empty()));
-    final ArgumentCaptor<CoordinatorCompactionConfig> oldConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
+    ).thenReturn(CoordinatorCompactionConfig.empty());
+    final ArgumentCaptor<byte[]> oldConfigCaptor = ArgumentCaptor.forClass(byte[].class);
     final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
     Mockito.when(mockJacksonConfigManager.set(
         ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
@@ -258,12 +284,18 @@ public class CoordinatorCompactionConfigsResourceTest
   @Test
   public void testAddOrUpdateCompactionConfigWithoutExistingConfig()
   {
-    Mockito.when(mockJacksonConfigManager.watch(
-        ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
+    Mockito.when(mockConnector.lookup(
+        ArgumentMatchers.anyString(),
+        ArgumentMatchers.eq("name"),
+        ArgumentMatchers.eq("payload"),
+        ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY))
+    ).thenReturn(null);
+    Mockito.when(mockJacksonConfigManager.convertByteToConfig(
+        ArgumentMatchers.eq(null),
         ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
         ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()))
-    ).thenReturn(new AtomicReference<>(CoordinatorCompactionConfig.empty()));
-    final ArgumentCaptor<CoordinatorCompactionConfig> oldConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
+    ).thenReturn(CoordinatorCompactionConfig.empty());
+    final ArgumentCaptor<byte[]> oldConfigCaptor = ArgumentCaptor.forClass(byte[].class);
     final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
     Mockito.when(mockJacksonConfigManager.set(
         ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
@@ -301,11 +333,17 @@ public class CoordinatorCompactionConfigsResourceTest
   @Test
   public void testDeleteCompactionConfigWithoutExistingConfigShouldFailAsDatasourceNotExist()
   {
-    Mockito.when(mockJacksonConfigManager.watch(
-        ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
+    Mockito.when(mockConnector.lookup(
+        ArgumentMatchers.anyString(),
+        ArgumentMatchers.eq("name"),
+        ArgumentMatchers.eq("payload"),
+        ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY))
+    ).thenReturn(null);
+    Mockito.when(mockJacksonConfigManager.convertByteToConfig(
+        ArgumentMatchers.eq(null),
         ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
         ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()))
-    ).thenReturn(new AtomicReference<>(CoordinatorCompactionConfig.empty()));
+    ).thenReturn(CoordinatorCompactionConfig.empty());
     String author = "maytas";
     String comment = "hello";
     Response result = coordinatorCompactionConfigsResource.deleteCompactionConfig(

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org