You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/01/04 21:02:26 UTC

kafka git commit: KAFKA-2422: Allow copycat connector plugins to be aliased to simpler names

Repository: kafka
Updated Branches:
  refs/heads/trunk f9642e2a9 -> b93f48f74


KAFKA-2422: Allow copycat connector plugins to be aliased to simpler names

…names

Author: Gwen Shapira <cs...@gmail.com>

Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>

Closes #687 from gwenshap/KAFKA-2422


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b93f48f7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b93f48f7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b93f48f7

Branch: refs/heads/trunk
Commit: b93f48f7494e1db4d564b6c28772712ee7681620
Parents: f9642e2
Author: Gwen Shapira <cs...@gmail.com>
Authored: Mon Jan 4 15:01:58 2016 -0500
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Mon Jan 4 15:01:58 2016 -0500

----------------------------------------------------------------------
 build.gradle                                    |   2 +
 checkstyle/import-control.xml                   |   2 +
 config/connect-file-sink.properties             |   2 +-
 config/connect-file-source.properties           |   2 +-
 .../kafka/connect/runtime/ConnectorConfig.java  |   6 +-
 .../apache/kafka/connect/runtime/Worker.java    |  64 +++++++++--
 .../runtime/distributed/DistributedHerder.java  |   4 +-
 .../kafka/connect/runtime/WorkerTest.java       | 115 ++++++++++++++++++-
 .../distributed/DistributedHerderTest.java      |   2 +
 .../templates/connect-file-sink.properties      |   2 +-
 .../templates/connect-file-source.properties    |   2 +-
 11 files changed, 181 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b93f48f7/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 6bb0363..75db84a 100644
--- a/build.gradle
+++ b/build.gradle
@@ -36,6 +36,7 @@ def powermock_easymock='org.powermock:powermock-api-easymock:1.6.3'
 def jackson_version = '2.6.3'
 def jetty_version = '9.2.14.v20151106'
 def jersey_version = '2.22.1'
+def reflections_version = '0.9.10'
 
 allprojects {
   apply plugin: 'idea'
@@ -790,6 +791,7 @@ project(':connect:runtime') {
     compile "org.eclipse.jetty:jetty-servlet:$jetty_version"
     compile "com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:$jackson_version"
     compile "org.glassfish.jersey.containers:jersey-container-servlet:$jersey_version"
+    compile "org.reflections:reflections:$reflections_version"
 
     testCompile "$junit"
     testCompile "$easymock"

http://git-wip-us.apache.org/repos/asf/kafka/blob/b93f48f7/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index a65a2dc..a663cf7 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -171,6 +171,8 @@
 
     <subpackage name="runtime">
       <allow pkg="org.apache.kafka.connect" />
+      <allow pkg="org.reflections"/>
+      <allow pkg="org.reflections.util"/>
 
       <subpackage name="rest">
         <allow pkg="org.eclipse.jetty" />

http://git-wip-us.apache.org/repos/asf/kafka/blob/b93f48f7/config/connect-file-sink.properties
----------------------------------------------------------------------
diff --git a/config/connect-file-sink.properties b/config/connect-file-sink.properties
index e2cf361..594ccc6 100644
--- a/config/connect-file-sink.properties
+++ b/config/connect-file-sink.properties
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 name=local-file-sink
-connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
+connector.class=FileStreamSink
 tasks.max=1
 file=test.sink.txt
 topics=connect-test
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/b93f48f7/config/connect-file-source.properties
----------------------------------------------------------------------
diff --git a/config/connect-file-source.properties b/config/connect-file-source.properties
index df92d44..599cf4c 100644
--- a/config/connect-file-source.properties
+++ b/config/connect-file-source.properties
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 name=local-file-source
-connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
+connector.class=FileStreamSource
 tasks.max=1
 file=test.txt
 topic=connect-test
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/b93f48f7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index 77cfc8d..4824acd 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -43,7 +43,9 @@ public class ConnectorConfig extends AbstractConfig {
 
     public static final String CONNECTOR_CLASS_CONFIG = "connector.class";
     private static final String CONNECTOR_CLASS_DOC =
-            "Name of the class for this connector. Must be a subclass of org.apache.kafka.connect.connector.Connector";
+                    "Name or alias of the class for this connector. Must be a subclass of org.apache.kafka.connect.connector.Connector. " +
+                    "If the connector is org.apache.kafka.connect.file.FileStreamSinkConnector, you can either specify this full name, " +
+                    " or use \"FileStreamSink\" or \"FileStreamSinkConnector\" to make the configuration a bit shorter";
 
     public static final String TASKS_MAX_CONFIG = "tasks.max";
     private static final String TASKS_MAX_DOC = "Maximum number of tasks to use for this connector.";
@@ -58,7 +60,7 @@ public class ConnectorConfig extends AbstractConfig {
     static {
         config = new ConfigDef()
                 .define(NAME_CONFIG, Type.STRING, Importance.HIGH, NAME_DOC)
-                .define(CONNECTOR_CLASS_CONFIG, Type.CLASS, Importance.HIGH, CONNECTOR_CLASS_DOC)
+                .define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, CONNECTOR_CLASS_DOC)
                 .define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, Importance.HIGH, TASKS_MAX_DOC)
                 .define(TOPICS_CONFIG, Type.LIST, TOPICS_DEFAULT, Importance.HIGH, TOPICS_DOC);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b93f48f7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 3898ad6..4766cf7 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -27,6 +27,7 @@ import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.ConnectorContext;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.source.SourceTask;
 import org.apache.kafka.connect.storage.Converter;
@@ -35,15 +36,20 @@ import org.apache.kafka.connect.storage.OffsetStorageReader;
 import org.apache.kafka.connect.storage.OffsetStorageWriter;
 import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
 import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.reflections.Reflections;
+import org.reflections.util.ClasspathHelper;
+import org.reflections.util.ConfigurationBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+
 /**
  * <p>
  * Worker runs a (dynamic) set of tasks in a set of threads, doing the work of actually moving
@@ -170,15 +176,9 @@ public class Worker {
      */
     public void addConnector(ConnectorConfig connConfig, ConnectorContext ctx) {
         String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
-        Class<?> maybeConnClass = connConfig.getClass(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
-        log.info("Creating connector {} of type {}", connName, maybeConnClass.getName());
+        Class<? extends Connector> connClass = getConnectorClass(connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
 
-        Class<? extends Connector> connClass;
-        try {
-            connClass = maybeConnClass.asSubclass(Connector.class);
-        } catch (ClassCastException e) {
-            throw new ConnectException("Specified class is not a subclass of Connector: " + maybeConnClass.getName());
-        }
+        log.info("Creating connector {} of type {}", connName, connClass.getName());
 
         if (connectors.containsKey(connName))
             throw new ConnectException("Connector with name " + connName + " already exists");
@@ -197,6 +197,54 @@ public class Worker {
         log.info("Finished creating connector {}", connName);
     }
 
+    /* Now that the configuration doesn't contain the actual class name, we need to be able to tell the herder whether a connector is a Sink */
+    public boolean isSinkConnector(String connName) {
+        return SinkConnector.class.isAssignableFrom(connectors.get(connName).getClass());
+    }
+
+
+    // Iterate over our entire classpath to find all the connectors and hopefully one of them matches the alias from the connector configration
+    private Class<? extends Connector> getConnectorClass(String connectorAlias) {
+        Reflections reflections = new Reflections(new ConfigurationBuilder()
+                .setUrls(ClasspathHelper.forJavaClassPath()));
+
+        Set<Class<? extends Connector>> connectors = reflections.getSubTypesOf(Connector.class);
+
+        List<Class<? extends Connector>> results = new ArrayList<>();
+
+        for (Class<? extends Connector> connector: connectors) {
+            // Configuration included the fully qualified class name
+            if (connector.getName().equals(connectorAlias))
+                results.add(connector);
+
+            // Configuration included the class name but not package
+            if (connector.getSimpleName().equals(connectorAlias))
+                results.add(connector);
+
+            // Configuration included a short version of the name (i.e. FileStreamSink instead of FileStreamSinkConnector)
+            if (connector.getSimpleName().equals(connectorAlias + "Connector"))
+                results.add(connector);
+        }
+
+        if (results.isEmpty())
+            throw new ConnectException("Failed to find any class that implements Connector and which name matches " + connectorAlias + " available connectors are: " + connectorNames(connectors));
+        if (results.size() > 1) {
+            throw new ConnectException("More than one connector matches alias " +  connectorAlias + ". Please use full package + class name instead. Classes found: " + connectorNames(results));
+        }
+
+        // We just validated that we have exactly one result, so this is safe
+        return results.get(0);
+    }
+
+    private String connectorNames(Collection<Class<? extends Connector>> connectors) {
+        StringBuilder names = new StringBuilder();
+        for (Class<?> c : connectors)
+            names.append(c.getName()).append(", ");
+
+        return names.substring(0, names.toString().length() - 2);
+    }
+
+
     private static Connector instantiateConnector(Class<? extends Connector> connClass) {
         try {
             return Utils.newInstance(connClass);

http://git-wip-us.apache.org/repos/asf/kafka/blob/b93f48f7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 7caaabb..a9e8dd5 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -34,7 +34,6 @@ import org.apache.kafka.connect.runtime.Worker;
 import org.apache.kafka.connect.runtime.rest.RestServer;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
-import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.storage.KafkaConfigStorage;
 import org.apache.kafka.connect.util.Callback;
 import org.apache.kafka.connect.util.ConnectorTaskId;
@@ -709,9 +708,8 @@ public class DistributedHerder implements Herder, Runnable {
         try {
             Map<String, String> configs = configState.connectorConfig(connName);
             ConnectorConfig connConfig = new ConnectorConfig(configs);
-
             List<String> sinkTopics = null;
-            if (SinkConnector.class.isAssignableFrom(connConfig.getClass(ConnectorConfig.CONNECTOR_CLASS_CONFIG)))
+            if (worker.isSinkConnector(connName))
                 sinkTopics = connConfig.getList(ConnectorConfig.TOPICS_CONFIG);
 
             final List<Map<String, String>> taskProps

http://git-wip-us.apache.org/repos/asf/kafka/blob/b93f48f7/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 4e25e9d..335e0ce 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -91,14 +91,14 @@ public class WorkerTest extends ThreadedTest {
         ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
 
         PowerMock.mockStatic(Worker.class);
-        PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{TestConnector.class}).andReturn(connector);
+        PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{WorkerTestConnector.class}).andReturn(connector);
         EasyMock.expect(connector.version()).andReturn("1.0");
 
         Map<String, String> props = new HashMap<>();
         props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
         props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
         props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
-        props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestConnector.class.getName());
+        props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, WorkerTestConnector.class.getName());
 
         connector.initialize(ctx);
         EasyMock.expectLastCall();
@@ -135,6 +135,110 @@ public class WorkerTest extends ThreadedTest {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testAddConnectorByAlias() throws Exception {
+        offsetBackingStore.configure(EasyMock.anyObject(Map.class));
+        EasyMock.expectLastCall();
+        offsetBackingStore.start();
+        EasyMock.expectLastCall();
+
+        // Create
+        Connector connector = PowerMock.createMock(Connector.class);
+        ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
+
+        PowerMock.mockStatic(Worker.class);
+        PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{WorkerTestConnector.class}).andReturn(connector);
+        EasyMock.expect(connector.version()).andReturn("1.0");
+
+        Map<String, String> props = new HashMap<>();
+        props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
+        props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
+        props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
+        props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "WorkerTestConnector");
+
+        connector.initialize(ctx);
+        EasyMock.expectLastCall();
+        connector.start(props);
+        EasyMock.expectLastCall();
+
+        // Remove
+        connector.stop();
+        EasyMock.expectLastCall();
+
+        offsetBackingStore.stop();
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+
+        worker = new Worker(new MockTime(), config, offsetBackingStore);
+        worker.start();
+
+        ConnectorConfig config = new ConnectorConfig(props);
+        assertEquals(Collections.emptySet(), worker.connectorNames());
+        worker.addConnector(config, ctx);
+        assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames());
+
+        worker.stopConnector(CONNECTOR_ID);
+        assertEquals(Collections.emptySet(), worker.connectorNames());
+        // Nothing should be left, so this should effectively be a nop
+        worker.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testAddConnectorByShortAlias() throws Exception {
+        offsetBackingStore.configure(EasyMock.anyObject(Map.class));
+        EasyMock.expectLastCall();
+        offsetBackingStore.start();
+        EasyMock.expectLastCall();
+
+        // Create
+        Connector connector = PowerMock.createMock(Connector.class);
+        ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
+
+        PowerMock.mockStatic(Worker.class);
+        PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{WorkerTestConnector.class}).andReturn(connector);
+        EasyMock.expect(connector.version()).andReturn("1.0");
+
+        Map<String, String> props = new HashMap<>();
+        props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
+        props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
+        props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
+        props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "WorkerTest");
+
+        connector.initialize(ctx);
+        EasyMock.expectLastCall();
+        connector.start(props);
+        EasyMock.expectLastCall();
+
+        // Remove
+        connector.stop();
+        EasyMock.expectLastCall();
+
+        offsetBackingStore.stop();
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        worker = new Worker(new MockTime(), config, offsetBackingStore);
+        worker.start();
+
+        ConnectorConfig config = new ConnectorConfig(props);
+        assertEquals(Collections.emptySet(), worker.connectorNames());
+        worker.addConnector(config, ctx);
+        assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames());
+
+        worker.stopConnector(CONNECTOR_ID);
+        assertEquals(Collections.emptySet(), worker.connectorNames());
+        // Nothing should be left, so this should effectively be a nop
+        worker.stop();
+
+        PowerMock.verifyAll();
+    }
+
+
     @Test(expected = ConnectException.class)
     public void testStopInvalidConnector() {
         offsetBackingStore.configure(EasyMock.anyObject(Map.class));
@@ -162,14 +266,14 @@ public class WorkerTest extends ThreadedTest {
         ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
 
         PowerMock.mockStatic(Worker.class);
-        PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{TestConnector.class}).andReturn(connector);
+        PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{WorkerTestConnector.class}).andReturn(connector);
         EasyMock.expect(connector.version()).andReturn("1.0");
 
         Map<String, String> props = new HashMap<>();
         props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
         props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
         props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
-        props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestConnector.class.getName());
+        props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, WorkerTestConnector.class.getName());
 
         connector.initialize(ctx);
         EasyMock.expectLastCall();
@@ -345,7 +449,8 @@ public class WorkerTest extends ThreadedTest {
     }
 
 
-    private static class TestConnector extends Connector {
+    /* Name here needs to be unique as we are testing the aliasing mechanism */
+    private static class WorkerTestConnector extends Connector {
         @Override
         public String version() {
             return "1.0";

http://git-wip-us.apache.org/repos/asf/kafka/blob/b93f48f7/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 36f8fce..76f9bc0 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -136,6 +136,7 @@ public class DistributedHerderTest {
     @Before
     public void setUp() throws Exception {
         worker = PowerMock.createMock(Worker.class);
+        EasyMock.expect(worker.isSinkConnector(CONN1)).andStubReturn(Boolean.FALSE);
         time = new MockTime();
 
         herder = PowerMock.createPartialMock(DistributedHerder.class, new String[]{"backoff"},
@@ -487,6 +488,7 @@ public class DistributedHerderTest {
         worker.addConnector(EasyMock.capture(capturedUpdatedConfig), EasyMock.<ConnectorContext>anyObject());
         PowerMock.expectLastCall();
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b93f48f7/tests/kafkatest/tests/templates/connect-file-sink.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/templates/connect-file-sink.properties b/tests/kafkatest/tests/templates/connect-file-sink.properties
index f52c26e..ad78bb3 100644
--- a/tests/kafkatest/tests/templates/connect-file-sink.properties
+++ b/tests/kafkatest/tests/templates/connect-file-sink.properties
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 name=local-file-sink
-connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
+connector.class=FileStreamSink
 tasks.max=1
 file={{ OUTPUT_FILE }}
 topics={{ TOPIC }}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/b93f48f7/tests/kafkatest/tests/templates/connect-file-source.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/templates/connect-file-source.properties b/tests/kafkatest/tests/templates/connect-file-source.properties
index e8a6f05..d2d5e97 100644
--- a/tests/kafkatest/tests/templates/connect-file-source.properties
+++ b/tests/kafkatest/tests/templates/connect-file-source.properties
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 name=local-file-source
-connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
+connector.class=FileStreamSource
 tasks.max=1
 file={{ INPUT_FILE }}
 topic={{ TOPIC }}
\ No newline at end of file