You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2019/03/22 20:39:13 UTC

[incubator-druid] branch master updated: Add Kafka integration test for transactional topics (#7295)

This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 01c021e  Add Kafka integration test for transactional topics (#7295)
01c021e is described below

commit 01c021e6da6e7abf5035863daa554b89a6df4904
Author: Surekha <su...@imply.io>
AuthorDate: Fri Mar 22 13:39:05 2019 -0700

    Add Kafka integration test for transactional topics (#7295)
    
    * Add integration test for transactional kafka
    
    * Add true for transactions enabled for transactional test
    
    * Add new test to travis_script_integration.sh, use version 0.2 of druid docker image
    
    * Use different datasource name for ITKafkaIndexingServiceTest and ITKafkaIndexingServiceTransactionalTest
    
    * use KafkaConsumerConfigs to get common consumer properties
    
    * Remove double line breaks
    
    * remove extra space
---
 ci/travis_script_integration.sh                    |   2 +-
 ...viceTest.java => AbstractKafkaIndexerTest.java} |  83 +++---
 .../tests/indexer/ITKafkaIndexingServiceTest.java  | 291 +--------------------
 .../ITKafkaIndexingServiceTransactionalTest.java   |  50 ++++
 4 files changed, 94 insertions(+), 332 deletions(-)

diff --git a/ci/travis_script_integration.sh b/ci/travis_script_integration.sh
index b4bb1af..99b6085 100755
--- a/ci/travis_script_integration.sh
+++ b/ci/travis_script_integration.sh
@@ -21,6 +21,6 @@ set -e
 
 pushd $TRAVIS_BUILD_DIR/integration-tests
 
-mvn verify -P integration-tests -Dit.test=ITAppenderatorDriverRealtimeIndexTaskTest,ITCompactionTaskTest,ITIndexerTest,ITKafkaIndexingServiceTest,ITKafkaTest,ITParallelIndexTest,ITRealtimeIndexTaskTest
+mvn verify -P integration-tests -Dit.test=ITAppenderatorDriverRealtimeIndexTaskTest,ITCompactionTaskTest,ITIndexerTest,ITKafkaIndexingServiceTest,ITKafkaIndexingServiceTransactionalTest,ITParallelIndexTest,ITRealtimeIndexTaskTest
 
 popd
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java
similarity index 88%
copy from integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java
copy to integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java
index d297fda..fa9c64b 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.tests.indexer;
 
+import com.google.common.base.Throwables;
 import com.google.inject.Inject;
 import kafka.admin.AdminUtils;
 import kafka.admin.RackAwareMode;
@@ -28,25 +29,22 @@ import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.ZkConnection;
 import org.apache.commons.io.IOUtils;
 import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
+import org.apache.druid.indexing.seekablestream.utils.RandomIdUtils;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.testing.IntegrationTestingConfig;
-import org.apache.druid.testing.guice.DruidTestModuleFactory;
 import org.apache.druid.testing.utils.RetryUtil;
 import org.apache.druid.testing.utils.TestQueryHelper;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeSuite;
-import org.testng.annotations.Guice;
-import org.testng.annotations.Test;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -54,16 +52,11 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.Callable;
 
-/*
- * This is a test for the Kafka indexing service.
- */
-@Guice(moduleFactory = DruidTestModuleFactory.class)
-public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
+public class AbstractKafkaIndexerTest extends AbstractIndexerTest
 {
-  private static final Logger LOG = new Logger(ITKafkaIndexingServiceTest.class);
+  private static final Logger LOG = new Logger(AbstractKafkaIndexerTest.class);
   private static final String INDEXER_FILE = "/indexer/kafka_supervisor_spec.json";
   private static final String QUERIES_FILE = "/indexer/kafka_index_queries.json";
-  private static final String DATASOURCE = "kafka_indexing_service_test";
   private static final String TOPIC_NAME = "kafka_indexing_service_topic";
 
   private static final int NUM_EVENTS_TO_SEND = 60;
@@ -109,17 +102,9 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
 
   private String fullDatasourceName;
 
-  @BeforeSuite
-  public void setFullDatasourceName()
-  {
-    fullDatasourceName = DATASOURCE + config.getExtraDatasourceNameSuffix();
-  }
-
-  @Test
-  public void testKafka()
+  void doKafkaIndexTest(String dataSourceName, boolean txnEnabled)
   {
-    LOG.info("Starting test: ITKafkaIndexingServiceTest");
-
+    fullDatasourceName = dataSourceName + config.getExtraDatasourceNameSuffix();
     // create topic
     try {
       int sessionTimeoutMs = 10000;
@@ -152,7 +137,6 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
       final Properties consumerProperties = new Properties();
       consumerProperties.putAll(consumerConfigs);
       consumerProperties.put("bootstrap.servers", config.getKafkaInternalHost());
-      addFilteredProperties(consumerProperties);
 
       spec = getTaskAsString(INDEXER_FILE);
       spec = StringUtils.replace(spec, "%%DATASOURCE%%", fullDatasourceName);
@@ -171,11 +155,17 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
 
     // set up kafka producer
     Properties properties = new Properties();
-    addFilteredProperties(properties);
+    addFilteredProperties(config, properties);
     properties.put("bootstrap.servers", config.getKafkaHost());
     LOG.info("Kafka bootstrap.servers: [%s]", config.getKafkaHost());
     properties.put("acks", "all");
     properties.put("retries", "3");
+    properties.put("key.serializer", ByteArraySerializer.class.getName());
+    properties.put("value.serializer", ByteArraySerializer.class.getName());
+    if (txnEnabled) {
+      properties.put("enable.idempotence", "true");
+      properties.put("transactional.id", RandomIdUtils.getRandomId());
+    }
 
     KafkaProducer<String, String> producer = new KafkaProducer<>(
         properties,
@@ -196,6 +186,10 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
     int num_events = 0;
 
     // send data to kafka
+    if (txnEnabled) {
+      producer.initTransactions();
+      producer.beginTransaction();
+    }
     while (num_events < NUM_EVENTS_TO_SEND) {
       num_events++;
       added += num_events;
@@ -203,16 +197,20 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
       String event = StringUtils.format(event_template, event_fmt.print(dt), num_events, 0, num_events);
       LOG.info("sending event: [%s]", event);
       try {
+
         producer.send(new ProducerRecord<String, String>(TOPIC_NAME, event)).get();
+
       }
       catch (Exception ioe) {
-        throw new RuntimeException(ioe);
+        throw Throwables.propagate(ioe);
       }
 
       dtLast = dt;
       dt = new DateTime(zone);
     }
-
+    if (txnEnabled) {
+      producer.commitTransaction();
+    }
     producer.close();
 
     LOG.info("Waiting for [%s] millis for Kafka indexing tasks to consume events", WAIT_TIME_MILLIS);
@@ -224,8 +222,7 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
       throw new RuntimeException(e);
     }
 
-
-    InputStream is = ITKafkaIndexingServiceTest.class.getResourceAsStream(QUERIES_FILE);
+    InputStream is = AbstractKafkaIndexerTest.class.getResourceAsStream(QUERIES_FILE);
     if (null == is) {
       throw new ISE("could not open query file: %s", QUERIES_FILE);
     }
@@ -255,7 +252,7 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
       this.queryHelper.testQueriesFromString(queryStr, 2);
     }
     catch (Exception e) {
-      throw new RuntimeException(e);
+      throw Throwables.propagate(e);
     }
 
     LOG.info("Shutting down Kafka Supervisor");
@@ -293,7 +290,7 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
       );
     }
     catch (Exception e) {
-      throw new RuntimeException(e);
+      throw Throwables.propagate(e);
     }
     LOG.info("segments are present");
     segmentsExist = true;
@@ -303,31 +300,29 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
       this.queryHelper.testQueriesFromString(queryStr, 2);
     }
     catch (Exception e) {
-      throw new RuntimeException(e);
+      throw Throwables.propagate(e);
+    }
+  }
+  
+  private void addFilteredProperties(IntegrationTestingConfig config, Properties properties)
+  {
+    for (Map.Entry<String, String> entry : config.getProperties().entrySet()) {
+      if (entry.getKey().startsWith(testPropertyPrefix)) {
+        properties.put(entry.getKey().substring(testPropertyPrefix.length()), entry.getValue());
+      }
     }
   }
 
-  @AfterClass
-  public void afterClass()
+  void doTearDown()
   {
-    LOG.info("teardown");
     if (config.manageKafkaTopic()) {
       // delete kafka topic
       AdminUtils.deleteTopic(zkUtils, TOPIC_NAME);
     }
 
     // remove segments
-    if (segmentsExist) {
+    if (segmentsExist && fullDatasourceName != null) {
       unloadAndKillData(fullDatasourceName);
     }
   }
-
-  public void addFilteredProperties(Properties properties)
-  {
-    for (Map.Entry<String, String> entry : config.getProperties().entrySet()) {
-      if (entry.getKey().startsWith(testPropertyPrefix)) {
-        properties.put(entry.getKey().substring(testPropertyPrefix.length()), entry.getValue());
-      }
-    }
-  }
 }
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java
index d297fda..e9cc84e 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java
@@ -19,315 +19,32 @@
 
 package org.apache.druid.tests.indexer;
 
-import com.google.inject.Inject;
-import kafka.admin.AdminUtils;
-import kafka.admin.RackAwareMode;
-import kafka.utils.ZKStringSerializer$;
-import kafka.utils.ZkUtils;
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkConnection;
-import org.apache.commons.io.IOUtils;
-import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
-import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.testing.IntegrationTestingConfig;
 import org.apache.druid.testing.guice.DruidTestModuleFactory;
-import org.apache.druid.testing.utils.RetryUtil;
-import org.apache.druid.testing.utils.TestQueryHelper;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
 import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeSuite;
 import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.Callable;
-
-/*
+/**
  * This is a test for the Kafka indexing service.
  */
 @Guice(moduleFactory = DruidTestModuleFactory.class)
-public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
+public class ITKafkaIndexingServiceTest extends AbstractKafkaIndexerTest
 {
   private static final Logger LOG = new Logger(ITKafkaIndexingServiceTest.class);
-  private static final String INDEXER_FILE = "/indexer/kafka_supervisor_spec.json";
-  private static final String QUERIES_FILE = "/indexer/kafka_index_queries.json";
   private static final String DATASOURCE = "kafka_indexing_service_test";
-  private static final String TOPIC_NAME = "kafka_indexing_service_topic";
-
-  private static final int NUM_EVENTS_TO_SEND = 60;
-  private static final long WAIT_TIME_MILLIS = 2 * 60 * 1000L;
-  public static final String testPropertyPrefix = "kafka.test.property.";
-
-  // We'll fill in the current time and numbers for added, deleted and changed
-  // before sending the event.
-  final String event_template =
-      "{\"timestamp\": \"%s\"," +
-      "\"page\": \"Gypsy Danger\"," +
-      "\"language\" : \"en\"," +
-      "\"user\" : \"nuclear\"," +
-      "\"unpatrolled\" : \"true\"," +
-      "\"newPage\" : \"true\"," +
-      "\"robot\": \"false\"," +
-      "\"anonymous\": \"false\"," +
-      "\"namespace\":\"article\"," +
-      "\"continent\":\"North America\"," +
-      "\"country\":\"United States\"," +
-      "\"region\":\"Bay Area\"," +
-      "\"city\":\"San Francisco\"," +
-      "\"added\":%d," +
-      "\"deleted\":%d," +
-      "\"delta\":%d}";
-
-  private String supervisorId;
-  private ZkClient zkClient;
-  private ZkUtils zkUtils;
-  private boolean segmentsExist;   // to tell if we should remove segments during teardown
-
-  // format for the querying interval
-  private final DateTimeFormatter INTERVAL_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:'00Z'");
-  // format for the expected timestamp in a query response
-  private final DateTimeFormatter TIMESTAMP_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'.000Z'");
-  private DateTime dtFirst;                // timestamp of 1st event
-  private DateTime dtLast;                 // timestamp of last event
-
-  @Inject
-  private TestQueryHelper queryHelper;
-  @Inject
-  private IntegrationTestingConfig config;
-
-  private String fullDatasourceName;
-
-  @BeforeSuite
-  public void setFullDatasourceName()
-  {
-    fullDatasourceName = DATASOURCE + config.getExtraDatasourceNameSuffix();
-  }
 
   @Test
   public void testKafka()
   {
     LOG.info("Starting test: ITKafkaIndexingServiceTest");
-
-    // create topic
-    try {
-      int sessionTimeoutMs = 10000;
-      int connectionTimeoutMs = 10000;
-      String zkHosts = config.getZookeeperHosts();
-      zkClient = new ZkClient(zkHosts, sessionTimeoutMs, connectionTimeoutMs, ZKStringSerializer$.MODULE$);
-      zkUtils = new ZkUtils(zkClient, new ZkConnection(zkHosts, sessionTimeoutMs), false);
-      if (config.manageKafkaTopic()) {
-        int numPartitions = 4;
-        int replicationFactor = 1;
-        Properties topicConfig = new Properties();
-        AdminUtils.createTopic(
-            zkUtils,
-            TOPIC_NAME,
-            numPartitions,
-            replicationFactor,
-            topicConfig,
-            RackAwareMode.Disabled$.MODULE$
-        );
-      }
-    }
-    catch (Exception e) {
-      throw new ISE(e, "could not create kafka topic");
-    }
-
-    String spec;
-    try {
-      LOG.info("supervisorSpec name: [%s]", INDEXER_FILE);
-      final Map<String, Object> consumerConfigs = KafkaConsumerConfigs.getConsumerProperties();
-      final Properties consumerProperties = new Properties();
-      consumerProperties.putAll(consumerConfigs);
-      consumerProperties.put("bootstrap.servers", config.getKafkaInternalHost());
-      addFilteredProperties(consumerProperties);
-
-      spec = getTaskAsString(INDEXER_FILE);
-      spec = StringUtils.replace(spec, "%%DATASOURCE%%", fullDatasourceName);
-      spec = StringUtils.replace(spec, "%%TOPIC%%", TOPIC_NAME);
-      spec = StringUtils.replace(spec, "%%CONSUMER_PROPERTIES%%", jsonMapper.writeValueAsString(consumerProperties));
-      LOG.info("supervisorSpec: [%s]\n", spec);
-    }
-    catch (Exception e) {
-      LOG.error("could not read file [%s]", INDEXER_FILE);
-      throw new ISE(e, "could not read file [%s]", INDEXER_FILE);
-    }
-
-    // start supervisor
-    supervisorId = indexer.submitSupervisor(spec);
-    LOG.info("Submitted supervisor");
-
-    // set up kafka producer
-    Properties properties = new Properties();
-    addFilteredProperties(properties);
-    properties.put("bootstrap.servers", config.getKafkaHost());
-    LOG.info("Kafka bootstrap.servers: [%s]", config.getKafkaHost());
-    properties.put("acks", "all");
-    properties.put("retries", "3");
-
-    KafkaProducer<String, String> producer = new KafkaProducer<>(
-        properties,
-        new StringSerializer(),
-        new StringSerializer()
-    );
-
-    DateTimeZone zone = DateTimes.inferTzFromString("UTC");
-    // format for putting into events
-    DateTimeFormatter event_fmt = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'Z'");
-
-    DateTime dt = new DateTime(zone); // timestamp to put on events
-    dtFirst = dt;            // timestamp of 1st event
-    dtLast = dt;             // timestamp of last event
-
-    // these are used to compute the expected aggregations
-    int added = 0;
-    int num_events = 0;
-
-    // send data to kafka
-    while (num_events < NUM_EVENTS_TO_SEND) {
-      num_events++;
-      added += num_events;
-      // construct the event to send
-      String event = StringUtils.format(event_template, event_fmt.print(dt), num_events, 0, num_events);
-      LOG.info("sending event: [%s]", event);
-      try {
-        producer.send(new ProducerRecord<String, String>(TOPIC_NAME, event)).get();
-      }
-      catch (Exception ioe) {
-        throw new RuntimeException(ioe);
-      }
-
-      dtLast = dt;
-      dt = new DateTime(zone);
-    }
-
-    producer.close();
-
-    LOG.info("Waiting for [%s] millis for Kafka indexing tasks to consume events", WAIT_TIME_MILLIS);
-    try {
-      Thread.sleep(WAIT_TIME_MILLIS);
-    }
-    catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new RuntimeException(e);
-    }
-
-
-    InputStream is = ITKafkaIndexingServiceTest.class.getResourceAsStream(QUERIES_FILE);
-    if (null == is) {
-      throw new ISE("could not open query file: %s", QUERIES_FILE);
-    }
-
-    // put the timestamps into the query structure
-    String query_response_template;
-    try {
-      query_response_template = IOUtils.toString(is, "UTF-8");
-    }
-    catch (IOException e) {
-      throw new ISE(e, "could not read query file: %s", QUERIES_FILE);
-    }
-
-    String queryStr = query_response_template;
-    queryStr = StringUtils.replace(queryStr, "%%DATASOURCE%%", fullDatasourceName);
-    queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst));
-    queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_MAXTIME%%", TIMESTAMP_FMT.print(dtLast));
-    queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_MINTIME%%", TIMESTAMP_FMT.print(dtFirst));
-    queryStr = StringUtils.replace(queryStr, "%%TIMESERIES_QUERY_START%%", INTERVAL_FMT.print(dtFirst));
-    queryStr = StringUtils.replace(queryStr, "%%TIMESERIES_QUERY_END%%", INTERVAL_FMT.print(dtLast.plusMinutes(2)));
-    queryStr = StringUtils.replace(queryStr, "%%TIMESERIES_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst));
-    queryStr = StringUtils.replace(queryStr, "%%TIMESERIES_ADDED%%", Integer.toString(added));
-    queryStr = StringUtils.replace(queryStr, "%%TIMESERIES_NUMEVENTS%%", Integer.toString(num_events));
-
-    // this query will probably be answered from the indexing tasks but possibly from 2 historical segments / 2 indexing
-    try {
-      this.queryHelper.testQueriesFromString(queryStr, 2);
-    }
-    catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-
-    LOG.info("Shutting down Kafka Supervisor");
-    indexer.shutdownSupervisor(supervisorId);
-
-    // wait for all kafka indexing tasks to finish
-    LOG.info("Waiting for all kafka indexing tasks to finish");
-    RetryUtil.retryUntilTrue(
-        new Callable<Boolean>()
-        {
-          @Override
-          public Boolean call()
-          {
-            return (indexer.getPendingTasks().size() + indexer.getRunningTasks().size() + indexer.getWaitingTasks()
-                                                                                                 .size()) == 0;
-          }
-        }, "Waiting for Tasks Completion"
-    );
-
-    // wait for segments to be handed off
-    try {
-      RetryUtil.retryUntil(
-          new Callable<Boolean>()
-          {
-            @Override
-            public Boolean call()
-            {
-              return coordinator.areSegmentsLoaded(fullDatasourceName);
-            }
-          },
-          true,
-          10000,
-          30,
-          "Real-time generated segments loaded"
-      );
-    }
-    catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-    LOG.info("segments are present");
-    segmentsExist = true;
-
-    // this query will be answered by at least 1 historical segment, most likely 2, and possibly up to all 4
-    try {
-      this.queryHelper.testQueriesFromString(queryStr, 2);
-    }
-    catch (Exception e) {
-      throw new RuntimeException(e);
-    }
+    doKafkaIndexTest(DATASOURCE, false);
   }
 
   @AfterClass
   public void afterClass()
   {
     LOG.info("teardown");
-    if (config.manageKafkaTopic()) {
-      // delete kafka topic
-      AdminUtils.deleteTopic(zkUtils, TOPIC_NAME);
-    }
-
-    // remove segments
-    if (segmentsExist) {
-      unloadAndKillData(fullDatasourceName);
-    }
-  }
-
-  public void addFilteredProperties(Properties properties)
-  {
-    for (Map.Entry<String, String> entry : config.getProperties().entrySet()) {
-      if (entry.getKey().startsWith(testPropertyPrefix)) {
-        properties.put(entry.getKey().substring(testPropertyPrefix.length()), entry.getValue());
-      }
-    }
+    doTearDown();
   }
 }
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalTest.java
new file mode 100644
index 0000000..07bcae5
--- /dev/null
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.indexer;
+
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+/**
+ * This is a test for the Kafka indexing service with transactional topics
+ */
+@Guice(moduleFactory = DruidTestModuleFactory.class)
+public class ITKafkaIndexingServiceTransactionalTest extends AbstractKafkaIndexerTest
+{
+  private static final Logger LOG = new Logger(ITKafkaIndexingServiceTransactionalTest.class);
+  private static final String DATASOURCE = "kafka_indexing_service_txn_test";
+
+  @Test
+  public void testKafka()
+  {
+    LOG.info("Starting test: ITKafkaIndexingServiceTransactionalTest");
+    doKafkaIndexTest(DATASOURCE, true);
+  }
+
+  @AfterClass
+  public void afterClass()
+  {
+    LOG.info("teardown");
+    doTearDown();
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org