You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ja...@apache.org on 2016/06/10 13:35:02 UTC
flume git commit: FLUME-2915: The kafka channel using new APIs will
be stuck when the sink is avro sink
Repository: flume
Updated Branches:
refs/heads/trunk f4f15bee1 -> a619cc157
FLUME-2915: The kafka channel using new APIs will be stuck when the sink is avro sink
(Jeff Holoman via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/a619cc15
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/a619cc15
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/a619cc15
Branch: refs/heads/trunk
Commit: a619cc15748d76357f84bd3d0fa0a3e496ee61a1
Parents: f4f15be
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Fri Jun 10 15:34:24 2016 +0200
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Fri Jun 10 15:34:24 2016 +0200
----------------------------------------------------------------------
.../flume/channel/kafka/KafkaChannel.java | 4 ++-
.../flume/channel/kafka/TestKafkaChannel.java | 29 ++++++++++++++++++++
2 files changed, 32 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/a619cc15/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
index 2d9b0c6..09d3f9d 100644
--- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
+++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
@@ -381,7 +381,9 @@ public class KafkaChannel extends BasicChannelSemantics {
}
//Add the key to the header
- e.getHeaders().put(KEY_HEADER, record.key());
+ if (record.key() != null) {
+ e.getHeaders().put(KEY_HEADER, record.key());
+ }
if (logger.isDebugEnabled()) {
logger.debug("Processed output from partition {} offset {}", record.partition(), record.offset());
http://git-wip-us.apache.org/repos/asf/flume/blob/a619cc15/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
index 637428d..13e073b 100644
--- a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
+++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
@@ -186,6 +186,11 @@ public class TestKafkaChannel {
doParseAsFlumeEventFalseAsSource(true);
}
+ @Test
+ public void testNullKeyNoHeader() throws Exception {
+ doTestNullKeyNoHeader();
+ }
+
private void doParseAsFlumeEventFalse(Boolean checkHeaders) throws Exception {
final KafkaChannel channel = startChannel(false);
Properties props = channel.getProducerProps();
@@ -215,6 +220,30 @@ public class TestKafkaChannel {
channel.stop();
}
+ private void doTestNullKeyNoHeader() throws Exception {
+ final KafkaChannel channel = startChannel(false);
+ Properties props = channel.getProducerProps();
+ KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(props);
+
+ for (int i = 0; i < 50; i++) {
+ ProducerRecord<String, byte[]> data = new ProducerRecord<String, byte[]>(topic, null, String.valueOf(i).getBytes());
+ producer.send(data).get();
+ }
+ ExecutorCompletionService<Void> submitterSvc = new
+ ExecutorCompletionService<Void>(Executors.newCachedThreadPool());
+ List<Event> events = pullEvents(channel, submitterSvc,
+ 50, false, false);
+ wait(submitterSvc, 5);
+ List<String> finals = new ArrayList<String>(50);
+ for (int i = 0; i < 50; i++) {
+ finals.add(i, events.get(i).getHeaders().get(KEY_HEADER));
+ }
+ for (int i = 0; i < 50; i++) {
+ Assert.assertTrue( finals.get(i) == null);
+ }
+ channel.stop();
+ }
+
/**
* Like the previous test but here we write to the channel like a Flume source would do
* to verify that the events are written as text and not as an Avro object