You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2014/12/30 20:43:38 UTC
flume git commit: FLUME-2578. Kafka source throws NPE if Kafka record
has null key
Repository: flume
Updated Branches:
refs/heads/trunk 84465664c -> 199684b62
FLUME-2578. Kafka source throws NPE if Kafka record has null key
(Gwen Shapira via Hari)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/199684b6
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/199684b6
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/199684b6
Branch: refs/heads/trunk
Commit: 199684b62ec983b8f922b1d6d706479032a18e64
Parents: 8446566
Author: Hari Shreedharan <hs...@apache.org>
Authored: Tue Dec 30 11:42:47 2014 -0800
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Tue Dec 30 11:42:47 2014 -0800
----------------------------------------------------------------------
.../apache/flume/source/kafka/KafkaSource.java | 4 +++-
.../flume/source/kafka/TestKafkaSource.java | 25 ++++++++++++++++++--
2 files changed, 26 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/199684b6/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
index 7bc03da..00a81c6 100644
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
+++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
@@ -102,7 +102,9 @@ public class KafkaSource extends AbstractSource
headers.put(KafkaSourceConstants.TIMESTAMP,
String.valueOf(System.currentTimeMillis()));
headers.put(KafkaSourceConstants.TOPIC, topic);
- headers.put(KafkaSourceConstants.KEY, new String(kafkaKey));
+ if (kafkaKey != null) {
+ headers.put(KafkaSourceConstants.KEY, new String(kafkaKey));
+ }
if (log.isDebugEnabled()) {
log.debug("Message: {}", new String(kafkaMessage));
}
http://git-wip-us.apache.org/repos/asf/flume/blob/199684b6/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
index 72eec77..8ec14cc 100644
--- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
+++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
@@ -105,8 +105,6 @@ public class TestKafkaSource {
Assert.assertEquals("hello, world", new String(events.get(0).getBody(),
Charsets.UTF_8));
-
-
}
@SuppressWarnings("unchecked")
@@ -301,6 +299,29 @@ public class TestKafkaSource {
}
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testNullKey() throws EventDeliveryException,
+ SecurityException, NoSuchFieldException, IllegalArgumentException,
+ IllegalAccessException, InterruptedException {
+ context.put(KafkaSourceConstants.BATCH_SIZE,"1");
+ kafkaSource.configure(context);
+ kafkaSource.start();
+
+ Thread.sleep(500L);
+
+ kafkaServer.produce(topicName, null , "hello, world");
+
+ Thread.sleep(500L);
+
+ Assert.assertEquals(Status.READY, kafkaSource.process());
+ Assert.assertEquals(Status.BACKOFF, kafkaSource.process());
+ Assert.assertEquals(1, events.size());
+
+ Assert.assertEquals("hello, world", new String(events.get(0).getBody(),
+ Charsets.UTF_8));
+ }
+
ChannelProcessor createGoodChannel() {
ChannelProcessor channelProcessor = mock(ChannelProcessor.class);