You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by su...@apache.org on 2020/10/05 15:54:57 UTC

[druid] branch master updated: adjustments to Kafka integration tests to allow running against Azure Event Hubs streams (#10463)

This is an automated email from the ASF dual-hosted git repository.

suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 307c1b0  adjustments to Kafka integration tests to allow running against Azure Event Hubs streams (#10463)
307c1b0 is described below

commit 307c1b072006115dc0780913bb3fd7d1831849b4
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Mon Oct 5 08:54:29 2020 -0700

    adjustments to Kafka integration tests to allow running against Azure Event Hubs streams (#10463)
    
    * adjustments to kafka integration tests to allow running against azure event hubs in kafka mode
    
    * oops
    
    * make better
    
    * more better
---
 .../apache/druid/testing/DockerConfigProvider.java | 43 +++++++++++++++++++++-
 .../druid/testing/utils/KafkaAdminClient.java      |  2 +-
 .../druid/testing/utils/KafkaEventWriter.java      |  2 +-
 .../org/apache/druid/testing/utils/KafkaUtil.java  | 15 ++++++++
 .../indexer/AbstractKafkaIndexingServiceTest.java  |  2 +-
 .../ITKafkaIndexingServiceDataFormatTest.java      | 15 +++++++-
 6 files changed, 73 insertions(+), 6 deletions(-)

diff --git a/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java b/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java
index 11c540f..67266b0 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java
@@ -21,8 +21,13 @@ package org.apache.druid.testing;
 
 
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 
 import javax.validation.constraints.NotNull;
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -54,6 +59,10 @@ public class DockerConfigProvider implements IntegrationTestingConfigProvider
   @JsonProperty
   private String streamEndpoint;
 
+  @JsonProperty
+  @JsonDeserialize(using = ArbitraryPropertiesJsonDeserializer.class)
+  private Map<String, String> properties = new HashMap<>();
+
   @Override
   public IntegrationTestingConfig get()
   {
@@ -190,7 +199,7 @@ public class DockerConfigProvider implements IntegrationTestingConfigProvider
       @Override
       public String getProperty(String prop)
       {
-        throw new UnsupportedOperationException("DockerConfigProvider does not support property " + prop);
+        return properties.get(prop);
       }
 
       @Override
@@ -208,7 +217,7 @@ public class DockerConfigProvider implements IntegrationTestingConfigProvider
       @Override
       public Map<String, String> getProperties()
       {
-        return new HashMap<>();
+        return properties;
       }
 
       @Override
@@ -260,4 +269,34 @@ public class DockerConfigProvider implements IntegrationTestingConfigProvider
       }
     };
   }
+
+  // there is probably a better way to do this...
+  static class ArbitraryPropertiesJsonDeserializer extends JsonDeserializer<Map<String, String>>
+  {
+    @Override
+    public Map<String, String> deserialize(JsonParser jsonParser, DeserializationContext deserializationContext)
+        throws IOException
+    {
+      // given some config input, such as
+      //    druid.test.config.properites.a.b.c=d
+      // calling jsonParser.readValueAs(Map.class) here results in a map that has both nested objects and also
+      // flattened string pairs, so the map looks something like this (in JSON form):
+      //    {
+      //      "a" : { "b": { "c" : "d" }}},
+      //      "a.b.c":"d"
+      //    }
+      // The string pairs are the values we want to populate this map with, so filtering out the top level keys which
+      // do not have string values leaves us with
+      //    { "a.b.c":"d"}
+      // from the given example, which is what we want
+      Map<String, Object> parsed = jsonParser.readValueAs(Map.class);
+      Map<String, String> flat = new HashMap<>();
+      for (Map.Entry<String, Object> entry : parsed.entrySet()) {
+        if (!(entry.getValue() instanceof Map)) {
+          flat.put(entry.getKey(), (String) entry.getValue());
+        }
+      }
+      return flat;
+    }
+  }
 }
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaAdminClient.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaAdminClient.java
index b8311a6..84993f2 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaAdminClient.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaAdminClient.java
@@ -43,8 +43,8 @@ public class KafkaAdminClient implements StreamAdminClient
   public KafkaAdminClient(IntegrationTestingConfig config)
   {
     Properties properties = new Properties();
-    KafkaUtil.addPropertiesFromTestConfig(config, properties);
     properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.getKafkaHost());
+    KafkaUtil.addPropertiesFromTestConfig(config, properties);
     adminClient = AdminClient.create(properties);
   }
 
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaEventWriter.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaEventWriter.java
index 1d5973c..79ae219 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaEventWriter.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaEventWriter.java
@@ -42,7 +42,6 @@ public class KafkaEventWriter implements StreamEventWriter
   public KafkaEventWriter(IntegrationTestingConfig config, boolean txnEnabled)
   {
     Properties properties = new Properties();
-    KafkaUtil.addPropertiesFromTestConfig(config, properties);
     properties.setProperty("bootstrap.servers", config.getKafkaHost());
     properties.setProperty("acks", "all");
     properties.setProperty("retries", "3");
@@ -53,6 +52,7 @@ public class KafkaEventWriter implements StreamEventWriter
       properties.setProperty("enable.idempotence", "true");
       properties.setProperty("transactional.id", IdUtils.getRandomId());
     }
+    KafkaUtil.addPropertiesFromTestConfig(config, properties);
     this.producer = new KafkaProducer<>(
         properties,
         new StringSerializer(),
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaUtil.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaUtil.java
index 36534c2..0f7e9fa 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaUtil.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaUtil.java
@@ -21,12 +21,16 @@ package org.apache.druid.testing.utils;
 
 import org.apache.druid.testing.IntegrationTestingConfig;
 
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
 public class KafkaUtil
 {
   private static final String TEST_PROPERTY_PREFIX = "kafka.test.property.";
+  private static final String TEST_CONFIG_PROPERTY_PREFIX = "kafka.test.config.";
+
+  public static final String TEST_CONFIG_TRANSACTION_ENABLED = "transactionEnabled";
 
   public static void addPropertiesFromTestConfig(IntegrationTestingConfig config, Properties properties)
   {
@@ -36,4 +40,15 @@ public class KafkaUtil
       }
     }
   }
+
+  public static Map<String, String> getAdditionalKafkaTestConfigFromProperties(IntegrationTestingConfig config)
+  {
+    Map<String, String> theMap = new HashMap<>();
+    for (Map.Entry<String, String> entry : config.getProperties().entrySet()) {
+      if (entry.getKey().startsWith(TEST_CONFIG_PROPERTY_PREFIX)) {
+        theMap.put(entry.getKey().substring(TEST_CONFIG_PROPERTY_PREFIX.length()), entry.getValue());
+      }
+    }
+    return theMap;
+  }
 }
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
index cc2a22f..204b6ef 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
@@ -58,9 +58,9 @@ public abstract class AbstractKafkaIndexingServiceTest extends AbstractStreamInd
   {
     final Map<String, Object> consumerConfigs = KafkaConsumerConfigs.getConsumerProperties();
     final Properties consumerProperties = new Properties();
-    KafkaUtil.addPropertiesFromTestConfig(config, consumerProperties);
     consumerProperties.putAll(consumerConfigs);
     consumerProperties.setProperty("bootstrap.servers", config.getKafkaInternalHost());
+    KafkaUtil.addPropertiesFromTestConfig(config, consumerProperties);
     return spec -> {
       try {
         spec = StringUtils.replace(
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceDataFormatTest.java b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceDataFormatTest.java
index 9143d9b..2286d23 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceDataFormatTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceDataFormatTest.java
@@ -22,7 +22,9 @@ package org.apache.druid.tests.parallelized;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.inject.Inject;
 import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.testing.IntegrationTestingConfig;
 import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.testing.utils.KafkaUtil;
 import org.apache.druid.tests.TestNGGroup;
 import org.apache.druid.tests.indexer.AbstractKafkaIndexingServiceTest;
 import org.apache.druid.tests.indexer.AbstractStreamIndexingTest;
@@ -78,6 +80,9 @@ public class ITKafkaIndexingServiceDataFormatTest extends AbstractKafkaIndexingS
   @Inject
   private @Json ObjectMapper jsonMapper;
 
+  @Inject
+  private IntegrationTestingConfig config;
+
   @BeforeClass
   public void beforeClass() throws Exception
   {
@@ -88,7 +93,15 @@ public class ITKafkaIndexingServiceDataFormatTest extends AbstractKafkaIndexingS
   public void testIndexData(boolean transactionEnabled, String serializerPath, String parserType, String specPath)
       throws Exception
   {
-    doTestIndexDataStableState(transactionEnabled, serializerPath, parserType, specPath);
+    Map<String, String> testConfig = KafkaUtil.getAdditionalKafkaTestConfigFromProperties(config);
+    boolean txnEnable = Boolean.parseBoolean(
+        testConfig.getOrDefault(KafkaUtil.TEST_CONFIG_TRANSACTION_ENABLED, "false")
+    );
+    if (txnEnable != transactionEnabled) {
+      // do nothing
+      return;
+    }
+    doTestIndexDataStableState(txnEnable, serializerPath, parserType, specPath);
   }
 
   @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org