You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2020/10/02 06:08:13 UTC

[GitHub] [druid] clintropolis opened a new pull request #10463: adjustments to Kafka integration tests to allow running against Azure Event Hubs streams

clintropolis opened a new pull request #10463:
URL: https://github.com/apache/druid/pull/10463


   ### Description
   This PR adjust the Kafka integration tests to slightly adjust the order in which the `druid.test.config.properties.kafka.test.property.*` overrides are applied, allowing `bootstrap.servers` to be overridden in the docker tests to point at Azure event hubs. Additionally, an option to disable transaction tests (Azure Event Hubs does not support Kafka transactions) in `ITKafkaIndexingServiceDataFormatTest`. Note that this PR does not actually add any new integration tests, it just allows more configurability making this stuff possible.
   
   With these adjustments I was able to run `ITKafkaIndexingServiceDataFormatTest` and `ITKafkaIndexingServiceNonTransactionalSerializedTest` with all tests passing by overriding the kafka configs with something like this:
   
   ```
   -Ddruid.test.config.properties.kafka.test.property.security.protocol=SASL_SSL
   -Ddruid.test.config.properties.kafka.test.property.sasl.mechanism=PLAIN
   -Ddruid.test.config.properties.kafka.test.property.sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://{my-hub-name}.servicebus.windows.net/;SharedAccessKeyName={key-name};SharedAccessKey={shared-key}";"
   -Ddruid.test.config.properties.kafka.test.property.bootstrap.servers={my-hub-name}.servicebus.windows.net:9093
   ```
   For `ITKafkaIndexingServiceNonTransactionalParallelizedTest`, I encountered an exception in the test itself with the admin client API trying to change the partition count for `testKafkaIndexDataWithKafkaReshardSplit`:
   
   ```
   java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.InvalidRequestException: This most likely occurs because of a request being malformed by the client library or the message was sent to an incompatible broker. See the broker logs for more details.
   
   	at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
   	at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
   	at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
   	at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
   	at org.apache.druid.testing.utils.KafkaAdminClient.updatePartitionCount(KafkaAdminClient.java:82)
   	at org.apache.druid.tests.indexer.AbstractStreamIndexingTest.testIndexWithStreamReshardHelper(AbstractStreamIndexingTest.java:427)
   ```
   
   so it cannot be run successfully at this time (the other test, `testKafkaIndexDataWithStartStopSupervisor` does succeed), and none of the transactional tests can run since it isn't supported.
   
   
   <hr>
   
   This PR has:
   - [x] been self-reviewed.
   - [x] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis commented on a change in pull request #10463: adjustments to Kafka integration tests to allow running against Azure Event Hubs streams

Posted by GitBox <gi...@apache.org>.
clintropolis commented on a change in pull request #10463:
URL: https://github.com/apache/druid/pull/10463#discussion_r499031039



##########
File path: integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java
##########
@@ -260,4 +269,32 @@ public String getStreamEndpoint()
       }
     };
   }
+
+  // there is probably a better way to do this...
+  static class ArbitraryPropertiesJsonDeserializer extends JsonDeserializer<Map<String, String>>
+  {
+    @Override
+    public Map<String, String> deserialize(JsonParser jsonParser, DeserializationContext deserializationContext)
+        throws IOException
+    {
+      // reading like this results in a map that has both nested objects and also flattened string pairs
+      // so the map looks something like this:
+
+      //    {
+      //      "a" : { "b": { "c" : "d" }}},
+      //      "a.b.c":"d"
+      //    }
+
+      // filtering out the top level keys which do not have string values produces what we want here that
+      // '-Ddruid.test.config.properites.some.property.key=foo' -> { "some.property.key":"foo"}
+      Map<String, Object> parsed = jsonParser.readValueAs(Map.class);
+      Map<String, String> flat = new HashMap<>();
+      for (Map.Entry<String, Object> entry : parsed.entrySet()) {
+        if (entry.getValue() instanceof String) {
+          flat.put(entry.getKey(), (String) entry.getValue());
+        }
+      }

Review comment:
       TBH I'm not entirely sure why the map ends up like it does coming from jackson, but I tried to explain what is going on here in the comments above. This block is just filtering out non-string values, leaving only the flat map after it is done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] suneet-s commented on a change in pull request #10463: adjustments to Kafka integration tests to allow running against Azure Event Hubs streams

Posted by GitBox <gi...@apache.org>.
suneet-s commented on a change in pull request #10463:
URL: https://github.com/apache/druid/pull/10463#discussion_r498875012



##########
File path: integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaEventWriter.java
##########
@@ -42,8 +42,8 @@
   public KafkaEventWriter(IntegrationTestingConfig config, boolean txnEnabled)
   {
     Properties properties = new Properties();
-    KafkaUtil.addPropertiesFromTestConfig(config, properties);
     properties.setProperty("bootstrap.servers", config.getKafkaHost());
+    KafkaUtil.addPropertiesFromTestConfig(config, properties);

Review comment:
       Maybe move this to line 56 so that the config can override other properties like `acks` if needed in the future




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10463: adjustments to Kafka integration tests to allow running against Azure Event Hubs streams

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10463:
URL: https://github.com/apache/druid/pull/10463#discussion_r499157477



##########
File path: integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java
##########
@@ -260,4 +269,32 @@ public String getStreamEndpoint()
       }
     };
   }
+
+  // there is probably a better way to do this...
+  static class ArbitraryPropertiesJsonDeserializer extends JsonDeserializer<Map<String, String>>
+  {
+    @Override
+    public Map<String, String> deserialize(JsonParser jsonParser, DeserializationContext deserializationContext)
+        throws IOException
+    {
+      // reading like this results in a map that has both nested objects and also flattened string pairs
+      // so the map looks something like this:
+
+      //    {
+      //      "a" : { "b": { "c" : "d" }}},
+      //      "a.b.c":"d"
+      //    }
+
+      // filtering out the top level keys which do not have string values produces what we want here that
+      // '-Ddruid.test.config.properites.some.property.key=foo' -> { "some.property.key":"foo"}
+      Map<String, Object> parsed = jsonParser.readValueAs(Map.class);
+      Map<String, String> flat = new HashMap<>();
+      for (Map.Entry<String, Object> entry : parsed.entrySet()) {
+        if (entry.getValue() instanceof String) {
+          flat.put(entry.getKey(), (String) entry.getValue());
+        }
+      }

Review comment:
       If its not been discussed before, its the `org.apache.druid.guice.JsonConfigurator#hieraricalPutValue` behind such kind of duplicate values. Its possible that passing properties such as below may work 
   `druid.test.config.properties={"kafka.test.property.security.protocol": "SASL_SSL", "kafka.test.property.sasl.mechanism":"PLAIN"}`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] suneet-s commented on a change in pull request #10463: adjustments to Kafka integration tests to allow running against Azure Event Hubs streams

Posted by GitBox <gi...@apache.org>.
suneet-s commented on a change in pull request #10463:
URL: https://github.com/apache/druid/pull/10463#discussion_r499034434



##########
File path: integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java
##########
@@ -260,4 +269,32 @@ public String getStreamEndpoint()
       }
     };
   }
+
+  // there is probably a better way to do this...
+  static class ArbitraryPropertiesJsonDeserializer extends JsonDeserializer<Map<String, String>>
+  {
+    @Override
+    public Map<String, String> deserialize(JsonParser jsonParser, DeserializationContext deserializationContext)
+        throws IOException
+    {
+      // reading like this results in a map that has both nested objects and also flattened string pairs
+      // so the map looks something like this:
+
+      //    {
+      //      "a" : { "b": { "c" : "d" }}},
+      //      "a.b.c":"d"
+      //    }
+
+      // filtering out the top level keys which do not have string values produces what we want here that
+      // '-Ddruid.test.config.properites.some.property.key=foo' -> { "some.property.key":"foo"}
+      Map<String, Object> parsed = jsonParser.readValueAs(Map.class);
+      Map<String, String> flat = new HashMap<>();
+      for (Map.Entry<String, Object> entry : parsed.entrySet()) {
+        if (entry.getValue() instanceof String) {
+          flat.put(entry.getKey(), (String) entry.getValue());
+        }
+      }

Review comment:
       I misread your comment earlier I thought an input of 
   
   ```json
   {"a" : { "b": { "c" : "d" }}}}
   ```
   
   would be converted to a map with an entry like
   
   ```
   "a.b.c":"d"
   ```
   
   To help me understand, could you describe an example (sample input + expected output) where the default Deserializer would produce a different output from what this deserializer does?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] suneet-s commented on a change in pull request #10463: adjustments to Kafka integration tests to allow running against Azure Event Hubs streams

Posted by GitBox <gi...@apache.org>.
suneet-s commented on a change in pull request #10463:
URL: https://github.com/apache/druid/pull/10463#discussion_r498872969



##########
File path: integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java
##########
@@ -260,4 +269,32 @@ public String getStreamEndpoint()
       }
     };
   }
+
+  // there is probably a better way to do this...
+  static class ArbitraryPropertiesJsonDeserializer extends JsonDeserializer<Map<String, String>>
+  {
+    @Override
+    public Map<String, String> deserialize(JsonParser jsonParser, DeserializationContext deserializationContext)
+        throws IOException
+    {
+      // reading like this results in a map that has both nested objects and also flattened string pairs
+      // so the map looks something like this:
+
+      //    {
+      //      "a" : { "b": { "c" : "d" }}},
+      //      "a.b.c":"d"
+      //    }
+
+      // filtering out the top level keys which do not have string values produces what we want here that
+      // '-Ddruid.test.config.properites.some.property.key=foo' -> { "some.property.key":"foo"}
+      Map<String, Object> parsed = jsonParser.readValueAs(Map.class);
+      Map<String, String> flat = new HashMap<>();
+      for (Map.Entry<String, Object> entry : parsed.entrySet()) {
+        if (entry.getValue() instanceof String) {
+          flat.put(entry.getKey(), (String) entry.getValue());
+        }
+      }

Review comment:
       How does this flatten the map? It looks like it's just ignoring any value that isn't a String? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] suneet-s merged pull request #10463: adjustments to Kafka integration tests to allow running against Azure Event Hubs streams

Posted by GitBox <gi...@apache.org>.
suneet-s merged pull request #10463:
URL: https://github.com/apache/druid/pull/10463


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org