You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2020/05/25 01:10:25 UTC
[kafka] branch trunk updated: KAFKA-4794: Add access to
OffsetStorageReader from SourceConnector (#2604)
This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3818770 KAFKA-4794: Add access to OffsetStorageReader from SourceConnector (#2604)
3818770 is described below
commit 3818770d7fcd6ccdd373cdcd3aa6952ea08c901c
Author: Florian Hussonnois <fl...@gmail.com>
AuthorDate: Mon May 25 03:09:54 2020 +0200
KAFKA-4794: Add access to OffsetStorageReader from SourceConnector (#2604)
Added access to OffsetStorageReader from SourceConnector per KIP-131.
Added two interfaces SinkConnectorContext/SourceConnectContext that extend ConnectorContext in order to expose an OffsetStorageReader instance.
Added unit tests for Connector, SinkConnector and SourceConnector default methods
Author: Florian Hussonnois <fl...@gmail.com>, Randall Hauch <rh...@gmail.com>
Reviewers: Randall Hauch <rh...@gmail.com>, Konstantine Karantasis <ko...@confluent.io>
---
.../apache/kafka/connect/connector/Connector.java | 9 ++
.../apache/kafka/connect/sink/SinkConnector.java | 5 +
.../SinkConnectorContext.java} | 10 +-
.../kafka/connect/source/SourceConnector.java | 4 +
...eConnector.java => SourceConnectorContext.java} | 13 +-
.../kafka/connect/connector/ConnectorTest.java | 88 ++++++++++++
.../kafka/connect/sink/SinkConnectorTest.java | 146 ++++++++++++++++++++
.../kafka/connect/source/SourceConnectorTest.java | 152 +++++++++++++++++++++
.../org/apache/kafka/connect/runtime/Worker.java | 17 ++-
.../kafka/connect/runtime/WorkerConnector.java | 61 +++++++--
.../kafka/connect/runtime/WorkerConnectorTest.java | 111 ++++++++++-----
.../apache/kafka/connect/runtime/WorkerTest.java | 68 ++++-----
12 files changed, 594 insertions(+), 90 deletions(-)
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java b/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java
index 79caeeb..6d54aab 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java
@@ -80,6 +80,15 @@ public abstract class Connector implements Versioned {
}
/**
+ * Returns the context object used to interact with the Kafka Connect runtime.
+ *
+ * @return the context for this Connector.
+ */
+ protected ConnectorContext context() {
+ return context;
+ }
+
+ /**
* Start this Connector. This method will only be called on a clean Connector, i.e. it has
* either just been instantiated and initialized or {@link #stop()} has been invoked.
*
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkConnector.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkConnector.java
index 5c58861..9627571 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkConnector.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkConnector.java
@@ -34,4 +34,9 @@ public abstract class SinkConnector extends Connector {
*/
public static final String TOPICS_CONFIG = "topics";
+ @Override
+ protected SinkConnectorContext context() {
+ return (SinkConnectorContext) context;
+ }
+
}
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkConnectorContext.java
similarity index 75%
copy from connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
copy to connect/api/src/main/java/org/apache/kafka/connect/sink/SinkConnectorContext.java
index 0ca3b33..5e2b07a 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkConnectorContext.java
@@ -14,14 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.connect.source;
+package org.apache.kafka.connect.sink;
-import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.connector.ConnectorContext;
/**
- * SourceConnectors implement the connector interface to pull data from another system and send
- * it to Kafka.
+ * A context to allow a {@link SinkConnector} to interact with the Kafka Connect runtime.
*/
-public abstract class SourceConnector extends Connector {
-
+public interface SinkConnectorContext extends ConnectorContext {
}
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
index 0ca3b33..6e96940 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
@@ -24,4 +24,8 @@ import org.apache.kafka.connect.connector.Connector;
*/
public abstract class SourceConnector extends Connector {
+ @Override
+ protected SourceConnectorContext context() {
+ return (SourceConnectorContext) context;
+ }
}
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnectorContext.java
similarity index 63%
copy from connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
copy to connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnectorContext.java
index 0ca3b33..417fbdd 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnectorContext.java
@@ -16,12 +16,17 @@
*/
package org.apache.kafka.connect.source;
-import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.connector.ConnectorContext;
+import org.apache.kafka.connect.storage.OffsetStorageReader;
/**
- * SourceConnectors implement the connector interface to pull data from another system and send
- * it to Kafka.
+ * A context to allow a {@link SourceConnector} to interact with the Kafka Connect runtime.
*/
-public abstract class SourceConnector extends Connector {
+public interface SourceConnectorContext extends ConnectorContext {
+ /**
+ * Returns the {@link OffsetStorageReader} for this SourceConnectorContext.
+ * @return the OffsetStorageReader for this connector.
+ */
+ OffsetStorageReader offsetStorageReader();
}
diff --git a/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorTest.java b/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorTest.java
new file mode 100644
index 0000000..7addf8f
--- /dev/null
+++ b/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.connector;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public abstract class ConnectorTest {
+
+ protected ConnectorContext context;
+ protected Connector connector;
+ protected AssertableConnector assertableConnector;
+
+ @Before
+ public void beforeEach() {
+ connector = createConnector();
+ context = createContext();
+ assertableConnector = (AssertableConnector) connector;
+ }
+
+ @Test
+ public void shouldInitializeContext() {
+ connector.initialize(context);
+ assertableConnector.assertInitialized();
+ assertableConnector.assertContext(context);
+ assertableConnector.assertTaskConfigs(null);
+ }
+
+ @Test
+ public void shouldInitializeContextWithTaskConfigs() {
+ List<Map<String, String>> taskConfigs = new ArrayList<>();
+ connector.initialize(context, taskConfigs);
+ assertableConnector.assertInitialized();
+ assertableConnector.assertContext(context);
+ assertableConnector.assertTaskConfigs(taskConfigs);
+ }
+
+ @Test
+ public void shouldStopAndStartWhenReconfigure() {
+ Map<String, String> props = new HashMap<>();
+ connector.initialize(context);
+ assertableConnector.assertContext(context);
+ assertableConnector.assertStarted(false);
+ assertableConnector.assertStopped(false);
+ connector.reconfigure(props);
+ assertableConnector.assertStarted(true);
+ assertableConnector.assertStopped(true);
+ assertableConnector.assertProperties(props);
+ }
+
+ protected abstract ConnectorContext createContext();
+
+ protected abstract Connector createConnector();
+
+ public interface AssertableConnector {
+
+ void assertContext(ConnectorContext expected);
+
+ void assertInitialized();
+
+ void assertTaskConfigs(List<Map<String, String>> expectedTaskConfigs);
+
+ void assertStarted(boolean expected);
+
+ void assertStopped(boolean expected);
+
+ void assertProperties(Map<String, String> expected);
+ }
+}
\ No newline at end of file
diff --git a/connect/api/src/test/java/org/apache/kafka/connect/sink/SinkConnectorTest.java b/connect/api/src/test/java/org/apache/kafka/connect/sink/SinkConnectorTest.java
new file mode 100644
index 0000000..a4924ae
--- /dev/null
+++ b/connect/api/src/test/java/org/apache/kafka/connect/sink/SinkConnectorTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.sink;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectorContext;
+import org.apache.kafka.connect.connector.ConnectorTest;
+import org.apache.kafka.connect.connector.Task;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+public class SinkConnectorTest extends ConnectorTest {
+
+ @Override
+ protected TestSinkConnectorContext createContext() {
+ return new TestSinkConnectorContext();
+ }
+
+ @Override
+ protected TestSinkConnector createConnector() {
+ return new TestSinkConnector();
+ }
+
+ private static class TestSinkConnectorContext implements SinkConnectorContext {
+
+ @Override
+ public void requestTaskReconfiguration() {
+ // Unexpected in these tests
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void raiseError(Exception e) {
+ // Unexpected in these tests
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ protected static class TestSinkConnector extends SinkConnector implements ConnectorTest.AssertableConnector {
+
+ public static final String VERSION = "an entirely different version";
+
+ private boolean initialized;
+ private List<Map<String, String>> taskConfigs;
+ private Map<String, String> props;
+ private boolean started;
+ private boolean stopped;
+
+ @Override
+ public String version() {
+ return VERSION;
+ }
+
+ @Override
+ public void initialize(ConnectorContext ctx) {
+ super.initialize(ctx);
+ initialized = true;
+ this.taskConfigs = null;
+ }
+
+ @Override
+ public void initialize(ConnectorContext ctx, List<Map<String, String>> taskConfigs) {
+ super.initialize(ctx, taskConfigs);
+ initialized = true;
+ this.taskConfigs = taskConfigs;
+ }
+
+ @Override
+ public void start(Map<String, String> props) {
+ this.props = props;
+ started = true;
+ }
+
+ @Override
+ public Class<? extends Task> taskClass() {
+ return null;
+ }
+
+ @Override
+ public List<Map<String, String>> taskConfigs(int maxTasks) {
+ return null;
+ }
+
+ @Override
+ public void stop() {
+ stopped = true;
+ }
+
+ @Override
+ public ConfigDef config() {
+ return new ConfigDef()
+ .define("required", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "required docs")
+ .define("optional", ConfigDef.Type.STRING, "defaultVal", ConfigDef.Importance.HIGH, "optional docs");
+ }
+
+ @Override
+ public void assertContext(ConnectorContext expected) {
+ assertSame(expected, context);
+ assertSame(expected, context());
+ }
+
+ @Override
+ public void assertInitialized() {
+ assertTrue(initialized);
+ }
+
+ @Override
+ public void assertTaskConfigs(List<Map<String, String>> expectedTaskConfigs) {
+ assertSame(expectedTaskConfigs, taskConfigs);
+ }
+
+ @Override
+ public void assertStarted(boolean expected) {
+ assertEquals(expected, started);
+ }
+
+ @Override
+ public void assertStopped(boolean expected) {
+ assertEquals(expected, stopped);
+ }
+
+ @Override
+ public void assertProperties(Map<String, String> expected) {
+ assertSame(expected, props);
+ }
+ }
+}
\ No newline at end of file
diff --git a/connect/api/src/test/java/org/apache/kafka/connect/source/SourceConnectorTest.java b/connect/api/src/test/java/org/apache/kafka/connect/source/SourceConnectorTest.java
new file mode 100644
index 0000000..9b10540
--- /dev/null
+++ b/connect/api/src/test/java/org/apache/kafka/connect/source/SourceConnectorTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.source;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectorContext;
+import org.apache.kafka.connect.connector.ConnectorTest;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.storage.OffsetStorageReader;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+public class SourceConnectorTest extends ConnectorTest {
+
+ @Override
+ protected ConnectorContext createContext() {
+ return new TestSourceConnectorContext();
+ }
+
+ @Override
+ protected TestSourceConnector createConnector() {
+ return new TestSourceConnector();
+ }
+
+ private static class TestSourceConnectorContext implements SourceConnectorContext {
+
+ @Override
+ public void requestTaskReconfiguration() {
+ // Unexpected in these tests
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void raiseError(Exception e) {
+ // Unexpected in these tests
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public OffsetStorageReader offsetStorageReader() {
+ return null;
+ }
+ }
+
+ private static class TestSourceConnector extends SourceConnector implements AssertableConnector {
+
+ public static final String VERSION = "an entirely different version";
+
+ private boolean initialized;
+ private List<Map<String, String>> taskConfigs;
+ private Map<String, String> props;
+ private boolean started;
+ private boolean stopped;
+
+ @Override
+ public String version() {
+ return VERSION;
+ }
+
+ @Override
+ public void initialize(ConnectorContext ctx) {
+ super.initialize(ctx);
+ initialized = true;
+ this.taskConfigs = null;
+ }
+
+ @Override
+ public void initialize(ConnectorContext ctx, List<Map<String, String>> taskConfigs) {
+ super.initialize(ctx, taskConfigs);
+ initialized = true;
+ this.taskConfigs = taskConfigs;
+ }
+
+ @Override
+ public void start(Map<String, String> props) {
+ this.props = props;
+ started = true;
+ }
+
+ @Override
+ public Class<? extends Task> taskClass() {
+ return null;
+ }
+
+ @Override
+ public List<Map<String, String>> taskConfigs(int maxTasks) {
+ return null;
+ }
+
+ @Override
+ public void stop() {
+ stopped = true;
+ }
+
+ @Override
+ public ConfigDef config() {
+ return new ConfigDef()
+ .define("required", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "required docs")
+ .define("optional", ConfigDef.Type.STRING, "defaultVal", ConfigDef.Importance.HIGH, "optional docs");
+ }
+
+ @Override
+ public void assertContext(ConnectorContext expected) {
+ assertSame(expected, context);
+ assertSame(expected, context());
+ }
+
+ @Override
+ public void assertInitialized() {
+ assertTrue(initialized);
+ }
+
+ @Override
+ public void assertTaskConfigs(List<Map<String, String>> expectedTaskConfigs) {
+ assertSame(expectedTaskConfigs, taskConfigs);
+ }
+
+ @Override
+ public void assertStarted(boolean expected) {
+ assertEquals(expected, started);
+ }
+
+ @Override
+ public void assertStopped(boolean expected) {
+ assertEquals(expected, stopped);
+ }
+
+ @Override
+ public void assertProperties(Map<String, String> expected) {
+ assertSame(expected, props);
+ }
+ }
+}
\ No newline at end of file
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 99e44d4..98e8e78 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -54,6 +54,7 @@ import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.OffsetBackingStore;
+import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.util.ConnectorTaskId;
@@ -252,7 +253,21 @@ public class Worker {
final String connClass = connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
log.info("Creating connector {} of type {}", connName, connClass);
final Connector connector = plugins.newConnector(connClass);
- workerConnector = new WorkerConnector(connName, connector, ctx, metrics, statusListener);
+
+ final OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(
+ offsetBackingStore,
+ connName,
+ internalKeyConverter,
+ internalValueConverter
+ );
+ workerConnector = new WorkerConnector(
+ connName,
+ connector,
+ ctx,
+ metrics,
+ statusListener,
+ offsetReader
+ );
log.info("Instantiated connector {} with version {} of type {}", connName, connector.version(), connector.getClass());
savedLoader = plugins.compareAndSwapLoaders(connector);
workerConnector.initialize(connConfig);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
index 7923943..ead9d71 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
@@ -18,9 +18,13 @@ package org.apache.kafka.connect.runtime;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.ConnectorContext;
+import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.kafka.connect.sink.SinkConnectorContext;
+import org.apache.kafka.connect.source.SourceConnectorContext;
+import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,41 +62,36 @@ public class WorkerConnector {
private Map<String, String> config;
private State state;
+ private final OffsetStorageReader offsetStorageReader;
public WorkerConnector(String connName,
Connector connector,
ConnectorContext ctx,
ConnectMetrics metrics,
- ConnectorStatus.Listener statusListener) {
+ ConnectorStatus.Listener statusListener,
+ OffsetStorageReader offsetStorageReader) {
this.connName = connName;
this.ctx = ctx;
this.connector = connector;
this.state = State.INIT;
this.metrics = new ConnectorMetricsGroup(metrics, AbstractStatus.State.UNASSIGNED, statusListener);
this.statusListener = this.metrics;
+ this.offsetStorageReader = offsetStorageReader;
}
public void initialize(ConnectorConfig connectorConfig) {
try {
+ if (!isSourceConnector() && !isSinkConnector()) {
+ throw new ConnectException("Connector implementations must be a subclass of either SourceConnector or SinkConnector");
+ }
this.config = connectorConfig.originalsStrings();
log.debug("{} Initializing connector {}", this, connName);
if (isSinkConnector()) {
SinkConnectorConfig.validate(config);
+ connector.initialize(new WorkerSinkConnectorContext());
+ } else {
+ connector.initialize(new WorkerSourceConnectorContext(offsetStorageReader));
}
-
- connector.initialize(new ConnectorContext() {
- @Override
- public void requestTaskReconfiguration() {
- ctx.requestTaskReconfiguration();
- }
-
- @Override
- public void raiseError(Exception e) {
- log.error("{} Connector raised an error", WorkerConnector.this, e);
- onFailure(e);
- ctx.raiseError(e);
- }
- });
} catch (Throwable t) {
log.error("{} Error initializing connector", this, t);
onFailure(t);
@@ -319,4 +318,36 @@ public class WorkerConnector {
return metricGroup;
}
}
+
+ private abstract class WorkerConnectorContext implements ConnectorContext {
+
+ @Override
+ public void requestTaskReconfiguration() {
+ WorkerConnector.this.ctx.requestTaskReconfiguration();
+ }
+
+ @Override
+ public void raiseError(Exception e) {
+ log.error("{} Connector raised an error", WorkerConnector.this, e);
+ onFailure(e);
+ WorkerConnector.this.ctx.raiseError(e);
+ }
+ }
+
+ private class WorkerSinkConnectorContext extends WorkerConnectorContext implements SinkConnectorContext {
+ }
+
+ private class WorkerSourceConnectorContext extends WorkerConnectorContext implements SourceConnectorContext {
+
+ private final OffsetStorageReader offsetStorageReader;
+
+ WorkerSourceConnectorContext(OffsetStorageReader offsetStorageReader) {
+ this.offsetStorageReader = offsetStorageReader;
+ }
+
+ @Override
+ public OffsetStorageReader offsetStorageReader() {
+ return offsetStorageReader;
+ }
+ }
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
index 10c413d..088900b 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
@@ -18,8 +18,15 @@ package org.apache.kafka.connect.runtime;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.ConnectorContext;
+import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.sink.SinkConnectorContext;
+import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.kafka.connect.source.SourceConnectorContext;
+import org.apache.kafka.connect.storage.OffsetStorageReader;
+import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.EasyMockSupport;
@@ -47,14 +54,18 @@ public class WorkerConnectorTest extends EasyMockSupport {
static {
CONFIG.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestConnector.class.getName());
CONFIG.put(ConnectorConfig.NAME_CONFIG, CONNECTOR);
+ CONFIG.put(SinkConnectorConfig.TOPICS_CONFIG, "my-topic");
}
public ConnectorConfig connectorConfig;
public MockConnectMetrics metrics;
@Mock Plugins plugins;
+ @Mock SourceConnector sourceConnector;
+ @Mock SinkConnector sinkConnector;
@Mock Connector connector;
@Mock ConnectorContext ctx;
@Mock ConnectorStatus.Listener listener;
+ @Mock OffsetStorageReader offsetStorageReader;
@Before
public void setup() {
@@ -70,11 +81,12 @@ public class WorkerConnectorTest extends EasyMockSupport {
@Test
public void testInitializeFailure() {
RuntimeException exception = new RuntimeException();
+ connector = sourceConnector;
connector.version();
expectLastCall().andReturn(VERSION);
- connector.initialize(EasyMock.notNull(ConnectorContext.class));
+ connector.initialize(EasyMock.notNull(SourceConnectorContext.class));
expectLastCall().andThrow(exception);
listener.onFailure(CONNECTOR, exception);
@@ -85,7 +97,7 @@ public class WorkerConnectorTest extends EasyMockSupport {
replayAll();
- WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener);
+ WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader);
workerConnector.initialize(connectorConfig);
assertFailedMetric(workerConnector);
@@ -98,11 +110,12 @@ public class WorkerConnectorTest extends EasyMockSupport {
@Test
public void testFailureIsFinalState() {
RuntimeException exception = new RuntimeException();
+ connector = sinkConnector;
connector.version();
expectLastCall().andReturn(VERSION);
- connector.initialize(EasyMock.notNull(ConnectorContext.class));
+ connector.initialize(EasyMock.notNull(SinkConnectorContext.class));
expectLastCall().andThrow(exception);
listener.onFailure(CONNECTOR, exception);
@@ -115,7 +128,7 @@ public class WorkerConnectorTest extends EasyMockSupport {
replayAll();
- WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener);
+ WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader);
workerConnector.initialize(connectorConfig);
assertFailedMetric(workerConnector);
@@ -129,10 +142,11 @@ public class WorkerConnectorTest extends EasyMockSupport {
@Test
public void testStartupAndShutdown() {
+ connector = sourceConnector;
connector.version();
expectLastCall().andReturn(VERSION);
- connector.initialize(EasyMock.notNull(ConnectorContext.class));
+ connector.initialize(EasyMock.notNull(SourceConnectorContext.class));
expectLastCall();
connector.start(CONFIG);
@@ -149,10 +163,10 @@ public class WorkerConnectorTest extends EasyMockSupport {
replayAll();
- WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener);
+ WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader);
workerConnector.initialize(connectorConfig);
- assertInitializedMetric(workerConnector);
+ assertInitializedSourceMetric(workerConnector);
workerConnector.transitionTo(TargetState.STARTED);
assertRunningMetric(workerConnector);
workerConnector.shutdown();
@@ -163,10 +177,11 @@ public class WorkerConnectorTest extends EasyMockSupport {
@Test
public void testStartupAndPause() {
+ connector = sinkConnector;
connector.version();
expectLastCall().andReturn(VERSION);
- connector.initialize(EasyMock.notNull(ConnectorContext.class));
+ connector.initialize(EasyMock.notNull(SinkConnectorContext.class));
expectLastCall();
connector.start(CONFIG);
@@ -186,10 +201,10 @@ public class WorkerConnectorTest extends EasyMockSupport {
replayAll();
- WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener);
+ WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader);
workerConnector.initialize(connectorConfig);
- assertInitializedMetric(workerConnector);
+ assertInitializedSinkMetric(workerConnector);
workerConnector.transitionTo(TargetState.STARTED);
assertRunningMetric(workerConnector);
workerConnector.transitionTo(TargetState.PAUSED);
@@ -202,10 +217,11 @@ public class WorkerConnectorTest extends EasyMockSupport {
@Test
public void testOnResume() {
+ connector = sourceConnector;
connector.version();
expectLastCall().andReturn(VERSION);
- connector.initialize(EasyMock.notNull(ConnectorContext.class));
+ connector.initialize(EasyMock.notNull(SourceConnectorContext.class));
expectLastCall();
listener.onPause(CONNECTOR);
@@ -225,10 +241,10 @@ public class WorkerConnectorTest extends EasyMockSupport {
replayAll();
- WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener);
+ WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader);
workerConnector.initialize(connectorConfig);
- assertInitializedMetric(workerConnector);
+ assertInitializedSourceMetric(workerConnector);
workerConnector.transitionTo(TargetState.PAUSED);
assertPausedMetric(workerConnector);
workerConnector.transitionTo(TargetState.STARTED);
@@ -241,10 +257,11 @@ public class WorkerConnectorTest extends EasyMockSupport {
@Test
public void testStartupPaused() {
+ connector = sinkConnector;
connector.version();
expectLastCall().andReturn(VERSION);
- connector.initialize(EasyMock.notNull(ConnectorContext.class));
+ connector.initialize(EasyMock.notNull(SinkConnectorContext.class));
expectLastCall();
// connector never gets started
@@ -257,10 +274,10 @@ public class WorkerConnectorTest extends EasyMockSupport {
replayAll();
- WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener);
+ WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader);
workerConnector.initialize(connectorConfig);
- assertInitializedMetric(workerConnector);
+ assertInitializedSinkMetric(workerConnector);
workerConnector.transitionTo(TargetState.PAUSED);
assertPausedMetric(workerConnector);
workerConnector.shutdown();
@@ -273,10 +290,11 @@ public class WorkerConnectorTest extends EasyMockSupport {
public void testStartupFailure() {
RuntimeException exception = new RuntimeException();
+ connector = sinkConnector;
connector.version();
expectLastCall().andReturn(VERSION);
- connector.initialize(EasyMock.notNull(ConnectorContext.class));
+ connector.initialize(EasyMock.notNull(SinkConnectorContext.class));
expectLastCall();
connector.start(CONFIG);
@@ -290,10 +308,10 @@ public class WorkerConnectorTest extends EasyMockSupport {
replayAll();
- WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener);
+ WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader);
workerConnector.initialize(connectorConfig);
- assertInitializedMetric(workerConnector);
+ assertInitializedSinkMetric(workerConnector);
workerConnector.transitionTo(TargetState.STARTED);
assertFailedMetric(workerConnector);
workerConnector.shutdown();
@@ -305,11 +323,12 @@ public class WorkerConnectorTest extends EasyMockSupport {
@Test
public void testShutdownFailure() {
RuntimeException exception = new RuntimeException();
+ connector = sourceConnector;
connector.version();
expectLastCall().andReturn(VERSION);
- connector.initialize(EasyMock.notNull(ConnectorContext.class));
+ connector.initialize(EasyMock.notNull(SourceConnectorContext.class));
expectLastCall();
connector.start(CONFIG);
@@ -326,10 +345,10 @@ public class WorkerConnectorTest extends EasyMockSupport {
replayAll();
- WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener);
+ WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader);
workerConnector.initialize(connectorConfig);
- assertInitializedMetric(workerConnector);
+ assertInitializedSourceMetric(workerConnector);
workerConnector.transitionTo(TargetState.STARTED);
assertRunningMetric(workerConnector);
workerConnector.shutdown();
@@ -340,10 +359,11 @@ public class WorkerConnectorTest extends EasyMockSupport {
@Test
public void testTransitionStartedToStarted() {
+ connector = sourceConnector;
connector.version();
expectLastCall().andReturn(VERSION);
- connector.initialize(EasyMock.notNull(ConnectorContext.class));
+ connector.initialize(EasyMock.notNull(SourceConnectorContext.class));
expectLastCall();
connector.start(CONFIG);
@@ -361,10 +381,10 @@ public class WorkerConnectorTest extends EasyMockSupport {
replayAll();
- WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener);
+ WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader);
workerConnector.initialize(connectorConfig);
- assertInitializedMetric(workerConnector);
+ assertInitializedSourceMetric(workerConnector);
workerConnector.transitionTo(TargetState.STARTED);
assertRunningMetric(workerConnector);
workerConnector.transitionTo(TargetState.STARTED);
@@ -377,10 +397,11 @@ public class WorkerConnectorTest extends EasyMockSupport {
@Test
public void testTransitionPausedToPaused() {
+ connector = sourceConnector;
connector.version();
expectLastCall().andReturn(VERSION);
- connector.initialize(EasyMock.notNull(ConnectorContext.class));
+ connector.initialize(EasyMock.notNull(SourceConnectorContext.class));
expectLastCall();
connector.start(CONFIG);
@@ -400,10 +421,10 @@ public class WorkerConnectorTest extends EasyMockSupport {
replayAll();
- WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener);
+ WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader);
workerConnector.initialize(connectorConfig);
- assertInitializedMetric(workerConnector);
+ assertInitializedSourceMetric(workerConnector);
workerConnector.transitionTo(TargetState.STARTED);
assertRunningMetric(workerConnector);
workerConnector.transitionTo(TargetState.PAUSED);
@@ -416,6 +437,27 @@ public class WorkerConnectorTest extends EasyMockSupport {
verifyAll();
}
+ @Test
+ public void testFailConnectorThatIsNeitherSourceNorSink() {
+ connector.version();
+ expectLastCall().andReturn(VERSION);
+
+ Capture<Throwable> exceptionCapture = Capture.newInstance();
+ listener.onFailure(EasyMock.eq(CONNECTOR), EasyMock.capture(exceptionCapture));
+ expectLastCall();
+
+ replayAll();
+
+ WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, metrics, listener, offsetStorageReader);
+
+ workerConnector.initialize(connectorConfig);
+ Throwable e = exceptionCapture.getValue();
+ assertTrue(e instanceof ConnectException);
+ assertTrue(e.getMessage().contains("must be a subclass of"));
+
+ verifyAll();
+ }
+
protected void assertFailedMetric(WorkerConnector workerConnector) {
assertFalse(workerConnector.metrics().isUnassigned());
assertTrue(workerConnector.metrics().isFailed());
@@ -444,7 +486,15 @@ public class WorkerConnectorTest extends EasyMockSupport {
assertFalse(workerConnector.metrics().isRunning());
}
- protected void assertInitializedMetric(WorkerConnector workerConnector) {
+ protected void assertInitializedSinkMetric(WorkerConnector workerConnector) {
+ assertInitializedMetric(workerConnector, "sink");
+ }
+
+ protected void assertInitializedSourceMetric(WorkerConnector workerConnector) {
+ assertInitializedMetric(workerConnector, "source");
+ }
+
+ protected void assertInitializedMetric(WorkerConnector workerConnector, String expectedType) {
assertTrue(workerConnector.metrics().isUnassigned());
assertFalse(workerConnector.metrics().isFailed());
assertFalse(workerConnector.metrics().isPaused());
@@ -454,12 +504,11 @@ public class WorkerConnectorTest extends EasyMockSupport {
String type = metrics.currentMetricValueAsString(metricGroup, "connector-type");
String clazz = metrics.currentMetricValueAsString(metricGroup, "connector-class");
String version = metrics.currentMetricValueAsString(metricGroup, "connector-version");
- assertEquals(type, "unknown");
+ assertEquals(expectedType, type);
assertNotNull(clazz);
assertEquals(VERSION, version);
}
private static abstract class TestConnector extends Connector {
}
-
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 1e8c0bc..b3fb675 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -27,7 +27,6 @@ import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.provider.MockFileConfigProvider;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
@@ -48,7 +47,9 @@ import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
+import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.Converter;
@@ -125,7 +126,8 @@ public class WorkerTest extends ThreadedTest {
@Mock private Herder herder;
@Mock private StatusBackingStore statusBackingStore;
- @Mock private Connector connector;
+ @Mock private SourceConnector sourceConnector;
+ @Mock private SinkConnector sinkConnector;
@Mock private ConnectorContext ctx;
@Mock private TestSourceTask task;
@Mock private WorkerSourceTask workerTask;
@@ -185,8 +187,8 @@ public class WorkerTest extends ThreadedTest {
// Create
EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
EasyMock.expect(plugins.newConnector(WorkerTestConnector.class.getName()))
- .andReturn(connector);
- EasyMock.expect(connector.version()).andReturn("1.0");
+ .andReturn(sourceConnector);
+ EasyMock.expect(sourceConnector.version()).andReturn("1.0");
Map<String, String> props = new HashMap<>();
props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar");
@@ -194,16 +196,16 @@ public class WorkerTest extends ThreadedTest {
props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, WorkerTestConnector.class.getName());
- EasyMock.expect(connector.version()).andReturn("1.0");
+ EasyMock.expect(sourceConnector.version()).andReturn("1.0");
expectFileConfigProvider();
- EasyMock.expect(plugins.compareAndSwapLoaders(connector))
+ EasyMock.expect(plugins.compareAndSwapLoaders(sourceConnector))
.andReturn(delegatingLoader)
.times(2);
- connector.initialize(anyObject(ConnectorContext.class));
+ sourceConnector.initialize(anyObject(ConnectorContext.class));
EasyMock.expectLastCall();
- connector.start(props);
+ sourceConnector.start(props);
EasyMock.expectLastCall();
EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader))
@@ -213,7 +215,7 @@ public class WorkerTest extends ThreadedTest {
EasyMock.expectLastCall();
// Remove
- connector.stop();
+ sourceConnector.stop();
EasyMock.expectLastCall();
connectorStatusListener.onShutdown(CONNECTOR_ID);
@@ -313,8 +315,8 @@ public class WorkerTest extends ThreadedTest {
expectFileConfigProvider();
EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
- EasyMock.expect(plugins.newConnector("WorkerTestConnector")).andReturn(connector);
- EasyMock.expect(connector.version()).andReturn("1.0");
+ EasyMock.expect(plugins.newConnector("WorkerTestConnector")).andReturn(sinkConnector);
+ EasyMock.expect(sinkConnector.version()).andReturn("1.0");
Map<String, String> props = new HashMap<>();
props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar");
@@ -322,13 +324,13 @@ public class WorkerTest extends ThreadedTest {
props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "WorkerTestConnector");
- EasyMock.expect(connector.version()).andReturn("1.0");
- EasyMock.expect(plugins.compareAndSwapLoaders(connector))
+ EasyMock.expect(sinkConnector.version()).andReturn("1.0");
+ EasyMock.expect(plugins.compareAndSwapLoaders(sinkConnector))
.andReturn(delegatingLoader)
.times(2);
- connector.initialize(anyObject(ConnectorContext.class));
+ sinkConnector.initialize(anyObject(ConnectorContext.class));
EasyMock.expectLastCall();
- connector.start(props);
+ sinkConnector.start(props);
EasyMock.expectLastCall();
EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader))
@@ -339,7 +341,7 @@ public class WorkerTest extends ThreadedTest {
EasyMock.expectLastCall();
// Remove
- connector.stop();
+ sinkConnector.stop();
EasyMock.expectLastCall();
connectorStatusListener.onShutdown(CONNECTOR_ID);
@@ -379,8 +381,8 @@ public class WorkerTest extends ThreadedTest {
expectFileConfigProvider();
EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
- EasyMock.expect(plugins.newConnector("WorkerTest")).andReturn(connector);
- EasyMock.expect(connector.version()).andReturn("1.0");
+ EasyMock.expect(plugins.newConnector("WorkerTest")).andReturn(sinkConnector);
+ EasyMock.expect(sinkConnector.version()).andReturn("1.0");
Map<String, String> props = new HashMap<>();
props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar");
@@ -388,13 +390,13 @@ public class WorkerTest extends ThreadedTest {
props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "WorkerTest");
- EasyMock.expect(connector.version()).andReturn("1.0");
- EasyMock.expect(plugins.compareAndSwapLoaders(connector))
+ EasyMock.expect(sinkConnector.version()).andReturn("1.0");
+ EasyMock.expect(plugins.compareAndSwapLoaders(sinkConnector))
.andReturn(delegatingLoader)
.times(2);
- connector.initialize(anyObject(ConnectorContext.class));
+ sinkConnector.initialize(anyObject(ConnectorContext.class));
EasyMock.expectLastCall();
- connector.start(props);
+ sinkConnector.start(props);
EasyMock.expectLastCall();
EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader))
@@ -405,7 +407,7 @@ public class WorkerTest extends ThreadedTest {
EasyMock.expectLastCall();
// Remove
- connector.stop();
+ sinkConnector.stop();
EasyMock.expectLastCall();
connectorStatusListener.onShutdown(CONNECTOR_ID);
@@ -460,8 +462,8 @@ public class WorkerTest extends ThreadedTest {
EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(3);
EasyMock.expect(plugins.newConnector(WorkerTestConnector.class.getName()))
- .andReturn(connector);
- EasyMock.expect(connector.version()).andReturn("1.0");
+ .andReturn(sinkConnector);
+ EasyMock.expect(sinkConnector.version()).andReturn("1.0");
Map<String, String> props = new HashMap<>();
props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar");
@@ -469,13 +471,13 @@ public class WorkerTest extends ThreadedTest {
props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, WorkerTestConnector.class.getName());
- EasyMock.expect(connector.version()).andReturn("1.0");
- EasyMock.expect(plugins.compareAndSwapLoaders(connector))
+ EasyMock.expect(sinkConnector.version()).andReturn("1.0");
+ EasyMock.expect(plugins.compareAndSwapLoaders(sinkConnector))
.andReturn(delegatingLoader)
.times(3);
- connector.initialize(anyObject(ConnectorContext.class));
+ sinkConnector.initialize(anyObject(ConnectorContext.class));
EasyMock.expectLastCall();
- connector.start(props);
+ sinkConnector.start(props);
EasyMock.expectLastCall();
EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader))
@@ -486,13 +488,13 @@ public class WorkerTest extends ThreadedTest {
EasyMock.expectLastCall();
// Reconfigure
- EasyMock.<Class<? extends Task>>expect(connector.taskClass()).andReturn(TestSourceTask.class);
+ EasyMock.<Class<? extends Task>>expect(sinkConnector.taskClass()).andReturn(TestSourceTask.class);
Map<String, String> taskProps = new HashMap<>();
taskProps.put("foo", "bar");
- EasyMock.expect(connector.taskConfigs(2)).andReturn(Arrays.asList(taskProps, taskProps));
+ EasyMock.expect(sinkConnector.taskConfigs(2)).andReturn(Arrays.asList(taskProps, taskProps));
// Remove
- connector.stop();
+ sinkConnector.stop();
EasyMock.expectLastCall();
connectorStatusListener.onShutdown(CONNECTOR_ID);
@@ -1359,7 +1361,7 @@ public class WorkerTest extends ThreadedTest {
}
/* Name here needs to be unique as we are testing the aliasing mechanism */
- public static class WorkerTestConnector extends Connector {
+ public static class WorkerTestConnector extends SourceConnector {
private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define("configName", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Test configName.");