You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2020/05/25 01:10:25 UTC

[kafka] branch trunk updated: KAFKA-4794: Add access to OffsetStorageReader from SourceConnector (#2604)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3818770  KAFKA-4794: Add access to OffsetStorageReader from SourceConnector (#2604)
3818770 is described below

commit 3818770d7fcd6ccdd373cdcd3aa6952ea08c901c
Author: Florian Hussonnois <fl...@gmail.com>
AuthorDate: Mon May 25 03:09:54 2020 +0200

    KAFKA-4794: Add access to OffsetStorageReader from SourceConnector (#2604)
    
    Added access to OffsetStorageReader from SourceConnector per KIP-131.
    
    Added two interfaces SinkConnectorContext/SourceConnectContext that extend ConnectorContext in order to expose an OffsetStorageReader instance.
    
    Added unit tests for Connector, SinkConnector and SourceConnector default methods
    
    Author: Florian Hussonnois <fl...@gmail.com>, Randall Hauch <rh...@gmail.com>
    Reviewers: Randall Hauch <rh...@gmail.com>, Konstantine Karantasis <ko...@confluent.io>
---
 .../apache/kafka/connect/connector/Connector.java  |   9 ++
 .../apache/kafka/connect/sink/SinkConnector.java   |   5 +
 .../SinkConnectorContext.java}                     |  10 +-
 .../kafka/connect/source/SourceConnector.java      |   4 +
 ...eConnector.java => SourceConnectorContext.java} |  13 +-
 .../kafka/connect/connector/ConnectorTest.java     |  88 ++++++++++++
 .../kafka/connect/sink/SinkConnectorTest.java      | 146 ++++++++++++++++++++
 .../kafka/connect/source/SourceConnectorTest.java  | 152 +++++++++++++++++++++
 .../org/apache/kafka/connect/runtime/Worker.java   |  17 ++-
 .../kafka/connect/runtime/WorkerConnector.java     |  61 +++++++--
 .../kafka/connect/runtime/WorkerConnectorTest.java | 111 ++++++++++-----
 .../apache/kafka/connect/runtime/WorkerTest.java   |  68 ++++-----
 12 files changed, 594 insertions(+), 90 deletions(-)

diff --git a/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java b/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java
index 79caeeb..6d54aab 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java
@@ -80,6 +80,15 @@ public abstract class Connector implements Versioned {
     }
 
     /**
+     * Returns the context object used to interact with the Kafka Connect runtime.
+     *
+     * @return the context for this Connector.
+     */
+    protected ConnectorContext context() {
+        return context;
+    }
+
+    /**
      * Start this Connector. This method will only be called on a clean Connector, i.e. it has
      * either just been instantiated and initialized or {@link #stop()} has been invoked.
      *
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkConnector.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkConnector.java
index 5c58861..9627571 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkConnector.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkConnector.java
@@ -34,4 +34,9 @@ public abstract class SinkConnector extends Connector {
      */
     public static final String TOPICS_CONFIG = "topics";
 
+    @Override
+    protected SinkConnectorContext context() {
+        return (SinkConnectorContext) context;
+    }
+
 }
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkConnectorContext.java
similarity index 75%
copy from connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
copy to connect/api/src/main/java/org/apache/kafka/connect/sink/SinkConnectorContext.java
index 0ca3b33..5e2b07a 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkConnectorContext.java
@@ -14,14 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.connect.source;
+package org.apache.kafka.connect.sink;
 
-import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.connector.ConnectorContext;
 
 /**
- * SourceConnectors implement the connector interface to pull data from another system and send
- * it to Kafka.
+ * A context to allow a {@link SinkConnector} to interact with the Kafka Connect runtime.
  */
-public abstract class SourceConnector extends Connector {
-
+public interface SinkConnectorContext extends ConnectorContext {
 }
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
index 0ca3b33..6e96940 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
@@ -24,4 +24,8 @@ import org.apache.kafka.connect.connector.Connector;
  */
 public abstract class SourceConnector extends Connector {
 
+    @Override
+    protected SourceConnectorContext context() {
+        return (SourceConnectorContext) context;
+    }
 }
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnectorContext.java
similarity index 63%
copy from connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
copy to connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnectorContext.java
index 0ca3b33..417fbdd 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnectorContext.java
@@ -16,12 +16,17 @@
  */
 package org.apache.kafka.connect.source;
 
-import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.connector.ConnectorContext;
+import org.apache.kafka.connect.storage.OffsetStorageReader;
 
 /**
- * SourceConnectors implement the connector interface to pull data from another system and send
- * it to Kafka.
+ * A context to allow a {@link SourceConnector} to interact with the Kafka Connect runtime.
  */
-public abstract class SourceConnector extends Connector {
+public interface SourceConnectorContext extends ConnectorContext {
 
+    /**
+     * Returns the {@link OffsetStorageReader} for this SourceConnectorContext.
+     * @return the OffsetStorageReader for this connector.
+     */
+    OffsetStorageReader offsetStorageReader();
 }
diff --git a/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorTest.java b/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorTest.java
new file mode 100644
index 0000000..7addf8f
--- /dev/null
+++ b/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.connector;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public abstract class ConnectorTest {
+
+    protected ConnectorContext context;
+    protected Connector connector;
+    protected AssertableConnector assertableConnector;
+
+    @Before
+    public void beforeEach() {
+        connector = createConnector();
+        context = createContext();
+        assertableConnector = (AssertableConnector) connector;
+    }
+
+    @Test
+    public void shouldInitializeContext() {
+        connector.initialize(context);
+        assertableConnector.assertInitialized();
+        assertableConnector.assertContext(context);
+        assertableConnector.assertTaskConfigs(null);
+    }
+
+    @Test
+    public void shouldInitializeContextWithTaskConfigs() {
+        List<Map<String, String>> taskConfigs = new ArrayList<>();
+        connector.initialize(context, taskConfigs);
+        assertableConnector.assertInitialized();
+        assertableConnector.assertContext(context);
+        assertableConnector.assertTaskConfigs(taskConfigs);
+    }
+
+    @Test
+    public void shouldStopAndStartWhenReconfigure() {
+        Map<String, String> props = new HashMap<>();
+        connector.initialize(context);
+        assertableConnector.assertContext(context);
+        assertableConnector.assertStarted(false);
+        assertableConnector.assertStopped(false);
+        connector.reconfigure(props);
+        assertableConnector.assertStarted(true);
+        assertableConnector.assertStopped(true);
+        assertableConnector.assertProperties(props);
+    }
+
+    protected abstract ConnectorContext createContext();
+
+    protected abstract Connector createConnector();
+
+    public interface AssertableConnector {
+
+        void assertContext(ConnectorContext expected);
+
+        void assertInitialized();
+
+        void assertTaskConfigs(List<Map<String, String>> expectedTaskConfigs);
+
+        void assertStarted(boolean expected);
+
+        void assertStopped(boolean expected);
+
+        void assertProperties(Map<String, String> expected);
+    }
+}
\ No newline at end of file
diff --git a/connect/api/src/test/java/org/apache/kafka/connect/sink/SinkConnectorTest.java b/connect/api/src/test/java/org/apache/kafka/connect/sink/SinkConnectorTest.java
new file mode 100644
index 0000000..a4924ae
--- /dev/null
+++ b/connect/api/src/test/java/org/apache/kafka/connect/sink/SinkConnectorTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.sink;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectorContext;
+import org.apache.kafka.connect.connector.ConnectorTest;
+import org.apache.kafka.connect.connector.Task;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+public class SinkConnectorTest extends ConnectorTest {
+
+    @Override
+    protected TestSinkConnectorContext createContext() {
+        return new TestSinkConnectorContext();
+    }
+
+    @Override
+    protected TestSinkConnector createConnector() {
+        return new TestSinkConnector();
+    }
+
+    private static class TestSinkConnectorContext implements SinkConnectorContext {
+
+        @Override
+        public void requestTaskReconfiguration() {
+            // Unexpected in these tests
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void raiseError(Exception e) {
+            // Unexpected in these tests
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    protected static class TestSinkConnector extends SinkConnector implements ConnectorTest.AssertableConnector {
+
+        public static final String VERSION = "an entirely different version";
+
+        private boolean initialized;
+        private List<Map<String, String>> taskConfigs;
+        private Map<String, String> props;
+        private boolean started;
+        private boolean stopped;
+
+        @Override
+        public String version() {
+            return VERSION;
+        }
+
+        @Override
+        public void initialize(ConnectorContext ctx) {
+            super.initialize(ctx);
+            initialized = true;
+            this.taskConfigs = null;
+        }
+
+        @Override
+        public void initialize(ConnectorContext ctx, List<Map<String, String>> taskConfigs) {
+            super.initialize(ctx, taskConfigs);
+            initialized = true;
+            this.taskConfigs = taskConfigs;
+        }
+
+        @Override
+        public void start(Map<String, String> props) {
+            this.props = props;
+            started = true;
+        }
+
+        @Override
+        public Class<? extends Task> taskClass() {
+            return null;
+        }
+
+        @Override
+        public List<Map<String, String>> taskConfigs(int maxTasks) {
+            return null;
+        }
+
+        @Override
+        public void stop() {
+            stopped = true;
+        }
+
+        @Override
+        public ConfigDef config() {
+            return new ConfigDef()
+                    .define("required", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "required docs")
+                    .define("optional", ConfigDef.Type.STRING, "defaultVal", ConfigDef.Importance.HIGH, "optional docs");
+        }
+
+        @Override
+        public void assertContext(ConnectorContext expected) {
+            assertSame(expected, context);
+            assertSame(expected, context());
+        }
+
+        @Override
+        public void assertInitialized() {
+            assertTrue(initialized);
+        }
+
+        @Override
+        public void assertTaskConfigs(List<Map<String, String>> expectedTaskConfigs) {
+            assertSame(expectedTaskConfigs, taskConfigs);
+        }
+
+        @Override
+        public void assertStarted(boolean expected) {
+            assertEquals(expected, started);
+        }
+
+        @Override
+        public void assertStopped(boolean expected) {
+            assertEquals(expected, stopped);
+        }
+
+        @Override
+        public void assertProperties(Map<String, String> expected) {
+            assertSame(expected, props);
+        }
+    }
+}
\ No newline at end of file
diff --git a/connect/api/src/test/java/org/apache/kafka/connect/source/SourceConnectorTest.java b/connect/api/src/test/java/org/apache/kafka/connect/source/SourceConnectorTest.java
new file mode 100644
index 0000000..9b10540
--- /dev/null
+++ b/connect/api/src/test/java/org/apache/kafka/connect/source/SourceConnectorTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.source;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectorContext;
+import org.apache.kafka.connect.connector.ConnectorTest;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.storage.OffsetStorageReader;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+public class SourceConnectorTest extends ConnectorTest {
+
+    @Override
+    protected ConnectorContext createContext() {
+        return new TestSourceConnectorContext();
+    }
+
+    @Override
+    protected TestSourceConnector createConnector() {
+        return new TestSourceConnector();
+    }
+
+    private static class TestSourceConnectorContext implements SourceConnectorContext {
+
+        @Override
+        public void requestTaskReconfiguration() {
+            // Unexpected in these tests
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void raiseError(Exception e) {
+            // Unexpected in these tests
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public OffsetStorageReader offsetStorageReader() {
+            return null;
+        }
+    }
+
+    private static class TestSourceConnector extends SourceConnector implements AssertableConnector {
+
+        public static final String VERSION = "an entirely different version";
+
+        private boolean initialized;
+        private List<Map<String, String>> taskConfigs;
+        private Map<String, String> props;
+        private boolean started;
+        private boolean stopped;
+
+        @Override
+        public String version() {
+            return VERSION;
+        }
+
+        @Override
+        public void initialize(ConnectorContext ctx) {
+            super.initialize(ctx);
+            initialized = true;
+            this.taskConfigs = null;
+        }
+
+        @Override
+        public void initialize(ConnectorContext ctx, List<Map<String, String>> taskConfigs) {
+            super.initialize(ctx, taskConfigs);
+            initialized = true;
+            this.taskConfigs = taskConfigs;
+        }
+
+        @Override
+        public void start(Map<String, String> props) {
+            this.props = props;
+            started = true;
+        }
+
+        @Override
+        public Class<? extends Task> taskClass() {
+            return null;
+        }
+
+        @Override
+        public List<Map<String, String>> taskConfigs(int maxTasks) {
+            return null;
+        }
+
+        @Override
+        public void stop() {
+            stopped = true;
+        }
+
+        @Override
+        public ConfigDef config() {
+            return new ConfigDef()
+                    .define("required", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "required docs")
+                    .define("optional", ConfigDef.Type.STRING, "defaultVal", ConfigDef.Importance.HIGH, "optional docs");
+        }
+
+        @Override
+        public void assertContext(ConnectorContext expected) {
+            assertSame(expected, context);
+            assertSame(expected, context());
+        }
+
+        @Override
+        public void assertInitialized() {
+            assertTrue(initialized);
+        }
+
+        @Override
+        public void assertTaskConfigs(List<Map<String, String>> expectedTaskConfigs) {
+            assertSame(expectedTaskConfigs, taskConfigs);
+        }
+
+        @Override
+        public void assertStarted(boolean expected) {
+            assertEquals(expected, started);
+        }
+
+        @Override
+        public void assertStopped(boolean expected) {
+            assertEquals(expected, stopped);
+        }
+
+        @Override
+        public void assertProperties(Map<String, String> expected) {
+            assertSame(expected, props);
+        }
+    }
+}
\ No newline at end of file
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 99e44d4..98e8e78 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -54,6 +54,7 @@ import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
 import org.apache.kafka.connect.storage.Converter;
 import org.apache.kafka.connect.storage.HeaderConverter;
 import org.apache.kafka.connect.storage.OffsetBackingStore;
+import org.apache.kafka.connect.storage.OffsetStorageReader;
 import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
 import org.apache.kafka.connect.storage.OffsetStorageWriter;
 import org.apache.kafka.connect.util.ConnectorTaskId;
@@ -252,7 +253,21 @@ public class Worker {
                 final String connClass = connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
                 log.info("Creating connector {} of type {}", connName, connClass);
                 final Connector connector = plugins.newConnector(connClass);
-                workerConnector = new WorkerConnector(connName, connector, ctx, metrics, statusListener);
+
+                final OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(
+                    offsetBackingStore,
+                    connName,
+                    internalKeyConverter,
+                    internalValueConverter
+                );
+                workerConnector = new WorkerConnector(
+                    connName,
+                    connector,
+                    ctx,
+                    metrics,
+                    statusListener,
+                    offsetReader
+                );
                 log.info("Instantiated connector {} with version {} of type {}", connName, connector.version(), connector.getClass());
                 savedLoader = plugins.compareAndSwapLoaders(connector);
                 workerConnector.initialize(connConfig);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
index 7923943..ead9d71 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
@@ -18,9 +18,13 @@ package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.ConnectorContext;
+import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.kafka.connect.sink.SinkConnectorContext;
+import org.apache.kafka.connect.source.SourceConnectorContext;
+import org.apache.kafka.connect.storage.OffsetStorageReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,41 +62,36 @@ public class WorkerConnector {
 
     private Map<String, String> config;
     private State state;
+    private final OffsetStorageReader offsetStorageReader;
 
     public WorkerConnector(String connName,
                            Connector connector,
                            ConnectorContext ctx,
                            ConnectMetrics metrics,
-                           ConnectorStatus.Listener statusListener) {
+                           ConnectorStatus.Listener statusListener,
+                           OffsetStorageReader offsetStorageReader) {
         this.connName = connName;
         this.ctx = ctx;
         this.connector = connector;
         this.state = State.INIT;
         this.metrics = new ConnectorMetricsGroup(metrics, AbstractStatus.State.UNASSIGNED, statusListener);
         this.statusListener = this.metrics;
+        this.offsetStorageReader = offsetStorageReader;
     }
 
     public void initialize(ConnectorConfig connectorConfig) {
         try {
+            if (!isSourceConnector() && !isSinkConnector()) {
+                throw new ConnectException("Connector implementations must be a subclass of either SourceConnector or SinkConnector");
+            }
             this.config = connectorConfig.originalsStrings();
             log.debug("{} Initializing connector {}", this, connName);
             if (isSinkConnector()) {
                 SinkConnectorConfig.validate(config);
+                connector.initialize(new WorkerSinkConnectorContext());
+            } else {
+                connector.initialize(new WorkerSourceConnectorContext(offsetStorageReader));
             }
-
-            connector.initialize(new ConnectorContext() {
-                @Override
-                public void requestTaskReconfiguration() {
-                    ctx.requestTaskReconfiguration();
-                }
-
-                @Override
-                public void raiseError(Exception e) {
-                    log.error("{} Connector raised an error", WorkerConnector.this, e);
-                    onFailure(e);
-                    ctx.raiseError(e);
-                }
-            });
         } catch (Throwable t) {
             log.error("{} Error initializing connector", this, t);
             onFailure(t);
@@ -319,4 +318,36 @@ public class WorkerConnector {
             return metricGroup;
         }
     }
+
+    private abstract class WorkerConnectorContext implements ConnectorContext {
+
+        @Override
+        public void requestTaskReconfiguration() {
+            WorkerConnector.this.ctx.requestTaskReconfiguration();
+        }
+
+        @Override
+        public void raiseError(Exception e) {
+            log.error("{} Connector raised an error", WorkerConnector.this, e);
+            onFailure(e);
+            WorkerConnector.this.ctx.raiseError(e);
+        }
+    }
+
+    private class WorkerSinkConnectorContext extends WorkerConnectorContext implements SinkConnectorContext {
+    }
+
+    private class WorkerSourceConnectorContext extends WorkerConnectorContext implements SourceConnectorContext {
+
+        private final OffsetStorageReader offsetStorageReader;
+
+        WorkerSourceConnectorContext(OffsetStorageReader offsetStorageReader) {
+            this.offsetStorageReader = offsetStorageReader;
+        }
+
+        @Override
+        public OffsetStorageReader offsetStorageReader() {
+            return offsetStorageReader;
+        }
+    }
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
index 10c413d..088900b 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
@@ -18,8 +18,15 @@ package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.ConnectorContext;
+import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.sink.SinkConnectorContext;
+import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.kafka.connect.source.SourceConnectorContext;
+import org.apache.kafka.connect.storage.OffsetStorageReader;
+import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockRunner;
 import org.easymock.EasyMockSupport;
@@ -47,14 +54,18 @@ public class WorkerConnectorTest extends EasyMockSupport {
     static {
         CONFIG.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestConnector.class.getName());
         CONFIG.put(ConnectorConfig.NAME_CONFIG, CONNECTOR);
+        CONFIG.put(SinkConnectorConfig.TOPICS_CONFIG, "my-topic");
     }
     public ConnectorConfig connectorConfig;
     public MockConnectMetrics metrics;
 
     @Mock Plugins plugins;
+    @Mock SourceConnector sourceConnector;
+    @Mock SinkConnector sinkConnector;
     @Mock Connector connector;
     @Mock ConnectorContext ctx;
     @Mock ConnectorStatus.Listener listener;
+    @Mock OffsetStorageReader offsetStorageReader;
 
     @Before
     public void setup() {
@@ -70,11 +81,12 @@ public class WorkerConnectorTest extends EasyMockSupport {
     @Test
     public void testInitializeFailure() {
         RuntimeException exception = new RuntimeException();
+        connector = sourceConnector;
 
         connector.version();
         expectLastCall().andReturn(VERSION);
 
-        connector.initialize(EasyMock.notNull(ConnectorContext.class));
+        connector.initialize(EasyMock.notNull(SourceConnectorContext.class));
         expectLastCall().andThrow(exception);
 
         listener.onFailure(CONNECTOR, exception);
@@ -85,7 +97,7 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
         replayAll();
 
-        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener);
+        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader);
 
         workerConnector.initialize(connectorConfig);
         assertFailedMetric(workerConnector);
@@ -98,11 +110,12 @@ public class WorkerConnectorTest extends EasyMockSupport {
     @Test
     public void testFailureIsFinalState() {
         RuntimeException exception = new RuntimeException();
+        connector = sinkConnector;
 
         connector.version();
         expectLastCall().andReturn(VERSION);
 
-        connector.initialize(EasyMock.notNull(ConnectorContext.class));
+        connector.initialize(EasyMock.notNull(SinkConnectorContext.class));
         expectLastCall().andThrow(exception);
 
         listener.onFailure(CONNECTOR, exception);
@@ -115,7 +128,7 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
         replayAll();
 
-        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener);
+        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader);
 
         workerConnector.initialize(connectorConfig);
         assertFailedMetric(workerConnector);
@@ -129,10 +142,11 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
     @Test
     public void testStartupAndShutdown() {
+        connector = sourceConnector;
         connector.version();
         expectLastCall().andReturn(VERSION);
 
-        connector.initialize(EasyMock.notNull(ConnectorContext.class));
+        connector.initialize(EasyMock.notNull(SourceConnectorContext.class));
         expectLastCall();
 
         connector.start(CONFIG);
@@ -149,10 +163,10 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
         replayAll();
 
-        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener);
+        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader);
 
         workerConnector.initialize(connectorConfig);
-        assertInitializedMetric(workerConnector);
+        assertInitializedSourceMetric(workerConnector);
         workerConnector.transitionTo(TargetState.STARTED);
         assertRunningMetric(workerConnector);
         workerConnector.shutdown();
@@ -163,10 +177,11 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
     @Test
     public void testStartupAndPause() {
+        connector = sinkConnector;
         connector.version();
         expectLastCall().andReturn(VERSION);
 
-        connector.initialize(EasyMock.notNull(ConnectorContext.class));
+        connector.initialize(EasyMock.notNull(SinkConnectorContext.class));
         expectLastCall();
 
         connector.start(CONFIG);
@@ -186,10 +201,10 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
         replayAll();
 
-        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener);
+        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader);
 
         workerConnector.initialize(connectorConfig);
-        assertInitializedMetric(workerConnector);
+        assertInitializedSinkMetric(workerConnector);
         workerConnector.transitionTo(TargetState.STARTED);
         assertRunningMetric(workerConnector);
         workerConnector.transitionTo(TargetState.PAUSED);
@@ -202,10 +217,11 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
     @Test
     public void testOnResume() {
+        connector = sourceConnector;
         connector.version();
         expectLastCall().andReturn(VERSION);
 
-        connector.initialize(EasyMock.notNull(ConnectorContext.class));
+        connector.initialize(EasyMock.notNull(SourceConnectorContext.class));
         expectLastCall();
 
         listener.onPause(CONNECTOR);
@@ -225,10 +241,10 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
         replayAll();
 
-        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener);
+        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader);
 
         workerConnector.initialize(connectorConfig);
-        assertInitializedMetric(workerConnector);
+        assertInitializedSourceMetric(workerConnector);
         workerConnector.transitionTo(TargetState.PAUSED);
         assertPausedMetric(workerConnector);
         workerConnector.transitionTo(TargetState.STARTED);
@@ -241,10 +257,11 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
     @Test
     public void testStartupPaused() {
+        connector = sinkConnector;
         connector.version();
         expectLastCall().andReturn(VERSION);
 
-        connector.initialize(EasyMock.notNull(ConnectorContext.class));
+        connector.initialize(EasyMock.notNull(SinkConnectorContext.class));
         expectLastCall();
 
         // connector never gets started
@@ -257,10 +274,10 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
         replayAll();
 
-        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener);
+        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader);
 
         workerConnector.initialize(connectorConfig);
-        assertInitializedMetric(workerConnector);
+        assertInitializedSinkMetric(workerConnector);
         workerConnector.transitionTo(TargetState.PAUSED);
         assertPausedMetric(workerConnector);
         workerConnector.shutdown();
@@ -273,10 +290,11 @@ public class WorkerConnectorTest extends EasyMockSupport {
     public void testStartupFailure() {
         RuntimeException exception = new RuntimeException();
 
+        connector = sinkConnector;
         connector.version();
         expectLastCall().andReturn(VERSION);
 
-        connector.initialize(EasyMock.notNull(ConnectorContext.class));
+        connector.initialize(EasyMock.notNull(SinkConnectorContext.class));
         expectLastCall();
 
         connector.start(CONFIG);
@@ -290,10 +308,10 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
         replayAll();
 
-        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener);
+        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader);
 
         workerConnector.initialize(connectorConfig);
-        assertInitializedMetric(workerConnector);
+        assertInitializedSinkMetric(workerConnector);
         workerConnector.transitionTo(TargetState.STARTED);
         assertFailedMetric(workerConnector);
         workerConnector.shutdown();
@@ -305,11 +323,12 @@ public class WorkerConnectorTest extends EasyMockSupport {
     @Test
     public void testShutdownFailure() {
         RuntimeException exception = new RuntimeException();
+        connector = sourceConnector;
 
         connector.version();
         expectLastCall().andReturn(VERSION);
 
-        connector.initialize(EasyMock.notNull(ConnectorContext.class));
+        connector.initialize(EasyMock.notNull(SourceConnectorContext.class));
         expectLastCall();
 
         connector.start(CONFIG);
@@ -326,10 +345,10 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
         replayAll();
 
-        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener);
+        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader);
 
         workerConnector.initialize(connectorConfig);
-        assertInitializedMetric(workerConnector);
+        assertInitializedSourceMetric(workerConnector);
         workerConnector.transitionTo(TargetState.STARTED);
         assertRunningMetric(workerConnector);
         workerConnector.shutdown();
@@ -340,10 +359,11 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
     @Test
     public void testTransitionStartedToStarted() {
+        connector = sourceConnector;
         connector.version();
         expectLastCall().andReturn(VERSION);
 
-        connector.initialize(EasyMock.notNull(ConnectorContext.class));
+        connector.initialize(EasyMock.notNull(SourceConnectorContext.class));
         expectLastCall();
 
         connector.start(CONFIG);
@@ -361,10 +381,10 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
         replayAll();
 
-        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener);
+        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader);
 
         workerConnector.initialize(connectorConfig);
-        assertInitializedMetric(workerConnector);
+        assertInitializedSourceMetric(workerConnector);
         workerConnector.transitionTo(TargetState.STARTED);
         assertRunningMetric(workerConnector);
         workerConnector.transitionTo(TargetState.STARTED);
@@ -377,10 +397,11 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
     @Test
     public void testTransitionPausedToPaused() {
+        connector = sourceConnector;
         connector.version();
         expectLastCall().andReturn(VERSION);
 
-        connector.initialize(EasyMock.notNull(ConnectorContext.class));
+        connector.initialize(EasyMock.notNull(SourceConnectorContext.class));
         expectLastCall();
 
         connector.start(CONFIG);
@@ -400,10 +421,10 @@ public class WorkerConnectorTest extends EasyMockSupport {
 
         replayAll();
 
-        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener);
+        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader);
 
         workerConnector.initialize(connectorConfig);
-        assertInitializedMetric(workerConnector);
+        assertInitializedSourceMetric(workerConnector);
         workerConnector.transitionTo(TargetState.STARTED);
         assertRunningMetric(workerConnector);
         workerConnector.transitionTo(TargetState.PAUSED);
@@ -416,6 +437,27 @@ public class WorkerConnectorTest extends EasyMockSupport {
         verifyAll();
     }
 
+    @Test
+    public void testFailConnectorThatIsNeitherSourceNorSink() {
+        connector.version();
+        expectLastCall().andReturn(VERSION);
+
+        Capture<Throwable> exceptionCapture = Capture.newInstance();
+        listener.onFailure(EasyMock.eq(CONNECTOR), EasyMock.capture(exceptionCapture));
+        expectLastCall();
+
+        replayAll();
+
+        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader);
+
+        workerConnector.initialize(connectorConfig);
+        Throwable e = exceptionCapture.getValue();
+        assertTrue(e instanceof ConnectException);
+        assertTrue(e.getMessage().contains("must be a subclass of"));
+
+        verifyAll();
+    }
+
     protected void assertFailedMetric(WorkerConnector workerConnector) {
         assertFalse(workerConnector.metrics().isUnassigned());
         assertTrue(workerConnector.metrics().isFailed());
@@ -444,7 +486,15 @@ public class WorkerConnectorTest extends EasyMockSupport {
         assertFalse(workerConnector.metrics().isRunning());
     }
 
-    protected void assertInitializedMetric(WorkerConnector workerConnector) {
+    protected void assertInitializedSinkMetric(WorkerConnector workerConnector) {
+        assertInitializedMetric(workerConnector, "sink");
+    }
+
+    protected void assertInitializedSourceMetric(WorkerConnector workerConnector) {
+        assertInitializedMetric(workerConnector, "source");
+    }
+
+    protected void assertInitializedMetric(WorkerConnector workerConnector, String expectedType) {
         assertTrue(workerConnector.metrics().isUnassigned());
         assertFalse(workerConnector.metrics().isFailed());
         assertFalse(workerConnector.metrics().isPaused());
@@ -454,12 +504,11 @@ public class WorkerConnectorTest extends EasyMockSupport {
         String type = metrics.currentMetricValueAsString(metricGroup, "connector-type");
         String clazz = metrics.currentMetricValueAsString(metricGroup, "connector-class");
         String version = metrics.currentMetricValueAsString(metricGroup, "connector-version");
-        assertEquals(type, "unknown");
+        assertEquals(expectedType, type);
         assertNotNull(clazz);
         assertEquals(VERSION, version);
     }
 
     private static abstract class TestConnector extends Connector {
     }
-
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 1e8c0bc..b3fb675 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -27,7 +27,6 @@ import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.provider.MockFileConfigProvider;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.ConnectorContext;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
@@ -48,7 +47,9 @@ import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
+import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.source.SourceConnector;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
 import org.apache.kafka.connect.storage.Converter;
@@ -125,7 +126,8 @@ public class WorkerTest extends ThreadedTest {
 
     @Mock private Herder herder;
     @Mock private StatusBackingStore statusBackingStore;
-    @Mock private Connector connector;
+    @Mock private SourceConnector sourceConnector;
+    @Mock private SinkConnector sinkConnector;
     @Mock private ConnectorContext ctx;
     @Mock private TestSourceTask task;
     @Mock private WorkerSourceTask workerTask;
@@ -185,8 +187,8 @@ public class WorkerTest extends ThreadedTest {
         // Create
         EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
         EasyMock.expect(plugins.newConnector(WorkerTestConnector.class.getName()))
-                .andReturn(connector);
-        EasyMock.expect(connector.version()).andReturn("1.0");
+                .andReturn(sourceConnector);
+        EasyMock.expect(sourceConnector.version()).andReturn("1.0");
 
         Map<String, String> props = new HashMap<>();
         props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar");
@@ -194,16 +196,16 @@ public class WorkerTest extends ThreadedTest {
         props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
         props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, WorkerTestConnector.class.getName());
 
-        EasyMock.expect(connector.version()).andReturn("1.0");
+        EasyMock.expect(sourceConnector.version()).andReturn("1.0");
 
         expectFileConfigProvider();
-        EasyMock.expect(plugins.compareAndSwapLoaders(connector))
+        EasyMock.expect(plugins.compareAndSwapLoaders(sourceConnector))
                 .andReturn(delegatingLoader)
                 .times(2);
 
-        connector.initialize(anyObject(ConnectorContext.class));
+        sourceConnector.initialize(anyObject(ConnectorContext.class));
         EasyMock.expectLastCall();
-        connector.start(props);
+        sourceConnector.start(props);
         EasyMock.expectLastCall();
 
         EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader))
@@ -213,7 +215,7 @@ public class WorkerTest extends ThreadedTest {
         EasyMock.expectLastCall();
 
         // Remove
-        connector.stop();
+        sourceConnector.stop();
         EasyMock.expectLastCall();
 
         connectorStatusListener.onShutdown(CONNECTOR_ID);
@@ -313,8 +315,8 @@ public class WorkerTest extends ThreadedTest {
         expectFileConfigProvider();
 
         EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
-        EasyMock.expect(plugins.newConnector("WorkerTestConnector")).andReturn(connector);
-        EasyMock.expect(connector.version()).andReturn("1.0");
+        EasyMock.expect(plugins.newConnector("WorkerTestConnector")).andReturn(sinkConnector);
+        EasyMock.expect(sinkConnector.version()).andReturn("1.0");
 
         Map<String, String> props = new HashMap<>();
         props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar");
@@ -322,13 +324,13 @@ public class WorkerTest extends ThreadedTest {
         props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
         props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "WorkerTestConnector");
 
-        EasyMock.expect(connector.version()).andReturn("1.0");
-        EasyMock.expect(plugins.compareAndSwapLoaders(connector))
+        EasyMock.expect(sinkConnector.version()).andReturn("1.0");
+        EasyMock.expect(plugins.compareAndSwapLoaders(sinkConnector))
                 .andReturn(delegatingLoader)
                 .times(2);
-        connector.initialize(anyObject(ConnectorContext.class));
+        sinkConnector.initialize(anyObject(ConnectorContext.class));
         EasyMock.expectLastCall();
-        connector.start(props);
+        sinkConnector.start(props);
         EasyMock.expectLastCall();
 
         EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader))
@@ -339,7 +341,7 @@ public class WorkerTest extends ThreadedTest {
         EasyMock.expectLastCall();
 
         // Remove
-        connector.stop();
+        sinkConnector.stop();
         EasyMock.expectLastCall();
 
         connectorStatusListener.onShutdown(CONNECTOR_ID);
@@ -379,8 +381,8 @@ public class WorkerTest extends ThreadedTest {
         expectFileConfigProvider();
 
         EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
-        EasyMock.expect(plugins.newConnector("WorkerTest")).andReturn(connector);
-        EasyMock.expect(connector.version()).andReturn("1.0");
+        EasyMock.expect(plugins.newConnector("WorkerTest")).andReturn(sinkConnector);
+        EasyMock.expect(sinkConnector.version()).andReturn("1.0");
 
         Map<String, String> props = new HashMap<>();
         props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar");
@@ -388,13 +390,13 @@ public class WorkerTest extends ThreadedTest {
         props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
         props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "WorkerTest");
 
-        EasyMock.expect(connector.version()).andReturn("1.0");
-        EasyMock.expect(plugins.compareAndSwapLoaders(connector))
+        EasyMock.expect(sinkConnector.version()).andReturn("1.0");
+        EasyMock.expect(plugins.compareAndSwapLoaders(sinkConnector))
                 .andReturn(delegatingLoader)
                 .times(2);
-        connector.initialize(anyObject(ConnectorContext.class));
+        sinkConnector.initialize(anyObject(ConnectorContext.class));
         EasyMock.expectLastCall();
-        connector.start(props);
+        sinkConnector.start(props);
         EasyMock.expectLastCall();
 
         EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader))
@@ -405,7 +407,7 @@ public class WorkerTest extends ThreadedTest {
         EasyMock.expectLastCall();
 
         // Remove
-        connector.stop();
+        sinkConnector.stop();
         EasyMock.expectLastCall();
 
         connectorStatusListener.onShutdown(CONNECTOR_ID);
@@ -460,8 +462,8 @@ public class WorkerTest extends ThreadedTest {
 
         EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(3);
         EasyMock.expect(plugins.newConnector(WorkerTestConnector.class.getName()))
-                .andReturn(connector);
-        EasyMock.expect(connector.version()).andReturn("1.0");
+                .andReturn(sinkConnector);
+        EasyMock.expect(sinkConnector.version()).andReturn("1.0");
 
         Map<String, String> props = new HashMap<>();
         props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar");
@@ -469,13 +471,13 @@ public class WorkerTest extends ThreadedTest {
         props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
         props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, WorkerTestConnector.class.getName());
 
-        EasyMock.expect(connector.version()).andReturn("1.0");
-        EasyMock.expect(plugins.compareAndSwapLoaders(connector))
+        EasyMock.expect(sinkConnector.version()).andReturn("1.0");
+        EasyMock.expect(plugins.compareAndSwapLoaders(sinkConnector))
                 .andReturn(delegatingLoader)
                 .times(3);
-        connector.initialize(anyObject(ConnectorContext.class));
+        sinkConnector.initialize(anyObject(ConnectorContext.class));
         EasyMock.expectLastCall();
-        connector.start(props);
+        sinkConnector.start(props);
         EasyMock.expectLastCall();
 
         EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader))
@@ -486,13 +488,13 @@ public class WorkerTest extends ThreadedTest {
         EasyMock.expectLastCall();
 
         // Reconfigure
-        EasyMock.<Class<? extends Task>>expect(connector.taskClass()).andReturn(TestSourceTask.class);
+        EasyMock.<Class<? extends Task>>expect(sinkConnector.taskClass()).andReturn(TestSourceTask.class);
         Map<String, String> taskProps = new HashMap<>();
         taskProps.put("foo", "bar");
-        EasyMock.expect(connector.taskConfigs(2)).andReturn(Arrays.asList(taskProps, taskProps));
+        EasyMock.expect(sinkConnector.taskConfigs(2)).andReturn(Arrays.asList(taskProps, taskProps));
 
         // Remove
-        connector.stop();
+        sinkConnector.stop();
         EasyMock.expectLastCall();
 
         connectorStatusListener.onShutdown(CONNECTOR_ID);
@@ -1359,7 +1361,7 @@ public class WorkerTest extends ThreadedTest {
     }
 
     /* Name here needs to be unique as we are testing the aliasing mechanism */
-    public static class WorkerTestConnector extends Connector {
+    public static class WorkerTestConnector extends SourceConnector {
 
         private static final ConfigDef CONFIG_DEF  = new ConfigDef()
             .define("configName", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Test configName.");