You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/10/19 21:29:57 UTC

kafka git commit: KAFKA-4312: If filePath is empty string writeAsText should have more meaningful error message

Repository: kafka
Updated Branches:
  refs/heads/trunk c2a8b8611 -> b51002c57


KAFKA-4312: If filePath is empty string writeAsText should have more meaningful error message

\u2026eaningful error message

Author: bbejeck <bb...@gmail.com>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #2042 from bbejeck/KAFKA-4312_write_as_text_throws_NPE_empty_string


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b51002c5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b51002c5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b51002c5

Branch: refs/heads/trunk
Commit: b51002c576ea9758132d75a8a0fe454e1bc270a2
Parents: c2a8b86
Author: Bill Bejeck <bb...@gmail.com>
Authored: Wed Oct 19 14:29:53 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Oct 19 14:29:53 2016 -0700

----------------------------------------------------------------------
 .../apache/kafka/streams/kstream/internals/KStreamImpl.java    | 3 +++
 .../org/apache/kafka/streams/kstream/internals/KTableImpl.java | 4 ++++
 .../kafka/streams/kstream/internals/KStreamImplTest.java       | 6 ++++++
 .../apache/kafka/streams/kstream/internals/KTableImplTest.java | 6 ++++++
 4 files changed, 19 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b51002c5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index b6c3401..bb77e96 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -207,6 +207,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     @Override
     public void writeAsText(String filePath, String streamName, Serde<K> keySerde, Serde<V> valSerde) {
         Objects.requireNonNull(filePath, "filePath can't be null");
+        if (filePath.trim().isEmpty()) {
+            throw new TopologyBuilderException("filePath can't be an empty string");
+        }
         String name = topology.newName(PRINTING_NAME);
         streamName = (streamName == null) ? this.name : streamName;
         try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/b51002c5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index c53e761..fc1c076 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -189,6 +189,10 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
      */
     @Override
     public void writeAsText(String filePath, String streamName, Serde<K> keySerde, Serde<V> valSerde) {
+        Objects.requireNonNull(filePath, "filePath can't be null");
+        if (filePath.trim().isEmpty()) {
+            throw new TopologyBuilderException("filePath can't be an empty string");
+        }
         String name = topology.newName(PRINTING_NAME);
         streamName = (streamName == null) ? this.name : streamName;
         try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/b51002c5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index fb2afec..e5e334c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
@@ -183,6 +184,11 @@ public class KStreamImplTest {
         testStream.writeAsText(null);
     }
 
+    @Test(expected = TopologyBuilderException.class)
+    public void shouldNotAllowEmptyFilePathOnWriteAsText() throws Exception {
+        testStream.writeAsText("\t    \t");
+    }
+
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullMapperOnFlatMap() throws Exception {
         testStream.flatMap(null);

http://git-wip-us.apache.org/repos/asf/kafka/blob/b51002c5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index 4b9ea06..afa1033 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -20,6 +20,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Predicate;
@@ -402,6 +403,11 @@ public class KTableImplTest {
         table.writeAsText(null);
     }
 
+    @Test(expected = TopologyBuilderException.class)
+    public void shouldNotAllowEmptyFilePathOnWriteAsText() throws Exception {
+        table.writeAsText("\t  \t");
+    }
+
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullActionOnForEach() throws Exception {
         table.foreach(null);