You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by de...@apache.org on 2017/09/12 19:58:45 UTC

flume git commit: FLUME-3046. Kafka Sink and Source Configuration Improvements

Repository: flume
Updated Branches:
  refs/heads/trunk 4a3f3c76f -> 54e2728a8


FLUME-3046. Kafka Sink and Source Configuration Improvements

This patch fixes the infinite loop between Kafka source and Kafka sink
by introducing the following configuration parameters in those components:
- topicHeader in Kafka source to specify the name of the header where it
  stores the topic name where the event comes from.
- setTopicHeader in Kafka source to control whether the topic name is stored
  in the given header.
- topicHeader in Kafka sink to configure the name of the header which
  is used to specify in which topic to send the event.
- allowTopicOverride in Kafka sink to control whether the target topic's name
  can be overridden by the specified header.

This closes #105

Reviewers: Attila Simon

(Tristan Stevens via Denes Arvay)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/54e2728a
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/54e2728a
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/54e2728a

Branch: refs/heads/trunk
Commit: 54e2728a8e141ee63704018c4497bbe083c0f75f
Parents: 4a3f3c7
Author: Tristan Stevens <tr...@cloudera.com>
Authored: Tue Sep 12 21:42:57 2017 +0200
Committer: Denes Arvay <de...@apache.org>
Committed: Tue Sep 12 21:58:05 2017 +0200

----------------------------------------------------------------------
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |  9 ++
 .../org/apache/flume/sink/kafka/KafkaSink.java  | 25 ++++-
 .../flume/sink/kafka/KafkaSinkConstants.java    |  5 +-
 .../apache/flume/sink/kafka/TestKafkaSink.java  | 88 +++++++++++++++++-
 .../apache/flume/source/kafka/KafkaSource.java  | 13 ++-
 .../source/kafka/KafkaSourceConstants.java      |  6 +-
 .../flume/source/kafka/TestKafkaSource.java     | 96 +++++++++++++++++++-
 7 files changed, 230 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/54e2728a/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index fd64749..8e9efcf 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1284,6 +1284,12 @@ useFlumeEventFormat                 false        By default events are taken as
                                                  true to read events as the Flume Avro binary format. Used in conjunction with the same property
                                                  on the KafkaSink or with the parseAsFlumeEvent property on the Kafka Channel this will preserve
                                                  any Flume headers sent on the producing side.
+setTopicHeader                      true         When set to true, stores the topic of the retrieved message into a header, defined by the
+                                                 ``topicHeader`` property.
+topicHeader                         topic        Defines the name of the header in which to store the name of the topic the message was received
+                                                 from, if the ``setTopicHeader`` property is set to ``true``. Care should be taken if combining
+                                                 with the Kafka Sink ``topicHeader`` property so as to avoid sending the message back to the same
+                                                 topic in a loop.
 migrateZookeeperOffsets             true         When no Kafka stored offset is found, look up the offsets in Zookeeper and commit them to Kafka.
                                                  This should be true to support seamless Kafka client migration from older versions of Flume.
                                                  Once migrated this can be set to false, though that should generally not be required.
@@ -2785,6 +2791,9 @@ partitionIdHeader                   --                   When set, the sink will
                                                          from the event header and send the message to the specified partition of the topic. If the
                                                          value represents an invalid partition, an EventDeliveryException will be thrown. If the header value
                                                          is present then this setting overrides ``defaultPartitionId``.
+allowTopicOverride                  true                 When set, the sink will allow a message to be produced into a topic specified by the ``topicHeader`` property (if provided).
+topicHeader                         topic                When set in conjunction with ``allowTopicOverride`` will produce a message into the value of the header named using the value of this property.
+                                                         Care should be taken when using in conjunction with the Kafka Source ``topicHeader`` property to avoid creating a loopback.
 kafka.producer.security.protocol    PLAINTEXT            Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security. See below for additional info on secure setup.
 *more producer security props*                           If using SASL_PLAINTEXT, SASL_SSL or SSL refer to `Kafka security <http://kafka.apache.org/documentation.html#security>`_ for additional
                                                          properties that need to be set on producer.

http://git-wip-us.apache.org/repos/asf/flume/blob/54e2728a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
index f18908b..d60d67e 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
@@ -67,7 +67,6 @@ import static org.apache.flume.sink.kafka.KafkaSinkConstants.KEY_HEADER;
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.OLD_BATCH_SIZE;
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.REQUIRED_ACKS_FLUME_KEY;
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.TOPIC_CONFIG;
-import static org.apache.flume.sink.kafka.KafkaSinkConstants.TOPIC_HEADER;
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.KEY_SERIALIZER_KEY;
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.MESSAGE_SERIALIZER_KEY;
 
@@ -117,6 +116,9 @@ public class KafkaSink extends AbstractSink implements Configurable {
   private boolean useAvroEventFormat;
   private String partitionHeader = null;
   private Integer staticPartitionId = null;
+  private boolean allowTopicOverride;
+  private String topicHeader = null;
+
   private Optional<SpecificDatumWriter<AvroFlumeEvent>> writer =
           Optional.absent();
   private Optional<SpecificDatumReader<AvroFlumeEvent>> reader =
@@ -172,10 +174,19 @@ public class KafkaSink extends AbstractSink implements Configurable {
         byte[] eventBody = event.getBody();
         Map<String, String> headers = event.getHeaders();
 
-        eventTopic = headers.get(TOPIC_HEADER);
-        if (eventTopic == null) {
-          eventTopic = BucketPath.escapeString(topic, event.getHeaders());
+        if (allowTopicOverride) {
+          eventTopic = headers.get(topicHeader);
+          if (eventTopic == null) {
+            eventTopic = BucketPath.escapeString(topic, event.getHeaders());
+            logger.debug("{} was set to true but header {} was null. Producing to {}" + 
+                " topic instead.",
+                new Object[]{KafkaSinkConstants.ALLOW_TOPIC_OVERRIDE_HEADER, 
+                    topicHeader, eventTopic});
+          }
+        } else {
+          eventTopic = topic;
         }
+
         eventKey = headers.get(KEY_HEADER);
         if (logger.isTraceEnabled()) {
           if (LogPrivacyUtil.allowLogRawData()) {
@@ -317,6 +328,12 @@ public class KafkaSink extends AbstractSink implements Configurable {
     partitionHeader = context.getString(KafkaSinkConstants.PARTITION_HEADER_NAME);
     staticPartitionId = context.getInteger(KafkaSinkConstants.STATIC_PARTITION_CONF);
 
+    allowTopicOverride = context.getBoolean(KafkaSinkConstants.ALLOW_TOPIC_OVERRIDE_HEADER,
+                                          KafkaSinkConstants.DEFAULT_ALLOW_TOPIC_OVERRIDE_HEADER);
+
+    topicHeader = context.getString(KafkaSinkConstants.TOPIC_OVERRIDE_HEADER,
+                                    KafkaSinkConstants.DEFAULT_TOPIC_OVERRIDE_HEADER);
+
     if (logger.isDebugEnabled()) {
       logger.debug(KafkaSinkConstants.AVRO_EVENT + " set to: {}", useAvroEventFormat);
     }

http://git-wip-us.apache.org/repos/asf/flume/blob/54e2728a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
index 7c819f5..ffca3df 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
@@ -33,7 +33,10 @@ public class KafkaSinkConstants {
       KAFKA_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
 
   public static final String KEY_HEADER = "key";
-  public static final String TOPIC_HEADER = "topic";
+  public static final String DEFAULT_TOPIC_OVERRIDE_HEADER = "topic";
+  public static final String TOPIC_OVERRIDE_HEADER = "topicHeader";
+  public static final String ALLOW_TOPIC_OVERRIDE_HEADER = "allowTopicOverride";
+  public static final boolean DEFAULT_ALLOW_TOPIC_OVERRIDE_HEADER = true;
 
   public static final String AVRO_EVENT = "useFlumeEventFormat";
   public static final boolean DEFAULT_AVRO_EVENT = false;

http://git-wip-us.apache.org/repos/asf/flume/blob/54e2728a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
index 975661d..d92c71f 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
@@ -240,6 +240,92 @@ public class TestKafkaSink {
                  new String((byte[]) fetchedMsg.key(), "UTF-8"));
   }
 
+  /**
+   * Tests that a message will be produced to a topic as specified by a
+   * custom topicHeader parameter (FLUME-3046).
+   * @throws UnsupportedEncodingException
+   */
+  @Test
+  public void testTopicFromConfHeader() throws UnsupportedEncodingException {
+    String customTopicHeader = "customTopicHeader";
+    Sink kafkaSink = new KafkaSink();
+    Context context = prepareDefaultContext();
+    context.put(KafkaSinkConstants.TOPIC_OVERRIDE_HEADER, customTopicHeader);
+    Configurables.configure(kafkaSink, context);
+    Channel memoryChannel = new MemoryChannel();
+    Configurables.configure(memoryChannel, context);
+    kafkaSink.setChannel(memoryChannel);
+    kafkaSink.start();
+
+    String msg = "test-topic-from-config-header";
+    Map<String, String> headers = new HashMap<String, String>();
+    headers.put(customTopicHeader, TestConstants.CUSTOM_TOPIC);
+    Transaction tx = memoryChannel.getTransaction();
+    tx.begin();
+    Event event = EventBuilder.withBody(msg.getBytes(), headers);
+    memoryChannel.put(event);
+    tx.commit();
+    tx.close();
+
+    try {
+      Sink.Status status = kafkaSink.process();
+      if (status == Sink.Status.BACKOFF) {
+        fail("Error Occurred");
+      }
+    } catch (EventDeliveryException ex) {
+      // ignore
+    }
+
+    MessageAndMetadata<?, ?> fetchedMsg =
+        testUtil.getNextMessageFromConsumer(TestConstants.CUSTOM_TOPIC);
+
+    assertEquals(msg, new String((byte[]) fetchedMsg.message(), "UTF-8"));
+  }
+
+  /**
+   * Tests that the topicHeader parameter will be ignored if the allowTopicHeader
+   * parameter is set to false (FLUME-3046).
+   * @throws UnsupportedEncodingException
+   */
+  @Test
+  public void testTopicNotFromConfHeader() throws UnsupportedEncodingException {
+    Sink kafkaSink = new KafkaSink();
+    Context context = prepareDefaultContext();
+    context.put(KafkaSinkConstants.ALLOW_TOPIC_OVERRIDE_HEADER, "false");
+    context.put(KafkaSinkConstants.TOPIC_OVERRIDE_HEADER, "foo");
+
+    Configurables.configure(kafkaSink, context);
+    Channel memoryChannel = new MemoryChannel();
+    Configurables.configure(memoryChannel, context);
+    kafkaSink.setChannel(memoryChannel);
+    kafkaSink.start();
+
+    String msg = "test-topic-from-config-header";
+    Map<String, String> headers = new HashMap<String, String>();
+    headers.put(KafkaSinkConstants.DEFAULT_TOPIC_OVERRIDE_HEADER, TestConstants.CUSTOM_TOPIC);
+    headers.put("foo", TestConstants.CUSTOM_TOPIC);
+    Transaction tx = memoryChannel.getTransaction();
+    tx.begin();
+    Event event = EventBuilder.withBody(msg.getBytes(), headers);
+    memoryChannel.put(event);
+    tx.commit();
+    tx.close();
+
+    try {
+      Sink.Status status = kafkaSink.process();
+      if (status == Sink.Status.BACKOFF) {
+        fail("Error Occurred");
+      }
+    } catch (EventDeliveryException ex) {
+      // ignore
+    }
+
+    MessageAndMetadata<?, ?> fetchedMsg =
+        testUtil.getNextMessageFromConsumer(DEFAULT_TOPIC);
+
+    assertEquals(msg, new String((byte[]) fetchedMsg.message(), "UTF-8"));
+  }
+
   @Test
   public void testReplaceSubStringOfTopicWithHeaders() throws UnsupportedEncodingException {
     Sink kafkaSink = new KafkaSink();
@@ -612,4 +698,4 @@ public class TestKafkaSink {
     return newTopic;
   }
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/54e2728a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
index d381850..ffdc96e 100644
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
+++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
@@ -127,6 +127,8 @@ public class KafkaSource extends AbstractPollableSource
   private String bootstrapServers;
   private String groupId = DEFAULT_GROUP_ID;
   private boolean migrateZookeeperOffsets = DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS;
+  private String topicHeader = null;
+  private boolean setTopicHeader;
 
   /**
    * This class is a helper to subscribe for topics by using
@@ -250,8 +252,9 @@ public class KafkaSource extends AbstractPollableSource
           headers.put(KafkaSourceConstants.TIMESTAMP_HEADER,
               String.valueOf(System.currentTimeMillis()));
         }
-        if (!headers.containsKey(KafkaSourceConstants.TOPIC_HEADER)) {
-          headers.put(KafkaSourceConstants.TOPIC_HEADER, message.topic());
+        // Only set the topic header if setTopicHeader and it isn't already populated
+        if (setTopicHeader && !headers.containsKey(topicHeader)) {
+          headers.put(topicHeader, message.topic());
         }
         if (!headers.containsKey(KafkaSourceConstants.PARTITION_HEADER)) {
           headers.put(KafkaSourceConstants.PARTITION_HEADER,
@@ -400,6 +403,12 @@ public class KafkaSource extends AbstractPollableSource
       log.info("Group ID was not specified. Using {} as the group id.", groupId);
     }
 
+    setTopicHeader = context.getBoolean(KafkaSourceConstants.SET_TOPIC_HEADER,
+                                        KafkaSourceConstants.DEFAULT_SET_TOPIC_HEADER);
+
+    topicHeader = context.getString(KafkaSourceConstants.TOPIC_HEADER,
+                                    KafkaSourceConstants.DEFAULT_TOPIC_HEADER);
+
     setConsumerProps(context);
 
     if (log.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) {

http://git-wip-us.apache.org/repos/asf/flume/blob/54e2728a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
index bf1a19d..474a143 100644
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
+++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
@@ -49,9 +49,13 @@ public class KafkaSourceConstants {
   public static final String OLD_GROUP_ID = "groupId";
 
   // flume event headers
-  public static final String TOPIC_HEADER = "topic";
+  public static final String DEFAULT_TOPIC_HEADER = "topic";
   public static final String KEY_HEADER = "key";
   public static final String TIMESTAMP_HEADER = "timestamp";
   public static final String PARTITION_HEADER = "partition";
 
+  public static final String SET_TOPIC_HEADER = "setTopicHeader";
+  public static final boolean DEFAULT_SET_TOPIC_HEADER = true;
+  public static final String TOPIC_HEADER = "topicHeader";
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/54e2728a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
index d1daceb..7804fa2 100644
--- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
+++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
@@ -76,7 +76,7 @@ import static org.apache.flume.source.kafka.KafkaSourceConstants.TIMESTAMP_HEADE
 import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPIC;
 import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPICS;
 import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPICS_REGEX;
-import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPIC_HEADER;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_TOPIC_HEADER;
 import static org.apache.flume.source.kafka.KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME_KEY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -556,7 +556,7 @@ public class TestKafkaSource {
 
     headers.put(TIMESTAMP_HEADER, currentTimestamp);
     headers.put(PARTITION_HEADER, "1");
-    headers.put(TOPIC_HEADER, "topic0");
+    headers.put(DEFAULT_TOPIC_HEADER, "topic0");
 
     e = new AvroFlumeEvent(headers, ByteBuffer.wrap("hello, world2".getBytes()));
     tempOutStream.reset();
@@ -590,7 +590,7 @@ public class TestKafkaSource {
     Assert.assertEquals("value2", e.getHeaders().get("header2"));
     Assert.assertEquals(currentTimestamp, e.getHeaders().get(TIMESTAMP_HEADER));
     Assert.assertEquals(e.getHeaders().get(PARTITION_HEADER), "1");
-    Assert.assertEquals(e.getHeaders().get(TOPIC_HEADER),"topic0");
+    Assert.assertEquals(e.getHeaders().get(DEFAULT_TOPIC_HEADER),"topic0");
 
   }
 
@@ -655,6 +655,96 @@ public class TestKafkaSource {
     Assert.assertNull(kafkaSource.getConsumerProps().getProperty(sampleConsumerProp));
   }
 
+  /**
+   * Tests the availability of the topic header in the output events,
+   * based on the configuration parameters added in FLUME-3046
+   * @throws InterruptedException
+   * @throws EventDeliveryException
+   */
+  @Test
+  public void testTopicHeaderSet() throws InterruptedException, EventDeliveryException {
+    context.put(TOPICS, topic0);
+    kafkaSource.configure(context);
+    kafkaSource.start();
+
+    Thread.sleep(500L);
+
+    kafkaServer.produce(topic0, "", "hello, world");
+
+    Thread.sleep(500L);
+
+    Status status = kafkaSource.process();
+    assertEquals(Status.READY, status);
+    Assert.assertEquals("hello, world", new String(events.get(0).getBody(),
+            Charsets.UTF_8));
+
+    Assert.assertEquals(topic0, events.get(0).getHeaders().get("topic"));
+
+    kafkaSource.stop();
+    events.clear();
+  }
+
+  /**
+   * Tests the availability of the custom topic header in the output events,
+   * based on the configuration parameters added in FLUME-3046
+   * @throws InterruptedException
+   * @throws EventDeliveryException
+   */
+  @Test
+  public void testTopicCustomHeaderSet() throws InterruptedException, EventDeliveryException {
+    context.put(TOPICS, topic0);
+    context.put(KafkaSourceConstants.TOPIC_HEADER, "customTopicHeader");
+    kafkaSource.configure(context);
+
+    kafkaSource.start();
+
+    Thread.sleep(500L);
+
+    kafkaServer.produce(topic0, "", "hello, world2");
+
+    Thread.sleep(500L);
+
+    Status status = kafkaSource.process();
+    assertEquals(Status.READY, status);
+    Assert.assertEquals("hello, world2", new String(events.get(0).getBody(),
+            Charsets.UTF_8));
+
+    Assert.assertEquals(topic0, events.get(0).getHeaders().get("customTopicHeader"));
+
+    kafkaSource.stop();
+    events.clear();
+  }
+
+  /**
+   * Tests the unavailability of the topic header in the output events,
+   * based on the configuration parameters added in FLUME-3046
+   * @throws InterruptedException
+   * @throws EventDeliveryException
+   */
+  @Test
+  public void testTopicCustomHeaderNotSet() throws InterruptedException, EventDeliveryException {
+    context.put(TOPICS, topic0);
+    context.put(KafkaSourceConstants.SET_TOPIC_HEADER, "false");
+    kafkaSource.configure(context);
+
+    kafkaSource.start();
+
+    Thread.sleep(500L);
+
+    kafkaServer.produce(topic0, "", "hello, world3");
+
+    Thread.sleep(500L);
+
+    Status status = kafkaSource.process();
+    assertEquals(Status.READY, status);
+    Assert.assertEquals("hello, world3", new String(events.get(0).getBody(),
+            Charsets.UTF_8));
+
+    Assert.assertNull(events.get(0).getHeaders().get("customTopicHeader"));
+
+    kafkaSource.stop();
+  }
+
   public void doTestMigrateZookeeperOffsets(boolean hasZookeeperOffsets, boolean hasKafkaOffsets,
                                             String group) throws Exception {
     // create a topic with 1 partition for simplicity