You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/30 00:06:03 UTC

[49/50] [abbrv] kafka git commit: KAFKA-3459: Returning zero task configurations from a connector does not properly clean up existing tasks

KAFKA-3459: Returning zero task configurations from a connector does not properly clean up existing tasks

hachikuji ewencp Can you take a look when you have time?

Author: Liquan Pei <li...@gmail.com>

Reviewers: Jason Gustafson <ja...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #1248 from Ishiihara/kafka-3459


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d0dedc63
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d0dedc63
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d0dedc63

Branch: refs/heads/0.10.0
Commit: d0dedc6314bfd83d9b2b9a9557e3168e715981da
Parents: 096b8b8
Author: Liquan Pei <li...@gmail.com>
Authored: Fri Apr 29 14:49:22 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Fri Apr 29 14:49:22 2016 -0700

----------------------------------------------------------------------
 .../runtime/distributed/ClusterConfigState.java |  13 +-
 .../runtime/distributed/DistributedHerder.java  |  15 +--
 .../runtime/standalone/StandaloneHerder.java    |  16 +--
 .../connect/storage/ConfigBackingStore.java     |   5 +-
 .../storage/KafkaConfigBackingStore.java        |  82 +++++-------
 .../storage/MemoryConfigBackingStore.java       |  18 ++-
 .../storage/KafkaConfigBackingStoreTest.java    | 131 ++++++++++++++++---
 7 files changed, 174 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d0dedc63/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java
index c5c217e..ea5ba82 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java
@@ -22,10 +22,11 @@ import org.apache.kafka.connect.util.ConnectorTaskId;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
 
 /**
  * An immutable snapshot of the configuration state of connectors and tasks in a Kafka Connect cluster.
@@ -116,15 +117,15 @@ public class ClusterConfigState {
     /**
      * Get all task configs for a connector.
      * @param connector name of the connector
-     * @return a map from the task id to its configuration
+     * @return a list of task configurations
      */
-    public Map<ConnectorTaskId, Map<String, String>> allTaskConfigs(String connector) {
-        Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap<>();
+    public List<Map<String, String>> allTaskConfigs(String connector) {
+        Map<Integer, Map<String, String>> taskConfigs = new TreeMap<>();
         for (Map.Entry<ConnectorTaskId, Map<String, String>> taskConfigEntry : this.taskConfigs.entrySet()) {
             if (taskConfigEntry.getKey().connector().equals(connector))
-                taskConfigs.put(taskConfigEntry.getKey(), taskConfigEntry.getValue());
+                taskConfigs.put(taskConfigEntry.getKey().task(), taskConfigEntry.getValue());
         }
-        return taskConfigs;
+        return new LinkedList<>(taskConfigs.values());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0dedc63/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index cbef186..037eba7 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -44,7 +44,6 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -543,7 +542,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
                         else if (!configState.contains(connName))
                             callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null);
                         else {
-                            configBackingStore.putTaskConfigs(connName, taskConfigListAsMap(connName, configs));
+                            configBackingStore.putTaskConfigs(connName, configs);
                             callback.onCompletion(null, null);
                         }
                         return null;
@@ -853,7 +852,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
             }
             if (changed) {
                 if (isLeader()) {
-                    configBackingStore.putTaskConfigs(connName, taskConfigListAsMap(connName, taskProps));
+                    configBackingStore.putTaskConfigs(connName, taskProps);
                     cb.onCompletion(null, null);
                 } else {
                     // We cannot forward the request on the same thread because this reconfiguration can happen as a result of connector
@@ -1064,14 +1063,4 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
         }
     }
 
-    private static Map<ConnectorTaskId, Map<String, String>> taskConfigListAsMap(String connName, List<Map<String, String>> configs) {
-        int index = 0;
-        Map<ConnectorTaskId, Map<String, String>> result = new HashMap<>();
-        for (Map<String, String> taskConfigMap : configs) {
-            ConnectorTaskId taskId = new ConnectorTaskId(connName, index);
-            result.put(taskId, taskConfigMap);
-            index++;
-        }
-        return result;
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0dedc63/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index ad02e99..2316bae 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -40,7 +40,6 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -250,20 +249,13 @@ public class StandaloneHerder extends AbstractHerder {
         return connName;
     }
 
-    private Map<ConnectorTaskId, Map<String, String>> recomputeTaskConfigs(String connName) {
+    private List<Map<String, String>> recomputeTaskConfigs(String connName) {
         Map<String, String> config = configState.connectorConfig(connName);
         ConnectorConfig connConfig = new ConnectorConfig(config);
 
-        List<Map<String, String>> taskConfigs = worker.connectorTaskConfigs(connName,
+        return worker.connectorTaskConfigs(connName,
                 connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG),
                 connConfig.getList(ConnectorConfig.TOPICS_CONFIG));
-
-        int i = 0;
-        Map<ConnectorTaskId, Map<String, String>> taskConfigMap = new HashMap<>();
-        for (Map<String, String> taskConfig : taskConfigs)
-            taskConfigMap.put(new ConnectorTaskId(connName, i++), taskConfig);
-
-        return taskConfigMap;
     }
 
     private void createConnectorTasks(String connName, TargetState initialState) {
@@ -296,8 +288,8 @@ public class StandaloneHerder extends AbstractHerder {
             return;
         }
 
-        Map<ConnectorTaskId, Map<String, String>> newTaskConfigs = recomputeTaskConfigs(connName);
-        Map<ConnectorTaskId, Map<String, String>> oldTaskConfigs = configState.allTaskConfigs(connName);
+        List<Map<String, String>> newTaskConfigs = recomputeTaskConfigs(connName);
+        List<Map<String, String>> oldTaskConfigs = configState.allTaskConfigs(connName);
 
         if (!newTaskConfigs.equals(oldTaskConfigs)) {
             removeConnectorTasks(connName);

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0dedc63/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java
index 5244842..77fc43b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java
@@ -22,6 +22,7 @@ import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -64,9 +65,9 @@ public interface ConfigBackingStore {
     /**
      * Update the task configurations for a connector.
      * @param connector name of the connector
-     * @param configs the new task configs
+     * @param configs the new task configs for the connector
      */
-    void putTaskConfigs(String connector, Map<ConnectorTaskId, Map<String, String>> configs);
+    void putTaskConfigs(String connector, List<Map<String, String>> configs);
 
     /**
      * Remove the task configs associated with a connector.

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0dedc63/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index 8d20288..9412e42 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -205,6 +205,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
     // Connector and task configs: name or id -> config map
     private Map<String, Map<String, String>> connectorConfigs = new HashMap<>();
     private Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap<>();
+
     // Set of connectors where we saw a task commit with an incomplete set of task config updates, indicating the data
     // is in an inconsistent state and we cannot safely use them until they have been refreshed.
     private Set<String> inconsistent = new HashSet<>();
@@ -339,12 +340,13 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
      * Write these task configurations and associated commit messages, unless an inconsistency is found that indicates
      * that we would be leaving one of the referenced connectors with an inconsistent state.
      *
-     * @param configs map containing task configurations
+     * @param connector the connector to write task configuration
+     * @param configs list of task configurations for the connector
      * @throws ConnectException if the task configurations do not resolve inconsistencies found in the existing root
      *                          and task configurations.
      */
     @Override
-    public void putTaskConfigs(String connector, Map<ConnectorTaskId, Map<String, String>> configs) {
+    public void putTaskConfigs(String connector, List<Map<String, String>> configs) {
         // Make sure we're at the end of the log. We should be the only writer, but we want to make sure we don't have
         // any outstanding lagging data to consume.
         try {
@@ -354,46 +356,33 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
             throw new ConnectException("Error writing root configuration to Kafka", e);
         }
 
-        // In theory, there is only a single writer and we shouldn't need this lock since the background thread should
-        // not invoke any callbacks that would conflict, but in practice this guards against inconsistencies due to
-        // the root config being updated.
-        Map<String, Integer> newTaskCounts = new HashMap<>();
-        synchronized (lock) {
-            // Validate tasks in this assignment. Any task configuration updates should include updates for *all* tasks
-            // in the connector -- we should have all task IDs 0 - N-1 within a connector if any task is included here
-            Map<String, Set<Integer>> updatedConfigIdsByConnector = taskIdsByConnector(configs);
-            for (Map.Entry<String, Set<Integer>> taskConfigSetEntry : updatedConfigIdsByConnector.entrySet()) {
-                if (!completeTaskIdSet(taskConfigSetEntry.getValue(), taskConfigSetEntry.getValue().size())) {
-                    log.error("Submitted task configuration contain invalid range of task IDs, ignoring this submission");
-                    throw new ConnectException("Error writing task configurations: found some connectors with invalid connectors");
-                }
-                newTaskCounts.put(taskConfigSetEntry.getKey(), taskConfigSetEntry.getValue().size());
-            }
-        }
+        int taskCount = configs.size();
 
         // Start sending all the individual updates
-        for (Map.Entry<ConnectorTaskId, Map<String, String>> taskConfigEntry : configs.entrySet()) {
+        int index = 0;
+        for (Map<String, String> taskConfig: configs) {
             Struct connectConfig = new Struct(TASK_CONFIGURATION_V0);
-            connectConfig.put("properties", taskConfigEntry.getValue());
+            connectConfig.put("properties", taskConfig);
             byte[] serializedConfig = converter.fromConnectData(topic, TASK_CONFIGURATION_V0, connectConfig);
-            log.debug("Writing configuration for task " + taskConfigEntry.getKey() + " configuration: " + taskConfigEntry.getValue());
-            configLog.send(TASK_KEY(taskConfigEntry.getKey()), serializedConfig);
+            log.debug("Writing configuration for task " + index + " configuration: " + taskConfig);
+            ConnectorTaskId connectorTaskId = new ConnectorTaskId(connector, index);
+            configLog.send(TASK_KEY(connectorTaskId), serializedConfig);
+            index++;
         }
 
         // Finally, send the commit to update the number of tasks and apply the new configs, then wait until we read to
         // the end of the log
         try {
             // Read to end to ensure all the task configs have been written
-            configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
-
-            // Write all the commit messages
-            for (Map.Entry<String, Integer> taskCountEntry : newTaskCounts.entrySet()) {
-                Struct connectConfig = new Struct(CONNECTOR_TASKS_COMMIT_V0);
-                connectConfig.put("tasks", taskCountEntry.getValue());
-                byte[] serializedConfig = converter.fromConnectData(topic, CONNECTOR_TASKS_COMMIT_V0, connectConfig);
-                log.debug("Writing commit for connector " + taskCountEntry.getKey() + " with " + taskCountEntry.getValue() + " tasks.");
-                configLog.send(COMMIT_TASKS_KEY(taskCountEntry.getKey()), serializedConfig);
+            if (taskCount > 0) {
+                configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
             }
+            // Write the commit message
+            Struct connectConfig = new Struct(CONNECTOR_TASKS_COMMIT_V0);
+            connectConfig.put("tasks", taskCount);
+            byte[] serializedConfig = converter.fromConnectData(topic, CONNECTOR_TASKS_COMMIT_V0, connectConfig);
+            log.debug("Writing commit for connector " + connector + " with " + taskCount + " tasks.");
+            configLog.send(COMMIT_TASKS_KEY(connector), serializedConfig);
 
             // Read to end to ensure all the commit messages have been written
             configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
@@ -426,6 +415,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
         return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, new SystemTime());
     }
 
+    @SuppressWarnings("unchecked")
     private class ConsumeCallback implements Callback<ConsumerRecord<String, byte[]>> {
         @Override
         public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record) {
@@ -562,20 +552,13 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
                         log.error("Ignoring connector tasks configuration commit for connector " + connectorName + " because it is in the wrong format: " + value.value());
                         return;
                     }
-
                     Map<ConnectorTaskId, Map<String, String>> deferred = deferredTaskUpdates.get(connectorName);
 
                     int newTaskCount = intValue(((Map<String, Object>) value.value()).get("tasks"));
 
                     // Validate the configs we're supposed to update to ensure we're getting a complete configuration
                     // update of all tasks that are expected based on the number of tasks in the commit message.
-                    Map<String, Set<Integer>> updatedConfigIdsByConnector = taskIdsByConnector(deferred);
-                    Set<Integer> taskIdSet = updatedConfigIdsByConnector.get(connectorName);
-                    if (taskIdSet == null) {
-                        //TODO: Figure out why this happens (KAFKA-3321)
-                        log.error("Received a commit message for connector " + connectorName + " but there is no matching configuration for tasks in this connector. This should never happen.");
-                        return;
-                    }
+                    Set<Integer> taskIdSet = taskIds(connectorName, deferred);
                     if (!completeTaskIdSet(taskIdSet, newTaskCount)) {
                         // Given the logic for writing commit messages, we should only hit this condition due to compacted
                         // historical data, in which case we would not have applied any updates yet and there will be no
@@ -622,19 +605,18 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
     }
 
     /**
-     * Given task configurations, get a set of integer task IDs organized by connector name.
+     * Given task configurations, get a set of integer task IDs for the connector.
      */
-    private Map<String, Set<Integer>> taskIdsByConnector(Map<ConnectorTaskId, Map<String, String>> configs) {
-        Map<String, Set<Integer>> connectorTaskIds = new HashMap<>();
-        if (configs == null)
-            return connectorTaskIds;
-        for (Map.Entry<ConnectorTaskId, Map<String, String>> taskConfigEntry : configs.entrySet()) {
-            ConnectorTaskId taskId = taskConfigEntry.getKey();
-            if (!connectorTaskIds.containsKey(taskId.connector()))
-                connectorTaskIds.put(taskId.connector(), new TreeSet<Integer>());
-            connectorTaskIds.get(taskId.connector()).add(taskId.task());
+    private Set<Integer> taskIds(String connector, Map<ConnectorTaskId, Map<String, String>> configs) {
+        Set<Integer> tasks = new TreeSet<>();
+        if (configs == null) {
+            return tasks;
+        }
+        for (ConnectorTaskId taskId : configs.keySet()) {
+            assert taskId.connector().equals(connector);
+            tasks.add(taskId.task());
         }
-        return connectorTaskIds;
+        return tasks;
     }
 
     private boolean completeTaskIdSet(Set<Integer> idSet, int expectedSize) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0dedc63/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
index ec5f2e6..212022d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
@@ -24,7 +24,9 @@ import org.apache.kafka.connect.util.ConnectorTaskId;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 
 public class MemoryConfigBackingStore implements ConfigBackingStore {
@@ -108,15 +110,16 @@ public class MemoryConfigBackingStore implements ConfigBackingStore {
     }
 
     @Override
-    public synchronized void putTaskConfigs(String connector, Map<ConnectorTaskId, Map<String, String>> configs) {
+    public synchronized void putTaskConfigs(String connector, List<Map<String, String>> configs) {
         ConnectorState state = connectors.get(connector);
         if (state == null)
             throw new IllegalArgumentException("Cannot put tasks for non-existing connector");
 
-        state.taskConfigs = configs;
+        Map<ConnectorTaskId, Map<String, String>> taskConfigsMap = taskConfigListAsMap(connector, configs);
+        state.taskConfigs = taskConfigsMap;
 
         if (updateListener != null)
-            updateListener.onTaskConfigUpdate(configs.keySet());
+            updateListener.onTaskConfigUpdate(taskConfigsMap.keySet());
     }
 
     @Override
@@ -151,4 +154,13 @@ public class MemoryConfigBackingStore implements ConfigBackingStore {
             this.taskConfigs = new HashMap<>();
         }
     }
+
+    private static Map<ConnectorTaskId, Map<String, String>> taskConfigListAsMap(String connector, List<Map<String, String>> configs) {
+        int index = 0;
+        Map<ConnectorTaskId, Map<String, String>> result = new TreeMap<>();
+        for (Map<String, String> taskConfigMap: configs) {
+            result.put(new ConnectorTaskId(connector, index++), taskConfigMap);
+        }
+        return result;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0dedc63/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index eaad34b..617177e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -21,7 +21,6 @@ import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
@@ -57,7 +56,6 @@ import java.util.concurrent.Future;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(KafkaConfigBackingStore.class)
@@ -113,6 +111,9 @@ public class KafkaConfigBackingStoreTest {
     private static final Struct TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR
             = new Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 2);
 
+    private static final Struct TASKS_COMMIT_STRUCT_ZERO_TASK_CONNECTOR
+            = new Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 0);
+
     // The exact format doesn't matter here since both conversions are mocked
     private static final List<byte[]> CONFIGS_SERIALIZED = Arrays.asList(
             "config-bytes-1".getBytes(), "config-bytes-2".getBytes(), "config-bytes-3".getBytes(),
@@ -275,9 +276,7 @@ public class KafkaConfigBackingStoreTest {
 
         // Writing task task configs should block until all the writes have been performed and the root record update
         // has completed
-        Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap<>();
-        taskConfigs.put(TASK_IDS.get(0), SAMPLE_CONFIGS.get(0));
-        taskConfigs.put(TASK_IDS.get(1), SAMPLE_CONFIGS.get(1));
+        List<Map<String, String>> taskConfigs = Arrays.asList(SAMPLE_CONFIGS.get(0), SAMPLE_CONFIGS.get(1));
         configStorage.putTaskConfigs("connector1", taskConfigs);
 
         // Validate root config by listing all connectors and tasks
@@ -296,6 +295,57 @@ public class KafkaConfigBackingStoreTest {
     }
 
     @Test
+    public void testPutTaskConfigsZeroTasks() throws Exception {
+        expectConfigure();
+        expectStart(Collections.EMPTY_LIST, Collections.EMPTY_MAP);
+
+        // Task configs should read to end, write to the log, read to end, write root.
+        expectReadToEnd(new LinkedHashMap<String, byte[]>());
+        expectConvertWriteRead(
+            COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(0),
+            "tasks", 0); // We have 0 tasks
+        // As soon as root is rewritten, we should see a callback notifying us that we reconfigured some tasks
+        configUpdateListener.onTaskConfigUpdate(Collections.<ConnectorTaskId>emptyList());
+        EasyMock.expectLastCall();
+
+        // Records to be read by consumer as it reads to the end of the log
+        LinkedHashMap<String, byte[]> serializedConfigs = new LinkedHashMap<>();
+        serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
+        expectReadToEnd(serializedConfigs);
+
+        expectStop();
+
+        PowerMock.replayAll();
+
+        configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
+        configStorage.start();
+
+        // Bootstrap as if we had already added the connector, but no tasks had been added yet
+        whiteboxAddConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), Collections.EMPTY_LIST);
+
+        // Null before writing
+        ClusterConfigState configState = configStorage.snapshot();
+        assertEquals(-1, configState.offset());
+
+        // Writing task task configs should block until all the writes have been performed and the root record update
+        // has completed
+        List<Map<String, String>> taskConfigs = Collections.emptyList();
+        configStorage.putTaskConfigs("connector1", taskConfigs);
+
+        // Validate root config by listing all connectors and tasks
+        configState = configStorage.snapshot();
+        assertEquals(1, configState.offset());
+        String connectorName = CONNECTOR_IDS.get(0);
+        assertEquals(Arrays.asList(connectorName), new ArrayList<>(configState.connectors()));
+        assertEquals(Collections.emptyList(), configState.tasks(connectorName));
+        assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors());
+
+        configStorage.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
     public void testRestore() throws Exception {
         // Restoring data should notify only of the latest values after loading is complete. This also validates
         // that inconsistent state is ignored.
@@ -350,10 +400,63 @@ public class KafkaConfigBackingStoreTest {
     }
 
     @Test
+    public void testRestoreZeroTasks() throws Exception {
+        // Restoring data should notify only of the latest values after loading is complete. This also validates
+        // that inconsistent state is ignored.
+        expectConfigure();
+        // Overwrite each type at least once to ensure we see the latest data after loading
+        List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+            new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)),
+            new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
+            new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
+            new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3)),
+            new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)),
+            // Connector after root update should make it through, task update shouldn't
+            new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)),
+            new ConsumerRecord<>(TOPIC, 0, 6, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(6)),
+            new ConsumerRecord<>(TOPIC, 0, 7, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(7)));
+        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
+        deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(3), CONNECTOR_CONFIG_STRUCTS.get(1));
+        deserialized.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+        deserialized.put(CONFIGS_SERIALIZED.get(5), CONNECTOR_CONFIG_STRUCTS.get(2));
+        deserialized.put(CONFIGS_SERIALIZED.get(6), TASK_CONFIG_STRUCTS.get(1));
+        deserialized.put(CONFIGS_SERIALIZED.get(7), TASKS_COMMIT_STRUCT_ZERO_TASK_CONNECTOR);
+        logOffset = 8;
+        expectStart(existingRecords, deserialized);
+
+        // Shouldn't see any callbacks since this is during startup
+
+        expectStop();
+
+        PowerMock.replayAll();
+
+        configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
+        configStorage.start();
+
+        // Should see a single connector and its config should be the last one seen anywhere in the log
+        ClusterConfigState configState = configStorage.snapshot();
+        assertEquals(8, configState.offset()); // Should always be next to be read, even if uncommitted
+        assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors()));
+        // CONNECTOR_CONFIG_STRUCTS[2] -> SAMPLE_CONFIGS[2]
+        assertEquals(SAMPLE_CONFIGS.get(2), configState.connectorConfig(CONNECTOR_IDS.get(0)));
+        // Should see 0 tasks for that connector.
+        assertEquals(Collections.emptyList(), configState.tasks(CONNECTOR_IDS.get(0)));
+        // Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0]
+        assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors());
+
+        configStorage.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
     public void testPutTaskConfigsDoesNotResolveAllInconsistencies() throws Exception {
         // Test a case where a failure and compaction has left us in an inconsistent state when reading the log.
-        // We start out by loading an initial configuration where we started to write a task update and failed before
-        // writing an the commit, and then compaction cleaned up the earlier record.
+        // We start out by loading an initial configuration where we started to write a task update, and then
+        // compaction cleaned up the earlier record.
 
         expectConfigure();
         List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
@@ -371,9 +474,6 @@ public class KafkaConfigBackingStoreTest {
         logOffset = 6;
         expectStart(existingRecords, deserialized);
 
-        // One failed attempt to write new task configs
-        expectReadToEnd(new LinkedHashMap<String, byte[]>());
-
         // Successful attempt to write new task config
         expectReadToEnd(new LinkedHashMap<String, byte[]>());
         expectConvertWriteRead(
@@ -392,7 +492,6 @@ public class KafkaConfigBackingStoreTest {
         serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2));
         expectReadToEnd(serializedConfigs);
 
-
         expectStop();
 
         PowerMock.replayAll();
@@ -410,17 +509,9 @@ public class KafkaConfigBackingStoreTest {
         assertNull(configState.taskConfig(TASK_IDS.get(1)));
         assertEquals(Collections.singleton(CONNECTOR_IDS.get(0)), configState.inconsistentConnectors());
 
-        // First try sending an invalid set of configs (can't possibly represent a valid config set for the tasks)
-        try {
-            configStorage.putTaskConfigs("connector1", Collections.singletonMap(TASK_IDS.get(1), SAMPLE_CONFIGS.get(2)));
-            fail("Should have failed due to incomplete task set.");
-        } catch (KafkaException e) {
-            // expected
-        }
-
         // Next, issue a write that has everything that is needed and it should be accepted. Note that in this case
         // we are going to shrink the number of tasks to 1
-        configStorage.putTaskConfigs("connector1", Collections.singletonMap(TASK_IDS.get(0), SAMPLE_CONFIGS.get(0)));
+        configStorage.putTaskConfigs("connector1", Collections.singletonList(SAMPLE_CONFIGS.get(0)));
         // Validate updated config
         configState = configStorage.snapshot();
         // This is only two more ahead of the last one because multiple calls fail, and so their configs are not written