You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/10/19 21:29:57 UTC
kafka git commit: KAFKA-4312: If filePath is empty string writeAsText
should have more meaningful error message
Repository: kafka
Updated Branches:
refs/heads/trunk c2a8b8611 -> b51002c57
KAFKA-4312: If filePath is empty string writeAsText should have more meaningful error message
\u2026eaningful error message
Author: bbejeck <bb...@gmail.com>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #2042 from bbejeck/KAFKA-4312_write_as_text_throws_NPE_empty_string
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b51002c5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b51002c5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b51002c5
Branch: refs/heads/trunk
Commit: b51002c576ea9758132d75a8a0fe454e1bc270a2
Parents: c2a8b86
Author: Bill Bejeck <bb...@gmail.com>
Authored: Wed Oct 19 14:29:53 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Oct 19 14:29:53 2016 -0700
----------------------------------------------------------------------
.../apache/kafka/streams/kstream/internals/KStreamImpl.java | 3 +++
.../org/apache/kafka/streams/kstream/internals/KTableImpl.java | 4 ++++
.../kafka/streams/kstream/internals/KStreamImplTest.java | 6 ++++++
.../apache/kafka/streams/kstream/internals/KTableImplTest.java | 6 ++++++
4 files changed, 19 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b51002c5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index b6c3401..bb77e96 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -207,6 +207,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
@Override
public void writeAsText(String filePath, String streamName, Serde<K> keySerde, Serde<V> valSerde) {
Objects.requireNonNull(filePath, "filePath can't be null");
+ if (filePath.trim().isEmpty()) {
+ throw new TopologyBuilderException("filePath can't be an empty string");
+ }
String name = topology.newName(PRINTING_NAME);
streamName = (streamName == null) ? this.name : streamName;
try {
http://git-wip-us.apache.org/repos/asf/kafka/blob/b51002c5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index c53e761..fc1c076 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -189,6 +189,10 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
*/
@Override
public void writeAsText(String filePath, String streamName, Serde<K> keySerde, Serde<V> valSerde) {
+ Objects.requireNonNull(filePath, "filePath can't be null");
+ if (filePath.trim().isEmpty()) {
+ throw new TopologyBuilderException("filePath can't be an empty string");
+ }
String name = topology.newName(PRINTING_NAME);
streamName = (streamName == null) ? this.name : streamName;
try {
http://git-wip-us.apache.org/repos/asf/kafka/blob/b51002c5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index fb2afec..e5e334c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
@@ -183,6 +184,11 @@ public class KStreamImplTest {
testStream.writeAsText(null);
}
+ @Test(expected = TopologyBuilderException.class)
+ public void shouldNotAllowEmptyFilePathOnWriteAsText() throws Exception {
+ testStream.writeAsText("\t \t");
+ }
+
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullMapperOnFlatMap() throws Exception {
testStream.flatMap(null);
http://git-wip-us.apache.org/repos/asf/kafka/blob/b51002c5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index 4b9ea06..afa1033 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -20,6 +20,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Predicate;
@@ -402,6 +403,11 @@ public class KTableImplTest {
table.writeAsText(null);
}
+ @Test(expected = TopologyBuilderException.class)
+ public void shouldNotAllowEmptyFilePathOnWriteAsText() throws Exception {
+ table.writeAsText("\t \t");
+ }
+
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullActionOnForEach() throws Exception {
table.foreach(null);