You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by su...@apache.org on 2020/10/05 15:54:57 UTC
[druid] branch master updated: adjustments to Kafka integration
tests to allow running against Azure Event Hubs streams (#10463)
This is an automated email from the ASF dual-hosted git repository.
suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 307c1b0 adjustments to Kafka integration tests to allow running against Azure Event Hubs streams (#10463)
307c1b0 is described below
commit 307c1b072006115dc0780913bb3fd7d1831849b4
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Mon Oct 5 08:54:29 2020 -0700
adjustments to Kafka integration tests to allow running against Azure Event Hubs streams (#10463)
* adjustments to kafka integration tests to allow running against azure event hubs in kafka mode
* oops
* make better
* more better
---
.../apache/druid/testing/DockerConfigProvider.java | 43 +++++++++++++++++++++-
.../druid/testing/utils/KafkaAdminClient.java | 2 +-
.../druid/testing/utils/KafkaEventWriter.java | 2 +-
.../org/apache/druid/testing/utils/KafkaUtil.java | 15 ++++++++
.../indexer/AbstractKafkaIndexingServiceTest.java | 2 +-
.../ITKafkaIndexingServiceDataFormatTest.java | 15 +++++++-
6 files changed, 73 insertions(+), 6 deletions(-)
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java b/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java
index 11c540f..67266b0 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java
@@ -21,8 +21,13 @@ package org.apache.druid.testing;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import javax.validation.constraints.NotNull;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@@ -54,6 +59,10 @@ public class DockerConfigProvider implements IntegrationTestingConfigProvider
@JsonProperty
private String streamEndpoint;
+ @JsonProperty
+ @JsonDeserialize(using = ArbitraryPropertiesJsonDeserializer.class)
+ private Map<String, String> properties = new HashMap<>();
+
@Override
public IntegrationTestingConfig get()
{
@@ -190,7 +199,7 @@ public class DockerConfigProvider implements IntegrationTestingConfigProvider
@Override
public String getProperty(String prop)
{
- throw new UnsupportedOperationException("DockerConfigProvider does not support property " + prop);
+ return properties.get(prop);
}
@Override
@@ -208,7 +217,7 @@ public class DockerConfigProvider implements IntegrationTestingConfigProvider
@Override
public Map<String, String> getProperties()
{
- return new HashMap<>();
+ return properties;
}
@Override
@@ -260,4 +269,34 @@ public class DockerConfigProvider implements IntegrationTestingConfigProvider
}
};
}
+
+ // there is probably a better way to do this...
+ static class ArbitraryPropertiesJsonDeserializer extends JsonDeserializer<Map<String, String>>
+ {
+ @Override
+ public Map<String, String> deserialize(JsonParser jsonParser, DeserializationContext deserializationContext)
+ throws IOException
+ {
+ // given some config input, such as
+ // druid.test.config.properites.a.b.c=d
+ // calling jsonParser.readValueAs(Map.class) here results in a map that has both nested objects and also
+ // flattened string pairs, so the map looks something like this (in JSON form):
+ // {
+ // "a" : { "b": { "c" : "d" }}},
+ // "a.b.c":"d"
+ // }
+ // The string pairs are the values we want to populate this map with, so filtering out the top level keys which
+ // do not have string values leaves us with
+ // { "a.b.c":"d"}
+ // from the given example, which is what we want
+ Map<String, Object> parsed = jsonParser.readValueAs(Map.class);
+ Map<String, String> flat = new HashMap<>();
+ for (Map.Entry<String, Object> entry : parsed.entrySet()) {
+ if (!(entry.getValue() instanceof Map)) {
+ flat.put(entry.getKey(), (String) entry.getValue());
+ }
+ }
+ return flat;
+ }
+ }
}
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaAdminClient.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaAdminClient.java
index b8311a6..84993f2 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaAdminClient.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaAdminClient.java
@@ -43,8 +43,8 @@ public class KafkaAdminClient implements StreamAdminClient
public KafkaAdminClient(IntegrationTestingConfig config)
{
Properties properties = new Properties();
- KafkaUtil.addPropertiesFromTestConfig(config, properties);
properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.getKafkaHost());
+ KafkaUtil.addPropertiesFromTestConfig(config, properties);
adminClient = AdminClient.create(properties);
}
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaEventWriter.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaEventWriter.java
index 1d5973c..79ae219 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaEventWriter.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaEventWriter.java
@@ -42,7 +42,6 @@ public class KafkaEventWriter implements StreamEventWriter
public KafkaEventWriter(IntegrationTestingConfig config, boolean txnEnabled)
{
Properties properties = new Properties();
- KafkaUtil.addPropertiesFromTestConfig(config, properties);
properties.setProperty("bootstrap.servers", config.getKafkaHost());
properties.setProperty("acks", "all");
properties.setProperty("retries", "3");
@@ -53,6 +52,7 @@ public class KafkaEventWriter implements StreamEventWriter
properties.setProperty("enable.idempotence", "true");
properties.setProperty("transactional.id", IdUtils.getRandomId());
}
+ KafkaUtil.addPropertiesFromTestConfig(config, properties);
this.producer = new KafkaProducer<>(
properties,
new StringSerializer(),
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaUtil.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaUtil.java
index 36534c2..0f7e9fa 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaUtil.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaUtil.java
@@ -21,12 +21,16 @@ package org.apache.druid.testing.utils;
import org.apache.druid.testing.IntegrationTestingConfig;
+import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class KafkaUtil
{
private static final String TEST_PROPERTY_PREFIX = "kafka.test.property.";
+ private static final String TEST_CONFIG_PROPERTY_PREFIX = "kafka.test.config.";
+
+ public static final String TEST_CONFIG_TRANSACTION_ENABLED = "transactionEnabled";
public static void addPropertiesFromTestConfig(IntegrationTestingConfig config, Properties properties)
{
@@ -36,4 +40,15 @@ public class KafkaUtil
}
}
}
+
+ public static Map<String, String> getAdditionalKafkaTestConfigFromProperties(IntegrationTestingConfig config)
+ {
+ Map<String, String> theMap = new HashMap<>();
+ for (Map.Entry<String, String> entry : config.getProperties().entrySet()) {
+ if (entry.getKey().startsWith(TEST_CONFIG_PROPERTY_PREFIX)) {
+ theMap.put(entry.getKey().substring(TEST_CONFIG_PROPERTY_PREFIX.length()), entry.getValue());
+ }
+ }
+ return theMap;
+ }
}
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
index cc2a22f..204b6ef 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
@@ -58,9 +58,9 @@ public abstract class AbstractKafkaIndexingServiceTest extends AbstractStreamInd
{
final Map<String, Object> consumerConfigs = KafkaConsumerConfigs.getConsumerProperties();
final Properties consumerProperties = new Properties();
- KafkaUtil.addPropertiesFromTestConfig(config, consumerProperties);
consumerProperties.putAll(consumerConfigs);
consumerProperties.setProperty("bootstrap.servers", config.getKafkaInternalHost());
+ KafkaUtil.addPropertiesFromTestConfig(config, consumerProperties);
return spec -> {
try {
spec = StringUtils.replace(
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceDataFormatTest.java b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceDataFormatTest.java
index 9143d9b..2286d23 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceDataFormatTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceDataFormatTest.java
@@ -22,7 +22,9 @@ package org.apache.druid.tests.parallelized;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.testing.utils.KafkaUtil;
import org.apache.druid.tests.TestNGGroup;
import org.apache.druid.tests.indexer.AbstractKafkaIndexingServiceTest;
import org.apache.druid.tests.indexer.AbstractStreamIndexingTest;
@@ -78,6 +80,9 @@ public class ITKafkaIndexingServiceDataFormatTest extends AbstractKafkaIndexingS
@Inject
private @Json ObjectMapper jsonMapper;
+ @Inject
+ private IntegrationTestingConfig config;
+
@BeforeClass
public void beforeClass() throws Exception
{
@@ -88,7 +93,15 @@ public class ITKafkaIndexingServiceDataFormatTest extends AbstractKafkaIndexingS
public void testIndexData(boolean transactionEnabled, String serializerPath, String parserType, String specPath)
throws Exception
{
- doTestIndexDataStableState(transactionEnabled, serializerPath, parserType, specPath);
+ Map<String, String> testConfig = KafkaUtil.getAdditionalKafkaTestConfigFromProperties(config);
+ boolean txnEnable = Boolean.parseBoolean(
+ testConfig.getOrDefault(KafkaUtil.TEST_CONFIG_TRANSACTION_ENABLED, "false")
+ );
+ if (txnEnable != transactionEnabled) {
+ // do nothing
+ return;
+ }
+ doTestIndexDataStableState(txnEnable, serializerPath, parserType, specPath);
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org