You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ja...@apache.org on 2016/06/10 13:35:02 UTC

flume git commit: FLUME-2915: The kafka channel using new APIs will be stuck when the sink is avro sink

Repository: flume
Updated Branches:
  refs/heads/trunk f4f15bee1 -> a619cc157


FLUME-2915: The kafka channel using new APIs will be stuck when the sink is avro sink

(Jeff Holoman via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/a619cc15
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/a619cc15
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/a619cc15

Branch: refs/heads/trunk
Commit: a619cc15748d76357f84bd3d0fa0a3e496ee61a1
Parents: f4f15be
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Fri Jun 10 15:34:24 2016 +0200
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Fri Jun 10 15:34:24 2016 +0200

----------------------------------------------------------------------
 .../flume/channel/kafka/KafkaChannel.java       |  4 ++-
 .../flume/channel/kafka/TestKafkaChannel.java   | 29 ++++++++++++++++++++
 2 files changed, 32 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/a619cc15/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
index 2d9b0c6..09d3f9d 100644
--- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
+++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
@@ -381,7 +381,9 @@ public class KafkaChannel extends BasicChannelSemantics {
             }
 
             //Add the key to the header
-            e.getHeaders().put(KEY_HEADER, record.key());
+            if (record.key() != null) {
+              e.getHeaders().put(KEY_HEADER, record.key());
+            }
 
             if (logger.isDebugEnabled()) {
               logger.debug("Processed output from partition {} offset {}", record.partition(), record.offset());

http://git-wip-us.apache.org/repos/asf/flume/blob/a619cc15/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
index 637428d..13e073b 100644
--- a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
+++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
@@ -186,6 +186,11 @@ public class TestKafkaChannel {
     doParseAsFlumeEventFalseAsSource(true);
   }
 
+  @Test
+  public void testNullKeyNoHeader() throws Exception {
+    doTestNullKeyNoHeader();
+  }
+
   private void doParseAsFlumeEventFalse(Boolean checkHeaders) throws Exception {
     final KafkaChannel channel = startChannel(false);
     Properties props = channel.getProducerProps();
@@ -215,6 +220,30 @@ public class TestKafkaChannel {
     channel.stop();
   }
 
+  private void doTestNullKeyNoHeader() throws Exception {
+    final KafkaChannel channel = startChannel(false);
+    Properties props = channel.getProducerProps();
+    KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(props);
+
+    for (int i = 0; i < 50; i++) {
+      ProducerRecord<String, byte[]> data = new ProducerRecord<String, byte[]>(topic, null, String.valueOf(i).getBytes());
+      producer.send(data).get();
+    }
+    ExecutorCompletionService<Void> submitterSvc = new
+            ExecutorCompletionService<Void>(Executors.newCachedThreadPool());
+    List<Event> events = pullEvents(channel, submitterSvc,
+            50, false, false);
+    wait(submitterSvc, 5);
+    List<String> finals = new ArrayList<String>(50);
+    for (int i = 0; i < 50; i++) {
+      finals.add(i, events.get(i).getHeaders().get(KEY_HEADER));
+    }
+    for (int i = 0; i < 50; i++) {
+      Assert.assertTrue( finals.get(i) == null);
+    }
+    channel.stop();
+  }
+
   /**
    * Like the previous test but here we write to the channel like a Flume source would do
    * to verify that the events are written as text and not as an Avro object