You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2014/12/30 20:43:38 UTC

flume git commit: FLUME-2578. Kafka source throws NPE if Kafka record has null key

Repository: flume
Updated Branches:
  refs/heads/trunk 84465664c -> 199684b62


FLUME-2578. Kafka source throws NPE if Kafka record has null key

(Gwen Shapira via Hari)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/199684b6
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/199684b6
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/199684b6

Branch: refs/heads/trunk
Commit: 199684b62ec983b8f922b1d6d706479032a18e64
Parents: 8446566
Author: Hari Shreedharan <hs...@apache.org>
Authored: Tue Dec 30 11:42:47 2014 -0800
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Tue Dec 30 11:42:47 2014 -0800

----------------------------------------------------------------------
 .../apache/flume/source/kafka/KafkaSource.java  |  4 +++-
 .../flume/source/kafka/TestKafkaSource.java     | 25 ++++++++++++++++++--
 2 files changed, 26 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/199684b6/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
index 7bc03da..00a81c6 100644
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
+++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
@@ -102,7 +102,9 @@ public class KafkaSource extends AbstractSource
           headers.put(KafkaSourceConstants.TIMESTAMP,
                   String.valueOf(System.currentTimeMillis()));
           headers.put(KafkaSourceConstants.TOPIC, topic);
-          headers.put(KafkaSourceConstants.KEY, new String(kafkaKey));
+          if (kafkaKey != null) {
+            headers.put(KafkaSourceConstants.KEY, new String(kafkaKey));
+          }
           if (log.isDebugEnabled()) {
             log.debug("Message: {}", new String(kafkaMessage));
           }

http://git-wip-us.apache.org/repos/asf/flume/blob/199684b6/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
index 72eec77..8ec14cc 100644
--- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
+++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
@@ -105,8 +105,6 @@ public class TestKafkaSource {
 
     Assert.assertEquals("hello, world", new String(events.get(0).getBody(),
             Charsets.UTF_8));
-
-
   }
 
   @SuppressWarnings("unchecked")
@@ -301,6 +299,29 @@ public class TestKafkaSource {
 
   }
 
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testNullKey() throws EventDeliveryException,
+      SecurityException, NoSuchFieldException, IllegalArgumentException,
+      IllegalAccessException, InterruptedException {
+    context.put(KafkaSourceConstants.BATCH_SIZE,"1");
+    kafkaSource.configure(context);
+    kafkaSource.start();
+
+    Thread.sleep(500L);
+
+    kafkaServer.produce(topicName, null , "hello, world");
+
+    Thread.sleep(500L);
+
+    Assert.assertEquals(Status.READY, kafkaSource.process());
+    Assert.assertEquals(Status.BACKOFF, kafkaSource.process());
+    Assert.assertEquals(1, events.size());
+
+    Assert.assertEquals("hello, world", new String(events.get(0).getBody(),
+        Charsets.UTF_8));
+  }
+
   ChannelProcessor createGoodChannel() {
 
     ChannelProcessor channelProcessor = mock(ChannelProcessor.class);