You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/10/13 19:23:44 UTC
[3/3] kafka git commit: KAFKA-2372: Add Kafka-backed storage of
Copycat configs.
KAFKA-2372: Add Kafka-backed storage of Copycat configs.
This also adds some other needed infrastructure for distributed Copycat, most
importantly the DistributedHerder, and refactors some code for handling
Kafka-backed logs into KafkaBasedLog since this is shared betweeen offset and
config storage.
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Reviewers: Gwen Shapira, James Cheng
Closes #241 from ewencp/kafka-2372-copycat-distributed-config
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/36d44693
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/36d44693
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/36d44693
Branch: refs/heads/trunk
Commit: 36d4469326fe20c3f0657315321e6ad515530a3e
Parents: e2ec02e
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Tue Oct 13 10:23:21 2015 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Tue Oct 13 10:23:21 2015 -0700
----------------------------------------------------------------------
.../org/apache/kafka/common/utils/Utils.java | 13 +
config/copycat-distributed.properties | 13 +-
config/copycat-standalone.properties | 12 +-
.../kafka/copycat/cli/CopycatDistributed.java | 8 +-
.../kafka/copycat/cli/CopycatStandalone.java | 2 +-
.../apache/kafka/copycat/cli/WorkerConfig.java | 20 +-
.../kafka/copycat/runtime/ConnectorConfig.java | 7 +-
.../apache/kafka/copycat/runtime/Herder.java | 19 +-
.../copycat/runtime/HerderConnectorContext.java | 42 ++
.../apache/kafka/copycat/runtime/Worker.java | 23 +-
.../runtime/distributed/ClusterConfigState.java | 122 +++++
.../runtime/distributed/DistributedHerder.java | 320 +++++++++++
.../standalone/StandaloneConnectorContext.java | 42 --
.../runtime/standalone/StandaloneHerder.java | 34 +-
.../copycat/storage/KafkaConfigStorage.java | 546 +++++++++++++++++++
.../storage/KafkaOffsetBackingStore.java | 258 ++-------
.../storage/OffsetStorageReaderImpl.java | 2 +-
.../kafka/copycat/storage/OffsetUtils.java | 3 +
.../kafka/copycat/util/KafkaBasedLog.java | 331 +++++++++++
.../copycat/runtime/WorkerSinkTaskTest.java | 8 +-
.../copycat/runtime/WorkerSourceTaskTest.java | 8 +-
.../kafka/copycat/runtime/WorkerTest.java | 8 +-
.../distributed/DistributedHerderTest.java | 289 ++++++++++
.../standalone/StandaloneHerderTest.java | 62 ++-
.../copycat/storage/KafkaConfigStorageTest.java | 508 +++++++++++++++++
.../storage/KafkaOffsetBackingStoreTest.java | 399 +++++---------
.../kafka/copycat/util/KafkaBasedLogTest.java | 463 ++++++++++++++++
.../apache/kafka/copycat/util/TestFuture.java | 80 ++-
tests/kafkatest/services/copycat.py | 3 +-
.../kafkatest/tests/copycat_distributed_test.py | 14 +-
.../templates/copycat-distributed.properties | 9 +-
.../templates/copycat-standalone.properties | 8 +-
32 files changed, 3043 insertions(+), 633 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index fa7c92f..aee379a 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -26,9 +26,11 @@ import java.nio.MappedByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.Properties;
@@ -420,6 +422,17 @@ public class Utils {
}
/**
+ * Converts a Properties object to a Map<String, String>, calling {@link #toString} to ensure all keys and values
+ * are Strings.
+ */
+ public static Map<String, String> propsToStringMap(Properties props) {
+ Map<String, String> result = new HashMap<>();
+ for (Map.Entry<Object, Object> entry : props.entrySet())
+ result.put(entry.getKey().toString(), entry.getValue().toString());
+ return result;
+ }
+
+ /**
* Get the stack trace from an exception as a string
*/
public static String stackTrace(Throwable e) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/config/copycat-distributed.properties
----------------------------------------------------------------------
diff --git a/config/copycat-distributed.properties b/config/copycat-distributed.properties
index 654ed24..b122413 100644
--- a/config/copycat-distributed.properties
+++ b/config/copycat-distributed.properties
@@ -27,13 +27,14 @@ value.converter=org.apache.kafka.copycat.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
-# The offset converter is configurable and must be specified, but most users will always want to use the built-in default.
-# Offset data is never visible outside of Copcyat.
-offset.key.converter=org.apache.kafka.copycat.json.JsonConverter
-offset.value.converter=org.apache.kafka.copycat.json.JsonConverter
-offset.key.converter.schemas.enable=false
-offset.value.converter.schemas.enable=false
+# The internal converter used for offsets and config data is configurable and must be specified, but most users will
+# always want to use the built-in default. Offset and config data is never visible outside of Copcyat in this format.
+internal.key.converter=org.apache.kafka.copycat.json.JsonConverter
+internal.value.converter=org.apache.kafka.copycat.json.JsonConverter
+internal.key.converter.schemas.enable=false
+internal.value.converter.schemas.enable=false
offset.storage.topic=copycat-offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
+config.storage.topic=copycat-configs
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/config/copycat-standalone.properties
----------------------------------------------------------------------
diff --git a/config/copycat-standalone.properties b/config/copycat-standalone.properties
index fd264b5..ebf689f 100644
--- a/config/copycat-standalone.properties
+++ b/config/copycat-standalone.properties
@@ -25,12 +25,12 @@ value.converter=org.apache.kafka.copycat.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
-# The offset converter is configurable and must be specified, but most users will always want to use the built-in default.
-# Offset data is never visible outside of Copcyat.
-offset.key.converter=org.apache.kafka.copycat.json.JsonConverter
-offset.value.converter=org.apache.kafka.copycat.json.JsonConverter
-offset.key.converter.schemas.enable=false
-offset.value.converter.schemas.enable=false
+# The internal converter used for offsets and config data is configurable and must be specified, but most users will
+# always want to use the built-in default. Offset and config data is never visible outside of Copcyat in this format.
+internal.key.converter=org.apache.kafka.copycat.json.JsonConverter
+internal.value.converter=org.apache.kafka.copycat.json.JsonConverter
+internal.key.converter.schemas.enable=false
+internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/copycat.offsets
# Flush much faster than normal, which is useful for testing/debugging
http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java
index b5e8896..b0230b2 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java
@@ -20,9 +20,8 @@ package org.apache.kafka.copycat.cli;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.copycat.runtime.Copycat;
-import org.apache.kafka.copycat.runtime.Herder;
import org.apache.kafka.copycat.runtime.Worker;
-import org.apache.kafka.copycat.runtime.standalone.StandaloneHerder;
+import org.apache.kafka.copycat.runtime.distributed.DistributedHerder;
import org.apache.kafka.copycat.storage.KafkaOffsetBackingStore;
import org.apache.kafka.copycat.util.Callback;
import org.apache.kafka.copycat.util.FutureCallback;
@@ -59,7 +58,8 @@ public class CopycatDistributed {
WorkerConfig workerConfig = new WorkerConfig(workerProps);
Worker worker = new Worker(workerConfig, new KafkaOffsetBackingStore());
- Herder herder = new StandaloneHerder(worker);
+ DistributedHerder herder = new DistributedHerder(worker);
+ herder.configure(workerConfig.originals());
final Copycat copycat = new Copycat(worker, herder);
copycat.start();
@@ -73,7 +73,7 @@ public class CopycatDistributed {
log.error("Failed to create job for {}", connectorPropsFile);
}
});
- herder.addConnector(connectorProps, cb);
+ herder.addConnector(Utils.propsToStringMap(connectorProps), cb);
cb.get();
}
} catch (Throwable t) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java
index 12ec154..65a15e4 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java
@@ -75,7 +75,7 @@ public class CopycatStandalone {
log.error("Failed to create job for {}", connectorPropsFile);
}
});
- herder.addConnector(connectorProps, cb);
+ herder.addConnector(Utils.propsToStringMap(connectorProps), cb);
cb.get();
}
} catch (Throwable t) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java
index a976d90..2a3f539 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java
@@ -57,13 +57,13 @@ public class WorkerConfig extends AbstractConfig {
public static final String VALUE_CONVERTER_CLASS_DOC =
"Converter class for value Copycat data that implements the <code>Converter</code> interface.";
- public static final String OFFSET_KEY_CONVERTER_CLASS_CONFIG = "offset.key.converter";
- public static final String OFFSET_KEY_CONVERTER_CLASS_DOC =
- "Converter class for offset key Copycat data that implements the <code>Converter</code> interface.";
+ public static final String INTERNAL_KEY_CONVERTER_CLASS_CONFIG = "internal.key.converter";
+ public static final String INTERNAL_KEY_CONVERTER_CLASS_DOC =
+ "Converter class for internal key Copycat data that implements the <code>Converter</code> interface. Used for converting data like offsets and configs.";
- public static final String OFFSET_VALUE_CONVERTER_CLASS_CONFIG = "offset.value.converter";
- public static final String OFFSET_VALUE_CONVERTER_CLASS_DOC =
- "Converter class for offset value Copycat data that implements the <code>Converter</code> interface.";
+ public static final String INTERNAL_VALUE_CONVERTER_CLASS_CONFIG = "internal.value.converter";
+ public static final String INTERNAL_VALUE_CONVERTER_CLASS_DOC =
+ "Converter class for offset value Copycat data that implements the <code>Converter</code> interface. Used for converting data like offsets and configs.";
public static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG
= "task.shutdown.graceful.timeout.ms";
@@ -95,10 +95,10 @@ public class WorkerConfig extends AbstractConfig {
Importance.HIGH, KEY_CONVERTER_CLASS_DOC)
.define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS,
Importance.HIGH, VALUE_CONVERTER_CLASS_DOC)
- .define(OFFSET_KEY_CONVERTER_CLASS_CONFIG, Type.CLASS,
- Importance.HIGH, OFFSET_KEY_CONVERTER_CLASS_DOC)
- .define(OFFSET_VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS,
- Importance.HIGH, OFFSET_VALUE_CONVERTER_CLASS_DOC)
+ .define(INTERNAL_KEY_CONVERTER_CLASS_CONFIG, Type.CLASS,
+ Importance.HIGH, INTERNAL_KEY_CONVERTER_CLASS_DOC)
+ .define(INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS,
+ Importance.HIGH, INTERNAL_VALUE_CONVERTER_CLASS_DOC)
.define(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, Type.LONG,
TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DEFAULT, Importance.LOW,
TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC)
http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java
index 336597e..767c88b 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java
@@ -22,7 +22,8 @@ import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
-import java.util.Properties;
+import java.util.HashMap;
+import java.util.Map;
/**
* <p>
@@ -64,10 +65,10 @@ public class ConnectorConfig extends AbstractConfig {
}
public ConnectorConfig() {
- this(new Properties());
+ this(new HashMap<String, String>());
}
- public ConnectorConfig(Properties props) {
+ public ConnectorConfig(Map<String, String> props) {
super(config, props);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Herder.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Herder.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Herder.java
index 7f8b7c2..31e68ef 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Herder.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Herder.java
@@ -19,7 +19,7 @@ package org.apache.kafka.copycat.runtime;
import org.apache.kafka.copycat.util.Callback;
-import java.util.Properties;
+import java.util.Map;
/**
* <p>
@@ -53,15 +53,24 @@ public interface Herder {
* the leader herder if necessary.
*
* @param connectorProps user-specified properties for this job
- * @param callback callback to invoke when the request completes
+ * @param callback callback to invoke when the request completes
*/
- void addConnector(Properties connectorProps, Callback<String> callback);
+ void addConnector(Map<String, String> connectorProps, Callback<String> callback);
/**
* Delete a connector job by name.
*
- * @param name name of the connector job to shutdown and delete
+ * @param name name of the connector job to shutdown and delete
* @param callback callback to invoke when the request completes
*/
void deleteConnector(String name, Callback<Void> callback);
-}
+
+ /**
+ * Requests reconfiguration of the task. This should only be triggered by
+ * {@link HerderConnectorContext}.
+ *
+ * @param connName name of the connector that should be reconfigured
+ */
+ void requestTaskReconfiguration(String connName);
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/HerderConnectorContext.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/HerderConnectorContext.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/HerderConnectorContext.java
new file mode 100644
index 0000000..7a64bd5
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/HerderConnectorContext.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime;
+
+import org.apache.kafka.copycat.connector.ConnectorContext;
+
+/**
+ * ConnectorContext for use with the StandaloneHerder, which maintains all connectors and tasks
+ * in a single process.
+ */
+public class HerderConnectorContext implements ConnectorContext {
+
+ private Herder herder;
+ private String connectorName;
+
+ public HerderConnectorContext(Herder herder, String connectorName) {
+ this.herder = herder;
+ this.connectorName = connectorName;
+ }
+
+ @Override
+ public void requestTaskReconfiguration() {
+ // This is trivial to forward since there is only one herder and it's in memory in this
+ // process
+ herder.requestTaskReconfiguration(connectorName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
index a34a014..0fdab4c 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
@@ -52,8 +52,8 @@ public class Worker {
private WorkerConfig config;
private Converter keyConverter;
private Converter valueConverter;
- private Converter offsetKeyConverter;
- private Converter offsetValueConverter;
+ private Converter internalKeyConverter;
+ private Converter internalValueConverter;
private OffsetBackingStore offsetBackingStore;
private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<>();
private KafkaProducer<byte[], byte[]> producer;
@@ -71,10 +71,10 @@ public class Worker {
this.keyConverter.configure(config.originalsWithPrefix("key.converter."), true);
this.valueConverter = config.getConfiguredInstance(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Converter.class);
this.valueConverter.configure(config.originalsWithPrefix("value.converter."), false);
- this.offsetKeyConverter = config.getConfiguredInstance(WorkerConfig.OFFSET_KEY_CONVERTER_CLASS_CONFIG, Converter.class);
- this.offsetKeyConverter.configure(config.originalsWithPrefix("offset.key.converter."), true);
- this.offsetValueConverter = config.getConfiguredInstance(WorkerConfig.OFFSET_VALUE_CONVERTER_CLASS_CONFIG, Converter.class);
- this.offsetValueConverter.configure(config.originalsWithPrefix("offset.value.converter."), false);
+ this.internalKeyConverter = config.getConfiguredInstance(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, Converter.class);
+ this.internalKeyConverter.configure(config.originalsWithPrefix("internal.key.converter."), true);
+ this.internalValueConverter = config.getConfiguredInstance(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, Converter.class);
+ this.internalValueConverter.configure(config.originalsWithPrefix("internal.value.converter."), false);
this.offsetBackingStore = offsetBackingStore;
this.offsetBackingStore.configure(config.originals());
@@ -157,9 +157,9 @@ public class Worker {
if (task instanceof SourceTask) {
SourceTask sourceTask = (SourceTask) task;
OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, id.connector(),
- offsetKeyConverter, offsetValueConverter);
+ internalKeyConverter, internalValueConverter);
OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.connector(),
- offsetKeyConverter, offsetValueConverter);
+ internalKeyConverter, internalValueConverter);
workerTask = new WorkerSourceTask(id, sourceTask, keyConverter, valueConverter, producer,
offsetReader, offsetWriter, config, time);
} else if (task instanceof SinkTask) {
@@ -201,4 +201,11 @@ public class Worker {
return task;
}
+ public Converter getInternalKeyConverter() {
+ return internalKeyConverter;
+ }
+
+ public Converter getInternalValueConverter() {
+ return internalValueConverter;
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/ClusterConfigState.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/ClusterConfigState.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/ClusterConfigState.java
new file mode 100644
index 0000000..719dd09
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/ClusterConfigState.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime.distributed;
+
+import org.apache.kafka.copycat.util.ConnectorTaskId;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * An immutable snapshot of the configuration state of connectors and tasks in a Copycat cluster.
+ */
+public class ClusterConfigState {
+ private final long offset;
+ private final Map<String, Integer> connectorTaskCounts;
+ private final Map<String, Map<String, String>> connectorConfigs;
+ private final Map<ConnectorTaskId, Map<String, String>> taskConfigs;
+ private final Set<String> inconsistentConnectors;
+
+ public ClusterConfigState(long offset,
+ Map<String, Integer> connectorTaskCounts,
+ Map<String, Map<String, String>> connectorConfigs,
+ Map<ConnectorTaskId, Map<String, String>> taskConfigs,
+ Set<String> inconsistentConnectors) {
+ this.offset = offset;
+ this.connectorTaskCounts = connectorTaskCounts;
+ this.connectorConfigs = connectorConfigs;
+ this.taskConfigs = taskConfigs;
+ this.inconsistentConnectors = inconsistentConnectors;
+ }
+
+ /**
+ * Get the last offset read to generate this config state. This offset is not guaranteed to be perfectly consistent
+ * with the recorded state because some partial updates to task configs may have been read.
+ * @return the latest config offset
+ */
+ public long offset() {
+ return offset;
+ }
+
+ /**
+ * Get a list of the connectors in this configuration
+ */
+ public Collection<String> connectors() {
+ return connectorTaskCounts.keySet();
+ }
+
+ /**
+ * Get the configuration for a connector.
+ * @param connector name of the connector
+ * @return a map containing configuration parameters
+ */
+ public Map<String, String> connectorConfig(String connector) {
+ return connectorConfigs.get(connector);
+ }
+
+ /**
+ * Get the configuration for a task.
+ * @param task id of the task
+ * @return a map containing configuration parameters
+ */
+ public Map<String, String> taskConfig(ConnectorTaskId task) {
+ return taskConfigs.get(task);
+ }
+
+ /**
+ * Get the current set of task IDs for the specified connector.
+ * @param connectorName the name of the connector to look up task configs for
+ * @return the current set of connector task IDs
+ */
+ public Collection<ConnectorTaskId> tasks(String connectorName) {
+ if (inconsistentConnectors.contains(connectorName))
+ return Collections.EMPTY_LIST;
+
+ Integer numTasks = connectorTaskCounts.get(connectorName);
+ if (numTasks == null)
+ throw new IllegalArgumentException("Connector does not exist in current configuration.");
+
+ List<ConnectorTaskId> taskIds = new ArrayList<>();
+ for (int taskIndex = 0; taskIndex < numTasks; taskIndex++) {
+ ConnectorTaskId taskId = new ConnectorTaskId(connectorName, taskIndex);
+ taskIds.add(taskId);
+ }
+ return taskIds;
+ }
+
+ /**
+ * Get the set of connectors which have inconsistent data in this snapshot. These inconsistencies can occur due to
+ * partially completed writes combined with log compaction.
+ *
+ * Connectors in this set will appear in the output of {@link #connectors()} since their connector configuration is
+ * available, but not in the output of {@link #taskConfig(ConnectorTaskId)} since the task configs are incomplete.
+ *
+ * When a worker detects a connector in this state, it should request that the connector regenerate its task
+ * configurations.
+ *
+ * @return the set of inconsistent connectors
+ */
+ public Set<String> inconsistentConnectors() {
+ return inconsistentConnectors;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java
new file mode 100644
index 0000000..5273658
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java
@@ -0,0 +1,320 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime.distributed;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.copycat.connector.Connector;
+import org.apache.kafka.copycat.errors.CopycatException;
+import org.apache.kafka.copycat.runtime.ConnectorConfig;
+import org.apache.kafka.copycat.runtime.Herder;
+import org.apache.kafka.copycat.runtime.HerderConnectorContext;
+import org.apache.kafka.copycat.runtime.Worker;
+import org.apache.kafka.copycat.sink.SinkConnector;
+import org.apache.kafka.copycat.sink.SinkTask;
+import org.apache.kafka.copycat.storage.KafkaConfigStorage;
+import org.apache.kafka.copycat.util.Callback;
+import org.apache.kafka.copycat.util.ConnectorTaskId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+/**
+ * Distributed "herder" that coordinates with other workers to spread work across multiple processes.
+ */
+public class DistributedHerder implements Herder {
+ private static final Logger log = LoggerFactory.getLogger(DistributedHerder.class);
+
+ private Worker worker;
+ private KafkaConfigStorage configStorage;
+ private ClusterConfigState configState;
+ private HashMap<String, ConnectorState> connectors = new HashMap<>();
+
+ public DistributedHerder(Worker worker) {
+ this.worker = worker;
+ this.configStorage = new KafkaConfigStorage(worker.getInternalValueConverter(),
+ new ConnectorConfigCallback(), new TaskConfigCallback());
+ }
+
+ // Public for testing (mock KafkaConfigStorage)
+ public DistributedHerder(Worker worker, KafkaConfigStorage configStorage) {
+ this.worker = worker;
+ this.configStorage = configStorage;
+ }
+
+ public synchronized void configure(Map<String, ?> configs) {
+ configStorage.configure(configs);
+ }
+
+ public synchronized void start() {
+ log.info("Herder starting");
+
+ configStorage.start();
+
+ log.info("Restoring connectors from stored configs");
+ restoreConnectors();
+
+ log.info("Herder started");
+ }
+
+ public synchronized void stop() {
+ log.info("Herder stopping");
+
+ // There's no coordination/hand-off to do here since this is all standalone. Instead, we
+ // should just clean up the stuff we normally would, i.e. cleanly checkpoint and shutdown all
+ // the tasks.
+ for (Map.Entry<String, ConnectorState> entry : connectors.entrySet()) {
+ ConnectorState state = entry.getValue();
+ stopConnector(state);
+ }
+ connectors.clear();
+
+ if (configStorage != null) {
+ configStorage.stop();
+ configStorage = null;
+ }
+
+ log.info("Herder stopped");
+ }
+
+ @Override
+ public synchronized void addConnector(Map<String, String> connectorProps,
+ Callback<String> callback) {
+ try {
+ // Ensure the config is written to storage first
+ ConnectorConfig connConfig = new ConnectorConfig(connectorProps);
+ String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
+ configStorage.putConnectorConfig(connName, connectorProps);
+
+ ConnectorState connState = createConnector(connConfig);
+ if (callback != null)
+ callback.onCompletion(null, connState.name);
+ // This should always be a new job, create jobs from scratch
+ createConnectorTasks(connState);
+ } catch (CopycatException e) {
+ if (callback != null)
+ callback.onCompletion(e, null);
+ }
+ }
+
+ @Override
+ public synchronized void deleteConnector(String name, Callback<Void> callback) {
+ try {
+ destroyConnector(name);
+ if (callback != null)
+ callback.onCompletion(null, null);
+ } catch (CopycatException e) {
+ if (callback != null)
+ callback.onCompletion(e, null);
+ }
+ }
+
+ @Override
+ public synchronized void requestTaskReconfiguration(String connName) {
+ ConnectorState state = connectors.get(connName);
+ if (state == null) {
+ log.error("Task that requested reconfiguration does not exist: {}", connName);
+ return;
+ }
+ updateConnectorTasks(state);
+ }
+
+ // Creates and configures the connector. Does not setup any tasks
+ private ConnectorState createConnector(ConnectorConfig connConfig) {
+ String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
+ String className = connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+ log.info("Creating connector {} of type {}", connName, className);
+ int maxTasks = connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG);
+ List<String> topics = connConfig.getList(ConnectorConfig.TOPICS_CONFIG); // Sinks only
+ Properties configs = connConfig.unusedProperties();
+
+ if (connectors.containsKey(connName)) {
+ log.error("Ignoring request to create connector due to conflicting connector name");
+ throw new CopycatException("Connector with name " + connName + " already exists");
+ }
+
+ final Connector connector;
+ try {
+ connector = instantiateConnector(className);
+ } catch (Throwable t) {
+ // Catches normal exceptions due to instantiation errors as well as any runtime errors that
+ // may be caused by user code
+ throw new CopycatException("Failed to create connector instance", t);
+ }
+ connector.initialize(new HerderConnectorContext(this, connName));
+ try {
+ connector.start(configs);
+ } catch (CopycatException e) {
+ throw new CopycatException("Connector threw an exception while starting", e);
+ }
+ ConnectorState state = new ConnectorState(connName, connector, maxTasks, topics);
+ connectors.put(connName, state);
+
+ log.info("Finished creating connector {}", connName);
+
+ return state;
+ }
+
+ private static Connector instantiateConnector(String className) {
+ try {
+ return Utils.newInstance(className, Connector.class);
+ } catch (ClassNotFoundException e) {
+ throw new CopycatException("Couldn't instantiate connector class", e);
+ }
+ }
+
+ private void destroyConnector(String connName) {
+ log.info("Destroying connector {}", connName);
+ ConnectorState state = connectors.get(connName);
+ if (state == null) {
+ log.error("Failed to destroy connector {} because it does not exist", connName);
+ throw new CopycatException("Connector does not exist");
+ }
+
+ stopConnector(state);
+ configStorage.putConnectorConfig(state.name, null);
+ connectors.remove(state.name);
+
+ log.info("Finished destroying connector {}", connName);
+ }
+
+ // Stops a connectors tasks, then the connector
+ private void stopConnector(ConnectorState state) {
+ removeConnectorTasks(state);
+ try {
+ state.connector.stop();
+ } catch (CopycatException e) {
+ log.error("Error shutting down connector {}: ", state.connector, e);
+ }
+ }
+
+ private void createConnectorTasks(ConnectorState state) {
+ String taskClassName = state.connector.taskClass().getName();
+
+ log.info("Creating tasks for connector {} of type {}", state.name, taskClassName);
+
+ List<Properties> taskConfigs = state.connector.taskConfigs(state.maxTasks);
+
+ // Generate the final configs, including framework provided settings
+ Map<ConnectorTaskId, Properties> taskProps = new HashMap<>();
+ for (int i = 0; i < taskConfigs.size(); i++) {
+ ConnectorTaskId taskId = new ConnectorTaskId(state.name, i);
+ Properties config = taskConfigs.get(i);
+ // TODO: This probably shouldn't be in the Herder. It's nice to have Copycat ensure the list of topics
+ // is automatically provided to tasks since it is required by the framework, but this
+ String subscriptionTopics = Utils.join(state.inputTopics, ",");
+ if (state.connector instanceof SinkConnector) {
+ // Make sure we don't modify the original since the connector may reuse it internally
+ Properties configForSink = new Properties();
+ configForSink.putAll(config);
+ configForSink.setProperty(SinkTask.TOPICS_CONFIG, subscriptionTopics);
+ config = configForSink;
+ }
+ taskProps.put(taskId, config);
+ }
+
+ // And initiate the tasks
+ for (int i = 0; i < taskConfigs.size(); i++) {
+ ConnectorTaskId taskId = new ConnectorTaskId(state.name, i);
+ Properties config = taskProps.get(taskId);
+ try {
+ worker.addTask(taskId, taskClassName, config);
+ // We only need to store the task IDs so we can clean up.
+ state.tasks.add(taskId);
+ } catch (Throwable e) {
+ log.error("Failed to add task {}: ", taskId, e);
+ // Swallow this so we can continue updating the rest of the tasks
+ // FIXME what's the proper response? Kill all the tasks? Consider this the same as a task
+ // that died after starting successfully.
+ }
+ }
+ }
+
+ private void removeConnectorTasks(ConnectorState state) {
+ Iterator<ConnectorTaskId> taskIter = state.tasks.iterator();
+ while (taskIter.hasNext()) {
+ ConnectorTaskId taskId = taskIter.next();
+ try {
+ worker.stopTask(taskId);
+ taskIter.remove();
+ } catch (CopycatException e) {
+ log.error("Failed to stop task {}: ", taskId, e);
+ // Swallow this so we can continue stopping the rest of the tasks
+ // FIXME: Forcibly kill the task?
+ }
+ }
+ }
+
+ private void updateConnectorTasks(ConnectorState state) {
+ removeConnectorTasks(state);
+ createConnectorTasks(state);
+ }
+
+ private void restoreConnectors() {
+ configState = configStorage.snapshot();
+ Collection<String> connNames = configState.connectors();
+ for (String connName : connNames) {
+ log.info("Restoring connector {}", connName);
+ Map<String, String> connProps = configState.connectorConfig(connName);
+ ConnectorConfig connConfig = new ConnectorConfig(connProps);
+ ConnectorState connState = createConnector(connConfig);
+ // Because this coordinator is standalone, connectors are only restored when this process
+ // starts and we know there can't be any existing tasks. So in this special case we're able
+ // to just create the tasks rather than having to check for existing tasks and sort out
+ // whether they need to be reconfigured.
+ createConnectorTasks(connState);
+ }
+ }
+
+
+
+ private static class ConnectorState {
+ public String name;
+ public Connector connector;
+ public int maxTasks;
+ public List<String> inputTopics;
+ Set<ConnectorTaskId> tasks;
+
+ public ConnectorState(String name, Connector connector, int maxTasks,
+ List<String> inputTopics) {
+ this.name = name;
+ this.connector = connector;
+ this.maxTasks = maxTasks;
+ this.inputTopics = inputTopics;
+ this.tasks = new HashSet<>();
+ }
+ }
+
+ private class ConnectorConfigCallback implements Callback<String> {
+ @Override
+ public void onCompletion(Throwable error, String result) {
+ configState = configStorage.snapshot();
+ // FIXME
+ }
+ }
+
+ private class TaskConfigCallback implements Callback<List<ConnectorTaskId>> {
+ @Override
+ public void onCompletion(Throwable error, List<ConnectorTaskId> result) {
+ configState = configStorage.snapshot();
+ // FIXME
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConnectorContext.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConnectorContext.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConnectorContext.java
deleted file mode 100644
index 0e14015..0000000
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConnectorContext.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.runtime.standalone;
-
-import org.apache.kafka.copycat.connector.ConnectorContext;
-
-/**
- * ConnectorContext for use with the StandaloneHerder, which maintains all connectors and tasks
- * in a single process.
- */
-class StandaloneConnectorContext implements ConnectorContext {
-
- private StandaloneHerder herder;
- private String connectorName;
-
- public StandaloneConnectorContext(StandaloneHerder herder, String connectorName) {
- this.herder = herder;
- this.connectorName = connectorName;
- }
-
- @Override
- public void requestTaskReconfiguration() {
- // This is trivial to forward since there is only one herder and it's in memory in this
- // process
- herder.requestTaskReconfiguration(connectorName);
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java
index 45d428d..d5670fd 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java
@@ -22,6 +22,7 @@ import org.apache.kafka.copycat.connector.Connector;
import org.apache.kafka.copycat.errors.CopycatException;
import org.apache.kafka.copycat.runtime.ConnectorConfig;
import org.apache.kafka.copycat.runtime.Herder;
+import org.apache.kafka.copycat.runtime.HerderConnectorContext;
import org.apache.kafka.copycat.runtime.Worker;
import org.apache.kafka.copycat.sink.SinkConnector;
import org.apache.kafka.copycat.sink.SinkTask;
@@ -65,7 +66,7 @@ public class StandaloneHerder implements Herder {
}
@Override
- public synchronized void addConnector(Properties connectorProps,
+ public synchronized void addConnector(Map<String, String> connectorProps,
Callback<String> callback) {
try {
ConnectorState connState = createConnector(connectorProps);
@@ -91,8 +92,18 @@ public class StandaloneHerder implements Herder {
}
}
- // Creates the and configures the connector. Does not setup any tasks
- private ConnectorState createConnector(Properties connectorProps) {
+ @Override
+ public synchronized void requestTaskReconfiguration(String connName) {
+ ConnectorState state = connectors.get(connName);
+ if (state == null) {
+ log.error("Task that requested reconfiguration does not exist: {}", connName);
+ return;
+ }
+ updateConnectorTasks(state);
+ }
+
+ // Creates and configures the connector. Does not setup any tasks
+ private ConnectorState createConnector(Map<String, String> connectorProps) {
ConnectorConfig connConfig = new ConnectorConfig(connectorProps);
String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
String className = connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
@@ -114,7 +125,7 @@ public class StandaloneHerder implements Herder {
// may be caused by user code
throw new CopycatException("Failed to create connector instance", t);
}
- connector.initialize(new StandaloneConnectorContext(this, connName));
+ connector.initialize(new HerderConnectorContext(this, connName));
try {
connector.start(configs);
} catch (CopycatException e) {
@@ -222,21 +233,6 @@ public class StandaloneHerder implements Herder {
createConnectorTasks(state);
}
- /**
- * Requests reconfiguration of the task. This should only be triggered by
- * {@link StandaloneConnectorContext}.
- *
- * @param connName name of the connector that should be reconfigured
- */
- public synchronized void requestTaskReconfiguration(String connName) {
- ConnectorState state = connectors.get(connName);
- if (state == null) {
- log.error("Task that requested reconfiguration does not exist: {}", connName);
- return;
- }
- updateConnectorTasks(state);
- }
-
private static class ConnectorState {
public String name;
http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaConfigStorage.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaConfigStorage.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaConfigStorage.java
new file mode 100644
index 0000000..366bf13
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaConfigStorage.java
@@ -0,0 +1,546 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.copycat.data.Schema;
+import org.apache.kafka.copycat.data.SchemaAndValue;
+import org.apache.kafka.copycat.data.SchemaBuilder;
+import org.apache.kafka.copycat.data.Struct;
+import org.apache.kafka.copycat.errors.CopycatException;
+import org.apache.kafka.copycat.errors.DataException;
+import org.apache.kafka.copycat.runtime.distributed.ClusterConfigState;
+import org.apache.kafka.copycat.util.Callback;
+import org.apache.kafka.copycat.util.ConnectorTaskId;
+import org.apache.kafka.copycat.util.KafkaBasedLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * <p>
+ * Provides persistent storage of Copycat connector configurations in a Kafka topic.
+ * </p>
+ * <p>
+ * This class manages both connector and task configurations. It tracks three types of configuration entries:
+ * <p/>
+ * 1. Connector config: map of string -> string configurations passed to the Connector class, with support for
+ * expanding this format if necessary. (Kafka key: connector-[connector-id]).
+ * These configs are *not* ephemeral. They represent the source of truth. If the entire Copycat
+ * cluster goes down, this is all that is really needed to recover.
+ * 2. Task configs: map of string -> string configurations passed to the Task class, with support for expanding
+ * this format if necessary. (Kafka key: task-[connector-id]-[task-id]).
+ * These configs are ephemeral; they are stored here to a) disseminate them to all workers while
+ * ensuring agreement and b) to allow faster cluster/worker recovery since the common case
+ * of recovery (restoring a connector) will simply result in the same configuration as before
+ * the failure.
+ * 3. Task commit "configs": records indicating that previous task config entries should be committed and all task
+ * configs for a connector can be applied. (Kafka key: commit-[connector-id].
+ * This config has two effects. First, it records the number of tasks the connector is currently
+ * running (and can therefore increase/decrease parallelism). Second, because each task config
+ * is stored separately but they need to be applied together to ensure each partition is assigned
+ * to a single task, this record also indicates that task configs for the specified connector
+ * can be "applied" or "committed".
+ * </p>
+ * <p>
+ * This configuration is expected to be stored in a *single partition* and *compacted* topic. Using a single partition
+ * ensures we can enforce ordering on messages, allowing Kafka to be used as a write ahead log. Compaction allows
+ * us to clean up outdated configurations over time. However, this combination has some important implications for
+ * the implementation of this class and the configuration state that it may expose.
+ * </p>
+ * <p>
+ * Connector configurations are independent of all other configs, so they are handled easily. Writing a single record
+ * is already atomic, so these can be applied as soon as they are read. One connectors config does not affect any
+ * others, and they do not need to coordinate with the connector's task configuration at all.
+ * </p>
+ * <p>
+ * The most obvious implication for task configs is the need for the commit messages. Because Kafka does not
+ * currently have multi-record transactions or support atomic batch record writes, task commit messages are required
+ * to ensure that readers do not end up using inconsistent configs. For example, consider if a connector wrote configs
+ * for its tasks, then was reconfigured and only managed to write updated configs for half its tasks. If task configs
+ * were applied immediately you could be using half the old configs and half the new configs. In that condition, some
+ * partitions may be double-assigned because the old config and new config may use completely different assignments.
+ * Therefore, when reading the log, we must buffer config updates for a connector's tasks and only apply atomically them
+ * once a commit message has been read.
+ * </p>
+ * <p>
+ * However, there are also further challenges. This simple buffering approach would work fine as long as the entire log was
+ * always available, but we would like to be able to enable compaction so our configuration topic does not grow
+ * indefinitely. Compaction may break a normal log because old entries will suddenly go missing. A new worker reading
+ * from the beginning of the log in order to build up the full current configuration will see task commits, but some
+ * records required for those commits will have been removed because the same keys have subsequently been rewritten.
+ * For example, if you have a sequence of record keys [connector-foo-config, task-foo-1-config, task-foo-2-config,
+ * commit-foo (2 tasks), task-foo-1-config, commit-foo (1 task)], we can end up with a compacted log containing
+ * [connector-foo-config, task-foo-2-config, commit-foo (2 tasks), task-foo-1-config, commit-foo (1 task)]. When read
+ * back, the first commit will see an invalid state because the first task-foo-1-config has been cleaned up.
+ * </p>
+ * <p>
+ * Compaction can further complicate things if writing new task configs fails mid-write. Consider a similar scenario
+ * as the previous one, but in this case both the first and second update will write 2 task configs. However, the
+ * second write fails half of the way through:
+ * [connector-foo-config, task-foo-1-config, task-foo-2-config, commit-foo (2 tasks), task-foo-1-config]. Now compaction
+ * occurs and we're left with
+ * [connector-foo-config, task-foo-2-config, commit-foo (2 tasks), task-foo-1-config]. At the first commit, we don't
+ * have a complete set of configs. And because of the failure, there is no second commit. We are left in an inconsistent
+ * state with no obvious way to resolve the issue -- we can try to keep on reading, but the failed node may never
+ * recover and write the updated config. Meanwhile, other workers may have seen the entire log; they will see the second
+ * task-foo-1-config waiting to be applied, but will otherwise think everything is ok -- they have a valid set of task
+ * configs for connector "foo".
+ * </p>
+ * <p>
+ * Because we can encounter these inconsistencies and addressing them requires support from the rest of the system
+ * (resolving the task configuration inconsistencies requires support from the connector instance to regenerate updated
+ * configs), this class exposes not only the current set of configs, but also which connectors have inconsistent data.
+ * This allows users of this class (i.e., Herder implementations) to take action to resolve any inconsistencies. These
+ * inconsistencies should be rare (as described above, due to compaction combined with leader failures in the middle
+ * of updating task configurations).
+ * </p>
+ * <p>
+ * Note that the expectation is that this config storage system has only a single writer at a time.
+ * The caller (Herder) must ensure this is the case. In distributed mode this will require forwarding config change
+ * requests to the leader in the cluster (i.e. the worker group coordinated by the Kafka broker).
+ * </p>
+ * <p>
+ * Since processing of the config log occurs in a background thread, callers must take care when using accessors.
+ * To simplify handling this correctly, this class only exposes a mechanism to snapshot the current state of the cluster.
+ * Updates may continue to be applied (and callbacks invoked) in the background. Callers must take care that they are
+ * using a consistent snapshot and only update when it is safe. In particular, if task configs are updated which require
+ * synchronization across workers to commit offsets and update the configuration, callbacks and updates during the
+ * rebalance must be deferred.
+ * </p>
+ */
+public class KafkaConfigStorage {
+ private static final Logger log = LoggerFactory.getLogger(KafkaConfigStorage.class);
+
+ public static final String CONFIG_TOPIC_CONFIG = "config.storage.topic";
+
+ public static final String CONNECTOR_PREFIX = "connector-";
+
+ public static String CONNECTOR_KEY(String connectorName) {
+ return CONNECTOR_PREFIX + connectorName;
+ }
+
+ public static final String TASK_PREFIX = "task-";
+
+ public static String TASK_KEY(ConnectorTaskId taskId) {
+ return TASK_PREFIX + taskId.connector() + "-" + taskId.task();
+ }
+
+ public static final String COMMIT_TASKS_PREFIX = "commit-";
+
+ public static String COMMIT_TASKS_KEY(String connectorName) {
+ return COMMIT_TASKS_PREFIX + connectorName;
+ }
+
+ // Note that while using real serialization for values as we have here, but ad hoc string serialization for keys,
+ // isn't ideal, we use this approach because it avoids any potential problems with schema evolution or
+ // converter/serializer changes causing keys to change. We need to absolutely ensure that the keys remain precisely
+ // the same.
+ public static final Schema CONNECTOR_CONFIGURATION_V0 = SchemaBuilder.struct()
+ .field("properties", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA))
+ .build();
+ public static final Schema TASK_CONFIGURATION_V0 = CONNECTOR_CONFIGURATION_V0;
+ public static final Schema CONNECTOR_TASKS_COMMIT_V0 = SchemaBuilder.struct()
+ .field("tasks", Schema.INT32_SCHEMA)
+ .build();
+
+ private static final long READ_TO_END_TIMEOUT_MS = 30000;
+
+ private final Object lock;
+ private boolean starting;
+ private final Converter converter;
+ private final Callback<String> connectorConfigCallback;
+ private final Callback<List<ConnectorTaskId>> tasksConfigCallback;
+ private String topic;
+ // Data is passed to the log already serialized. We use a converter to handle translating to/from generic Copycat
+ // format to serialized form
+ private KafkaBasedLog<String, byte[]> configLog;
+ // Connector -> # of tasks
+ private Map<String, Integer> connectorTaskCounts = new HashMap<>();
+ // Connector and task configs: name or id -> config map
+ private Map<String, Map<String, String>> connectorConfigs = new HashMap<>();
+ private Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap<>();
+ // Set of connectors where we saw a task commit with an incomplete set of task config updates, indicating the data
+ // is in an inconsistent state and we cannot safely use them until they have been refreshed.
+ private Set<String> inconsistent = new HashSet<>();
+ // The most recently read offset. This does not take into account deferred task updates/commits, so we may have
+ // outstanding data to be applied.
+ private long offset;
+
+ // Connector -> Map[ConnectorTaskId -> Configs]
+ private Map<String, Map<ConnectorTaskId, Map<String, String>>> deferredTaskUpdates = new HashMap<>();
+
+
+ public KafkaConfigStorage(Converter converter, Callback<String> connectorConfigCallback, Callback<List<ConnectorTaskId>> tasksConfigCallback) {
+ this.lock = new Object();
+ this.starting = false;
+ this.converter = converter;
+ this.connectorConfigCallback = connectorConfigCallback;
+ this.tasksConfigCallback = tasksConfigCallback;
+
+ offset = -1;
+ }
+
+ public void configure(Map<String, ?> configs) {
+ if (configs.get(CONFIG_TOPIC_CONFIG) == null)
+ throw new CopycatException("Must specify topic for Copycat connector configuration.");
+ topic = (String) configs.get(CONFIG_TOPIC_CONFIG);
+
+ Map<String, Object> producerProps = new HashMap<>();
+ producerProps.putAll(configs);
+ producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+ producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+ producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
+
+ Map<String, Object> consumerProps = new HashMap<>();
+ consumerProps.putAll(configs);
+ consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+ consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+
+ configLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback);
+ }
+
+ public void start() {
+ log.info("Starting KafkaConfigStorage");
+ // During startup, callbacks are *not* invoked. You can grab a snapshot after starting -- just take care that
+ // updates can continue to occur in the background
+ starting = true;
+ configLog.start();
+ starting = false;
+ log.info("Started KafkaConfigStorage");
+ }
+
+ public void stop() {
+ log.info("Closing KafkaConfigStorage");
+ configLog.stop();
+ log.info("Closed KafkaConfigStorage");
+ }
+
+ /**
+ * Get a snapshot of the current state of the cluster.
+ */
+ public ClusterConfigState snapshot() {
+ synchronized (lock) {
+ // Doing a shallow copy of the data is safe here because the complex nested data that is copied should all be
+ // immutable configs
+ return new ClusterConfigState(
+ offset,
+ new HashMap<>(connectorTaskCounts),
+ new HashMap<>(connectorConfigs),
+ new HashMap<>(taskConfigs),
+ new HashSet<>(inconsistent)
+ );
+ }
+ }
+
+ /**
+ * Write this connector configuration to persistent storage and wait until it has been acknowledge and read back by
+ * tailing the Kafka log with a consumer.
+ *
+ * @param connector name of the connector to write data for
+ * @param properties the configuration to write
+ */
+ public void putConnectorConfig(String connector, Map<String, String> properties) {
+ Struct copycatConfig = new Struct(CONNECTOR_CONFIGURATION_V0);
+ copycatConfig.put("properties", properties);
+ byte[] serializedConfig = converter.fromCopycatData(topic, CONNECTOR_CONFIGURATION_V0, copycatConfig);
+
+ try {
+ configLog.send(CONNECTOR_KEY(connector), serializedConfig);
+ configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ log.error("Failed to write connector configuration to Kafka: ", e);
+ throw new CopycatException("Error writing connector configuration to Kafka", e);
+ }
+ }
+
+ /**
+ * Write these task configurations and associated commit messages, unless an inconsistency is found that indicates
+ * that we would be leaving one of the referenced connectors with an inconsistent state.
+ *
+ * @param configs map containing task configurations
+ * @throws CopycatException if the task configurations do not resolve inconsistencies found in the existing root
+ * and task configurations.
+ */
+ public void putTaskConfigs(Map<ConnectorTaskId, Map<String, String>> configs) {
+ // Make sure we're at the end of the log. We should be the only writer, but we want to make sure we don't have
+ // any outstanding lagging data to consume.
+ try {
+ configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ log.error("Failed to write root configuration to Kafka: ", e);
+ throw new CopycatException("Error writing root configuration to Kafka", e);
+ }
+
+ // In theory, there is only a single writer and we shouldn't need this lock since the background thread should
+ // not invoke any callbacks that would conflict, but in practice this guards against inconsistencies due to
+ // the root config being updated.
+ Map<String, Integer> newTaskCounts = new HashMap<>();
+ synchronized (lock) {
+ // Validate tasks in this assignment. Any task configuration updates should include updates for *all* tasks
+ // in the connector -- we should have all task IDs 0 - N-1 within a connector if any task is included here
+ Map<String, Set<Integer>> updatedConfigIdsByConnector = taskIdsByConnector(configs);
+ for (Map.Entry<String, Set<Integer>> taskConfigSetEntry : updatedConfigIdsByConnector.entrySet()) {
+ if (!completeTaskIdSet(taskConfigSetEntry.getValue(), taskConfigSetEntry.getValue().size())) {
+ log.error("Submitted task configuration contain invalid range of task IDs, ignoring this submission");
+ throw new CopycatException("Error writing task configurations: found some connectors with invalid connectors");
+ }
+ newTaskCounts.put(taskConfigSetEntry.getKey(), taskConfigSetEntry.getValue().size());
+ }
+ }
+
+ // Start sending all the individual updates
+ for (Map.Entry<ConnectorTaskId, Map<String, String>> taskConfigEntry : configs.entrySet()) {
+ Struct copycatConfig = new Struct(TASK_CONFIGURATION_V0);
+ copycatConfig.put("properties", taskConfigEntry.getValue());
+ byte[] serializedConfig = converter.fromCopycatData(topic, TASK_CONFIGURATION_V0, copycatConfig);
+ configLog.send(TASK_KEY(taskConfigEntry.getKey()), serializedConfig);
+ }
+
+ // Finally, send the commit to update the number of tasks and apply the new configs, then wait until we read to
+ // the end of the log
+ try {
+ // Read to end to ensure all the task configs have been written
+ configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+
+ // Write all the commit messages
+ for (Map.Entry<String, Integer> taskCountEntry : newTaskCounts.entrySet()) {
+ Struct copycatConfig = new Struct(CONNECTOR_TASKS_COMMIT_V0);
+ copycatConfig.put("tasks", taskCountEntry.getValue());
+ byte[] serializedConfig = converter.fromCopycatData(topic, CONNECTOR_TASKS_COMMIT_V0, copycatConfig);
+ configLog.send(COMMIT_TASKS_KEY(taskCountEntry.getKey()), serializedConfig);
+ }
+
+ // Read to end to ensure all the commit messages have been written
+ configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ log.error("Failed to write root configuration to Kafka: ", e);
+ throw new CopycatException("Error writing root configuration to Kafka", e);
+ }
+ }
+
+ private KafkaBasedLog<String, byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps,
+ Map<String, Object> consumerProps, Callback<ConsumerRecord<String, byte[]>> consumedCallback) {
+ return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, new SystemTime());
+ }
+
+ private final Callback<ConsumerRecord<String, byte[]>> consumedCallback = new Callback<ConsumerRecord<String, byte[]>>() {
+ @Override
+ public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record) {
+ if (error != null) {
+ log.error("Unexpected in consumer callback for KafkaConfigStorage: ", error);
+ return;
+ }
+
+ final SchemaAndValue value;
+ try {
+ value = converter.toCopycatData(topic, record.value());
+ } catch (DataException e) {
+ log.error("Failed to convert config data to Copycat format: ", e);
+ return;
+ }
+ offset = record.offset();
+
+ if (record.key().startsWith(CONNECTOR_PREFIX)) {
+ String connectorName = record.key().substring(CONNECTOR_PREFIX.length());
+ synchronized (lock) {
+ // Connector configs can be applied and callbacks invoked immediately
+ if (!(value.value() instanceof Map)) {
+ log.error("Found connector configuration (" + record.key() + ") in wrong format: " + value.value().getClass());
+ return;
+ }
+ Object newConnectorConfig = ((Map<String, Object>) value.value()).get("properties");
+ if (!(newConnectorConfig instanceof Map)) {
+ log.error("Invalid data for connector config: properties filed should be a Map but is " + newConnectorConfig.getClass());
+ return;
+ }
+ connectorConfigs.put(connectorName, (Map<String, String>) newConnectorConfig);
+ }
+ if (!starting)
+ connectorConfigCallback.onCompletion(null, connectorName);
+ } else if (record.key().startsWith(TASK_PREFIX)) {
+ synchronized (lock) {
+ ConnectorTaskId taskId = parseTaskId(record.key());
+ if (taskId == null) {
+ log.error("Ignoring task configuration because " + record.key() + " couldn't be parsed as a task config key");
+ return;
+ }
+ if (!(value.value() instanceof Map)) {
+ log.error("Ignoring task configuration because it is in the wrong format: " + value.value());
+ return;
+ }
+
+ Object newTaskConfig = ((Map<String, Object>) value.value()).get("properties");
+ if (!(newTaskConfig instanceof Map)) {
+ log.error("Invalid data for task config: properties filed should be a Map but is " + newTaskConfig.getClass());
+ return;
+ }
+
+ Map<ConnectorTaskId, Map<String, String>> deferred = deferredTaskUpdates.get(taskId.connector());
+ if (deferred == null) {
+ deferred = new HashMap<>();
+ deferredTaskUpdates.put(taskId.connector(), deferred);
+ }
+ deferred.put(taskId, (Map<String, String>) newTaskConfig);
+ }
+ } else if (record.key().startsWith(COMMIT_TASKS_PREFIX)) {
+ String connectorName = record.key().substring(COMMIT_TASKS_PREFIX.length());
+ List<ConnectorTaskId> updatedTasks = new ArrayList<>();
+ synchronized (lock) {
+ // Apply any outstanding deferred task updates for the given connector. Note that just because we
+ // encounter a commit message does not mean it will result in consistent output. In particular due to
+ // compaction, there may be cases where . For example if we have the following sequence of writes:
+ //
+ // 1. Write connector "foo"'s config
+ // 2. Write connector "foo", task 1's config <-- compacted
+ // 3. Write connector "foo", task 2's config
+ // 4. Write connector "foo" task commit message
+ // 5. Write connector "foo", task 1's config
+ // 6. Write connector "foo", task 2's config
+ // 7. Write connector "foo" task commit message
+ //
+ // then when a new worker starts up, if message 2 had been compacted, then when message 4 is applied
+ // "foo" will not have a complete set of configs. Only when message 7 is applied will the complete
+ // configuration be available. Worse, if the leader died while writing messages 5, 6, and 7 such that
+ // only 5 was written, then there may be nothing that will finish writing the configs and get the
+ // log back into a consistent state.
+ //
+ // It is expected that the user of this class (i.e., the Herder) will take the necessary action to
+ // resolve this (i.e., get the connector to recommit its configuration). This inconsistent state is
+ // exposed in the snapshots provided via ClusterConfigState so they are easy to handle.
+ if (!(value.value() instanceof Map)) { // Schema-less, so we get maps instead of structs
+ log.error("Ignoring connector tasks configuration commit because it is in the wrong format: " + value.value());
+ return;
+ }
+
+ Map<ConnectorTaskId, Map<String, String>> deferred = deferredTaskUpdates.get(connectorName);
+
+ Object newTaskCountObj = ((Map<String, Object>) value.value()).get("tasks");
+ Integer newTaskCount = (Integer) newTaskCountObj;
+
+ // Validate the configs we're supposed to update to ensure we're getting a complete configuration
+ // update of all tasks that are expected based on the number of tasks in the commit message.
+ Map<String, Set<Integer>> updatedConfigIdsByConnector = taskIdsByConnector(deferred);
+ Set<Integer> taskIdSet = updatedConfigIdsByConnector.get(connectorName);
+ if (!completeTaskIdSet(taskIdSet, newTaskCount)) {
+ // Given the logic for writing commit messages, we should only hit this condition due to compacted
+ // historical data, in which case we would not have applied any updates yet and there will be no
+ // task config data already committed for the connector, so we shouldn't have to clear any data
+ // out. All we need to do is add the flag marking it inconsistent.
+ inconsistent.add(connectorName);
+ } else {
+ if (deferred != null) {
+ taskConfigs.putAll(deferred);
+ updatedTasks.addAll(taskConfigs.keySet());
+ }
+ inconsistent.remove(connectorName);
+ }
+ // Always clear the deferred entries, even if we didn't apply them. If they represented an inconsistent
+ // update, then we need to see a completely fresh set of configs after this commit message, so we don't
+ // want any of these outdated configs
+ if (deferred != null)
+ deferred.clear();
+
+ connectorTaskCounts.put(connectorName, newTaskCount);
+ }
+
+ if (!starting)
+ tasksConfigCallback.onCompletion(null, updatedTasks);
+ } else {
+ log.error("Discarding config update record with invalid key: " + record.key());
+ }
+ }
+ };
+
+ private ConnectorTaskId parseTaskId(String key) {
+ String[] parts = key.split("-");
+ if (parts.length < 3) return null;
+
+ try {
+ int taskNum = Integer.parseInt(parts[parts.length - 1]);
+ String connectorName = Utils.join(Arrays.copyOfRange(parts, 1, parts.length - 1), "-");
+ return new ConnectorTaskId(connectorName, taskNum);
+ } catch (NumberFormatException e) {
+ return null;
+ }
+ }
+
+ /**
+ * Given task configurations, get a set of integer task IDs organized by connector name.
+ */
+ private Map<String, Set<Integer>> taskIdsByConnector(Map<ConnectorTaskId, Map<String, String>> configs) {
+ Map<String, Set<Integer>> connectorTaskIds = new HashMap<>();
+ if (configs == null)
+ return connectorTaskIds;
+ for (Map.Entry<ConnectorTaskId, Map<String, String>> taskConfigEntry : configs.entrySet()) {
+ ConnectorTaskId taskId = taskConfigEntry.getKey();
+ if (!connectorTaskIds.containsKey(taskId.connector()))
+ connectorTaskIds.put(taskId.connector(), new TreeSet<Integer>());
+ connectorTaskIds.get(taskId.connector()).add(taskId.task());
+ }
+ return connectorTaskIds;
+ }
+
+ private boolean completeTaskIdSet(Set<Integer> idSet, int expectedSize) {
+ // Note that we do *not* check for the exact set. This is an important implication of compaction. If we start out
+ // with 2 tasks, then reduce to 1, we'll end up with log entries like:
+ //
+ // 1. Connector "foo" config
+ // 2. Connector "foo", task 1 config
+ // 3. Connector "foo", task 2 config
+ // 4. Connector "foo", commit 2 tasks
+ // 5. Connector "foo", task 1 config
+ // 6. Connector "foo", commit 1 tasks
+ //
+ // However, due to compaction we could end up with a log that looks like this:
+ //
+ // 1. Connector "foo" config
+ // 3. Connector "foo", task 2 config
+ // 5. Connector "foo", task 1 config
+ // 6. Connector "foo", commit 1 tasks
+ //
+ // which isn't incorrect, but would appear in this code to have an extra task configuration. Instead, we just
+ // validate that all the configs specified by the commit message are present. This should be fine because the
+ // logic for writing configs ensures all the task configs are written (and reads them back) before writing the
+ // commit message.
+
+ if (idSet.size() < expectedSize)
+ return false;
+
+ for (int i = 0; i < expectedSize; i++)
+ if (!idSet.contains(i))
+ return false;
+ return true;
+ }
+}
+