You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/06 22:21:58 UTC
kafka git commit: KAFKA-2764: Change use of Properties in Copycat to
Maps.
Repository: kafka
Updated Branches:
refs/heads/trunk a76660ac8 -> c006c5916
KAFKA-2764: Change use of Properties in Copycat to Maps.
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Reviewers: Gwen Shapira, Guozhang Wang
Closes #444 from ewencp/kafka-2764-maps-not-properties
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c006c591
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c006c591
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c006c591
Branch: refs/heads/trunk
Commit: c006c5916ef7a8048ed55db64db27c7b64a3af59
Parents: a76660a
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Fri Nov 6 13:27:45 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Nov 6 13:27:45 2015 -0800
----------------------------------------------------------------------
.../kafka/common/config/AbstractConfig.java | 19 +++++++-
.../kafka/copycat/connector/Connector.java | 12 ++---
.../apache/kafka/copycat/connector/Task.java | 4 +-
.../org/apache/kafka/copycat/sink/SinkTask.java | 3 +-
.../apache/kafka/copycat/source/SourceTask.java | 4 +-
.../connector/ConnectorReconfigurationTest.java | 11 +++--
.../copycat/file/FileStreamSinkConnector.java | 15 +++---
.../kafka/copycat/file/FileStreamSinkTask.java | 5 +-
.../copycat/file/FileStreamSourceConnector.java | 19 ++++----
.../copycat/file/FileStreamSourceTask.java | 6 +--
.../file/FileStreamSinkConnectorTest.java | 17 ++++---
.../file/FileStreamSourceConnectorTest.java | 27 ++++++-----
.../copycat/file/FileStreamSourceTaskTest.java | 12 ++---
.../kafka/copycat/cli/CopycatDistributed.java | 8 +--
.../kafka/copycat/cli/CopycatStandalone.java | 14 +++---
.../apache/kafka/copycat/runtime/Worker.java | 21 +++-----
.../kafka/copycat/runtime/WorkerConfig.java | 4 +-
.../kafka/copycat/runtime/WorkerSinkTask.java | 21 ++++----
.../kafka/copycat/runtime/WorkerSourceTask.java | 8 +--
.../kafka/copycat/runtime/WorkerTask.java | 4 +-
.../runtime/distributed/DistributedConfig.java | 4 +-
.../runtime/standalone/StandaloneConfig.java | 4 +-
.../copycat/runtime/WorkerSinkTaskTest.java | 17 +++----
.../copycat/runtime/WorkerSourceTaskTest.java | 16 +++---
.../kafka/copycat/runtime/WorkerTest.java | 51 ++++++++++----------
.../distributed/DistributedHerderTest.java | 3 +-
26 files changed, 167 insertions(+), 162 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 327a9ed..07b64c0 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -105,9 +105,9 @@ public class AbstractConfig {
return keys;
}
- public Properties unusedProperties() {
+ public Map<String, Object> unusedConfigs() {
Set<String> unusedKeys = this.unused();
- Properties unusedProps = new Properties();
+ Map<String, Object> unusedProps = new HashMap<>();
for (String key : unusedKeys)
unusedProps.put(key, this.originals.get(key));
return unusedProps;
@@ -120,6 +120,21 @@ public class AbstractConfig {
}
/**
+ * Get all the original settings, ensuring that all values are of type String.
+ * @return the original settings
+ * @throw ClassCastException if any of the values are not strings
+ */
+ public Map<String, String> originalsStrings() {
+ Map<String, String> copy = new RecordingMap<>();
+ for (Map.Entry<String, ?> entry : originals.entrySet()) {
+ if (!(entry.getValue() instanceof String))
+ throw new ClassCastException("Non-string value found in original settings");
+ copy.put(entry.getKey(), (String) entry.getValue());
+ }
+ return copy;
+ }
+
+ /**
* Gets all original settings with the given prefix, stripping the prefix before adding it to the output.
*
* @param prefix the prefix to use as a filter
http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java
index ae141c4..6972d3d 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java
@@ -20,7 +20,7 @@ package org.apache.kafka.copycat.connector;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.List;
-import java.util.Properties;
+import java.util.Map;
/**
* <p>
@@ -69,7 +69,7 @@ public abstract class Connector {
* @param taskConfigs existing task configurations, which may be used when generating new task configs to avoid
* churn in partition to task assignments
*/
- public void initialize(ConnectorContext ctx, List<Properties> taskConfigs) {
+ public void initialize(ConnectorContext ctx, List<Map<String, String>> taskConfigs) {
context = ctx;
// Ignore taskConfigs. May result in more churn of tasks during recovery if updated configs
// are very different, but reduces the difficulty of implementing a Connector
@@ -81,17 +81,17 @@ public abstract class Connector {
*
* @param props configuration settings
*/
- public abstract void start(Properties props);
+ public abstract void start(Map<String, String> props);
/**
* Reconfigure this Connector. Most implementations will not override this, using the default
- * implementation that calls {@link #stop()} followed by {@link #start(Properties)}.
+ * implementation that calls {@link #stop()} followed by {@link #start(Map)}.
* Implementations only need to override this if they want to handle this process more
* efficiently, e.g. without shutting down network connections to the external system.
*
* @param props new configuration settings
*/
- public void reconfigure(Properties props) {
+ public void reconfigure(Map<String, String> props) {
stop();
start(props);
}
@@ -108,7 +108,7 @@ public abstract class Connector {
* @param maxTasks maximum number of configurations to generate
* @return configurations for Tasks
*/
- public abstract List<Properties> taskConfigs(int maxTasks);
+ public abstract List<Map<String, String>> taskConfigs(int maxTasks);
/**
* Stop this connector.
http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Task.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Task.java b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Task.java
index cdaba08..2a8c98c 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Task.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Task.java
@@ -19,7 +19,7 @@ package org.apache.kafka.copycat.connector;
import org.apache.kafka.common.annotation.InterfaceStability;
-import java.util.Properties;
+import java.util.Map;
/**
* <p>
@@ -40,7 +40,7 @@ public interface Task {
* Start the Task
* @param props initial configuration
*/
- void start(Properties props);
+ void start(Map<String, String> props);
/**
* Stop this task.
http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java
index 7c03cda..b2d5ff6 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java
@@ -23,7 +23,6 @@ import org.apache.kafka.copycat.connector.Task;
import java.util.Collection;
import java.util.Map;
-import java.util.Properties;
/**
* SinkTask is a Task takes records loaded from Kafka and sends them to another system. In
@@ -52,7 +51,7 @@ public abstract class SinkTask implements Task {
* @param props initial configuration
*/
@Override
- public abstract void start(Properties props);
+ public abstract void start(Map<String, String> props);
/**
* Put the records in the sink. Usually this should send the records to the sink asynchronously
http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java
index 30cbf16..841943f 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java
@@ -21,7 +21,7 @@ import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.copycat.connector.Task;
import java.util.List;
-import java.util.Properties;
+import java.util.Map;
/**
* SourceTask is a Task that pulls records from another system for storage in Kafka.
@@ -43,7 +43,7 @@ public abstract class SourceTask implements Task {
* @param props initial configuration
*/
@Override
- public abstract void start(Properties props);
+ public abstract void start(Map<String, String> props);
/**
* Poll this SourceTask for new records. This method should block if no data is currently
http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java b/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java
index cbaf866..79ddfd7 100644
--- a/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java
+++ b/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java
@@ -20,8 +20,9 @@ package org.apache.kafka.copycat.connector;
import org.apache.kafka.copycat.errors.CopycatException;
import org.junit.Test;
+import java.util.Collections;
import java.util.List;
-import java.util.Properties;
+import java.util.Map;
import static org.junit.Assert.assertEquals;
@@ -30,7 +31,7 @@ public class ConnectorReconfigurationTest {
@Test
public void testDefaultReconfigure() throws Exception {
TestConnector conn = new TestConnector(false);
- conn.reconfigure(new Properties());
+ conn.reconfigure(Collections.<String, String>emptyMap());
assertEquals(conn.stopOrder, 0);
assertEquals(conn.configureOrder, 1);
}
@@ -38,7 +39,7 @@ public class ConnectorReconfigurationTest {
@Test(expected = CopycatException.class)
public void testReconfigureStopException() throws Exception {
TestConnector conn = new TestConnector(true);
- conn.reconfigure(new Properties());
+ conn.reconfigure(Collections.<String, String>emptyMap());
}
private static class TestConnector extends Connector {
@@ -52,7 +53,7 @@ public class ConnectorReconfigurationTest {
}
@Override
- public void start(Properties props) {
+ public void start(Map<String, String> props) {
configureOrder = order++;
}
@@ -62,7 +63,7 @@ public class ConnectorReconfigurationTest {
}
@Override
- public List<Properties> taskConfigs(int count) {
+ public List<Map<String, String>> taskConfigs(int count) {
return null;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkConnector.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkConnector.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkConnector.java
index 6e2b04d..763f638 100644
--- a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkConnector.java
+++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkConnector.java
@@ -21,8 +21,9 @@ import org.apache.kafka.copycat.connector.Task;
import org.apache.kafka.copycat.sink.SinkConnector;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
-import java.util.Properties;
+import java.util.Map;
/**
* Very simple connector that works with the console. This connector supports both source and
@@ -34,8 +35,8 @@ public class FileStreamSinkConnector extends SinkConnector {
private String filename;
@Override
- public void start(Properties props) {
- filename = props.getProperty(FILE_CONFIG);
+ public void start(Map<String, String> props) {
+ filename = props.get(FILE_CONFIG);
}
@Override
@@ -44,12 +45,12 @@ public class FileStreamSinkConnector extends SinkConnector {
}
@Override
- public List<Properties> taskConfigs(int maxTasks) {
- ArrayList<Properties> configs = new ArrayList<>();
+ public List<Map<String, String>> taskConfigs(int maxTasks) {
+ ArrayList<Map<String, String>> configs = new ArrayList<>();
for (int i = 0; i < maxTasks; i++) {
- Properties config = new Properties();
+ Map<String, String> config = new HashMap<>();
if (filename != null)
- config.setProperty(FILE_CONFIG, filename);
+ config.put(FILE_CONFIG, filename);
configs.add(config);
}
return configs;
http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java
index 6dfe4a7..5286d2b 100644
--- a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java
+++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java
@@ -30,7 +30,6 @@ import java.io.FileOutputStream;
import java.io.PrintStream;
import java.util.Collection;
import java.util.Map;
-import java.util.Properties;
/**
* FileStreamSinkTask writes records to stdout or a file.
@@ -51,8 +50,8 @@ public class FileStreamSinkTask extends SinkTask {
}
@Override
- public void start(Properties props) {
- filename = props.getProperty(FileStreamSinkConnector.FILE_CONFIG);
+ public void start(Map<String, String> props) {
+ filename = props.get(FileStreamSinkConnector.FILE_CONFIG);
if (filename == null) {
outputStream = System.out;
} else {
http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java
index 716322f..9784bb1 100644
--- a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java
+++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java
@@ -22,8 +22,9 @@ import org.apache.kafka.copycat.errors.CopycatException;
import org.apache.kafka.copycat.source.SourceConnector;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
-import java.util.Properties;
+import java.util.Map;
/**
* Very simple connector that works with the console. This connector supports both source and
@@ -37,9 +38,9 @@ public class FileStreamSourceConnector extends SourceConnector {
private String topic;
@Override
- public void start(Properties props) {
- filename = props.getProperty(FILE_CONFIG);
- topic = props.getProperty(TOPIC_CONFIG);
+ public void start(Map<String, String> props) {
+ filename = props.get(FILE_CONFIG);
+ topic = props.get(TOPIC_CONFIG);
if (topic == null || topic.isEmpty())
throw new CopycatException("FileStreamSourceConnector configuration must include 'topic' setting");
if (topic.contains(","))
@@ -52,13 +53,13 @@ public class FileStreamSourceConnector extends SourceConnector {
}
@Override
- public List<Properties> taskConfigs(int maxTasks) {
- ArrayList<Properties> configs = new ArrayList<>();
+ public List<Map<String, String>> taskConfigs(int maxTasks) {
+ ArrayList<Map<String, String>> configs = new ArrayList<>();
// Only one input stream makes sense.
- Properties config = new Properties();
+ Map<String, String> config = new HashMap<>();
if (filename != null)
- config.setProperty(FILE_CONFIG, filename);
- config.setProperty(TOPIC_CONFIG, topic);
+ config.put(FILE_CONFIG, filename);
+ config.put(TOPIC_CONFIG, topic);
configs.add(config);
return configs;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
index f2249d0..70eef5c 100644
--- a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
+++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
@@ -46,15 +46,15 @@ public class FileStreamSourceTask extends SourceTask {
private Long streamOffset;
@Override
- public void start(Properties props) {
- filename = props.getProperty(FileStreamSourceConnector.FILE_CONFIG);
+ public void start(Map<String, String> props) {
+ filename = props.get(FileStreamSourceConnector.FILE_CONFIG);
if (filename == null || filename.isEmpty()) {
stream = System.in;
// Tracking offset for stdin doesn't make sense
streamOffset = null;
reader = new BufferedReader(new InputStreamReader(stream));
}
- topic = props.getProperty(FileStreamSourceConnector.TOPIC_CONFIG);
+ topic = props.get(FileStreamSourceConnector.TOPIC_CONFIG);
if (topic == null)
throw new CopycatException("FileStreamSourceTask config missing topic setting");
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkConnectorTest.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkConnectorTest.java b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkConnectorTest.java
index ab5fd3b..b30856f 100644
--- a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkConnectorTest.java
+++ b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkConnectorTest.java
@@ -25,8 +25,9 @@ import org.junit.Test;
import org.powermock.api.easymock.PowerMock;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
-import java.util.Properties;
+import java.util.Map;
import static org.junit.Assert.assertEquals;
@@ -42,7 +43,7 @@ public class FileStreamSinkConnectorTest {
private FileStreamSinkConnector connector;
private ConnectorContext ctx;
- private Properties sinkProperties;
+ private Map<String, String> sinkProperties;
@Before
public void setup() {
@@ -50,9 +51,9 @@ public class FileStreamSinkConnectorTest {
ctx = PowerMock.createMock(ConnectorContext.class);
connector.initialize(ctx);
- sinkProperties = new Properties();
- sinkProperties.setProperty(SinkConnector.TOPICS_CONFIG, MULTIPLE_TOPICS);
- sinkProperties.setProperty(FileStreamSinkConnector.FILE_CONFIG, FILENAME);
+ sinkProperties = new HashMap<>();
+ sinkProperties.put(SinkConnector.TOPICS_CONFIG, MULTIPLE_TOPICS);
+ sinkProperties.put(FileStreamSinkConnector.FILE_CONFIG, FILENAME);
}
@Test
@@ -60,14 +61,14 @@ public class FileStreamSinkConnectorTest {
PowerMock.replayAll();
connector.start(sinkProperties);
- List<Properties> taskConfigs = connector.taskConfigs(1);
+ List<Map<String, String>> taskConfigs = connector.taskConfigs(1);
assertEquals(1, taskConfigs.size());
- assertEquals(FILENAME, taskConfigs.get(0).getProperty(FileStreamSinkConnector.FILE_CONFIG));
+ assertEquals(FILENAME, taskConfigs.get(0).get(FileStreamSinkConnector.FILE_CONFIG));
taskConfigs = connector.taskConfigs(2);
assertEquals(2, taskConfigs.size());
for (int i = 0; i < 2; i++) {
- assertEquals(FILENAME, taskConfigs.get(0).getProperty(FileStreamSinkConnector.FILE_CONFIG));
+ assertEquals(FILENAME, taskConfigs.get(0).get(FileStreamSinkConnector.FILE_CONFIG));
}
PowerMock.verifyAll();
http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceConnectorTest.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceConnectorTest.java b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceConnectorTest.java
index 41e15a0..28bfa62 100644
--- a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceConnectorTest.java
+++ b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceConnectorTest.java
@@ -23,8 +23,9 @@ import org.junit.Before;
import org.junit.Test;
import org.powermock.api.easymock.PowerMock;
+import java.util.HashMap;
import java.util.List;
-import java.util.Properties;
+import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@@ -37,7 +38,7 @@ public class FileStreamSourceConnectorTest {
private FileStreamSourceConnector connector;
private ConnectorContext ctx;
- private Properties sourceProperties;
+ private Map<String, String> sourceProperties;
@Before
public void setup() {
@@ -45,9 +46,9 @@ public class FileStreamSourceConnectorTest {
ctx = PowerMock.createMock(ConnectorContext.class);
connector.initialize(ctx);
- sourceProperties = new Properties();
- sourceProperties.setProperty(FileStreamSourceConnector.TOPIC_CONFIG, SINGLE_TOPIC);
- sourceProperties.setProperty(FileStreamSourceConnector.FILE_CONFIG, FILENAME);
+ sourceProperties = new HashMap<>();
+ sourceProperties.put(FileStreamSourceConnector.TOPIC_CONFIG, SINGLE_TOPIC);
+ sourceProperties.put(FileStreamSourceConnector.FILE_CONFIG, FILENAME);
}
@Test
@@ -55,20 +56,20 @@ public class FileStreamSourceConnectorTest {
PowerMock.replayAll();
connector.start(sourceProperties);
- List<Properties> taskConfigs = connector.taskConfigs(1);
+ List<Map<String, String>> taskConfigs = connector.taskConfigs(1);
assertEquals(1, taskConfigs.size());
assertEquals(FILENAME,
- taskConfigs.get(0).getProperty(FileStreamSourceConnector.FILE_CONFIG));
+ taskConfigs.get(0).get(FileStreamSourceConnector.FILE_CONFIG));
assertEquals(SINGLE_TOPIC,
- taskConfigs.get(0).getProperty(FileStreamSourceConnector.TOPIC_CONFIG));
+ taskConfigs.get(0).get(FileStreamSourceConnector.TOPIC_CONFIG));
// Should be able to return fewer than requested #
taskConfigs = connector.taskConfigs(2);
assertEquals(1, taskConfigs.size());
assertEquals(FILENAME,
- taskConfigs.get(0).getProperty(FileStreamSourceConnector.FILE_CONFIG));
+ taskConfigs.get(0).get(FileStreamSourceConnector.FILE_CONFIG));
assertEquals(SINGLE_TOPIC,
- taskConfigs.get(0).getProperty(FileStreamSourceConnector.TOPIC_CONFIG));
+ taskConfigs.get(0).get(FileStreamSourceConnector.TOPIC_CONFIG));
PowerMock.verifyAll();
}
@@ -79,16 +80,16 @@ public class FileStreamSourceConnectorTest {
sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG);
connector.start(sourceProperties);
- List<Properties> taskConfigs = connector.taskConfigs(1);
+ List<Map<String, String>> taskConfigs = connector.taskConfigs(1);
assertEquals(1, taskConfigs.size());
- assertNull(taskConfigs.get(0).getProperty(FileStreamSourceConnector.FILE_CONFIG));
+ assertNull(taskConfigs.get(0).get(FileStreamSourceConnector.FILE_CONFIG));
PowerMock.verifyAll();
}
@Test(expected = CopycatException.class)
public void testMultipleSourcesInvalid() {
- sourceProperties.setProperty(FileStreamSourceConnector.TOPIC_CONFIG, MULTIPLE_TOPICS);
+ sourceProperties.put(FileStreamSourceConnector.TOPIC_CONFIG, MULTIPLE_TOPICS);
connector.start(sourceProperties);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java
index 4365def..ddf8e43 100644
--- a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java
+++ b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java
@@ -31,9 +31,9 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
import static org.junit.Assert.assertEquals;
@@ -42,7 +42,7 @@ public class FileStreamSourceTaskTest {
private static final String TOPIC = "test";
private File tempFile;
- private Properties config;
+ private Map<String, String> config;
private OffsetStorageReader offsetStorageReader;
private SourceTaskContext context;
private FileStreamSourceTask task;
@@ -52,9 +52,9 @@ public class FileStreamSourceTaskTest {
@Before
public void setup() throws IOException {
tempFile = File.createTempFile("file-stream-source-task-test", null);
- config = new Properties();
- config.setProperty(FileStreamSourceConnector.FILE_CONFIG, tempFile.getAbsolutePath());
- config.setProperty(FileStreamSourceConnector.TOPIC_CONFIG, TOPIC);
+ config = new HashMap<>();
+ config.put(FileStreamSourceConnector.FILE_CONFIG, tempFile.getAbsolutePath());
+ config.put(FileStreamSourceConnector.TOPIC_CONFIG, TOPIC);
task = new FileStreamSourceTask();
offsetStorageReader = PowerMock.createMock(OffsetStorageReader.class);
context = PowerMock.createMock(SourceTaskContext.class);
@@ -135,7 +135,7 @@ public class FileStreamSourceTaskTest {
}
public void testInvalidFile() throws InterruptedException {
- config.setProperty(FileStreamSourceConnector.FILE_CONFIG, "bogusfilename");
+ config.put(FileStreamSourceConnector.FILE_CONFIG, "bogusfilename");
task.start(config);
// Currently the task retries indefinitely if the file isn't found, but shouldn't return any data.
for (int i = 0; i < 100; i++)
http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java
index ca3f76c..8dfefaa 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java
@@ -28,7 +28,8 @@ import org.apache.kafka.copycat.storage.KafkaOffsetBackingStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Properties;
+import java.util.Collections;
+import java.util.Map;
/**
* <p>
@@ -44,15 +45,14 @@ public class CopycatDistributed {
private static final Logger log = LoggerFactory.getLogger(CopycatDistributed.class);
public static void main(String[] args) throws Exception {
- Properties workerProps;
-
if (args.length < 1) {
log.info("Usage: CopycatDistributed worker.properties");
System.exit(1);
}
String workerPropsFile = args[0];
- workerProps = !workerPropsFile.isEmpty() ? Utils.loadProps(workerPropsFile) : new Properties();
+ Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
+ Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String, String>emptyMap();
DistributedConfig config = new DistributedConfig(workerProps);
Worker worker = new Worker(config, new KafkaOffsetBackingStore());
http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java
index cd4fc96..3869552 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java
@@ -34,7 +34,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
-import java.util.Properties;
+import java.util.Collections;
+import java.util.Map;
/**
* <p>
@@ -52,8 +53,6 @@ public class CopycatStandalone {
private static final Logger log = LoggerFactory.getLogger(CopycatStandalone.class);
public static void main(String[] args) throws Exception {
- Properties workerProps;
- Properties connectorProps;
if (args.length < 2) {
log.info("Usage: CopycatStandalone worker.properties connector1.properties [connector2.properties ...]");
@@ -61,7 +60,8 @@ public class CopycatStandalone {
}
String workerPropsFile = args[0];
- workerProps = !workerPropsFile.isEmpty() ? Utils.loadProps(workerPropsFile) : new Properties();
+ Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
+ Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String, String>emptyMap();
StandaloneConfig config = new StandaloneConfig(workerProps);
Worker worker = new Worker(config, new FileOffsetBackingStore());
@@ -72,7 +72,7 @@ public class CopycatStandalone {
try {
for (final String connectorPropsFile : Arrays.copyOfRange(args, 1, args.length)) {
- connectorProps = Utils.loadProps(connectorPropsFile);
+ Map<String, String> connectorProps = Utils.propsToStringMap(Utils.loadProps(connectorPropsFile));
FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>(new Callback<Herder.Created<ConnectorInfo>>() {
@Override
public void onCompletion(Throwable error, Herder.Created<ConnectorInfo> info) {
@@ -83,8 +83,8 @@ public class CopycatStandalone {
}
});
herder.putConnectorConfig(
- connectorProps.getProperty(ConnectorConfig.NAME_CONFIG),
- Utils.propsToStringMap(connectorProps), false, cb);
+ connectorProps.get(ConnectorConfig.NAME_CONFIG),
+ connectorProps, false, cb);
cb.get();
}
} catch (Throwable t) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
index de9f533..08eab86 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
@@ -38,7 +38,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
import java.util.Set;
/**
@@ -89,15 +88,12 @@ public class Worker {
public void start() {
log.info("Worker starting");
- Properties unusedConfigs = config.unusedProperties();
-
Map<String, Object> producerProps = new HashMap<>();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
- for (String propName : unusedConfigs.stringPropertyNames()) {
- producerProps.put(propName, unusedConfigs.getProperty(propName));
- }
+ producerProps.putAll(config.unusedConfigs());
+
producer = new KafkaProducer<>(producerProps);
offsetBackingStore.start();
@@ -177,10 +173,7 @@ public class Worker {
final Connector connector = instantiateConnector(connClass);
connector.initialize(ctx);
try {
- Map<String, Object> originals = connConfig.originals();
- Properties props = new Properties();
- props.putAll(originals);
- connector.start(props);
+ connector.start(connConfig.originalsStrings());
} catch (CopycatException e) {
throw new CopycatException("Connector threw an exception while starting", e);
}
@@ -209,8 +202,8 @@ public class Worker {
List<Map<String, String>> result = new ArrayList<>();
String taskClassName = connector.taskClass().getName();
- for (Properties taskProps : connector.taskConfigs(maxTasks)) {
- Map<String, String> taskConfig = Utils.propsToStringMap(taskProps);
+ for (Map<String, String> taskProps : connector.taskConfigs(maxTasks)) {
+ Map<String, String> taskConfig = new HashMap<>(taskProps); // Ensure we don't modify the connector's copy of the config
taskConfig.put(TaskConfig.TASK_CLASS_CONFIG, taskClassName);
if (sinkTopics != null)
taskConfig.put(SinkTask.TOPICS_CONFIG, Utils.join(sinkTopics, ","));
@@ -280,9 +273,7 @@ public class Worker {
// Start the task before adding modifying any state, any exceptions are caught higher up the
// call chain and there's no cleanup to do here
- Properties props = new Properties();
- props.putAll(taskConfig.originals());
- workerTask.start(props);
+ workerTask.start(taskConfig.originalsStrings());
if (task instanceof SourceTask) {
WorkerSourceTask workerSourceTask = (WorkerSourceTask) workerTask;
sourceTaskOffsetCommitter.schedule(id, workerSourceTask);
http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerConfig.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerConfig.java
index 0c6a6f6..b962d54 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerConfig.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerConfig.java
@@ -23,7 +23,7 @@ import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
-import java.util.Properties;
+import java.util.Map;
/**
* Common base class providing configuration for Copycat workers, whether standalone or distributed.
@@ -132,7 +132,7 @@ public class WorkerConfig extends AbstractConfig {
.define(REST_ADVERTISED_PORT_CONFIG, Type.INT, null, Importance.LOW, REST_ADVERTISED_PORT_DOC);
}
- public WorkerConfig(ConfigDef definition, Properties props) {
+ public WorkerConfig(ConfigDef definition, Map<String, String> props) {
super(definition, props);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
index dc51730..e9193b8 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
@@ -44,7 +44,6 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
import java.util.concurrent.TimeUnit;
/**
@@ -60,7 +59,7 @@ class WorkerSinkTask implements WorkerTask {
private final Converter keyConverter;
private final Converter valueConverter;
private WorkerSinkTaskThread workThread;
- private Properties taskProps;
+ private Map<String, String> taskProps;
private KafkaConsumer<byte[], byte[]> consumer;
private WorkerSinkTaskContext context;
private boolean started;
@@ -78,7 +77,7 @@ class WorkerSinkTask implements WorkerTask {
}
@Override
- public void start(Properties props) {
+ public void start(Map<String, String> props) {
taskProps = props;
consumer = createConsumer();
context = new WorkerSinkTaskContext(consumer);
@@ -126,7 +125,7 @@ class WorkerSinkTask implements WorkerTask {
* @returns true if successful, false if joining the consumer group was interrupted
*/
public boolean joinConsumerGroupAndStart() {
- String topicsStr = taskProps.getProperty(SinkTask.TOPICS_CONFIG);
+ String topicsStr = taskProps.get(SinkTask.TOPICS_CONFIG);
if (topicsStr == null || topicsStr.isEmpty())
throw new CopycatException("Sink tasks require a list of topics.");
String[] topics = topicsStr.split(",");
@@ -222,14 +221,14 @@ class WorkerSinkTask implements WorkerTask {
private KafkaConsumer<byte[], byte[]> createConsumer() {
// Include any unknown worker configs so consumer configs can be set globally on the worker
// and through to the task
- Properties props = workerConfig.unusedProperties();
- props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "copycat-" + id.connector());
- props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ Map<String, Object> props = workerConfig.unusedConfigs();
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, "copycat-" + id.connector());
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
Utils.join(workerConfig.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
- props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
- props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
- props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
KafkaConsumer<byte[], byte[]> newConsumer;
try {
http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
index 1f96c78..cdb41b0 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
@@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory;
import java.util.IdentityHashMap;
import java.util.List;
-import java.util.Properties;
+import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -85,7 +85,7 @@ class WorkerSourceTask implements WorkerTask {
}
@Override
- public void start(Properties props) {
+ public void start(Map<String, String> props) {
workThread = new WorkerSourceTaskThread("WorkerSourceTask-" + id, props);
workThread.start();
}
@@ -273,11 +273,11 @@ class WorkerSourceTask implements WorkerTask {
private class WorkerSourceTaskThread extends ShutdownableThread {
- private Properties workerProps;
+ private Map<String, String> workerProps;
private boolean finishedStart;
private boolean startedShutdownBeforeStartCompleted;
- public WorkerSourceTaskThread(String name, Properties workerProps) {
+ public WorkerSourceTaskThread(String name, Map<String, String> workerProps) {
super(name);
this.workerProps = workerProps;
this.finishedStart = false;
http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerTask.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerTask.java
index af225bb..0759efe 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerTask.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerTask.java
@@ -17,7 +17,7 @@
package org.apache.kafka.copycat.runtime;
-import java.util.Properties;
+import java.util.Map;
/**
* Handles processing for an individual task. This interface only provides the basic methods
@@ -29,7 +29,7 @@ interface WorkerTask {
* Start the Task
* @param props initial configuration
*/
- void start(Properties props);
+ void start(Map<String, String> props);
/**
* Stop this task from processing messages. This method does not block, it only triggers
http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedConfig.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedConfig.java
index 90d63cf..a2848b1 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedConfig.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedConfig.java
@@ -23,7 +23,7 @@ import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.copycat.runtime.WorkerConfig;
-import java.util.Properties;
+import java.util.Map;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
@@ -180,7 +180,7 @@ public class DistributedConfig extends WorkerConfig {
WORKER_UNSYNC_BACKOFF_MS_DOC);
}
- public DistributedConfig(Properties props) {
+ public DistributedConfig(Map<String, String> props) {
super(CONFIG, props);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConfig.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConfig.java
index 246d36d..6e547d3 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConfig.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConfig.java
@@ -20,7 +20,7 @@ package org.apache.kafka.copycat.runtime.standalone;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.copycat.runtime.WorkerConfig;
-import java.util.Properties;
+import java.util.Map;
public class StandaloneConfig extends WorkerConfig {
private static final ConfigDef CONFIG;
@@ -29,7 +29,7 @@ public class StandaloneConfig extends WorkerConfig {
CONFIG = baseConfigDef();
}
- public StandaloneConfig(Properties props) {
+ public StandaloneConfig(Map<String, String> props) {
super(CONFIG, props);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
index 177f7a6..7905736 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
@@ -56,7 +56,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
-import java.util.Properties;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
@@ -86,7 +85,7 @@ public class WorkerSinkTaskTest extends ThreadedTest {
private static final TopicPartition TOPIC_PARTITION3 = new TopicPartition(TOPIC, PARTITION3);
private static final TopicPartition UNASSIGNED_TOPIC_PARTITION = new TopicPartition(TOPIC, 200);
- private static final Properties TASK_PROPS = new Properties();
+ private static final Map<String, String> TASK_PROPS = new HashMap<>();
static {
TASK_PROPS.put(SinkConnector.TOPICS_CONFIG, TOPIC);
}
@@ -111,13 +110,13 @@ public class WorkerSinkTaskTest extends ThreadedTest {
public void setup() {
super.setup();
time = new MockTime();
- Properties workerProps = new Properties();
- workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
- workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
- workerProps.setProperty("internal.key.converter", "org.apache.kafka.copycat.json.JsonConverter");
- workerProps.setProperty("internal.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
- workerProps.setProperty("internal.key.converter.schemas.enable", "false");
- workerProps.setProperty("internal.value.converter.schemas.enable", "false");
+ Map<String, String> workerProps = new HashMap<>();
+ workerProps.put("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
+ workerProps.put("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
+ workerProps.put("internal.key.converter", "org.apache.kafka.copycat.json.JsonConverter");
+ workerProps.put("internal.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
+ workerProps.put("internal.key.converter.schemas.enable", "false");
+ workerProps.put("internal.value.converter.schemas.enable", "false");
workerConfig = new StandaloneConfig(workerProps);
workerTask = PowerMock.createPartialMock(
WorkerSinkTask.class, new String[]{"createConsumer", "createWorkerThread"},
http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
index 452c5cb..0fa14bd 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
@@ -82,7 +82,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
private Capture<org.apache.kafka.clients.producer.Callback> producerCallbacks;
- private static final Properties EMPTY_TASK_PROPS = new Properties();
+ private static final Map<String, String> EMPTY_TASK_PROPS = Collections.emptyMap();
private static final List<SourceRecord> RECORDS = Arrays.asList(
new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD)
);
@@ -90,13 +90,13 @@ public class WorkerSourceTaskTest extends ThreadedTest {
@Override
public void setup() {
super.setup();
- Properties workerProps = new Properties();
- workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
- workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
- workerProps.setProperty("internal.key.converter", "org.apache.kafka.copycat.json.JsonConverter");
- workerProps.setProperty("internal.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
- workerProps.setProperty("internal.key.converter.schemas.enable", "false");
- workerProps.setProperty("internal.value.converter.schemas.enable", "false");
+ Map<String, String> workerProps = new HashMap<>();
+ workerProps.put("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
+ workerProps.put("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
+ workerProps.put("internal.key.converter", "org.apache.kafka.copycat.json.JsonConverter");
+ workerProps.put("internal.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
+ workerProps.put("internal.key.converter.schemas.enable", "false");
+ workerProps.put("internal.value.converter.schemas.enable", "false");
config = new StandaloneConfig(workerProps);
producerCallbacks = EasyMock.newCapture();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
index 05015a4..f99c711 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
@@ -19,7 +19,6 @@ package org.apache.kafka.copycat.runtime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.copycat.connector.Connector;
import org.apache.kafka.copycat.connector.ConnectorContext;
import org.apache.kafka.copycat.connector.Task;
@@ -46,10 +45,10 @@ import org.powermock.modules.junit4.PowerMockRunner;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@@ -70,13 +69,13 @@ public class WorkerTest extends ThreadedTest {
public void setup() {
super.setup();
- Properties workerProps = new Properties();
- workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
- workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
- workerProps.setProperty("internal.key.converter", "org.apache.kafka.copycat.json.JsonConverter");
- workerProps.setProperty("internal.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
- workerProps.setProperty("internal.key.converter.schemas.enable", "false");
- workerProps.setProperty("internal.value.converter.schemas.enable", "false");
+ Map<String, String> workerProps = new HashMap<>();
+ workerProps.put("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
+ workerProps.put("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
+ workerProps.put("internal.key.converter", "org.apache.kafka.copycat.json.JsonConverter");
+ workerProps.put("internal.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
+ workerProps.put("internal.key.converter.schemas.enable", "false");
+ workerProps.put("internal.value.converter.schemas.enable", "false");
config = new StandaloneConfig(workerProps);
}
@@ -94,7 +93,7 @@ public class WorkerTest extends ThreadedTest {
PowerMock.mockStatic(Worker.class);
PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{TestConnector.class}).andReturn(connector);
- Properties props = new Properties();
+ Map<String, String> props = new HashMap<>();
props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
@@ -117,7 +116,7 @@ public class WorkerTest extends ThreadedTest {
worker = new Worker(new MockTime(), config, offsetBackingStore);
worker.start();
- ConnectorConfig config = new ConnectorConfig(Utils.propsToStringMap(props));
+ ConnectorConfig config = new ConnectorConfig(props);
assertEquals(Collections.emptySet(), worker.connectorNames());
worker.addConnector(config, ctx);
assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames());
@@ -164,7 +163,7 @@ public class WorkerTest extends ThreadedTest {
PowerMock.mockStatic(Worker.class);
PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{TestConnector.class}).andReturn(connector);
- Properties props = new Properties();
+ Map<String, String> props = new HashMap<>();
props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
@@ -177,8 +176,8 @@ public class WorkerTest extends ThreadedTest {
// Reconfigure
EasyMock.<Class<? extends Task>>expect(connector.taskClass()).andReturn(TestSourceTask.class);
- Properties taskProps = new Properties();
- taskProps.setProperty("foo", "bar");
+ Map<String, String> taskProps = new HashMap<>();
+ taskProps.put("foo", "bar");
EasyMock.expect(connector.taskConfigs(2)).andReturn(Arrays.asList(taskProps, taskProps));
// Remove
@@ -193,7 +192,7 @@ public class WorkerTest extends ThreadedTest {
worker = new Worker(new MockTime(), config, offsetBackingStore);
worker.start();
- ConnectorConfig config = new ConnectorConfig(Utils.propsToStringMap(props));
+ ConnectorConfig config = new ConnectorConfig(props);
assertEquals(Collections.emptySet(), worker.connectorNames());
worker.addConnector(config, ctx);
assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames());
@@ -204,10 +203,10 @@ public class WorkerTest extends ThreadedTest {
// expected
}
List<Map<String, String>> taskConfigs = worker.connectorTaskConfigs(CONNECTOR_ID, 2, Arrays.asList("foo", "bar"));
- Properties expectedTaskProps = new Properties();
- expectedTaskProps.setProperty("foo", "bar");
- expectedTaskProps.setProperty(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
- expectedTaskProps.setProperty(SinkTask.TOPICS_CONFIG, "foo,bar");
+ Map<String, String> expectedTaskProps = new HashMap<>();
+ expectedTaskProps.put("foo", "bar");
+ expectedTaskProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
+ expectedTaskProps.put(SinkTask.TOPICS_CONFIG, "foo,bar");
assertEquals(2, taskConfigs.size());
assertEquals(expectedTaskProps, taskConfigs.get(0));
assertEquals(expectedTaskProps, taskConfigs.get(1));
@@ -246,7 +245,7 @@ public class WorkerTest extends ThreadedTest {
EasyMock.anyObject(WorkerConfig.class),
EasyMock.anyObject(Time.class))
.andReturn(workerTask);
- Properties origProps = new Properties();
+ Map<String, String> origProps = new HashMap<>();
origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
workerTask.start(origProps);
EasyMock.expectLastCall();
@@ -266,7 +265,7 @@ public class WorkerTest extends ThreadedTest {
worker = new Worker(new MockTime(), config, offsetBackingStore);
worker.start();
assertEquals(Collections.emptySet(), worker.taskIds());
- worker.addTask(taskId, new TaskConfig(Utils.propsToStringMap(origProps)));
+ worker.addTask(taskId, new TaskConfig(origProps));
assertEquals(new HashSet<>(Arrays.asList(taskId)), worker.taskIds());
worker.stopTask(taskId);
assertEquals(Collections.emptySet(), worker.taskIds());
@@ -315,7 +314,7 @@ public class WorkerTest extends ThreadedTest {
EasyMock.anyObject(WorkerConfig.class),
EasyMock.anyObject(Time.class))
.andReturn(workerTask);
- Properties origProps = new Properties();
+ Map<String, String> origProps = new HashMap<>();
origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
workerTask.start(origProps);
EasyMock.expectLastCall();
@@ -335,7 +334,7 @@ public class WorkerTest extends ThreadedTest {
worker = new Worker(new MockTime(), config, offsetBackingStore);
worker.start();
- worker.addTask(TASK_ID, new TaskConfig(Utils.propsToStringMap(origProps)));
+ worker.addTask(TASK_ID, new TaskConfig(origProps));
worker.stop();
PowerMock.verifyAll();
@@ -344,7 +343,7 @@ public class WorkerTest extends ThreadedTest {
private static class TestConnector extends Connector {
@Override
- public void start(Properties props) {
+ public void start(Map<String, String> props) {
}
@@ -354,7 +353,7 @@ public class WorkerTest extends ThreadedTest {
}
@Override
- public List<Properties> taskConfigs(int maxTasks) {
+ public List<Map<String, String>> taskConfigs(int maxTasks) {
return null;
}
@@ -369,7 +368,7 @@ public class WorkerTest extends ThreadedTest {
}
@Override
- public void start(Properties props) {
+ public void start(Map<String, String> props) {
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
index 7873447..512cb5c 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
@@ -55,7 +55,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
import java.util.concurrent.TimeoutException;
import static org.junit.Assert.assertEquals;
@@ -65,7 +64,7 @@ import static org.junit.Assert.assertTrue;
@PrepareForTest(DistributedHerder.class)
@PowerMockIgnore("javax.management.*")
public class DistributedHerderTest {
- private static final Properties HERDER_CONFIG = new Properties();
+ private static final Map<String, String> HERDER_CONFIG = new HashMap<>();
static {
HERDER_CONFIG.put(KafkaConfigStorage.CONFIG_TOPIC_CONFIG, "config-topic");
HERDER_CONFIG.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");