You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/11/07 02:51:01 UTC

kafka git commit: KAFKA-2765: Add versions to Copycat Connector and Task interfaces and log versions when instantiating connectors and tasks.

Repository: kafka
Updated Branches:
  refs/heads/trunk 130a561ad -> d297b3af2


KAFKA-2765: Add versions to Copycat Connector and Task interfaces and log versions when instantiating connectors and tasks.

Author: Ewen Cheslack-Postava <me...@ewencp.org>

Reviewers: Gwen Shapira

Closes #446 from ewencp/connector-versions


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d297b3af
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d297b3af
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d297b3af

Branch: refs/heads/trunk
Commit: d297b3af265a5be5a83ee11a990060231ea4cc25
Parents: 130a561
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Fri Nov 6 17:50:46 2015 -0800
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Fri Nov 6 17:50:46 2015 -0800

----------------------------------------------------------------------
 .../apache/kafka/copycat/connector/Connector.java   |  7 +++++++
 .../org/apache/kafka/copycat/connector/Task.java    |  7 +++++++
 .../connector/ConnectorReconfigurationTest.java     |  5 +++++
 .../kafka/copycat/file/FileStreamSinkConnector.java |  6 ++++++
 .../kafka/copycat/file/FileStreamSinkTask.java      |  5 +++++
 .../copycat/file/FileStreamSourceConnector.java     |  6 ++++++
 .../kafka/copycat/file/FileStreamSourceTask.java    |  5 +++++
 .../org/apache/kafka/copycat/runtime/Worker.java    |  5 ++++-
 .../apache/kafka/copycat/runtime/WorkerTest.java    | 16 +++++++++++++++-
 9 files changed, 60 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d297b3af/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java
index 6972d3d..4d0e1bd 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java
@@ -45,6 +45,13 @@ public abstract class Connector {
     protected ConnectorContext context;
 
     /**
+     * Get the version of this connector.
+     *
+     * @return the version, formatted as a String
+     */
+    public abstract String version();
+
+    /**
      * Initialize this connector, using the provided ConnectorContext to notify the runtime of
      * input configuration changes.
      * @param ctx context object used to interact with the Copycat runtime

http://git-wip-us.apache.org/repos/asf/kafka/blob/d297b3af/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Task.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Task.java b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Task.java
index 2a8c98c..cb8b719 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Task.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Task.java
@@ -37,6 +37,13 @@ import java.util.Map;
 @InterfaceStability.Unstable
 public interface Task {
     /**
+     * Get the version of this task. Usually this should be the same as the corresponding {@link Connector} class's version.
+     *
+     * @return the version, formatted as a String
+     */
+    String version();
+
+    /**
      * Start the Task
      * @param props initial configuration
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/d297b3af/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java b/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java
index 79ddfd7..7b1e9eb 100644
--- a/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java
+++ b/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java
@@ -53,6 +53,11 @@ public class ConnectorReconfigurationTest {
         }
 
         @Override
+        public String version() {
+            return "1.0";
+        }
+
+        @Override
         public void start(Map<String, String> props) {
             configureOrder = order++;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d297b3af/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkConnector.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkConnector.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkConnector.java
index 763f638..d0d59a8 100644
--- a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkConnector.java
+++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkConnector.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.copycat.file;
 
+import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.copycat.connector.Task;
 import org.apache.kafka.copycat.sink.SinkConnector;
 
@@ -35,6 +36,11 @@ public class FileStreamSinkConnector extends SinkConnector {
     private String filename;
 
     @Override
+    public String version() {
+        return AppInfoParser.getVersion();
+    }
+
+    @Override
     public void start(Map<String, String> props) {
         filename = props.get(FILE_CONFIG);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d297b3af/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java
index 5286d2b..f95ef8e 100644
--- a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java
+++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java
@@ -50,6 +50,11 @@ public class FileStreamSinkTask extends SinkTask {
     }
 
     @Override
+    public String version() {
+        return new FileStreamSinkConnector().version();
+    }
+
+    @Override
     public void start(Map<String, String> props) {
         filename = props.get(FileStreamSinkConnector.FILE_CONFIG);
         if (filename == null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d297b3af/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java
index 9784bb1..9021775 100644
--- a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java
+++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.copycat.file;
 
+import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.copycat.connector.Task;
 import org.apache.kafka.copycat.errors.CopycatException;
 import org.apache.kafka.copycat.source.SourceConnector;
@@ -38,6 +39,11 @@ public class FileStreamSourceConnector extends SourceConnector {
     private String topic;
 
     @Override
+    public String version() {
+        return AppInfoParser.getVersion();
+    }
+
+    @Override
     public void start(Map<String, String> props) {
         filename = props.get(FILE_CONFIG);
         topic = props.get(TOPIC_CONFIG);

http://git-wip-us.apache.org/repos/asf/kafka/blob/d297b3af/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
index 70eef5c..2a2cfbc 100644
--- a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
+++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
@@ -46,6 +46,11 @@ public class FileStreamSourceTask extends SourceTask {
     private Long streamOffset;
 
     @Override
+    public String version() {
+        return new FileStreamSourceConnector().version();
+    }
+
+    @Override
     public void start(Map<String, String> props) {
         filename = props.get(FileStreamSourceConnector.FILE_CONFIG);
         if (filename == null || filename.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d297b3af/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
index 08eab86..91fa175 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
@@ -171,6 +171,7 @@ public class Worker {
             throw new CopycatException("Connector with name " + connName + " already exists");
 
         final Connector connector = instantiateConnector(connClass);
+        log.info("Instantiated connector {} with version {} of type {}", connName, connector.version(), connClass.getName());
         connector.initialize(ctx);
         try {
             connector.start(connConfig.originalsStrings());
@@ -252,7 +253,9 @@ public class Worker {
             throw new CopycatException(msg);
         }
 
-        final Task task = instantiateTask(taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class));
+        Class<? extends Task> taskClass = taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class);
+        final Task task = instantiateTask(taskClass);
+        log.info("Instantiated task {} with version {} of type {}", id, task.version(), taskClass.getName());
 
         // Decide which type of worker task we need based on the type of task.
         final WorkerTask workerTask;

http://git-wip-us.apache.org/repos/asf/kafka/blob/d297b3af/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
index f99c711..00cef2b 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
@@ -92,6 +92,7 @@ public class WorkerTest extends ThreadedTest {
 
         PowerMock.mockStatic(Worker.class);
         PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{TestConnector.class}).andReturn(connector);
+        EasyMock.expect(connector.version()).andReturn("1.0");
 
         Map<String, String> props = new HashMap<>();
         props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
@@ -162,6 +163,7 @@ public class WorkerTest extends ThreadedTest {
 
         PowerMock.mockStatic(Worker.class);
         PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{TestConnector.class}).andReturn(connector);
+        EasyMock.expect(connector.version()).andReturn("1.0");
 
         Map<String, String> props = new HashMap<>();
         props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
@@ -234,6 +236,7 @@ public class WorkerTest extends ThreadedTest {
 
         PowerMock.mockStatic(Worker.class);
         PowerMock.expectPrivate(Worker.class, "instantiateTask", new Object[]{TestSourceTask.class}).andReturn(task);
+        EasyMock.expect(task.version()).andReturn("1.0");
 
         PowerMock.expectNew(
                 WorkerSourceTask.class, EasyMock.eq(taskId), EasyMock.eq(task),
@@ -303,7 +306,8 @@ public class WorkerTest extends ThreadedTest {
 
         PowerMock.mockStatic(Worker.class);
         PowerMock.expectPrivate(Worker.class, "instantiateTask", new Object[]{TestSourceTask.class}).andReturn(task);
-
+        EasyMock.expect(task.version()).andReturn("1.0");
+        
         PowerMock.expectNew(
                 WorkerSourceTask.class, EasyMock.eq(TASK_ID), EasyMock.eq(task),
                 EasyMock.anyObject(Converter.class),
@@ -343,6 +347,11 @@ public class WorkerTest extends ThreadedTest {
 
     private static class TestConnector extends Connector {
         @Override
+        public String version() {
+            return "1.0";
+        }
+
+        @Override
         public void start(Map<String, String> props) {
 
         }
@@ -368,6 +377,11 @@ public class WorkerTest extends ThreadedTest {
         }
 
         @Override
+        public String version() {
+            return "1.0";
+        }
+
+        @Override
         public void start(Map<String, String> props) {
         }