You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2015/10/09 03:30:02 UTC
flume git commit: FLUME-2781. Kafka Channel with
parseAsFlumeEvent=true should write data as is, not as flume events.
Repository: flume
Updated Branches:
refs/heads/flume-1.7 7dbe6fef6 -> faad35801
FLUME-2781. Kafka Channel with parseAsFlumeEvent=true should write data as is, not as flume events.
(Gonzalo Herreros via Hari)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/faad3580
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/faad3580
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/faad3580
Branch: refs/heads/flume-1.7
Commit: faad35801b24b9f0ca34d8b86f28dded468d73b8
Parents: 7dbe6fe
Author: Hari Shreedharan <hs...@apache.org>
Authored: Thu Oct 8 18:28:31 2015 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Thu Oct 8 18:29:47 2015 -0700
----------------------------------------------------------------------
.../flume/channel/kafka/KafkaChannel.java | 33 +++++++++--------
.../flume/channel/kafka/TestKafkaChannel.java | 37 ++++++++++++++++++++
2 files changed, 56 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/faad3580/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
index c83d4f6..c0c1c66 100644
--- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
+++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
@@ -261,21 +261,26 @@ public class KafkaChannel extends BasicChannelSemantics {
}
try {
- if (!tempOutStream.isPresent()) {
- tempOutStream = Optional.of(new ByteArrayOutputStream());
- }
- if (!writer.isPresent()) {
- writer = Optional.of(new
- SpecificDatumWriter<AvroFlumeEvent>(AvroFlumeEvent.class));
+ if (parseAsFlumeEvent) {
+ if (!tempOutStream.isPresent()) {
+ tempOutStream = Optional.of(new ByteArrayOutputStream());
+ }
+ if (!writer.isPresent()) {
+ writer = Optional.of(new
+ SpecificDatumWriter<AvroFlumeEvent>(AvroFlumeEvent.class));
+ }
+ tempOutStream.get().reset();
+ AvroFlumeEvent e = new AvroFlumeEvent(
+ toCharSeqMap(event.getHeaders()),
+ ByteBuffer.wrap(event.getBody()));
+ encoder = EncoderFactory.get()
+ .directBinaryEncoder(tempOutStream.get(), encoder);
+ writer.get().write(e, encoder);
+ // Not really possible to avoid this copy :(
+ serializedEvents.get().add(tempOutStream.get().toByteArray());
+ } else {
+ serializedEvents.get().add(event.getBody());
}
- tempOutStream.get().reset();
- AvroFlumeEvent e = new AvroFlumeEvent(
- toCharSeqMap(event.getHeaders()), ByteBuffer.wrap(event.getBody()));
- encoder = EncoderFactory.get()
- .directBinaryEncoder(tempOutStream.get(), encoder);
- writer.get().write(e, encoder);
- // Not really possible to avoid this copy :(
- serializedEvents.get().add(tempOutStream.get().toByteArray());
} catch (Exception e) {
throw new ChannelException("Error while serializing event", e);
}
http://git-wip-us.apache.org/repos/asf/flume/blob/faad3580/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
index 25b9e40..319e779 100644
--- a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
+++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
@@ -168,6 +168,43 @@ public class TestKafkaChannel {
}
/**
+ * Like the previous test but here we write to the channel like a Flume source would do
+ * to verify that the events are written as text and not as an Avro object
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testWritingToNoParsingAsFlumeAgent() throws Exception {
+ final KafkaChannel channel = startChannel(false);
+
+ List<String> msgs = new ArrayList<String>();
+ for (int i = 0; i < 50; i++){
+ msgs.add(String.valueOf(i));
+ }
+ Transaction tx = channel.getTransaction();
+ tx.begin();
+ for (int i = 0; i < msgs.size(); i++){
+ channel.put(EventBuilder.withBody(msgs.get(i).getBytes()));
+ }
+ tx.commit();
+ ExecutorCompletionService<Void> submitterSvc = new
+ ExecutorCompletionService<Void>(Executors.newCachedThreadPool());
+ List<Event> events = pullEvents(channel, submitterSvc,
+ 50, false, false);
+ wait(submitterSvc, 5);
+ Set<Integer> finals = Sets.newHashSet();
+ for (int i = 0; i < 50; i++) {
+ finals.add(Integer.parseInt(new String(events.get(i).getBody())));
+ }
+ for (int i = 0; i < 50; i++) {
+ Assert.assertTrue(finals.contains(i));
+ finals.remove(i);
+ }
+ Assert.assertTrue(finals.isEmpty());
+ channel.stop();
+ }
+
+ /**
* This method starts a channel, puts events into it. The channel is then
* stopped and restarted. Then we check to make sure if all events we put
* come out. Optionally, 10 events are rolled back,