You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2015/10/09 03:30:02 UTC

flume git commit: FLUME-2781. Kafka Channel with parseAsFlumeEvent=true should write data as is, not as flume events.

Repository: flume
Updated Branches:
  refs/heads/flume-1.7 7dbe6fef6 -> faad35801


FLUME-2781. Kafka Channel with parseAsFlumeEvent=true should write data as is, not as flume events.

(Gonzalo Herreros via Hari)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/faad3580
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/faad3580
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/faad3580

Branch: refs/heads/flume-1.7
Commit: faad35801b24b9f0ca34d8b86f28dded468d73b8
Parents: 7dbe6fe
Author: Hari Shreedharan <hs...@apache.org>
Authored: Thu Oct 8 18:28:31 2015 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Thu Oct 8 18:29:47 2015 -0700

----------------------------------------------------------------------
 .../flume/channel/kafka/KafkaChannel.java       | 33 +++++++++--------
 .../flume/channel/kafka/TestKafkaChannel.java   | 37 ++++++++++++++++++++
 2 files changed, 56 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/faad3580/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
index c83d4f6..c0c1c66 100644
--- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
+++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
@@ -261,21 +261,26 @@ public class KafkaChannel extends BasicChannelSemantics {
       }
 
       try {
-        if (!tempOutStream.isPresent()) {
-          tempOutStream = Optional.of(new ByteArrayOutputStream());
-        }
-        if (!writer.isPresent()) {
-          writer = Optional.of(new
-            SpecificDatumWriter<AvroFlumeEvent>(AvroFlumeEvent.class));
+        if (parseAsFlumeEvent) {
+          if (!tempOutStream.isPresent()) {
+            tempOutStream = Optional.of(new ByteArrayOutputStream());
+          }
+          if (!writer.isPresent()) {
+            writer = Optional.of(new
+              SpecificDatumWriter<AvroFlumeEvent>(AvroFlumeEvent.class));
+          }
+          tempOutStream.get().reset();
+          AvroFlumeEvent e = new AvroFlumeEvent(
+            toCharSeqMap(event.getHeaders()),
+            ByteBuffer.wrap(event.getBody()));
+          encoder = EncoderFactory.get()
+            .directBinaryEncoder(tempOutStream.get(), encoder);
+          writer.get().write(e, encoder);
+          // Not really possible to avoid this copy :(
+          serializedEvents.get().add(tempOutStream.get().toByteArray());
+        } else {
+          serializedEvents.get().add(event.getBody());
         }
-        tempOutStream.get().reset();
-        AvroFlumeEvent e = new AvroFlumeEvent(
-          toCharSeqMap(event.getHeaders()), ByteBuffer.wrap(event.getBody()));
-        encoder = EncoderFactory.get()
-          .directBinaryEncoder(tempOutStream.get(), encoder);
-        writer.get().write(e, encoder);
-        // Not really possible to avoid this copy :(
-        serializedEvents.get().add(tempOutStream.get().toByteArray());
       } catch (Exception e) {
         throw new ChannelException("Error while serializing event", e);
       }

http://git-wip-us.apache.org/repos/asf/flume/blob/faad3580/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
index 25b9e40..319e779 100644
--- a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
+++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
@@ -168,6 +168,43 @@ public class TestKafkaChannel {
   }
 
   /**
+   * Like the previous test but here we write to the channel like a Flume source would do
+   * to verify that the events are written as text and not as an Avro object
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testWritingToNoParsingAsFlumeAgent() throws Exception {
+    final KafkaChannel channel = startChannel(false);
+
+    List<String> msgs = new ArrayList<String>();
+    for (int i = 0; i < 50; i++){
+      msgs.add(String.valueOf(i));
+    }
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    for (int i = 0; i < msgs.size(); i++){
+      channel.put(EventBuilder.withBody(msgs.get(i).getBytes()));
+    }
+    tx.commit();
+    ExecutorCompletionService<Void> submitterSvc = new
+            ExecutorCompletionService<Void>(Executors.newCachedThreadPool());
+    List<Event> events = pullEvents(channel, submitterSvc,
+      50, false, false);
+    wait(submitterSvc, 5);
+    Set<Integer> finals = Sets.newHashSet();
+    for (int i = 0; i < 50; i++) {
+      finals.add(Integer.parseInt(new String(events.get(i).getBody())));
+    }
+    for (int i = 0; i < 50; i++) {
+      Assert.assertTrue(finals.contains(i));
+      finals.remove(i);
+    }
+    Assert.assertTrue(finals.isEmpty());
+    channel.stop();
+  }
+
+  /**
    * This method starts a channel, puts events into it. The channel is then
    * stopped and restarted. Then we check to make sure if all events we put
    * come out. Optionally, 10 events are rolled back,