You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/11/07 02:51:01 UTC
kafka git commit: KAFKA-2765: Add versions to Copycat Connector and
Task interfaces and log versions when instantiating connectors and tasks.
Repository: kafka
Updated Branches:
refs/heads/trunk 130a561ad -> d297b3af2
KAFKA-2765: Add versions to Copycat Connector and Task interfaces and log versions when instantiating connectors and tasks.
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Reviewers: Gwen Shapira
Closes #446 from ewencp/connector-versions
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d297b3af
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d297b3af
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d297b3af
Branch: refs/heads/trunk
Commit: d297b3af265a5be5a83ee11a990060231ea4cc25
Parents: 130a561
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Fri Nov 6 17:50:46 2015 -0800
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Fri Nov 6 17:50:46 2015 -0800
----------------------------------------------------------------------
.../apache/kafka/copycat/connector/Connector.java | 7 +++++++
.../org/apache/kafka/copycat/connector/Task.java | 7 +++++++
.../connector/ConnectorReconfigurationTest.java | 5 +++++
.../kafka/copycat/file/FileStreamSinkConnector.java | 6 ++++++
.../kafka/copycat/file/FileStreamSinkTask.java | 5 +++++
.../copycat/file/FileStreamSourceConnector.java | 6 ++++++
.../kafka/copycat/file/FileStreamSourceTask.java | 5 +++++
.../org/apache/kafka/copycat/runtime/Worker.java | 5 ++++-
.../apache/kafka/copycat/runtime/WorkerTest.java | 16 +++++++++++++++-
9 files changed, 60 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/d297b3af/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java
index 6972d3d..4d0e1bd 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java
@@ -45,6 +45,13 @@ public abstract class Connector {
protected ConnectorContext context;
/**
+ * Get the version of this connector.
+ *
+ * @return the version, formatted as a String
+ */
+ public abstract String version();
+
+ /**
* Initialize this connector, using the provided ConnectorContext to notify the runtime of
* input configuration changes.
* @param ctx context object used to interact with the Copycat runtime
http://git-wip-us.apache.org/repos/asf/kafka/blob/d297b3af/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Task.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Task.java b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Task.java
index 2a8c98c..cb8b719 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Task.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Task.java
@@ -37,6 +37,13 @@ import java.util.Map;
@InterfaceStability.Unstable
public interface Task {
/**
+ * Get the version of this task. Usually this should be the same as the corresponding {@link Connector} class's version.
+ *
+ * @return the version, formatted as a String
+ */
+ String version();
+
+ /**
* Start the Task
* @param props initial configuration
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/d297b3af/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java b/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java
index 79ddfd7..7b1e9eb 100644
--- a/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java
+++ b/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java
@@ -53,6 +53,11 @@ public class ConnectorReconfigurationTest {
}
@Override
+ public String version() {
+ return "1.0";
+ }
+
+ @Override
public void start(Map<String, String> props) {
configureOrder = order++;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d297b3af/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkConnector.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkConnector.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkConnector.java
index 763f638..d0d59a8 100644
--- a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkConnector.java
+++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkConnector.java
@@ -17,6 +17,7 @@
package org.apache.kafka.copycat.file;
+import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.copycat.connector.Task;
import org.apache.kafka.copycat.sink.SinkConnector;
@@ -35,6 +36,11 @@ public class FileStreamSinkConnector extends SinkConnector {
private String filename;
@Override
+ public String version() {
+ return AppInfoParser.getVersion();
+ }
+
+ @Override
public void start(Map<String, String> props) {
filename = props.get(FILE_CONFIG);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d297b3af/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java
index 5286d2b..f95ef8e 100644
--- a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java
+++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java
@@ -50,6 +50,11 @@ public class FileStreamSinkTask extends SinkTask {
}
@Override
+ public String version() {
+ return new FileStreamSinkConnector().version();
+ }
+
+ @Override
public void start(Map<String, String> props) {
filename = props.get(FileStreamSinkConnector.FILE_CONFIG);
if (filename == null) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/d297b3af/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java
index 9784bb1..9021775 100644
--- a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java
+++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java
@@ -17,6 +17,7 @@
package org.apache.kafka.copycat.file;
+import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.copycat.connector.Task;
import org.apache.kafka.copycat.errors.CopycatException;
import org.apache.kafka.copycat.source.SourceConnector;
@@ -38,6 +39,11 @@ public class FileStreamSourceConnector extends SourceConnector {
private String topic;
@Override
+ public String version() {
+ return AppInfoParser.getVersion();
+ }
+
+ @Override
public void start(Map<String, String> props) {
filename = props.get(FILE_CONFIG);
topic = props.get(TOPIC_CONFIG);
http://git-wip-us.apache.org/repos/asf/kafka/blob/d297b3af/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
index 70eef5c..2a2cfbc 100644
--- a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
+++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
@@ -46,6 +46,11 @@ public class FileStreamSourceTask extends SourceTask {
private Long streamOffset;
@Override
+ public String version() {
+ return new FileStreamSourceConnector().version();
+ }
+
+ @Override
public void start(Map<String, String> props) {
filename = props.get(FileStreamSourceConnector.FILE_CONFIG);
if (filename == null || filename.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/d297b3af/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
index 08eab86..91fa175 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
@@ -171,6 +171,7 @@ public class Worker {
throw new CopycatException("Connector with name " + connName + " already exists");
final Connector connector = instantiateConnector(connClass);
+ log.info("Instantiated connector {} with version {} of type {}", connName, connector.version(), connClass.getName());
connector.initialize(ctx);
try {
connector.start(connConfig.originalsStrings());
@@ -252,7 +253,9 @@ public class Worker {
throw new CopycatException(msg);
}
- final Task task = instantiateTask(taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class));
+ Class<? extends Task> taskClass = taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class);
+ final Task task = instantiateTask(taskClass);
+ log.info("Instantiated task {} with version {} of type {}", id, task.version(), taskClass.getName());
// Decide which type of worker task we need based on the type of task.
final WorkerTask workerTask;
http://git-wip-us.apache.org/repos/asf/kafka/blob/d297b3af/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
index f99c711..00cef2b 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
@@ -92,6 +92,7 @@ public class WorkerTest extends ThreadedTest {
PowerMock.mockStatic(Worker.class);
PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{TestConnector.class}).andReturn(connector);
+ EasyMock.expect(connector.version()).andReturn("1.0");
Map<String, String> props = new HashMap<>();
props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
@@ -162,6 +163,7 @@ public class WorkerTest extends ThreadedTest {
PowerMock.mockStatic(Worker.class);
PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{TestConnector.class}).andReturn(connector);
+ EasyMock.expect(connector.version()).andReturn("1.0");
Map<String, String> props = new HashMap<>();
props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
@@ -234,6 +236,7 @@ public class WorkerTest extends ThreadedTest {
PowerMock.mockStatic(Worker.class);
PowerMock.expectPrivate(Worker.class, "instantiateTask", new Object[]{TestSourceTask.class}).andReturn(task);
+ EasyMock.expect(task.version()).andReturn("1.0");
PowerMock.expectNew(
WorkerSourceTask.class, EasyMock.eq(taskId), EasyMock.eq(task),
@@ -303,7 +306,8 @@ public class WorkerTest extends ThreadedTest {
PowerMock.mockStatic(Worker.class);
PowerMock.expectPrivate(Worker.class, "instantiateTask", new Object[]{TestSourceTask.class}).andReturn(task);
-
+ EasyMock.expect(task.version()).andReturn("1.0");
+
PowerMock.expectNew(
WorkerSourceTask.class, EasyMock.eq(TASK_ID), EasyMock.eq(task),
EasyMock.anyObject(Converter.class),
@@ -343,6 +347,11 @@ public class WorkerTest extends ThreadedTest {
private static class TestConnector extends Connector {
@Override
+ public String version() {
+ return "1.0";
+ }
+
+ @Override
public void start(Map<String, String> props) {
}
@@ -368,6 +377,11 @@ public class WorkerTest extends ThreadedTest {
}
@Override
+ public String version() {
+ return "1.0";
+ }
+
+ @Override
public void start(Map<String, String> props) {
}