You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/03/03 17:55:18 UTC
kafka git commit: KAFKA-2934;
Offset storage file configuration in Connect standalone mode is not
included in StandaloneConfig
Repository: kafka
Updated Branches:
refs/heads/trunk 10394aa80 -> 079c88178
KAFKA-2934; Offset storage file configuration in Connect standalone mode is not included in StandaloneConfig
Added offsetBackingStore config to StandaloneConfig and DistributedConfig;
Added config for offset.storage.topic and config.storage.topic into DistributedConfig;
Author: jinxing <ji...@fenbi.com>
Reviewers: Jason Gustafson <ja...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #734 from ZoneMayor/trunk-KAFKA-2934
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/079c8817
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/079c8817
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/079c8817
Branch: refs/heads/trunk
Commit: 079c88178dff4b3a4c9de55629e7d15b60e5f562
Parents: 10394aa
Author: jinxing <ji...@fenbi.com>
Authored: Thu Mar 3 08:54:37 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Thu Mar 3 08:54:37 2016 -0800
----------------------------------------------------------------------
.../kafka/connect/cli/ConnectDistributed.java | 4 +--
.../apache/kafka/connect/runtime/Worker.java | 2 +-
.../runtime/distributed/DistributedConfig.java | 32 ++++++++++++++++++-
.../runtime/distributed/DistributedHerder.java | 2 +-
.../runtime/standalone/StandaloneConfig.java | 12 ++++++-
.../connect/storage/FileOffsetBackingStore.java | 10 +++---
.../connect/storage/KafkaConfigStorage.java | 16 +++++-----
.../storage/KafkaOffsetBackingStore.java | 18 +++++------
.../storage/KafkaStatusBackingStore.java | 18 +++++------
.../storage/MemoryOffsetBackingStore.java | 3 +-
.../storage/MemoryStatusBackingStore.java | 3 +-
.../connect/storage/OffsetBackingStore.java | 10 ++++--
.../connect/storage/StatusBackingStore.java | 10 ++++--
.../connect/runtime/WorkerSinkTaskTest.java | 1 +
.../runtime/WorkerSinkTaskThreadedTest.java | 1 +
.../connect/runtime/WorkerSourceTaskTest.java | 1 +
.../kafka/connect/runtime/WorkerTest.java | 3 +-
.../distributed/DistributedHerderTest.java | 8 +++--
.../storage/FileOffsetBackingStoreTest.java | 19 +++++++----
.../connect/storage/KafkaConfigStorageTest.java | 22 +++++++++----
.../storage/KafkaOffsetBackingStoreTest.java | 33 ++++++++++----------
21 files changed, 153 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/079c8817/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index bc5b75a..849fa2f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -67,12 +67,12 @@ public class ConnectDistributed {
String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
- offsetBackingStore.configure(config.originals());
+ offsetBackingStore.configure(config);
Worker worker = new Worker(workerId, time, config, offsetBackingStore);
StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, worker.getInternalValueConverter());
- statusBackingStore.configure(config.originals());
+ statusBackingStore.configure(config);
DistributedHerder herder = new DistributedHerder(config, time, worker, statusBackingStore, advertisedUrl.toString());
final Connect connect = new Connect(worker, herder, rest);
http://git-wip-us.apache.org/repos/asf/kafka/blob/079c8817/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 39b69a3..4c0d016 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -96,7 +96,7 @@ public class Worker {
this.internalValueConverter.configure(config.originalsWithPrefix("internal.value.converter."), false);
this.offsetBackingStore = offsetBackingStore;
- this.offsetBackingStore.configure(config.originals());
+ this.offsetBackingStore.configure(config);
}
public void start() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/079c8817/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
index 0c5c92f..f5aa8ae 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
@@ -67,6 +67,24 @@ public class DistributedConfig extends WorkerConfig {
" fails to catch up within worker.sync.timeout.ms, leave the Connect cluster for this long before rejoining.";
public static final int WORKER_UNSYNC_BACKOFF_MS_DEFAULT = 5 * 60 * 1000;
+ /**
+ * <code>offset.storage.topic</code>
+ */
+ public static final String OFFSET_STORAGE_TOPIC_CONFIG = "offset.storage.topic";
+ private static final String OFFSET_STORAGE_TOPIC_CONFIG_DOC = "kafka topic to store connector offsets in";
+
+ /**
+ * <code>config.storage.topic</code>
+ */
+ public static final String CONFIG_TOPIC_CONFIG = "config.storage.topic";
+ private static final String CONFIG_TOPIC_CONFIG_DOC = "kafka topic to store configs";
+
+ /**
+ * <code>status.storage.topic</code>
+ */
+ public static final String STATUS_STORAGE_TOPIC_CONFIG = "status.storage.topic";
+ public static final String STATUS_STORAGE_TOPIC_CONFIG_DOC = "kafka topic to track connector and task status";
+
static {
CONFIG = baseConfigDef()
.define(GROUP_ID_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, GROUP_ID_DOC)
@@ -161,7 +179,19 @@ public class DistributedConfig extends WorkerConfig {
ConfigDef.Type.INT,
WORKER_UNSYNC_BACKOFF_MS_DEFAULT,
ConfigDef.Importance.MEDIUM,
- WORKER_UNSYNC_BACKOFF_MS_DOC);
+ WORKER_UNSYNC_BACKOFF_MS_DOC)
+ .define(OFFSET_STORAGE_TOPIC_CONFIG,
+ ConfigDef.Type.STRING,
+ ConfigDef.Importance.HIGH,
+ OFFSET_STORAGE_TOPIC_CONFIG_DOC)
+ .define(CONFIG_TOPIC_CONFIG,
+ ConfigDef.Type.STRING,
+ ConfigDef.Importance.HIGH,
+ CONFIG_TOPIC_CONFIG_DOC)
+ .define(STATUS_STORAGE_TOPIC_CONFIG,
+ ConfigDef.Type.STRING,
+ ConfigDef.Importance.HIGH,
+ STATUS_STORAGE_TOPIC_CONFIG_DOC);
}
public DistributedConfig(Map<String, String> props) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/079c8817/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 83ed714..84ad6e0 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -137,7 +137,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
this.configStorage = configStorage;
} else {
this.configStorage = new KafkaConfigStorage(worker.getInternalValueConverter(), connectorConfigCallback(), taskConfigCallback());
- this.configStorage.configure(config.originals());
+ this.configStorage.configure(config);
}
configState = ClusterConfigState.EMPTY;
this.time = time;
http://git-wip-us.apache.org/repos/asf/kafka/blob/079c8817/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneConfig.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneConfig.java
index 7cefe22..8014b3a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneConfig.java
@@ -25,8 +25,18 @@ import java.util.Map;
public class StandaloneConfig extends WorkerConfig {
private static final ConfigDef CONFIG;
+ /**
+ * <code>offset.storage.file.filename</code>
+ */
+ public static final String OFFSET_STORAGE_FILE_FILENAME_CONFIG = "offset.storage.file.filename";
+ private static final String OFFSET_STORAGE_FILE_FILENAME_DOC = "file to store offset data in";
+
static {
- CONFIG = baseConfigDef();
+ CONFIG = baseConfigDef()
+ .define(OFFSET_STORAGE_FILE_FILENAME_CONFIG,
+ ConfigDef.Type.STRING,
+ ConfigDef.Importance.HIGH,
+ OFFSET_STORAGE_FILE_FILENAME_DOC);
}
public StandaloneConfig(Map<String, String> props) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/079c8817/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java
index 1032bce..f377617 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java
@@ -18,6 +18,8 @@
package org.apache.kafka.connect.storage;
import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,7 +42,6 @@ import java.util.Map;
public class FileOffsetBackingStore extends MemoryOffsetBackingStore {
private static final Logger log = LoggerFactory.getLogger(FileOffsetBackingStore.class);
- public final static String OFFSET_STORAGE_FILE_FILENAME_CONFIG = "offset.storage.file.filename";
private File file;
public FileOffsetBackingStore() {
@@ -48,10 +49,9 @@ public class FileOffsetBackingStore extends MemoryOffsetBackingStore {
}
@Override
- public void configure(Map<String, ?> props) {
- super.configure(props);
- String filename = (String) props.get(OFFSET_STORAGE_FILE_FILENAME_CONFIG);
- file = new File(filename);
+ public void configure(WorkerConfig config) {
+ super.configure(config);
+ file = new File(config.getString(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG));
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/079c8817/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java
index 6a06fec..7f2fb83 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java
@@ -20,6 +20,7 @@ package org.apache.kafka.connect.storage;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
@@ -33,6 +34,7 @@ import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.KafkaBasedLog;
@@ -147,8 +149,6 @@ import java.util.concurrent.TimeoutException;
public class KafkaConfigStorage {
private static final Logger log = LoggerFactory.getLogger(KafkaConfigStorage.class);
- public static final String CONFIG_TOPIC_CONFIG = "config.storage.topic";
-
public static final String CONNECTOR_PREFIX = "connector-";
public static String CONNECTOR_KEY(String connectorName) {
@@ -216,19 +216,19 @@ public class KafkaConfigStorage {
offset = -1;
}
- public void configure(Map<String, ?> configs) {
- if (configs.get(CONFIG_TOPIC_CONFIG) == null)
- throw new ConnectException("Must specify topic for connector configuration.");
- topic = (String) configs.get(CONFIG_TOPIC_CONFIG);
+ public void configure(DistributedConfig config) {
+ topic = config.getString(DistributedConfig.CONFIG_TOPIC_CONFIG);
+ if (topic.equals(""))
+ throw new ConfigException("Must specify topic for connector configuration.");
Map<String, Object> producerProps = new HashMap<>();
- producerProps.putAll(configs);
+ producerProps.putAll(config.originals());
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
Map<String, Object> consumerProps = new HashMap<>();
- consumerProps.putAll(configs);
+ consumerProps.putAll(config.originals());
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
http://git-wip-us.apache.org/repos/asf/kafka/blob/079c8817/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
index dfb8c51..e8984fb 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
@@ -21,10 +21,12 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConvertingFutureCallback;
import org.apache.kafka.connect.util.KafkaBasedLog;
@@ -53,27 +55,25 @@ import java.util.concurrent.TimeoutException;
public class KafkaOffsetBackingStore implements OffsetBackingStore {
private static final Logger log = LoggerFactory.getLogger(KafkaOffsetBackingStore.class);
- public final static String OFFSET_STORAGE_TOPIC_CONFIG = "offset.storage.topic";
-
private KafkaBasedLog<byte[], byte[]> offsetLog;
private HashMap<ByteBuffer, ByteBuffer> data;
@Override
- public void configure(Map<String, ?> configs) {
- String topic = (String) configs.get(OFFSET_STORAGE_TOPIC_CONFIG);
- if (topic == null)
- throw new ConnectException("Offset storage topic must be specified");
+ public void configure(WorkerConfig config) {
+ String topic = config.getString(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG);
+ if (topic.equals(""))
+ throw new ConfigException("Offset storage topic must be specified");
data = new HashMap<>();
Map<String, Object> producerProps = new HashMap<>();
- producerProps.putAll(configs);
+ producerProps.putAll(config.originals());
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
Map<String, Object> consumerProps = new HashMap<>();
- consumerProps.putAll(configs);
+ consumerProps.putAll(config.originals());
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
http://git-wip-us.apache.org/repos/asf/kafka/blob/079c8817/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
index db7ccc7..eb9a48c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
@@ -31,10 +32,11 @@ import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
-import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.ConnectorStatus;
import org.apache.kafka.connect.runtime.TaskStatus;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.KafkaBasedLog;
@@ -76,8 +78,6 @@ import java.util.Set;
public class KafkaStatusBackingStore implements StatusBackingStore {
private static final Logger log = LoggerFactory.getLogger(KafkaStatusBackingStore.class);
- public static final String STATUS_TOPIC_CONFIG = "status.storage.topic";
-
private static final String TASK_STATUS_PREFIX = "status-task-";
private static final String CONNECTOR_STATUS_PREFIX = "status-connector-";
@@ -117,19 +117,19 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
}
@Override
- public void configure(Map<String, ?> configs) {
- if (configs.get(STATUS_TOPIC_CONFIG) == null)
- throw new ConnectException("Must specify topic for connector status.");
- this.topic = (String) configs.get(STATUS_TOPIC_CONFIG);
+ public void configure(WorkerConfig config) {
+ this.topic = config.getString(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG);
+ if (topic.equals(""))
+ throw new ConfigException("Must specify topic for connector status.");
Map<String, Object> producerProps = new HashMap<>();
- producerProps.putAll(configs);
+ producerProps.putAll(config.originals());
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerProps.put(ProducerConfig.RETRIES_CONFIG, 0); // we handle retries in this class
Map<String, Object> consumerProps = new HashMap<>();
- consumerProps.putAll(configs);
+ consumerProps.putAll(config.originals());
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
http://git-wip-us.apache.org/repos/asf/kafka/blob/079c8817/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java
index d62e38f..669e5f5 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java
@@ -17,6 +17,7 @@
package org.apache.kafka.connect.storage;
+import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.util.Callback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,7 +47,7 @@ public class MemoryOffsetBackingStore implements OffsetBackingStore {
}
@Override
- public void configure(Map<String, ?> props) {
+ public void configure(WorkerConfig config) {
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/079c8817/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java
index 96b235b..f21c4ad 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.storage;
import org.apache.kafka.connect.runtime.ConnectorStatus;
import org.apache.kafka.connect.runtime.TaskStatus;
+import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.Table;
@@ -37,7 +38,7 @@ public class MemoryStatusBackingStore implements StatusBackingStore {
}
@Override
- public void configure(Map<String, ?> configs) {
+ public void configure(WorkerConfig config) {
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/079c8817/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetBackingStore.java
index 83fdb53..1b74a90 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetBackingStore.java
@@ -17,7 +17,7 @@
package org.apache.kafka.connect.storage;
-import org.apache.kafka.common.Configurable;
+import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.util.Callback;
import java.nio.ByteBuffer;
@@ -38,7 +38,7 @@ import java.util.concurrent.Future;
* connector so that the shared namespace does not result in conflicting keys.
* </p>
*/
-public interface OffsetBackingStore extends Configurable {
+public interface OffsetBackingStore {
/**
* Start this offset store.
@@ -69,4 +69,10 @@ public interface OffsetBackingStore extends Configurable {
*/
public Future<Void> set(Map<ByteBuffer, ByteBuffer> values,
Callback<Void> callback);
+
+ /**
+ * Configure class with the given key-value pairs
+ * @param config can be DistributedConfig or StandaloneConfig
+ */
+ public void configure(WorkerConfig config);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/079c8817/connect/runtime/src/main/java/org/apache/kafka/connect/storage/StatusBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/StatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/StatusBackingStore.java
index 6464f89..268c5df 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/StatusBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/StatusBackingStore.java
@@ -16,15 +16,15 @@
**/
package org.apache.kafka.connect.storage;
-import org.apache.kafka.common.Configurable;
import org.apache.kafka.connect.runtime.ConnectorStatus;
import org.apache.kafka.connect.runtime.TaskStatus;
+import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.util.ConnectorTaskId;
import java.util.Collection;
import java.util.Set;
-public interface StatusBackingStore extends Configurable {
+public interface StatusBackingStore {
/**
* Start dependent services (if needed)
@@ -97,4 +97,10 @@ public interface StatusBackingStore extends Configurable {
* Flush any pending writes
*/
void flush();
+
+ /**
+ * Configure class with the given key-value pairs
+ * @param config config for StatusBackingStore
+ */
+ public void configure(WorkerConfig config);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/079c8817/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index aef3344..f419a7b 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -115,6 +115,7 @@ public class WorkerSinkTaskTest {
workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
workerProps.put("internal.key.converter.schemas.enable", "false");
workerProps.put("internal.value.converter.schemas.enable", "false");
+ workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
workerConfig = new StandaloneConfig(workerProps);
workerTask = PowerMock.createPartialMock(
WorkerSinkTask.class, new String[]{"createConsumer"},
http://git-wip-us.apache.org/repos/asf/kafka/blob/079c8817/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index b37b34f..ac10d59 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -117,6 +117,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
workerProps.put("internal.key.converter.schemas.enable", "false");
workerProps.put("internal.value.converter.schemas.enable", "false");
+ workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
workerConfig = new StandaloneConfig(workerProps);
workerTask = PowerMock.createPartialMock(
WorkerSinkTask.class, new String[]{"createConsumer"},
http://git-wip-us.apache.org/repos/asf/kafka/blob/079c8817/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 14c0c6e..8f57336 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -109,6 +109,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
workerProps.put("internal.key.converter.schemas.enable", "false");
workerProps.put("internal.value.converter.schemas.enable", "false");
+ workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
config = new StandaloneConfig(workerProps);
producerCallbacks = EasyMock.newCapture();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/079c8817/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 0ca405e..67d3fdc 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -79,6 +79,7 @@ public class WorkerTest extends ThreadedTest {
workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
workerProps.put("internal.key.converter.schemas.enable", "false");
workerProps.put("internal.value.converter.schemas.enable", "false");
+ workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
config = new StandaloneConfig(workerProps);
}
@@ -452,7 +453,7 @@ public class WorkerTest extends ThreadedTest {
}
private void expectStartStorage() {
- offsetBackingStore.configure(EasyMock.anyObject(Map.class));
+ offsetBackingStore.configure(EasyMock.anyObject(WorkerConfig.class));
EasyMock.expectLastCall();
offsetBackingStore.start();
EasyMock.expectLastCall();
http://git-wip-us.apache.org/repos/asf/kafka/blob/079c8817/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index d439e96..8017ecf 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -31,7 +31,6 @@ import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.KafkaConfigStorage;
-import org.apache.kafka.connect.storage.KafkaStatusBackingStore;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
@@ -68,8 +67,8 @@ import static org.junit.Assert.assertTrue;
public class DistributedHerderTest {
private static final Map<String, String> HERDER_CONFIG = new HashMap<>();
static {
- HERDER_CONFIG.put(KafkaStatusBackingStore.STATUS_TOPIC_CONFIG, "status-topic");
- HERDER_CONFIG.put(KafkaConfigStorage.CONFIG_TOPIC_CONFIG, "config-topic");
+ HERDER_CONFIG.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "status-topic");
+ HERDER_CONFIG.put(DistributedConfig.CONFIG_TOPIC_CONFIG, "config-topic");
HERDER_CONFIG.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
HERDER_CONFIG.put(DistributedConfig.GROUP_ID_CONFIG, "test-connect-group");
// The WorkerConfig base class has some required settings without defaults
@@ -77,6 +76,9 @@ public class DistributedHerderTest {
HERDER_CONFIG.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
HERDER_CONFIG.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
HERDER_CONFIG.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+ HERDER_CONFIG.put(DistributedConfig.CONFIG_TOPIC_CONFIG, "connect-configs");
+ HERDER_CONFIG.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "connect-offsets");
+ HERDER_CONFIG.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "status-topic");
}
private static final String MEMBER_URL = "memberUrl";
http://git-wip-us.apache.org/repos/asf/kafka/blob/079c8817/connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java
index 3833e88..6055d9a 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java
@@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- * <p/>
+ S* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
@@ -17,6 +17,7 @@
package org.apache.kafka.connect.storage;
+import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.util.Callback;
import org.easymock.EasyMock;
import org.junit.After;
@@ -36,7 +37,8 @@ import static org.junit.Assert.assertEquals;
public class FileOffsetBackingStoreTest {
FileOffsetBackingStore store;
- Map<String, Object> props;
+ Map<String, String> props;
+ StandaloneConfig config;
File tempFile;
private static Map<ByteBuffer, ByteBuffer> firstSet = new HashMap<>();
@@ -50,9 +52,14 @@ public class FileOffsetBackingStoreTest {
public void setup() throws IOException {
store = new FileOffsetBackingStore();
tempFile = File.createTempFile("fileoffsetbackingstore", null);
- props = new HashMap<>();
- props.put(FileOffsetBackingStore.OFFSET_STORAGE_FILE_FILENAME_CONFIG, tempFile.getAbsolutePath());
- store.configure(props);
+ props = new HashMap<String, String>();
+ props.put(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, tempFile.getAbsolutePath());
+ props.put(StandaloneConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+ props.put(StandaloneConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+ props.put(StandaloneConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+ props.put(StandaloneConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+ config = new StandaloneConfig(props);
+ store.configure(config);
store.start();
}
@@ -87,7 +94,7 @@ public class FileOffsetBackingStoreTest {
// Restore into a new store to ensure correct reload from scratch
FileOffsetBackingStore restore = new FileOffsetBackingStore();
- restore.configure(props);
+ restore.configure(config);
restore.start();
Map<ByteBuffer, ByteBuffer> values = restore.get(Arrays.asList(buffer("key")), getCallback).get();
assertEquals(buffer("value"), values.get(buffer("key")));
http://git-wip-us.apache.org/repos/asf/kafka/blob/079c8817/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java
index cfc713f..f95704c 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.KafkaBasedLog;
@@ -64,10 +65,19 @@ import static org.junit.Assert.fail;
public class KafkaConfigStorageTest {
private static final String TOPIC = "connect-configs";
private static final Map<String, String> DEFAULT_CONFIG_STORAGE_PROPS = new HashMap<>();
+ private static final DistributedConfig DEFAULT_DISTRIBUTED_CONFIG;
static {
- DEFAULT_CONFIG_STORAGE_PROPS.put(KafkaConfigStorage.CONFIG_TOPIC_CONFIG, TOPIC);
+ DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.CONFIG_TOPIC_CONFIG, TOPIC);
+ DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "connect-offsets");
+ DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.GROUP_ID_CONFIG, "connect");
+ DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "status-topic");
DEFAULT_CONFIG_STORAGE_PROPS.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9093");
+ DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+ DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+ DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+ DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+ DEFAULT_DISTRIBUTED_CONFIG = new DistributedConfig(DEFAULT_CONFIG_STORAGE_PROPS);
}
private static final List<String> CONNECTOR_IDS = Arrays.asList("connector1", "connector2");
@@ -139,7 +149,7 @@ public class KafkaConfigStorageTest {
PowerMock.replayAll();
- configStorage.configure(DEFAULT_CONFIG_STORAGE_PROPS);
+ configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
assertEquals(TOPIC, capturedTopic.getValue());
assertEquals("org.apache.kafka.common.serialization.StringSerializer", capturedProducerProps.getValue().get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", capturedProducerProps.getValue().get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
@@ -179,7 +189,7 @@ public class KafkaConfigStorageTest {
PowerMock.replayAll();
- configStorage.configure(DEFAULT_CONFIG_STORAGE_PROPS);
+ configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
configStorage.start();
// Null before writing
@@ -247,7 +257,7 @@ public class KafkaConfigStorageTest {
PowerMock.replayAll();
- configStorage.configure(DEFAULT_CONFIG_STORAGE_PROPS);
+ configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
configStorage.start();
// Bootstrap as if we had already added the connector, but no tasks had been added yet
@@ -314,7 +324,7 @@ public class KafkaConfigStorageTest {
PowerMock.replayAll();
- configStorage.configure(DEFAULT_CONFIG_STORAGE_PROPS);
+ configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
configStorage.start();
// Should see a single connector and its config should be the last one seen anywhere in the log
@@ -383,7 +393,7 @@ public class KafkaConfigStorageTest {
PowerMock.replayAll();
- configStorage.configure(DEFAULT_CONFIG_STORAGE_PROPS);
+ configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
configStorage.start();
// After reading the log, it should have been in an inconsistent state
ClusterConfigState configState = configStorage.snapshot();
http://git-wip-us.apache.org/repos/asf/kafka/blob/079c8817/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
index 22bb376..aa92942 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
@@ -23,7 +23,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.record.TimestampType;
-import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.easymock.Capture;
@@ -62,9 +62,20 @@ import static org.junit.Assert.fail;
public class KafkaOffsetBackingStoreTest {
private static final String TOPIC = "connect-offsets";
private static final Map<String, String> DEFAULT_PROPS = new HashMap<>();
+ private static final DistributedConfig DEFAULT_DISTRIBUTED_CONFIG;
static {
- DEFAULT_PROPS.put(KafkaOffsetBackingStore.OFFSET_STORAGE_TOPIC_CONFIG, TOPIC);
DEFAULT_PROPS.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9093");
+ DEFAULT_PROPS.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, TOPIC);
+ DEFAULT_PROPS.put(DistributedConfig.CONFIG_TOPIC_CONFIG, "connect-configs");
+ DEFAULT_PROPS.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "connect-offsets");
+ DEFAULT_PROPS.put(DistributedConfig.GROUP_ID_CONFIG, "connect");
+ DEFAULT_PROPS.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "status-topic");
+ DEFAULT_PROPS.put(DistributedConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+ DEFAULT_PROPS.put(DistributedConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+ DEFAULT_PROPS.put(DistributedConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+ DEFAULT_PROPS.put(DistributedConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+ DEFAULT_DISTRIBUTED_CONFIG = new DistributedConfig(DEFAULT_PROPS);
+
}
private static final Map<ByteBuffer, ByteBuffer> FIRST_SET = new HashMap<>();
static {
@@ -95,12 +106,6 @@ public class KafkaOffsetBackingStoreTest {
store = PowerMock.createPartialMockAndInvokeDefaultConstructor(KafkaOffsetBackingStore.class, new String[]{"createKafkaBasedLog"});
}
- @Test(expected = ConnectException.class)
- public void testMissingTopic() {
- store = new KafkaOffsetBackingStore();
- store.configure(Collections.<String, Object>emptyMap());
- }
-
@Test
public void testStartStop() throws Exception {
expectConfigure();
@@ -109,7 +114,7 @@ public class KafkaOffsetBackingStoreTest {
PowerMock.replayAll();
- store.configure(DEFAULT_PROPS);
+ store.configure(DEFAULT_DISTRIBUTED_CONFIG);
assertEquals(TOPIC, capturedTopic.getValue());
assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", capturedProducerProps.getValue().get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", capturedProducerProps.getValue().get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
@@ -135,7 +140,7 @@ public class KafkaOffsetBackingStoreTest {
PowerMock.replayAll();
- store.configure(DEFAULT_PROPS);
+ store.configure(DEFAULT_DISTRIBUTED_CONFIG);
store.start();
HashMap<ByteBuffer, ByteBuffer> data = Whitebox.getInternalState(store, "data");
assertEquals(TP0_VALUE_NEW, data.get(TP0_KEY));
@@ -199,9 +204,7 @@ public class KafkaOffsetBackingStoreTest {
PowerMock.replayAll();
-
-
- store.configure(DEFAULT_PROPS);
+ store.configure(DEFAULT_DISTRIBUTED_CONFIG);
store.start();
// Getting from empty store should return nulls
@@ -285,9 +288,7 @@ public class KafkaOffsetBackingStoreTest {
PowerMock.replayAll();
-
-
- store.configure(DEFAULT_PROPS);
+ store.configure(DEFAULT_DISTRIBUTED_CONFIG);
store.start();
// Set some offsets