You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by de...@apache.org on 2017/06/13 10:06:39 UTC

flume git commit: FLUME-3100. Support arbitrary header substitution for topic of Kafka Sink

Repository: flume
Updated Branches:
  refs/heads/trunk fdc53f338 -> 83e25691d


FLUME-3100. Support arbitrary header substitution for topic of Kafka Sink

This patch adds the ability of header substitution n Kafka Sink's
kafka.topic configuration variable.

This closes #137.

Reviewers: Denes Arvay

(Takafumi Saito via Denes Arvay)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/83e25691
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/83e25691
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/83e25691

Branch: refs/heads/trunk
Commit: 83e25691dc5f32d020b122d679b6f124162e4aef
Parents: fdc53f3
Author: stakafum <s....@gmail.com>
Authored: Thu May 25 15:23:44 2017 +0900
Committer: Denes Arvay <de...@cloudera.com>
Committed: Tue Jun 13 12:01:27 2017 +0200

----------------------------------------------------------------------
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |  2 ++
 .../org/apache/flume/sink/kafka/KafkaSink.java  |  3 +-
 .../apache/flume/sink/kafka/TestConstants.java  |  1 +
 .../apache/flume/sink/kafka/TestKafkaSink.java  | 37 ++++++++++++++++++++
 4 files changed, 42 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/83e25691/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index a5d64f0..2cd5465 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -2727,6 +2727,8 @@ kafka.topic                         default-flume-topic  The topic in Kafka to w
                                                          messages will be published to this topic.
                                                          If the event header contains a "topic" field, the event will be published to that topic
                                                          overriding the topic configured here.
+                                                         Arbitrary header substitution is supported, eg. %{header} is replaced with value of event header named "header".
+                                                         (If using the substitution, it is recommended to set "auto.create.topics.enable" property of Kafka broker to true.)
 flumeBatchSize                      100                  How many messages to process in one batch. Larger batches improve throughput while adding latency.
 kafka.producer.acks                 1                    How many replicas must acknowledge a message before its considered successfully written.
                                                          Accepted values are 0 (Never wait for acknowledgement), 1 (wait for leader only), -1 (wait for all replicas)

http://git-wip-us.apache.org/repos/asf/flume/blob/83e25691/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
index 68866c3..f18908b 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
@@ -32,6 +32,7 @@ import org.apache.flume.Transaction;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.conf.ConfigurationException;
 import org.apache.flume.conf.LogPrivacyUtil;
+import org.apache.flume.formatter.output.BucketPath;
 import org.apache.flume.instrumentation.kafka.KafkaSinkCounter;
 import org.apache.flume.sink.AbstractSink;
 import org.apache.flume.source.avro.AvroFlumeEvent;
@@ -173,7 +174,7 @@ public class KafkaSink extends AbstractSink implements Configurable {
 
         eventTopic = headers.get(TOPIC_HEADER);
         if (eventTopic == null) {
-          eventTopic = topic;
+          eventTopic = BucketPath.escapeString(topic, event.getHeaders());
         }
         eventKey = headers.get(KEY_HEADER);
         if (logger.isTraceEnabled()) {

http://git-wip-us.apache.org/repos/asf/flume/blob/83e25691/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java
index 6d85700..8d6dce7 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java
@@ -20,6 +20,7 @@ package org.apache.flume.sink.kafka;
 
 public class TestConstants {
   public static final String STATIC_TOPIC = "static-topic";
+  public static final String HEADER_TOPIC = "%{header1}-topic";
   public static final String CUSTOM_KEY = "custom-key";
   public static final String CUSTOM_TOPIC = "custom-topic";
   public static final String HEADER_1_VALUE = "test-avro-header";

http://git-wip-us.apache.org/repos/asf/flume/blob/83e25691/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
index 7c66420..975661d 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
@@ -91,6 +91,7 @@ public class TestKafkaSink {
     topics.add(DEFAULT_TOPIC);
     topics.add(TestConstants.STATIC_TOPIC);
     topics.add(TestConstants.CUSTOM_TOPIC);
+    topics.add(TestConstants.HEADER_1_VALUE + "-topic");
     testUtil.initTopicList(topics);
   }
 
@@ -239,6 +240,42 @@ public class TestKafkaSink {
                  new String((byte[]) fetchedMsg.key(), "UTF-8"));
   }
 
+  @Test
+  public void testReplaceSubStringOfTopicWithHeaders() throws UnsupportedEncodingException {
+    Sink kafkaSink = new KafkaSink();
+    Context context = prepareDefaultContext();
+    context.put(TOPIC_CONFIG, TestConstants.HEADER_TOPIC);
+    Configurables.configure(kafkaSink, context);
+    Channel memoryChannel = new MemoryChannel();
+    Configurables.configure(memoryChannel, context);
+    kafkaSink.setChannel(memoryChannel);
+    kafkaSink.start();
+
+    String msg = "test-replace-substring-of-topic-with-headers";
+    Map<String, String> headers = new HashMap<>();
+    headers.put(TestConstants.HEADER_1_KEY, TestConstants.HEADER_1_VALUE);
+    Transaction tx = memoryChannel.getTransaction();
+    tx.begin();
+    Event event = EventBuilder.withBody(msg.getBytes(), headers);
+    memoryChannel.put(event);
+    tx.commit();
+    tx.close();
+
+    try {
+      Sink.Status status = kafkaSink.process();
+      if (status == Sink.Status.BACKOFF) {
+        fail("Error Occurred");
+      }
+    } catch (EventDeliveryException ex) {
+      // ignore
+    }
+
+    String fetchedMsg = new String((byte[])
+        testUtil.getNextMessageFromConsumer(TestConstants.HEADER_1_VALUE + "-topic").message());
+
+    assertEquals(msg, fetchedMsg);
+  }
+
   @SuppressWarnings("rawtypes")
   @Test
   public void testAvroEvent() throws IOException {