You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by de...@apache.org on 2017/09/12 19:58:45 UTC
flume git commit: FLUME-3046. Kafka Sink and Source Configuration
Improvements
Repository: flume
Updated Branches:
refs/heads/trunk 4a3f3c76f -> 54e2728a8
FLUME-3046. Kafka Sink and Source Configuration Improvements
This patch fixes the infinite loop between Kafka source and Kafka sink
by introducing the following configuration parameters in those components:
- topicHeader in Kafka source to specify the name of the header where it
stores the topic name where the event comes from.
- setTopicHeader in Kafka source to control whether the topic name is stored
in the given header.
- topicHeader in Kafka sink to configure the name of the header which
is used to specify in which topic to send the event.
- allowTopicOverride in Kafka sink to control whether the target topic's name
can be overridden by the specified header.
This closes #105
Reviewers: Attila Simon
(Tristan Stevens via Denes Arvay)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/54e2728a
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/54e2728a
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/54e2728a
Branch: refs/heads/trunk
Commit: 54e2728a8e141ee63704018c4497bbe083c0f75f
Parents: 4a3f3c7
Author: Tristan Stevens <tr...@cloudera.com>
Authored: Tue Sep 12 21:42:57 2017 +0200
Committer: Denes Arvay <de...@apache.org>
Committed: Tue Sep 12 21:58:05 2017 +0200
----------------------------------------------------------------------
flume-ng-doc/sphinx/FlumeUserGuide.rst | 9 ++
.../org/apache/flume/sink/kafka/KafkaSink.java | 25 ++++-
.../flume/sink/kafka/KafkaSinkConstants.java | 5 +-
.../apache/flume/sink/kafka/TestKafkaSink.java | 88 +++++++++++++++++-
.../apache/flume/source/kafka/KafkaSource.java | 13 ++-
.../source/kafka/KafkaSourceConstants.java | 6 +-
.../flume/source/kafka/TestKafkaSource.java | 96 +++++++++++++++++++-
7 files changed, 230 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/54e2728a/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index fd64749..8e9efcf 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1284,6 +1284,12 @@ useFlumeEventFormat false By default events are taken as
true to read events as the Flume Avro binary format. Used in conjunction with the same property
on the KafkaSink or with the parseAsFlumeEvent property on the Kafka Channel this will preserve
any Flume headers sent on the producing side.
+setTopicHeader true When set to true, stores the topic of the retrieved message into a header, defined by the
+ ``topicHeader`` property.
+topicHeader topic Defines the name of the header in which to store the name of the topic the message was received
+ from, if the ``setTopicHeader`` property is set to ``true``. Care should be taken if combining
+ with the Kafka Sink ``topicHeader`` property so as to avoid sending the message back to the same
+ topic in a loop.
migrateZookeeperOffsets true When no Kafka stored offset is found, look up the offsets in Zookeeper and commit them to Kafka.
This should be true to support seamless Kafka client migration from older versions of Flume.
Once migrated this can be set to false, though that should generally not be required.
@@ -2785,6 +2791,9 @@ partitionIdHeader -- When set, the sink will
from the event header and send the message to the specified partition of the topic. If the
value represents an invalid partition, an EventDeliveryException will be thrown. If the header value
is present then this setting overrides ``defaultPartitionId``.
+allowTopicOverride true When set, the sink will allow a message to be produced into a topic specified by the ``topicHeader`` property (if provided).
+topicHeader topic When set in conjunction with ``allowTopicOverride`` will produce a message into the value of the header named using the value of this property.
+ Care should be taken when using in conjunction with the Kafka Source ``topicHeader`` property to avoid creating a loopback.
kafka.producer.security.protocol PLAINTEXT Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security. See below for additional info on secure setup.
*more producer security props* If using SASL_PLAINTEXT, SASL_SSL or SSL refer to `Kafka security <http://kafka.apache.org/documentation.html#security>`_ for additional
properties that need to be set on producer.
http://git-wip-us.apache.org/repos/asf/flume/blob/54e2728a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
index f18908b..d60d67e 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
@@ -67,7 +67,6 @@ import static org.apache.flume.sink.kafka.KafkaSinkConstants.KEY_HEADER;
import static org.apache.flume.sink.kafka.KafkaSinkConstants.OLD_BATCH_SIZE;
import static org.apache.flume.sink.kafka.KafkaSinkConstants.REQUIRED_ACKS_FLUME_KEY;
import static org.apache.flume.sink.kafka.KafkaSinkConstants.TOPIC_CONFIG;
-import static org.apache.flume.sink.kafka.KafkaSinkConstants.TOPIC_HEADER;
import static org.apache.flume.sink.kafka.KafkaSinkConstants.KEY_SERIALIZER_KEY;
import static org.apache.flume.sink.kafka.KafkaSinkConstants.MESSAGE_SERIALIZER_KEY;
@@ -117,6 +116,9 @@ public class KafkaSink extends AbstractSink implements Configurable {
private boolean useAvroEventFormat;
private String partitionHeader = null;
private Integer staticPartitionId = null;
+ private boolean allowTopicOverride;
+ private String topicHeader = null;
+
private Optional<SpecificDatumWriter<AvroFlumeEvent>> writer =
Optional.absent();
private Optional<SpecificDatumReader<AvroFlumeEvent>> reader =
@@ -172,10 +174,19 @@ public class KafkaSink extends AbstractSink implements Configurable {
byte[] eventBody = event.getBody();
Map<String, String> headers = event.getHeaders();
- eventTopic = headers.get(TOPIC_HEADER);
- if (eventTopic == null) {
- eventTopic = BucketPath.escapeString(topic, event.getHeaders());
+ if (allowTopicOverride) {
+ eventTopic = headers.get(topicHeader);
+ if (eventTopic == null) {
+ eventTopic = BucketPath.escapeString(topic, event.getHeaders());
+ logger.debug("{} was set to true but header {} was null. Producing to {}" +
+ " topic instead.",
+ new Object[]{KafkaSinkConstants.ALLOW_TOPIC_OVERRIDE_HEADER,
+ topicHeader, eventTopic});
+ }
+ } else {
+ eventTopic = topic;
}
+
eventKey = headers.get(KEY_HEADER);
if (logger.isTraceEnabled()) {
if (LogPrivacyUtil.allowLogRawData()) {
@@ -317,6 +328,12 @@ public class KafkaSink extends AbstractSink implements Configurable {
partitionHeader = context.getString(KafkaSinkConstants.PARTITION_HEADER_NAME);
staticPartitionId = context.getInteger(KafkaSinkConstants.STATIC_PARTITION_CONF);
+ allowTopicOverride = context.getBoolean(KafkaSinkConstants.ALLOW_TOPIC_OVERRIDE_HEADER,
+ KafkaSinkConstants.DEFAULT_ALLOW_TOPIC_OVERRIDE_HEADER);
+
+ topicHeader = context.getString(KafkaSinkConstants.TOPIC_OVERRIDE_HEADER,
+ KafkaSinkConstants.DEFAULT_TOPIC_OVERRIDE_HEADER);
+
if (logger.isDebugEnabled()) {
logger.debug(KafkaSinkConstants.AVRO_EVENT + " set to: {}", useAvroEventFormat);
}
http://git-wip-us.apache.org/repos/asf/flume/blob/54e2728a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
index 7c819f5..ffca3df 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java
@@ -33,7 +33,10 @@ public class KafkaSinkConstants {
KAFKA_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
public static final String KEY_HEADER = "key";
- public static final String TOPIC_HEADER = "topic";
+ public static final String DEFAULT_TOPIC_OVERRIDE_HEADER = "topic";
+ public static final String TOPIC_OVERRIDE_HEADER = "topicHeader";
+ public static final String ALLOW_TOPIC_OVERRIDE_HEADER = "allowTopicOverride";
+ public static final boolean DEFAULT_ALLOW_TOPIC_OVERRIDE_HEADER = true;
public static final String AVRO_EVENT = "useFlumeEventFormat";
public static final boolean DEFAULT_AVRO_EVENT = false;
http://git-wip-us.apache.org/repos/asf/flume/blob/54e2728a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
index 975661d..d92c71f 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
@@ -240,6 +240,92 @@ public class TestKafkaSink {
new String((byte[]) fetchedMsg.key(), "UTF-8"));
}
+ /**
+ * Tests that a message will be produced to a topic as specified by a
+ * custom topicHeader parameter (FLUME-3046).
+ * @throws UnsupportedEncodingException
+ */
+ @Test
+ public void testTopicFromConfHeader() throws UnsupportedEncodingException {
+ String customTopicHeader = "customTopicHeader";
+ Sink kafkaSink = new KafkaSink();
+ Context context = prepareDefaultContext();
+ context.put(KafkaSinkConstants.TOPIC_OVERRIDE_HEADER, customTopicHeader);
+ Configurables.configure(kafkaSink, context);
+ Channel memoryChannel = new MemoryChannel();
+ Configurables.configure(memoryChannel, context);
+ kafkaSink.setChannel(memoryChannel);
+ kafkaSink.start();
+
+ String msg = "test-topic-from-config-header";
+ Map<String, String> headers = new HashMap<String, String>();
+ headers.put(customTopicHeader, TestConstants.CUSTOM_TOPIC);
+ Transaction tx = memoryChannel.getTransaction();
+ tx.begin();
+ Event event = EventBuilder.withBody(msg.getBytes(), headers);
+ memoryChannel.put(event);
+ tx.commit();
+ tx.close();
+
+ try {
+ Sink.Status status = kafkaSink.process();
+ if (status == Sink.Status.BACKOFF) {
+ fail("Error Occurred");
+ }
+ } catch (EventDeliveryException ex) {
+ // ignore
+ }
+
+ MessageAndMetadata<?, ?> fetchedMsg =
+ testUtil.getNextMessageFromConsumer(TestConstants.CUSTOM_TOPIC);
+
+ assertEquals(msg, new String((byte[]) fetchedMsg.message(), "UTF-8"));
+ }
+
+ /**
+ * Tests that the topicHeader parameter will be ignored if the allowTopicHeader
+ * parameter is set to false (FLUME-3046).
+ * @throws UnsupportedEncodingException
+ */
+ @Test
+ public void testTopicNotFromConfHeader() throws UnsupportedEncodingException {
+ Sink kafkaSink = new KafkaSink();
+ Context context = prepareDefaultContext();
+ context.put(KafkaSinkConstants.ALLOW_TOPIC_OVERRIDE_HEADER, "false");
+ context.put(KafkaSinkConstants.TOPIC_OVERRIDE_HEADER, "foo");
+
+ Configurables.configure(kafkaSink, context);
+ Channel memoryChannel = new MemoryChannel();
+ Configurables.configure(memoryChannel, context);
+ kafkaSink.setChannel(memoryChannel);
+ kafkaSink.start();
+
+ String msg = "test-topic-from-config-header";
+ Map<String, String> headers = new HashMap<String, String>();
+ headers.put(KafkaSinkConstants.DEFAULT_TOPIC_OVERRIDE_HEADER, TestConstants.CUSTOM_TOPIC);
+ headers.put("foo", TestConstants.CUSTOM_TOPIC);
+ Transaction tx = memoryChannel.getTransaction();
+ tx.begin();
+ Event event = EventBuilder.withBody(msg.getBytes(), headers);
+ memoryChannel.put(event);
+ tx.commit();
+ tx.close();
+
+ try {
+ Sink.Status status = kafkaSink.process();
+ if (status == Sink.Status.BACKOFF) {
+ fail("Error Occurred");
+ }
+ } catch (EventDeliveryException ex) {
+ // ignore
+ }
+
+ MessageAndMetadata<?, ?> fetchedMsg =
+ testUtil.getNextMessageFromConsumer(DEFAULT_TOPIC);
+
+ assertEquals(msg, new String((byte[]) fetchedMsg.message(), "UTF-8"));
+ }
+
@Test
public void testReplaceSubStringOfTopicWithHeaders() throws UnsupportedEncodingException {
Sink kafkaSink = new KafkaSink();
@@ -612,4 +698,4 @@ public class TestKafkaSink {
return newTopic;
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flume/blob/54e2728a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
index d381850..ffdc96e 100644
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
+++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
@@ -127,6 +127,8 @@ public class KafkaSource extends AbstractPollableSource
private String bootstrapServers;
private String groupId = DEFAULT_GROUP_ID;
private boolean migrateZookeeperOffsets = DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS;
+ private String topicHeader = null;
+ private boolean setTopicHeader;
/**
* This class is a helper to subscribe for topics by using
@@ -250,8 +252,9 @@ public class KafkaSource extends AbstractPollableSource
headers.put(KafkaSourceConstants.TIMESTAMP_HEADER,
String.valueOf(System.currentTimeMillis()));
}
- if (!headers.containsKey(KafkaSourceConstants.TOPIC_HEADER)) {
- headers.put(KafkaSourceConstants.TOPIC_HEADER, message.topic());
+ // Only set the topic header if setTopicHeader and it isn't already populated
+ if (setTopicHeader && !headers.containsKey(topicHeader)) {
+ headers.put(topicHeader, message.topic());
}
if (!headers.containsKey(KafkaSourceConstants.PARTITION_HEADER)) {
headers.put(KafkaSourceConstants.PARTITION_HEADER,
@@ -400,6 +403,12 @@ public class KafkaSource extends AbstractPollableSource
log.info("Group ID was not specified. Using {} as the group id.", groupId);
}
+ setTopicHeader = context.getBoolean(KafkaSourceConstants.SET_TOPIC_HEADER,
+ KafkaSourceConstants.DEFAULT_SET_TOPIC_HEADER);
+
+ topicHeader = context.getString(KafkaSourceConstants.TOPIC_HEADER,
+ KafkaSourceConstants.DEFAULT_TOPIC_HEADER);
+
setConsumerProps(context);
if (log.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) {
http://git-wip-us.apache.org/repos/asf/flume/blob/54e2728a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
index bf1a19d..474a143 100644
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
+++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
@@ -49,9 +49,13 @@ public class KafkaSourceConstants {
public static final String OLD_GROUP_ID = "groupId";
// flume event headers
- public static final String TOPIC_HEADER = "topic";
+ public static final String DEFAULT_TOPIC_HEADER = "topic";
public static final String KEY_HEADER = "key";
public static final String TIMESTAMP_HEADER = "timestamp";
public static final String PARTITION_HEADER = "partition";
+ public static final String SET_TOPIC_HEADER = "setTopicHeader";
+ public static final boolean DEFAULT_SET_TOPIC_HEADER = true;
+ public static final String TOPIC_HEADER = "topicHeader";
+
}
http://git-wip-us.apache.org/repos/asf/flume/blob/54e2728a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
index d1daceb..7804fa2 100644
--- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
+++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
@@ -76,7 +76,7 @@ import static org.apache.flume.source.kafka.KafkaSourceConstants.TIMESTAMP_HEADE
import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPIC;
import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPICS;
import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPICS_REGEX;
-import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPIC_HEADER;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_TOPIC_HEADER;
import static org.apache.flume.source.kafka.KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -556,7 +556,7 @@ public class TestKafkaSource {
headers.put(TIMESTAMP_HEADER, currentTimestamp);
headers.put(PARTITION_HEADER, "1");
- headers.put(TOPIC_HEADER, "topic0");
+ headers.put(DEFAULT_TOPIC_HEADER, "topic0");
e = new AvroFlumeEvent(headers, ByteBuffer.wrap("hello, world2".getBytes()));
tempOutStream.reset();
@@ -590,7 +590,7 @@ public class TestKafkaSource {
Assert.assertEquals("value2", e.getHeaders().get("header2"));
Assert.assertEquals(currentTimestamp, e.getHeaders().get(TIMESTAMP_HEADER));
Assert.assertEquals(e.getHeaders().get(PARTITION_HEADER), "1");
- Assert.assertEquals(e.getHeaders().get(TOPIC_HEADER),"topic0");
+ Assert.assertEquals(e.getHeaders().get(DEFAULT_TOPIC_HEADER),"topic0");
}
@@ -655,6 +655,96 @@ public class TestKafkaSource {
Assert.assertNull(kafkaSource.getConsumerProps().getProperty(sampleConsumerProp));
}
+ /**
+ * Tests the availability of the topic header in the output events,
+ * based on the configuration parameters added in FLUME-3046
+ * @throws InterruptedException
+ * @throws EventDeliveryException
+ */
+ @Test
+ public void testTopicHeaderSet() throws InterruptedException, EventDeliveryException {
+ context.put(TOPICS, topic0);
+ kafkaSource.configure(context);
+ kafkaSource.start();
+
+ Thread.sleep(500L);
+
+ kafkaServer.produce(topic0, "", "hello, world");
+
+ Thread.sleep(500L);
+
+ Status status = kafkaSource.process();
+ assertEquals(Status.READY, status);
+ Assert.assertEquals("hello, world", new String(events.get(0).getBody(),
+ Charsets.UTF_8));
+
+ Assert.assertEquals(topic0, events.get(0).getHeaders().get("topic"));
+
+ kafkaSource.stop();
+ events.clear();
+ }
+
+ /**
+ * Tests the availability of the custom topic header in the output events,
+ * based on the configuration parameters added in FLUME-3046
+ * @throws InterruptedException
+ * @throws EventDeliveryException
+ */
+ @Test
+ public void testTopicCustomHeaderSet() throws InterruptedException, EventDeliveryException {
+ context.put(TOPICS, topic0);
+ context.put(KafkaSourceConstants.TOPIC_HEADER, "customTopicHeader");
+ kafkaSource.configure(context);
+
+ kafkaSource.start();
+
+ Thread.sleep(500L);
+
+ kafkaServer.produce(topic0, "", "hello, world2");
+
+ Thread.sleep(500L);
+
+ Status status = kafkaSource.process();
+ assertEquals(Status.READY, status);
+ Assert.assertEquals("hello, world2", new String(events.get(0).getBody(),
+ Charsets.UTF_8));
+
+ Assert.assertEquals(topic0, events.get(0).getHeaders().get("customTopicHeader"));
+
+ kafkaSource.stop();
+ events.clear();
+ }
+
+ /**
+ * Tests the unavailability of the topic header in the output events,
+ * based on the configuration parameters added in FLUME-3046
+ * @throws InterruptedException
+ * @throws EventDeliveryException
+ */
+ @Test
+ public void testTopicCustomHeaderNotSet() throws InterruptedException, EventDeliveryException {
+ context.put(TOPICS, topic0);
+ context.put(KafkaSourceConstants.SET_TOPIC_HEADER, "false");
+ kafkaSource.configure(context);
+
+ kafkaSource.start();
+
+ Thread.sleep(500L);
+
+ kafkaServer.produce(topic0, "", "hello, world3");
+
+ Thread.sleep(500L);
+
+ Status status = kafkaSource.process();
+ assertEquals(Status.READY, status);
+ Assert.assertEquals("hello, world3", new String(events.get(0).getBody(),
+ Charsets.UTF_8));
+
+ Assert.assertNull(events.get(0).getHeaders().get("customTopicHeader"));
+
+ kafkaSource.stop();
+ }
+
public void doTestMigrateZookeeperOffsets(boolean hasZookeeperOffsets, boolean hasKafkaOffsets,
String group) throws Exception {
// create a topic with 1 partition for simplicity