You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/02/17 02:30:15 UTC
[kafka] branch trunk updated: MINOR: Fix bug introduced by adding
batch.size without default in FileStreamSourceConnector (#4579)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ee352be MINOR: Fix bug introduced by adding batch.size without default in FileStreamSourceConnector (#4579)
ee352be is described below
commit ee352be9c88663b95b1096b7a294e61857857380
Author: Konstantine Karantasis <ko...@confluent.io>
AuthorDate: Fri Feb 16 18:30:12 2018 -0800
MINOR: Fix bug introduced by adding batch.size without default in FileStreamSourceConnector (#4579)
https://github.com/apache/kafka/pull/4356 added `batch.size` config property to `FileStreamSourceConnector` but the property was added as required without a default in config definition (`ConfigDef`). This results in validation error during connector startup.
Unit tests were added for both `FileStreamSourceConnector` and `FileStreamSinkConnector` to avoid such issues in the future.
Reviewers: Randall Hauch <rh...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
.../connect/file/FileStreamSinkConnector.java | 4 ++-
.../connect/file/FileStreamSourceConnector.java | 30 +++++++++----------
.../kafka/connect/file/FileStreamSourceTask.java | 13 ++-------
.../connect/file/FileStreamSinkConnectorTest.java | 11 +++++++
.../file/FileStreamSourceConnectorTest.java | 34 ++++++++++++++++++++--
.../connect/file/FileStreamSourceTaskTest.java | 10 +------
6 files changed, 63 insertions(+), 39 deletions(-)
diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java
index 4ae7f4b..136e899 100644
--- a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java
+++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.file;
+import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
@@ -47,7 +48,8 @@ public class FileStreamSinkConnector extends SinkConnector {
@Override
public void start(Map<String, String> props) {
- filename = props.get(FILE_CONFIG);
+ AbstractConfig parsedConfig = new AbstractConfig(CONFIG_DEF, props);
+ filename = parsedConfig.getString(FILE_CONFIG);
}
@Override
diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
index 59006da..74b5f7c 100644
--- a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
+++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
@@ -16,12 +16,13 @@
*/
package org.apache.kafka.connect.file;
+import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.connector.Task;
-import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceConnector;
import java.util.ArrayList;
@@ -42,12 +43,13 @@ public class FileStreamSourceConnector extends SourceConnector {
private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(FILE_CONFIG, Type.STRING, null, Importance.HIGH, "Source filename. If not specified, the standard input will be used")
- .define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, "The topic to publish data to")
- .define(TASK_BATCH_SIZE_CONFIG, Type.INT, Importance.LOW, "The maximum number of records the Source task can read from file one time");
+ .define(TOPIC_CONFIG, Type.LIST, Importance.HIGH, "The topic to publish data to")
+ .define(TASK_BATCH_SIZE_CONFIG, Type.INT, DEFAULT_TASK_BATCH_SIZE, Importance.LOW,
+ "The maximum number of records the Source task can read from file one time");
private String filename;
private String topic;
- private int batchSize = DEFAULT_TASK_BATCH_SIZE;
+ private int batchSize;
@Override
public String version() {
@@ -56,20 +58,14 @@ public class FileStreamSourceConnector extends SourceConnector {
@Override
public void start(Map<String, String> props) {
- filename = props.get(FILE_CONFIG);
- topic = props.get(TOPIC_CONFIG);
- if (topic == null || topic.isEmpty())
- throw new ConnectException("FileStreamSourceConnector configuration must include 'topic' setting");
- if (topic.contains(","))
- throw new ConnectException("FileStreamSourceConnector should only have a single topic when used as a source.");
-
- if (props.containsKey(TASK_BATCH_SIZE_CONFIG)) {
- try {
- batchSize = Integer.parseInt(props.get(TASK_BATCH_SIZE_CONFIG));
- } catch (NumberFormatException e) {
- throw new ConnectException("Invalid FileStreamSourceConnector configuration", e);
- }
+ AbstractConfig parsedConfig = new AbstractConfig(CONFIG_DEF, props);
+ filename = parsedConfig.getString(FILE_CONFIG);
+ List<String> topics = parsedConfig.getList(TOPIC_CONFIG);
+ if (topics.size() != 1) {
+ throw new ConfigException("'topic' in FileStreamSourceConnector configuration requires definition of a single topic");
}
+ topic = topics.get(0);
+ batchSize = parsedConfig.getInt(TASK_BATCH_SIZE_CONFIG);
}
@Override
diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
index 482102f..17037f2 100644
--- a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
+++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
@@ -68,17 +68,10 @@ public class FileStreamSourceTask extends SourceTask {
streamOffset = null;
reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8));
}
+ // Missing topic or parsing error is not possible because we've parsed the config in the
+ // Connector
topic = props.get(FileStreamSourceConnector.TOPIC_CONFIG);
- if (topic == null)
- throw new ConnectException("FileStreamSourceTask config missing topic setting");
-
- if (props.containsKey(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG)) {
- try {
- batchSize = Integer.parseInt(props.get(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG));
- } catch (NumberFormatException e) {
- throw new ConnectException("Invalid FileStreamSourceTask configuration", e);
- }
- }
+ batchSize = Integer.parseInt(props.get(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG));
}
@Override
diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java
index c06c991..2348454 100644
--- a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java
+++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.file;
+import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.sink.SinkConnector;
import org.easymock.EasyMockSupport;
@@ -50,6 +51,16 @@ public class FileStreamSinkConnectorTest extends EasyMockSupport {
}
@Test
+ public void testConnectorConfigValidation() {
+ replayAll();
+ List<ConfigValue> configValues = connector.config().validate(sinkProperties);
+ for (ConfigValue val : configValues) {
+ assertEquals("Config property errors: " + val.errorMessages(), 0, val.errorMessages().size());
+ }
+ verifyAll();
+ }
+
+ @Test
public void testSinkTasks() {
replayAll();
diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java
index 69a94a8..d7efdac 100644
--- a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java
+++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java
@@ -16,8 +16,9 @@
*/
package org.apache.kafka.connect.file;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.ConnectorContext;
-import org.apache.kafka.connect.errors.ConnectException;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.junit.Before;
@@ -52,6 +53,16 @@ public class FileStreamSourceConnectorTest extends EasyMockSupport {
}
@Test
+ public void testConnectorConfigValidation() {
+ replayAll();
+ List<ConfigValue> configValues = connector.config().validate(sourceProperties);
+ for (ConfigValue val : configValues) {
+ assertEquals("Config property errors: " + val.errorMessages(), 0, val.errorMessages().size());
+ }
+ verifyAll();
+ }
+
+ @Test
public void testSourceTasks() {
replayAll();
@@ -87,7 +98,7 @@ public class FileStreamSourceConnectorTest extends EasyMockSupport {
EasyMock.verify(ctx);
}
- @Test(expected = ConnectException.class)
+ @Test(expected = ConfigException.class)
public void testMultipleSourcesInvalid() {
sourceProperties.put(FileStreamSourceConnector.TOPIC_CONFIG, MULTIPLE_TOPICS);
connector.start(sourceProperties);
@@ -102,4 +113,23 @@ public class FileStreamSourceConnectorTest extends EasyMockSupport {
EasyMock.verify(ctx);
}
+
+ @Test(expected = ConfigException.class)
+ public void testMissingTopic() {
+ sourceProperties.remove(FileStreamSourceConnector.TOPIC_CONFIG);
+ connector.start(sourceProperties);
+ }
+
+ @Test(expected = ConfigException.class)
+ public void testBlankTopic() {
+ // Because of trimming this tests is same as testing for empty string.
+ sourceProperties.put(FileStreamSourceConnector.TOPIC_CONFIG, " ");
+ connector.start(sourceProperties);
+ }
+
+ @Test(expected = ConfigException.class)
+ public void testInvalidBatchSize() {
+ sourceProperties.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, "abcd");
+ connector.start(sourceProperties);
+ }
}
diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
index 3cb7128..c1c0a74 100644
--- a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
+++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.connect.file;
-import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.OffsetStorageReader;
@@ -55,6 +54,7 @@ public class FileStreamSourceTaskTest extends EasyMockSupport {
config = new HashMap<>();
config.put(FileStreamSourceConnector.FILE_CONFIG, tempFile.getAbsolutePath());
config.put(FileStreamSourceConnector.TOPIC_CONFIG, TOPIC);
+ config.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, String.valueOf(FileStreamSourceConnector.DEFAULT_TASK_BATCH_SIZE));
task = new FileStreamSourceTask();
offsetStorageReader = createMock(OffsetStorageReader.class);
context = createMock(SourceTaskContext.class);
@@ -151,14 +151,6 @@ public class FileStreamSourceTaskTest extends EasyMockSupport {
task.stop();
}
- @Test(expected = ConnectException.class)
- public void testMissingTopic() throws InterruptedException {
- replay();
-
- config.remove(FileStreamSourceConnector.TOPIC_CONFIG);
- task.start(config);
- }
-
@Test
public void testMissingFile() throws InterruptedException {
replay();
--
To stop receiving notification emails like this one, please contact
jgus@apache.org.