You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/02/17 02:30:15 UTC

[kafka] branch trunk updated: MINOR: Fix bug introduced by adding batch.size without default in FileStreamSourceConnector (#4579)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ee352be  MINOR: Fix bug introduced by adding batch.size without default in FileStreamSourceConnector (#4579)
ee352be is described below

commit ee352be9c88663b95b1096b7a294e61857857380
Author: Konstantine Karantasis <ko...@confluent.io>
AuthorDate: Fri Feb 16 18:30:12 2018 -0800

    MINOR: Fix bug introduced by adding batch.size without default in FileStreamSourceConnector (#4579)
    
    https://github.com/apache/kafka/pull/4356 added `batch.size` config property to `FileStreamSourceConnector` but the property was added as required without a default in config definition (`ConfigDef`). This results in validation error during connector startup.
    
    Unit tests were added for both `FileStreamSourceConnector` and `FileStreamSinkConnector` to avoid such issues in the future.
    
    Reviewers: Randall Hauch <rh...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
 .../connect/file/FileStreamSinkConnector.java      |  4 ++-
 .../connect/file/FileStreamSourceConnector.java    | 30 +++++++++----------
 .../kafka/connect/file/FileStreamSourceTask.java   | 13 ++-------
 .../connect/file/FileStreamSinkConnectorTest.java  | 11 +++++++
 .../file/FileStreamSourceConnectorTest.java        | 34 ++++++++++++++++++++--
 .../connect/file/FileStreamSourceTaskTest.java     | 10 +------
 6 files changed, 63 insertions(+), 39 deletions(-)

diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java
index 4ae7f4b..136e899 100644
--- a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java
+++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.file;
 
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
@@ -47,7 +48,8 @@ public class FileStreamSinkConnector extends SinkConnector {
 
     @Override
     public void start(Map<String, String> props) {
-        filename = props.get(FILE_CONFIG);
+        AbstractConfig parsedConfig = new AbstractConfig(CONFIG_DEF, props);
+        filename = parsedConfig.getString(FILE_CONFIG);
     }
 
     @Override
diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
index 59006da..74b5f7c 100644
--- a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
+++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
@@ -16,12 +16,13 @@
  */
 package org.apache.kafka.connect.file;
 
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.connect.connector.Task;
-import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.source.SourceConnector;
 
 import java.util.ArrayList;
@@ -42,12 +43,13 @@ public class FileStreamSourceConnector extends SourceConnector {
 
     private static final ConfigDef CONFIG_DEF = new ConfigDef()
         .define(FILE_CONFIG, Type.STRING, null, Importance.HIGH, "Source filename. If not specified, the standard input will be used")
-        .define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, "The topic to publish data to")
-        .define(TASK_BATCH_SIZE_CONFIG, Type.INT, Importance.LOW, "The maximum number of records the Source task can read from file one time");
+        .define(TOPIC_CONFIG, Type.LIST, Importance.HIGH, "The topic to publish data to")
+        .define(TASK_BATCH_SIZE_CONFIG, Type.INT, DEFAULT_TASK_BATCH_SIZE, Importance.LOW,
+                "The maximum number of records the Source task can read from file one time");
 
     private String filename;
     private String topic;
-    private int batchSize = DEFAULT_TASK_BATCH_SIZE;
+    private int batchSize;
 
     @Override
     public String version() {
@@ -56,20 +58,14 @@ public class FileStreamSourceConnector extends SourceConnector {
 
     @Override
     public void start(Map<String, String> props) {
-        filename = props.get(FILE_CONFIG);
-        topic = props.get(TOPIC_CONFIG);
-        if (topic == null || topic.isEmpty())
-            throw new ConnectException("FileStreamSourceConnector configuration must include 'topic' setting");
-        if (topic.contains(","))
-            throw new ConnectException("FileStreamSourceConnector should only have a single topic when used as a source.");
-
-        if (props.containsKey(TASK_BATCH_SIZE_CONFIG)) {
-            try {
-                batchSize = Integer.parseInt(props.get(TASK_BATCH_SIZE_CONFIG));
-            } catch (NumberFormatException e) {
-                throw new ConnectException("Invalid FileStreamSourceConnector configuration", e);
-            }
+        AbstractConfig parsedConfig = new AbstractConfig(CONFIG_DEF, props);
+        filename = parsedConfig.getString(FILE_CONFIG);
+        List<String> topics = parsedConfig.getList(TOPIC_CONFIG);
+        if (topics.size() != 1) {
+            throw new ConfigException("'topic' in FileStreamSourceConnector configuration requires definition of a single topic");
         }
+        topic = topics.get(0);
+        batchSize = parsedConfig.getInt(TASK_BATCH_SIZE_CONFIG);
     }
 
     @Override
diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
index 482102f..17037f2 100644
--- a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
+++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
@@ -68,17 +68,10 @@ public class FileStreamSourceTask extends SourceTask {
             streamOffset = null;
             reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8));
         }
+        // Missing topic or parsing error is not possible because we've parsed the config in the
+        // Connector
         topic = props.get(FileStreamSourceConnector.TOPIC_CONFIG);
-        if (topic == null)
-            throw new ConnectException("FileStreamSourceTask config missing topic setting");
-
-        if (props.containsKey(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG)) {
-            try {
-                batchSize = Integer.parseInt(props.get(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG));
-            } catch (NumberFormatException e) {
-                throw new ConnectException("Invalid FileStreamSourceTask configuration", e);
-            }
-        }
+        batchSize = Integer.parseInt(props.get(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG));
     }
 
     @Override
diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java
index c06c991..2348454 100644
--- a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java
+++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.file;
 
+import org.apache.kafka.common.config.ConfigValue;
 import org.apache.kafka.connect.connector.ConnectorContext;
 import org.apache.kafka.connect.sink.SinkConnector;
 import org.easymock.EasyMockSupport;
@@ -50,6 +51,16 @@ public class FileStreamSinkConnectorTest extends EasyMockSupport {
     }
 
     @Test
+    public void testConnectorConfigValidation() {
+        replayAll();
+        List<ConfigValue> configValues = connector.config().validate(sinkProperties);
+        for (ConfigValue val : configValues) {
+            assertEquals("Config property errors: " + val.errorMessages(), 0, val.errorMessages().size());
+        }
+        verifyAll();
+    }
+
+    @Test
     public void testSinkTasks() {
         replayAll();
 
diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java
index 69a94a8..d7efdac 100644
--- a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java
+++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java
@@ -16,8 +16,9 @@
  */
 package org.apache.kafka.connect.file;
 
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.ConfigValue;
 import org.apache.kafka.connect.connector.ConnectorContext;
-import org.apache.kafka.connect.errors.ConnectException;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockSupport;
 import org.junit.Before;
@@ -52,6 +53,16 @@ public class FileStreamSourceConnectorTest extends EasyMockSupport {
     }
 
     @Test
+    public void testConnectorConfigValidation() {
+        replayAll();
+        List<ConfigValue> configValues = connector.config().validate(sourceProperties);
+        for (ConfigValue val : configValues) {
+            assertEquals("Config property errors: " + val.errorMessages(), 0, val.errorMessages().size());
+        }
+        verifyAll();
+    }
+
+    @Test
     public void testSourceTasks() {
         replayAll();
 
@@ -87,7 +98,7 @@ public class FileStreamSourceConnectorTest extends EasyMockSupport {
         EasyMock.verify(ctx);
     }
 
-    @Test(expected = ConnectException.class)
+    @Test(expected = ConfigException.class)
     public void testMultipleSourcesInvalid() {
         sourceProperties.put(FileStreamSourceConnector.TOPIC_CONFIG, MULTIPLE_TOPICS);
         connector.start(sourceProperties);
@@ -102,4 +113,23 @@ public class FileStreamSourceConnectorTest extends EasyMockSupport {
 
         EasyMock.verify(ctx);
     }
+
+    @Test(expected = ConfigException.class)
+    public void testMissingTopic() {
+        sourceProperties.remove(FileStreamSourceConnector.TOPIC_CONFIG);
+        connector.start(sourceProperties);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testBlankTopic() {
+        // Because of trimming this tests is same as testing for empty string.
+        sourceProperties.put(FileStreamSourceConnector.TOPIC_CONFIG, "     ");
+        connector.start(sourceProperties);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testInvalidBatchSize() {
+        sourceProperties.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, "abcd");
+        connector.start(sourceProperties);
+    }
 }
diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
index 3cb7128..c1c0a74 100644
--- a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
+++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.connect.file;
 
-import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTaskContext;
 import org.apache.kafka.connect.storage.OffsetStorageReader;
@@ -55,6 +54,7 @@ public class FileStreamSourceTaskTest extends EasyMockSupport {
         config = new HashMap<>();
         config.put(FileStreamSourceConnector.FILE_CONFIG, tempFile.getAbsolutePath());
         config.put(FileStreamSourceConnector.TOPIC_CONFIG, TOPIC);
+        config.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, String.valueOf(FileStreamSourceConnector.DEFAULT_TASK_BATCH_SIZE));
         task = new FileStreamSourceTask();
         offsetStorageReader = createMock(OffsetStorageReader.class);
         context = createMock(SourceTaskContext.class);
@@ -151,14 +151,6 @@ public class FileStreamSourceTaskTest extends EasyMockSupport {
         task.stop();
     }
 
-    @Test(expected = ConnectException.class)
-    public void testMissingTopic() throws InterruptedException {
-        replay();
-
-        config.remove(FileStreamSourceConnector.TOPIC_CONFIG);
-        task.start(config);
-    }
-
     @Test
     public void testMissingFile() throws InterruptedException {
         replay();

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.