You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2019/03/22 20:39:13 UTC
[incubator-druid] branch master updated: Add Kafka integration test
for transactional topics (#7295)
This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 01c021e Add Kafka integration test for transactional topics (#7295)
01c021e is described below
commit 01c021e6da6e7abf5035863daa554b89a6df4904
Author: Surekha <su...@imply.io>
AuthorDate: Fri Mar 22 13:39:05 2019 -0700
Add Kafka integration test for transactional topics (#7295)
* Add integration test for transactional kafka
* Add true for transactions enabled for transactional test
* Add new test to travis_script_integration.sh, use version 0.2 of druid docker image
* Use different datasource name for ITKafkaIndexingServiceTest and ITKafkaIndexingServiceTransactionalTest
* use KafkaConsumerConfigs to get common consumer properties
* Remove double line breaks
* remove extra space
---
ci/travis_script_integration.sh | 2 +-
...viceTest.java => AbstractKafkaIndexerTest.java} | 83 +++---
.../tests/indexer/ITKafkaIndexingServiceTest.java | 291 +--------------------
.../ITKafkaIndexingServiceTransactionalTest.java | 50 ++++
4 files changed, 94 insertions(+), 332 deletions(-)
diff --git a/ci/travis_script_integration.sh b/ci/travis_script_integration.sh
index b4bb1af..99b6085 100755
--- a/ci/travis_script_integration.sh
+++ b/ci/travis_script_integration.sh
@@ -21,6 +21,6 @@ set -e
pushd $TRAVIS_BUILD_DIR/integration-tests
-mvn verify -P integration-tests -Dit.test=ITAppenderatorDriverRealtimeIndexTaskTest,ITCompactionTaskTest,ITIndexerTest,ITKafkaIndexingServiceTest,ITKafkaTest,ITParallelIndexTest,ITRealtimeIndexTaskTest
+mvn verify -P integration-tests -Dit.test=ITAppenderatorDriverRealtimeIndexTaskTest,ITCompactionTaskTest,ITIndexerTest,ITKafkaIndexingServiceTest,ITKafkaIndexingServiceTransactionalTest,ITParallelIndexTest,ITRealtimeIndexTaskTest
popd
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java
similarity index 88%
copy from integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java
copy to integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java
index d297fda..fa9c64b 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java
@@ -19,6 +19,7 @@
package org.apache.druid.tests.indexer;
+import com.google.common.base.Throwables;
import com.google.inject.Inject;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
@@ -28,25 +29,22 @@ import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.commons.io.IOUtils;
import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
+import org.apache.druid.indexing.seekablestream.utils.RandomIdUtils;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.testing.IntegrationTestingConfig;
-import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.testing.utils.RetryUtil;
import org.apache.druid.testing.utils.TestQueryHelper;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeSuite;
-import org.testng.annotations.Guice;
-import org.testng.annotations.Test;
import java.io.IOException;
import java.io.InputStream;
@@ -54,16 +52,11 @@ import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
-/*
- * This is a test for the Kafka indexing service.
- */
-@Guice(moduleFactory = DruidTestModuleFactory.class)
-public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
+public class AbstractKafkaIndexerTest extends AbstractIndexerTest
{
- private static final Logger LOG = new Logger(ITKafkaIndexingServiceTest.class);
+ private static final Logger LOG = new Logger(AbstractKafkaIndexerTest.class);
private static final String INDEXER_FILE = "/indexer/kafka_supervisor_spec.json";
private static final String QUERIES_FILE = "/indexer/kafka_index_queries.json";
- private static final String DATASOURCE = "kafka_indexing_service_test";
private static final String TOPIC_NAME = "kafka_indexing_service_topic";
private static final int NUM_EVENTS_TO_SEND = 60;
@@ -109,17 +102,9 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
private String fullDatasourceName;
- @BeforeSuite
- public void setFullDatasourceName()
- {
- fullDatasourceName = DATASOURCE + config.getExtraDatasourceNameSuffix();
- }
-
- @Test
- public void testKafka()
+ void doKafkaIndexTest(String dataSourceName, boolean txnEnabled)
{
- LOG.info("Starting test: ITKafkaIndexingServiceTest");
-
+ fullDatasourceName = dataSourceName + config.getExtraDatasourceNameSuffix();
// create topic
try {
int sessionTimeoutMs = 10000;
@@ -152,7 +137,6 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
final Properties consumerProperties = new Properties();
consumerProperties.putAll(consumerConfigs);
consumerProperties.put("bootstrap.servers", config.getKafkaInternalHost());
- addFilteredProperties(consumerProperties);
spec = getTaskAsString(INDEXER_FILE);
spec = StringUtils.replace(spec, "%%DATASOURCE%%", fullDatasourceName);
@@ -171,11 +155,17 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
// set up kafka producer
Properties properties = new Properties();
- addFilteredProperties(properties);
+ addFilteredProperties(config, properties);
properties.put("bootstrap.servers", config.getKafkaHost());
LOG.info("Kafka bootstrap.servers: [%s]", config.getKafkaHost());
properties.put("acks", "all");
properties.put("retries", "3");
+ properties.put("key.serializer", ByteArraySerializer.class.getName());
+ properties.put("value.serializer", ByteArraySerializer.class.getName());
+ if (txnEnabled) {
+ properties.put("enable.idempotence", "true");
+ properties.put("transactional.id", RandomIdUtils.getRandomId());
+ }
KafkaProducer<String, String> producer = new KafkaProducer<>(
properties,
@@ -196,6 +186,10 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
int num_events = 0;
// send data to kafka
+ if (txnEnabled) {
+ producer.initTransactions();
+ producer.beginTransaction();
+ }
while (num_events < NUM_EVENTS_TO_SEND) {
num_events++;
added += num_events;
@@ -203,16 +197,20 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
String event = StringUtils.format(event_template, event_fmt.print(dt), num_events, 0, num_events);
LOG.info("sending event: [%s]", event);
try {
+
producer.send(new ProducerRecord<String, String>(TOPIC_NAME, event)).get();
+
}
catch (Exception ioe) {
- throw new RuntimeException(ioe);
+ throw Throwables.propagate(ioe);
}
dtLast = dt;
dt = new DateTime(zone);
}
-
+ if (txnEnabled) {
+ producer.commitTransaction();
+ }
producer.close();
LOG.info("Waiting for [%s] millis for Kafka indexing tasks to consume events", WAIT_TIME_MILLIS);
@@ -224,8 +222,7 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
throw new RuntimeException(e);
}
-
- InputStream is = ITKafkaIndexingServiceTest.class.getResourceAsStream(QUERIES_FILE);
+ InputStream is = AbstractKafkaIndexerTest.class.getResourceAsStream(QUERIES_FILE);
if (null == is) {
throw new ISE("could not open query file: %s", QUERIES_FILE);
}
@@ -255,7 +252,7 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
this.queryHelper.testQueriesFromString(queryStr, 2);
}
catch (Exception e) {
- throw new RuntimeException(e);
+ throw Throwables.propagate(e);
}
LOG.info("Shutting down Kafka Supervisor");
@@ -293,7 +290,7 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
);
}
catch (Exception e) {
- throw new RuntimeException(e);
+ throw Throwables.propagate(e);
}
LOG.info("segments are present");
segmentsExist = true;
@@ -303,31 +300,29 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
this.queryHelper.testQueriesFromString(queryStr, 2);
}
catch (Exception e) {
- throw new RuntimeException(e);
+ throw Throwables.propagate(e);
+ }
+ }
+
+ private void addFilteredProperties(IntegrationTestingConfig config, Properties properties)
+ {
+ for (Map.Entry<String, String> entry : config.getProperties().entrySet()) {
+ if (entry.getKey().startsWith(testPropertyPrefix)) {
+ properties.put(entry.getKey().substring(testPropertyPrefix.length()), entry.getValue());
+ }
}
}
- @AfterClass
- public void afterClass()
+ void doTearDown()
{
- LOG.info("teardown");
if (config.manageKafkaTopic()) {
// delete kafka topic
AdminUtils.deleteTopic(zkUtils, TOPIC_NAME);
}
// remove segments
- if (segmentsExist) {
+ if (segmentsExist && fullDatasourceName != null) {
unloadAndKillData(fullDatasourceName);
}
}
-
- public void addFilteredProperties(Properties properties)
- {
- for (Map.Entry<String, String> entry : config.getProperties().entrySet()) {
- if (entry.getKey().startsWith(testPropertyPrefix)) {
- properties.put(entry.getKey().substring(testPropertyPrefix.length()), entry.getValue());
- }
- }
- }
}
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java
index d297fda..e9cc84e 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java
@@ -19,315 +19,32 @@
package org.apache.druid.tests.indexer;
-import com.google.inject.Inject;
-import kafka.admin.AdminUtils;
-import kafka.admin.RackAwareMode;
-import kafka.utils.ZKStringSerializer$;
-import kafka.utils.ZkUtils;
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkConnection;
-import org.apache.commons.io.IOUtils;
-import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
-import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
-import org.apache.druid.testing.utils.RetryUtil;
-import org.apache.druid.testing.utils.TestQueryHelper;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeSuite;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.Callable;
-
-/*
+/**
* This is a test for the Kafka indexing service.
*/
@Guice(moduleFactory = DruidTestModuleFactory.class)
-public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
+public class ITKafkaIndexingServiceTest extends AbstractKafkaIndexerTest
{
private static final Logger LOG = new Logger(ITKafkaIndexingServiceTest.class);
- private static final String INDEXER_FILE = "/indexer/kafka_supervisor_spec.json";
- private static final String QUERIES_FILE = "/indexer/kafka_index_queries.json";
private static final String DATASOURCE = "kafka_indexing_service_test";
- private static final String TOPIC_NAME = "kafka_indexing_service_topic";
-
- private static final int NUM_EVENTS_TO_SEND = 60;
- private static final long WAIT_TIME_MILLIS = 2 * 60 * 1000L;
- public static final String testPropertyPrefix = "kafka.test.property.";
-
- // We'll fill in the current time and numbers for added, deleted and changed
- // before sending the event.
- final String event_template =
- "{\"timestamp\": \"%s\"," +
- "\"page\": \"Gypsy Danger\"," +
- "\"language\" : \"en\"," +
- "\"user\" : \"nuclear\"," +
- "\"unpatrolled\" : \"true\"," +
- "\"newPage\" : \"true\"," +
- "\"robot\": \"false\"," +
- "\"anonymous\": \"false\"," +
- "\"namespace\":\"article\"," +
- "\"continent\":\"North America\"," +
- "\"country\":\"United States\"," +
- "\"region\":\"Bay Area\"," +
- "\"city\":\"San Francisco\"," +
- "\"added\":%d," +
- "\"deleted\":%d," +
- "\"delta\":%d}";
-
- private String supervisorId;
- private ZkClient zkClient;
- private ZkUtils zkUtils;
- private boolean segmentsExist; // to tell if we should remove segments during teardown
-
- // format for the querying interval
- private final DateTimeFormatter INTERVAL_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:'00Z'");
- // format for the expected timestamp in a query response
- private final DateTimeFormatter TIMESTAMP_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'.000Z'");
- private DateTime dtFirst; // timestamp of 1st event
- private DateTime dtLast; // timestamp of last event
-
- @Inject
- private TestQueryHelper queryHelper;
- @Inject
- private IntegrationTestingConfig config;
-
- private String fullDatasourceName;
-
- @BeforeSuite
- public void setFullDatasourceName()
- {
- fullDatasourceName = DATASOURCE + config.getExtraDatasourceNameSuffix();
- }
@Test
public void testKafka()
{
LOG.info("Starting test: ITKafkaIndexingServiceTest");
-
- // create topic
- try {
- int sessionTimeoutMs = 10000;
- int connectionTimeoutMs = 10000;
- String zkHosts = config.getZookeeperHosts();
- zkClient = new ZkClient(zkHosts, sessionTimeoutMs, connectionTimeoutMs, ZKStringSerializer$.MODULE$);
- zkUtils = new ZkUtils(zkClient, new ZkConnection(zkHosts, sessionTimeoutMs), false);
- if (config.manageKafkaTopic()) {
- int numPartitions = 4;
- int replicationFactor = 1;
- Properties topicConfig = new Properties();
- AdminUtils.createTopic(
- zkUtils,
- TOPIC_NAME,
- numPartitions,
- replicationFactor,
- topicConfig,
- RackAwareMode.Disabled$.MODULE$
- );
- }
- }
- catch (Exception e) {
- throw new ISE(e, "could not create kafka topic");
- }
-
- String spec;
- try {
- LOG.info("supervisorSpec name: [%s]", INDEXER_FILE);
- final Map<String, Object> consumerConfigs = KafkaConsumerConfigs.getConsumerProperties();
- final Properties consumerProperties = new Properties();
- consumerProperties.putAll(consumerConfigs);
- consumerProperties.put("bootstrap.servers", config.getKafkaInternalHost());
- addFilteredProperties(consumerProperties);
-
- spec = getTaskAsString(INDEXER_FILE);
- spec = StringUtils.replace(spec, "%%DATASOURCE%%", fullDatasourceName);
- spec = StringUtils.replace(spec, "%%TOPIC%%", TOPIC_NAME);
- spec = StringUtils.replace(spec, "%%CONSUMER_PROPERTIES%%", jsonMapper.writeValueAsString(consumerProperties));
- LOG.info("supervisorSpec: [%s]\n", spec);
- }
- catch (Exception e) {
- LOG.error("could not read file [%s]", INDEXER_FILE);
- throw new ISE(e, "could not read file [%s]", INDEXER_FILE);
- }
-
- // start supervisor
- supervisorId = indexer.submitSupervisor(spec);
- LOG.info("Submitted supervisor");
-
- // set up kafka producer
- Properties properties = new Properties();
- addFilteredProperties(properties);
- properties.put("bootstrap.servers", config.getKafkaHost());
- LOG.info("Kafka bootstrap.servers: [%s]", config.getKafkaHost());
- properties.put("acks", "all");
- properties.put("retries", "3");
-
- KafkaProducer<String, String> producer = new KafkaProducer<>(
- properties,
- new StringSerializer(),
- new StringSerializer()
- );
-
- DateTimeZone zone = DateTimes.inferTzFromString("UTC");
- // format for putting into events
- DateTimeFormatter event_fmt = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'Z'");
-
- DateTime dt = new DateTime(zone); // timestamp to put on events
- dtFirst = dt; // timestamp of 1st event
- dtLast = dt; // timestamp of last event
-
- // these are used to compute the expected aggregations
- int added = 0;
- int num_events = 0;
-
- // send data to kafka
- while (num_events < NUM_EVENTS_TO_SEND) {
- num_events++;
- added += num_events;
- // construct the event to send
- String event = StringUtils.format(event_template, event_fmt.print(dt), num_events, 0, num_events);
- LOG.info("sending event: [%s]", event);
- try {
- producer.send(new ProducerRecord<String, String>(TOPIC_NAME, event)).get();
- }
- catch (Exception ioe) {
- throw new RuntimeException(ioe);
- }
-
- dtLast = dt;
- dt = new DateTime(zone);
- }
-
- producer.close();
-
- LOG.info("Waiting for [%s] millis for Kafka indexing tasks to consume events", WAIT_TIME_MILLIS);
- try {
- Thread.sleep(WAIT_TIME_MILLIS);
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- }
-
-
- InputStream is = ITKafkaIndexingServiceTest.class.getResourceAsStream(QUERIES_FILE);
- if (null == is) {
- throw new ISE("could not open query file: %s", QUERIES_FILE);
- }
-
- // put the timestamps into the query structure
- String query_response_template;
- try {
- query_response_template = IOUtils.toString(is, "UTF-8");
- }
- catch (IOException e) {
- throw new ISE(e, "could not read query file: %s", QUERIES_FILE);
- }
-
- String queryStr = query_response_template;
- queryStr = StringUtils.replace(queryStr, "%%DATASOURCE%%", fullDatasourceName);
- queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst));
- queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_MAXTIME%%", TIMESTAMP_FMT.print(dtLast));
- queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_MINTIME%%", TIMESTAMP_FMT.print(dtFirst));
- queryStr = StringUtils.replace(queryStr, "%%TIMESERIES_QUERY_START%%", INTERVAL_FMT.print(dtFirst));
- queryStr = StringUtils.replace(queryStr, "%%TIMESERIES_QUERY_END%%", INTERVAL_FMT.print(dtLast.plusMinutes(2)));
- queryStr = StringUtils.replace(queryStr, "%%TIMESERIES_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst));
- queryStr = StringUtils.replace(queryStr, "%%TIMESERIES_ADDED%%", Integer.toString(added));
- queryStr = StringUtils.replace(queryStr, "%%TIMESERIES_NUMEVENTS%%", Integer.toString(num_events));
-
- // this query will probably be answered from the indexing tasks but possibly from 2 historical segments / 2 indexing
- try {
- this.queryHelper.testQueriesFromString(queryStr, 2);
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- LOG.info("Shutting down Kafka Supervisor");
- indexer.shutdownSupervisor(supervisorId);
-
- // wait for all kafka indexing tasks to finish
- LOG.info("Waiting for all kafka indexing tasks to finish");
- RetryUtil.retryUntilTrue(
- new Callable<Boolean>()
- {
- @Override
- public Boolean call()
- {
- return (indexer.getPendingTasks().size() + indexer.getRunningTasks().size() + indexer.getWaitingTasks()
- .size()) == 0;
- }
- }, "Waiting for Tasks Completion"
- );
-
- // wait for segments to be handed off
- try {
- RetryUtil.retryUntil(
- new Callable<Boolean>()
- {
- @Override
- public Boolean call()
- {
- return coordinator.areSegmentsLoaded(fullDatasourceName);
- }
- },
- true,
- 10000,
- 30,
- "Real-time generated segments loaded"
- );
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
- LOG.info("segments are present");
- segmentsExist = true;
-
- // this query will be answered by at least 1 historical segment, most likely 2, and possibly up to all 4
- try {
- this.queryHelper.testQueriesFromString(queryStr, 2);
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
+ doKafkaIndexTest(DATASOURCE, false);
}
@AfterClass
public void afterClass()
{
LOG.info("teardown");
- if (config.manageKafkaTopic()) {
- // delete kafka topic
- AdminUtils.deleteTopic(zkUtils, TOPIC_NAME);
- }
-
- // remove segments
- if (segmentsExist) {
- unloadAndKillData(fullDatasourceName);
- }
- }
-
- public void addFilteredProperties(Properties properties)
- {
- for (Map.Entry<String, String> entry : config.getProperties().entrySet()) {
- if (entry.getKey().startsWith(testPropertyPrefix)) {
- properties.put(entry.getKey().substring(testPropertyPrefix.length()), entry.getValue());
- }
- }
+ doTearDown();
}
}
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalTest.java
new file mode 100644
index 0000000..07bcae5
--- /dev/null
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.indexer;
+
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+/**
+ * This is a test for the Kafka indexing service with transactional topics
+ */
+@Guice(moduleFactory = DruidTestModuleFactory.class)
+public class ITKafkaIndexingServiceTransactionalTest extends AbstractKafkaIndexerTest
+{
+ private static final Logger LOG = new Logger(ITKafkaIndexingServiceTransactionalTest.class);
+ private static final String DATASOURCE = "kafka_indexing_service_txn_test";
+
+ @Test
+ public void testKafka()
+ {
+ LOG.info("Starting test: ITKafkaIndexingServiceTransactionalTest");
+ doKafkaIndexTest(DATASOURCE, true);
+ }
+
+ @AfterClass
+ public void afterClass()
+ {
+ LOG.info("teardown");
+ doTearDown();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org