You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2020/04/28 07:48:31 UTC

[GitHub] [druid] jihoonson opened a new pull request #9783: Integration tests for stream ingestion with various data formats

jihoonson opened a new pull request #9783:
URL: https://github.com/apache/druid/pull/9783


   ### Description
   
   This PR adds new integration tests for stream ingestion against different data formats. To do this, a new interface `EventSerializer` was added which converts generated events using `SyntheticStreamGenerator` into a particular data format.
   
   The new tests automatically pick up serializer, inputFormat, and inputRowParser specs from test resources and populate test parameters. Since these tests take about 30 mins when you run in parallel with 2 threads, a new Travis job was added.
   
   <hr>
   
   This PR has:
   - [x] been self-reviewed.
      - [ ] using the [concurrency checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md) (Remove this item if the PR doesn't have any relation to concurrency.)
   - [ ] added documentation for new or modified features or behaviors.
   - [x] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/licenses.yaml)
   - [ ] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
   - [ ] added unit tests or modified existing tests to cover new code paths.
   - [x] added integration tests.
   - [ ] been tested in a test Druid cluster.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #9783: Integration tests for stream ingestion with various data formats

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #9783:
URL: https://github.com/apache/druid/pull/9783#discussion_r417620233



##########
File path: integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
##########
@@ -67,92 +92,147 @@
   private IntegrationTestingConfig config;
 
   private StreamAdminClient streamAdminClient;
-  private WikipediaStreamEventStreamGenerator wikipediaStreamEventGenerator;
 
   abstract StreamAdminClient createStreamAdminClient(IntegrationTestingConfig config) throws Exception;
-  abstract StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config) throws Exception;
-  abstract Function<String, String> generateStreamIngestionPropsTransform(String streamName,
-                                                                          String fullDatasourceName,
-                                                                          IntegrationTestingConfig config);
+
+  abstract StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config, boolean transactionEnabled)

Review comment:
       Good idea. Raised https://github.com/apache/druid/pull/9795.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #9783: Integration tests for stream ingestion with various data formats

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #9783:
URL: https://github.com/apache/druid/pull/9783#discussion_r417633174



##########
File path: integration-tests/src/test/resources/testng.xml
##########
@@ -20,7 +20,7 @@
 <!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd" >
 
 
-<suite name="IntegrationTestSuite">
+<suite name="IntegrationTestSuite" data-provider-thread-count="2">

Review comment:
       Oh hmm, should we document those by ourselves? TestNG has already a couple of documents in their official document, Javadoc, and `testng.dtd` and I thought they would be enough.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] ccaominh commented on a change in pull request #9783: Integration tests for stream ingestion with various data formats

Posted by GitBox <gi...@apache.org>.
ccaominh commented on a change in pull request #9783:
URL: https://github.com/apache/druid/pull/9783#discussion_r416958063



##########
File path: integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceDataFormatTest.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.parallelized;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.tests.TestNGGroup;
+import org.apache.druid.tests.indexer.AbstractKinesisIndexingServiceTest;
+import org.apache.druid.tests.indexer.AbstractStreamIndexingTest;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+@Test(groups = "kinesis-data-format")

Review comment:
       Keeping it separate is nice for running the groups concurrently. Perhaps we can explore TestNG [groups of groups](https://testng.org/doc/documentation-main.html#groups-of-groups) later to simplify running multiple groups.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on pull request #9783: Integration tests for stream ingestion with various data formats

Posted by GitBox <gi...@apache.org>.
jihoonson commented on pull request #9783:
URL: https://github.com/apache/druid/pull/9783#issuecomment-621442610


   @clintropolis @ccaominh @maytasm thank you for reviewing this PR. Merged it now. @maytasm let me know what you think about my last comments.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #9783: Integration tests for stream ingestion with various data formats

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #9783:
URL: https://github.com/apache/druid/pull/9783#discussion_r416956654



##########
File path: integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceDataFormatTest.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.parallelized;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.tests.TestNGGroup;
+import org.apache.druid.tests.indexer.AbstractKinesisIndexingServiceTest;
+import org.apache.druid.tests.indexer.AbstractStreamIndexingTest;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+@Test(groups = "kinesis-data-format")

Review comment:
       On second thought, it might be useful to have a different group if we run this test on some CI in the future. I added a new group to `TestNGGroup`. The new group is disabled on Travis.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #9783: Integration tests for stream ingestion with various data formats

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #9783:
URL: https://github.com/apache/druid/pull/9783#discussion_r416944603



##########
File path: integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceDataFormatTest.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.parallelized;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.tests.TestNGGroup;
+import org.apache.druid.tests.indexer.AbstractKinesisIndexingServiceTest;
+import org.apache.druid.tests.indexer.AbstractStreamIndexingTest;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+@Test(groups = "kinesis-data-format")

Review comment:
       Oh, I forgot to revert this change. I think it would be better to add this test to the existing `kinesis-index` group since we run them manually and probably want to run them all at once. What do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #9783: Integration tests for stream ingestion with various data formats

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #9783:
URL: https://github.com/apache/druid/pull/9783#discussion_r417538264



##########
File path: integration-tests/src/test/resources/testng.xml
##########
@@ -20,7 +20,7 @@
 <!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd" >
 
 
-<suite name="IntegrationTestSuite">
+<suite name="IntegrationTestSuite" data-provider-thread-count="2">

Review comment:
       It seems Travis didn't pick up my last commit due to an outage yesterday. I added some in README.
   
   For `thread-count` and `data-provider-thread-count`, they are currently fixed for `IntegrationTestSuite` as far as I can tell ([command line parameters will be overridden by suite parameters](https://testng.org/doc/documentation-main.html#running-testng)). I think it could be better to set them via command line parameters, so that we can have more control over them when we want to run some tests locally.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #9783: Integration tests for stream ingestion with various data formats

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #9783:
URL: https://github.com/apache/druid/pull/9783#discussion_r416997861



##########
File path: integration-tests/src/test/resources/testng.xml
##########
@@ -20,7 +20,7 @@
 <!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd" >
 
 
-<suite name="IntegrationTestSuite">
+<suite name="IntegrationTestSuite" data-provider-thread-count="2">

Review comment:
       nit: might be helpful to add what this does / how to use in README.md




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #9783: Integration tests for stream ingestion with various data formats

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #9783:
URL: https://github.com/apache/druid/pull/9783#discussion_r417545565



##########
File path: integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
##########
@@ -67,92 +92,147 @@
   private IntegrationTestingConfig config;
 
   private StreamAdminClient streamAdminClient;
-  private WikipediaStreamEventStreamGenerator wikipediaStreamEventGenerator;
 
   abstract StreamAdminClient createStreamAdminClient(IntegrationTestingConfig config) throws Exception;
-  abstract StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config) throws Exception;
-  abstract Function<String, String> generateStreamIngestionPropsTransform(String streamName,
-                                                                          String fullDatasourceName,
-                                                                          IntegrationTestingConfig config);
+
+  abstract StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config, boolean transactionEnabled)

Review comment:
       Hmm, looking at again, I'm not sure how this can be implemented. Even though only `KafkaEventWriter` needs `transactionEnabled`, `createStreamEventWriter()` is called in `AbstractStreamIndexingTest` which is a super class of both `AbstractKafkaIndexingServiceTest` and `AbstractKinesisIndexingServiceTest`. That means, `AbstractStreamIndexingTest` needs to know about `transactionEnabled` parameter when it calls `createStreamEventWriter()`. Before this PR, you added `isKafkaWriterTransactionalEnabled()` to `AbstractKafkaIndexingServiceTest`, but this seems working only when all child classes of `AbstractKafkaIndexingServiceTest` run with only either transaction enabled or disabled which is not true in `ITKafkaIndexingServiceDataFormatTest` anymore. I prefer to parameterize this class to run with different transaction mode since it's better for code maintenance (we don't have to keep two separate test classes for each mode in sync).
   
   A simple workaround I can come up with is adding a javadoc for this method explaining `transactionEnabled` parameter works for only Kafka. I assume this is ok since the `StreamEventWriter` interface has methods `isTransactionEnabled`, `initTransaction`, and `commitTransaction` which are implemented only for Kafka properly.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #9783: Integration tests for stream ingestion with various data formats

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #9783:
URL: https://github.com/apache/druid/pull/9783#discussion_r417613167



##########
File path: integration-tests/src/test/resources/testng.xml
##########
@@ -20,7 +20,7 @@
 <!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd" >
 
 
-<suite name="IntegrationTestSuite">
+<suite name="IntegrationTestSuite" data-provider-thread-count="2">

Review comment:
       My thought behind this comment was to mention `@DataProvider(parallel = true)` and how that can be uses/works with data-provider-thread-count. Regarding, the fixed value, we can also move it to command line.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #9783: Integration tests for stream ingestion with various data formats

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #9783:
URL: https://github.com/apache/druid/pull/9783#discussion_r417613167



##########
File path: integration-tests/src/test/resources/testng.xml
##########
@@ -20,7 +20,7 @@
 <!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd" >
 
 
-<suite name="IntegrationTestSuite">
+<suite name="IntegrationTestSuite" data-provider-thread-count="2">

Review comment:
       My thought behind this comment was to mention @DataProvider(parallel = true) and how that can be uses/works with data-provider-thread-count. Regarding, the fixed value, we can also move it to command line.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on pull request #9783: Integration tests for stream ingestion with various data formats

Posted by GitBox <gi...@apache.org>.
jihoonson commented on pull request #9783:
URL: https://github.com/apache/druid/pull/9783#issuecomment-620854252


   Manually ran the new Kinesis test on my laptop.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #9783: Integration tests for stream ingestion with various data formats

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #9783:
URL: https://github.com/apache/druid/pull/9783#discussion_r417608147



##########
File path: integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
##########
@@ -67,92 +92,147 @@
   private IntegrationTestingConfig config;
 
   private StreamAdminClient streamAdminClient;
-  private WikipediaStreamEventStreamGenerator wikipediaStreamEventGenerator;
 
   abstract StreamAdminClient createStreamAdminClient(IntegrationTestingConfig config) throws Exception;
-  abstract StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config) throws Exception;
-  abstract Function<String, String> generateStreamIngestionPropsTransform(String streamName,
-                                                                          String fullDatasourceName,
-                                                                          IntegrationTestingConfig config);
+
+  abstract StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config, boolean transactionEnabled)

Review comment:
       hmm. I see. We can keep it this way then. My concern is that in Kinesis test class we are passing arbitrary meaningless true/false which does not do anything and might be misleading. I think Javadoc on the Kinesis test classes (ITKinesisIndexingServiceSerializedTest, etc) will definitely help. Another idea is to make boolean into Boolean and have the Kinesis test classes pass null instead of true/false to be more explicit that this transactionEnabled flag does not apply to Kinesis.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #9783: Integration tests for stream ingestion with various data formats

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #9783:
URL: https://github.com/apache/druid/pull/9783#discussion_r417000607



##########
File path: integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
##########
@@ -67,92 +92,147 @@
   private IntegrationTestingConfig config;
 
   private StreamAdminClient streamAdminClient;
-  private WikipediaStreamEventStreamGenerator wikipediaStreamEventGenerator;
 
   abstract StreamAdminClient createStreamAdminClient(IntegrationTestingConfig config) throws Exception;
-  abstract StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config) throws Exception;
-  abstract Function<String, String> generateStreamIngestionPropsTransform(String streamName,
-                                                                          String fullDatasourceName,
-                                                                          IntegrationTestingConfig config);
+
+  abstract StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config, boolean transactionEnabled)
+      throws Exception;
+
+  abstract Function<String, String> generateStreamIngestionPropsTransform(
+      String streamName,
+      String fullDatasourceName,
+      String parserType,
+      String parserOrInputFormat,
+      IntegrationTestingConfig config
+  );
+
   abstract Function<String, String> generateStreamQueryPropsTransform(String streamName, String fullDatasourceName);
+
   public abstract String getTestNamePrefix();
 
   protected void doBeforeClass() throws Exception
   {
     streamAdminClient = createStreamAdminClient(config);
-    wikipediaStreamEventGenerator = new WikipediaStreamEventStreamGenerator(EVENTS_PER_SECOND, CYCLE_PADDING_MS);
   }
 
-  protected void doClassTeardown()
+  private static String getOnlyResourcePath(String resourceRoot) throws IOException
   {
-    wikipediaStreamEventGenerator.shutdown();
+    return String.join("/", resourceRoot, Iterables.getOnlyElement(listResources(resourceRoot)));
   }
 
-  protected void doTestIndexDataWithLegacyParserStableState() throws Exception
+  protected static List<String> listDataFormatResources() throws IOException
   {
-    StreamEventWriter streamEventWriter = createStreamEventWriter(config);
-    final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig();
-    try (
-        final Closeable ignored1 = unloader(generatedTestConfig.getFullDatasourceName())
-    ) {
-      final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform().apply(getResourceAsString(INDEXER_FILE_LEGACY_PARSER));
-      LOG.info("supervisorSpec: [%s]\n", taskSpec);
-      // Start supervisor
-      generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
-      LOG.info("Submitted supervisor");
-      // Start data generator
-      wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, TOTAL_NUMBER_OF_SECOND, FIRST_EVENT_TIME);
-      verifyIngestedData(generatedTestConfig);
+    return listResources(DATA_RESOURCE_ROOT)
+        .stream()
+        .filter(resource -> !SUPERVISOR_SPEC_TEMPLATE_FILE.equals(resource))
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Returns a map of key to path to spec. The returned map contains at least 2 specs and one of them
+   * should be a {@link #SERIALIZER} spec.
+   */
+  protected static Map<String, String> findTestSpecs(String resourceRoot) throws IOException
+  {
+    final List<String> specDirs = listResources(resourceRoot);
+    final Map<String, String> map = new HashMap<>();
+    for (String eachSpec : specDirs) {
+      if (SERIALIZER_SPEC_DIR.equals(eachSpec)) {
+        map.put(SERIALIZER, getOnlyResourcePath(String.join("/", resourceRoot, SERIALIZER_SPEC_DIR)));
+      } else if (INPUT_ROW_PARSER_SPEC_DIR.equals(eachSpec)) {
+        map.put(INPUT_ROW_PARSER, getOnlyResourcePath(String.join("/", resourceRoot, INPUT_ROW_PARSER_SPEC_DIR)));
+      } else if (INPUT_FORMAT_SPEC_DIR.equals(eachSpec)) {
+        map.put(INPUT_FORMAT, getOnlyResourcePath(String.join("/", resourceRoot, INPUT_FORMAT_SPEC_DIR)));
+      }
     }
-    finally {
-      doMethodTeardown(generatedTestConfig, streamEventWriter);
+    if (!map.containsKey(SERIALIZER_SPEC_DIR)) {
+      throw new IAE("Failed to find serializer spec under [%s]. Found resources are %s", resourceRoot, map);
     }
+    if (map.size() == 1) {
+      throw new IAE("Failed to find input format or parser spec under [%s]. Found resources are %s", resourceRoot, map);
+    }
+    return map;
+  }
+
+  private Closeable createResourceCloser(GeneratedTestConfig generatedTestConfig)
+  {
+    return Closer.create().register(() -> doMethodTeardown(generatedTestConfig));
   }
 
-  protected void doTestIndexDataWithInputFormatStableState() throws Exception
+  protected void doTestIndexDataStableState(
+      boolean transactionEnabled,
+      String serializerPath,
+      String parserType,
+      String specPath
+  ) throws Exception
   {
-    StreamEventWriter streamEventWriter = createStreamEventWriter(config);
-    final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig();
+    final EventSerializer serializer = jsonMapper.readValue(getResourceAsStream(serializerPath), EventSerializer.class);
+    final StreamGenerator streamGenerator = new WikipediaStreamEventStreamGenerator(
+        serializer,
+        EVENTS_PER_SECOND,
+        CYCLE_PADDING_MS
+    );
+    final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(parserType, getResourceAsString(specPath));
     try (
-        final Closeable ignored1 = unloader(generatedTestConfig.getFullDatasourceName())
+        final Closeable closer = createResourceCloser(generatedTestConfig);
+        final StreamEventWriter streamEventWriter = createStreamEventWriter(config, transactionEnabled)
     ) {
-      final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform().apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
+      final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform()
+                                                 .apply(getResourceAsString(SUPERVISOR_SPEC_TEMPLATE_PATH));
       LOG.info("supervisorSpec: [%s]\n", taskSpec);
       // Start supervisor
       generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
       LOG.info("Submitted supervisor");
       // Start data generator
-      wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, TOTAL_NUMBER_OF_SECOND, FIRST_EVENT_TIME);
+      streamGenerator.run(
+          generatedTestConfig.getStreamName(),
+          streamEventWriter,
+          TOTAL_NUMBER_OF_SECOND,
+          FIRST_EVENT_TIME
+      );
       verifyIngestedData(generatedTestConfig);
     }
-    finally {
-      doMethodTeardown(generatedTestConfig, streamEventWriter);
-    }
   }
 
-  void doTestIndexDataWithLosingCoordinator() throws Exception
+  void doTestIndexDataWithLosingCoordinator(boolean transactionEnabled) throws Exception

Review comment:
       Good point. Can I do it in a follow-up PR with the change for README.md you commented below?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] ccaominh commented on a change in pull request #9783: Integration tests for stream ingestion with various data formats

Posted by GitBox <gi...@apache.org>.
ccaominh commented on a change in pull request #9783:
URL: https://github.com/apache/druid/pull/9783#discussion_r416931177



##########
File path: integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceDataFormatTest.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.parallelized;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.tests.TestNGGroup;
+import org.apache.druid.tests.indexer.AbstractKinesisIndexingServiceTest;
+import org.apache.druid.tests.indexer.AbstractStreamIndexingTest;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+@Test(groups = "kinesis-data-format")

Review comment:
       Can add a new named constant to `TestNGGroup` for "kinesis-data-format"




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #9783: Integration tests for stream ingestion with various data formats

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #9783:
URL: https://github.com/apache/druid/pull/9783#discussion_r417007253



##########
File path: integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
##########
@@ -67,92 +92,147 @@
   private IntegrationTestingConfig config;
 
   private StreamAdminClient streamAdminClient;
-  private WikipediaStreamEventStreamGenerator wikipediaStreamEventGenerator;
 
   abstract StreamAdminClient createStreamAdminClient(IntegrationTestingConfig config) throws Exception;
-  abstract StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config) throws Exception;
-  abstract Function<String, String> generateStreamIngestionPropsTransform(String streamName,
-                                                                          String fullDatasourceName,
-                                                                          IntegrationTestingConfig config);
+
+  abstract StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config, boolean transactionEnabled)
+      throws Exception;
+
+  abstract Function<String, String> generateStreamIngestionPropsTransform(
+      String streamName,
+      String fullDatasourceName,
+      String parserType,
+      String parserOrInputFormat,
+      IntegrationTestingConfig config
+  );
+
   abstract Function<String, String> generateStreamQueryPropsTransform(String streamName, String fullDatasourceName);
+
   public abstract String getTestNamePrefix();
 
   protected void doBeforeClass() throws Exception
   {
     streamAdminClient = createStreamAdminClient(config);
-    wikipediaStreamEventGenerator = new WikipediaStreamEventStreamGenerator(EVENTS_PER_SECOND, CYCLE_PADDING_MS);
   }
 
-  protected void doClassTeardown()
+  private static String getOnlyResourcePath(String resourceRoot) throws IOException
   {
-    wikipediaStreamEventGenerator.shutdown();
+    return String.join("/", resourceRoot, Iterables.getOnlyElement(listResources(resourceRoot)));
   }
 
-  protected void doTestIndexDataWithLegacyParserStableState() throws Exception
+  protected static List<String> listDataFormatResources() throws IOException
   {
-    StreamEventWriter streamEventWriter = createStreamEventWriter(config);
-    final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig();
-    try (
-        final Closeable ignored1 = unloader(generatedTestConfig.getFullDatasourceName())
-    ) {
-      final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform().apply(getResourceAsString(INDEXER_FILE_LEGACY_PARSER));
-      LOG.info("supervisorSpec: [%s]\n", taskSpec);
-      // Start supervisor
-      generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
-      LOG.info("Submitted supervisor");
-      // Start data generator
-      wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, TOTAL_NUMBER_OF_SECOND, FIRST_EVENT_TIME);
-      verifyIngestedData(generatedTestConfig);
+    return listResources(DATA_RESOURCE_ROOT)
+        .stream()
+        .filter(resource -> !SUPERVISOR_SPEC_TEMPLATE_FILE.equals(resource))
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Returns a map of key to path to spec. The returned map contains at least 2 specs and one of them
+   * should be a {@link #SERIALIZER} spec.
+   */
+  protected static Map<String, String> findTestSpecs(String resourceRoot) throws IOException
+  {
+    final List<String> specDirs = listResources(resourceRoot);
+    final Map<String, String> map = new HashMap<>();
+    for (String eachSpec : specDirs) {
+      if (SERIALIZER_SPEC_DIR.equals(eachSpec)) {
+        map.put(SERIALIZER, getOnlyResourcePath(String.join("/", resourceRoot, SERIALIZER_SPEC_DIR)));
+      } else if (INPUT_ROW_PARSER_SPEC_DIR.equals(eachSpec)) {
+        map.put(INPUT_ROW_PARSER, getOnlyResourcePath(String.join("/", resourceRoot, INPUT_ROW_PARSER_SPEC_DIR)));
+      } else if (INPUT_FORMAT_SPEC_DIR.equals(eachSpec)) {
+        map.put(INPUT_FORMAT, getOnlyResourcePath(String.join("/", resourceRoot, INPUT_FORMAT_SPEC_DIR)));
+      }
     }
-    finally {
-      doMethodTeardown(generatedTestConfig, streamEventWriter);
+    if (!map.containsKey(SERIALIZER_SPEC_DIR)) {
+      throw new IAE("Failed to find serializer spec under [%s]. Found resources are %s", resourceRoot, map);
     }
+    if (map.size() == 1) {
+      throw new IAE("Failed to find input format or parser spec under [%s]. Found resources are %s", resourceRoot, map);
+    }
+    return map;
+  }
+
+  private Closeable createResourceCloser(GeneratedTestConfig generatedTestConfig)
+  {
+    return Closer.create().register(() -> doMethodTeardown(generatedTestConfig));
   }
 
-  protected void doTestIndexDataWithInputFormatStableState() throws Exception
+  protected void doTestIndexDataStableState(
+      boolean transactionEnabled,
+      String serializerPath,
+      String parserType,
+      String specPath
+  ) throws Exception
   {
-    StreamEventWriter streamEventWriter = createStreamEventWriter(config);
-    final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig();
+    final EventSerializer serializer = jsonMapper.readValue(getResourceAsStream(serializerPath), EventSerializer.class);
+    final StreamGenerator streamGenerator = new WikipediaStreamEventStreamGenerator(
+        serializer,
+        EVENTS_PER_SECOND,
+        CYCLE_PADDING_MS
+    );
+    final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(parserType, getResourceAsString(specPath));
     try (
-        final Closeable ignored1 = unloader(generatedTestConfig.getFullDatasourceName())
+        final Closeable closer = createResourceCloser(generatedTestConfig);
+        final StreamEventWriter streamEventWriter = createStreamEventWriter(config, transactionEnabled)
     ) {
-      final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform().apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
+      final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform()
+                                                 .apply(getResourceAsString(SUPERVISOR_SPEC_TEMPLATE_PATH));
       LOG.info("supervisorSpec: [%s]\n", taskSpec);
       // Start supervisor
       generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
       LOG.info("Submitted supervisor");
       // Start data generator
-      wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, TOTAL_NUMBER_OF_SECOND, FIRST_EVENT_TIME);
+      streamGenerator.run(
+          generatedTestConfig.getStreamName(),
+          streamEventWriter,
+          TOTAL_NUMBER_OF_SECOND,
+          FIRST_EVENT_TIME
+      );
       verifyIngestedData(generatedTestConfig);
     }
-    finally {
-      doMethodTeardown(generatedTestConfig, streamEventWriter);
-    }
   }
 
-  void doTestIndexDataWithLosingCoordinator() throws Exception
+  void doTestIndexDataWithLosingCoordinator(boolean transactionEnabled) throws Exception

Review comment:
       Sure




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #9783: Integration tests for stream ingestion with various data formats

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #9783:
URL: https://github.com/apache/druid/pull/9783#discussion_r416995812



##########
File path: integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
##########
@@ -67,92 +92,147 @@
   private IntegrationTestingConfig config;
 
   private StreamAdminClient streamAdminClient;
-  private WikipediaStreamEventStreamGenerator wikipediaStreamEventGenerator;
 
   abstract StreamAdminClient createStreamAdminClient(IntegrationTestingConfig config) throws Exception;
-  abstract StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config) throws Exception;
-  abstract Function<String, String> generateStreamIngestionPropsTransform(String streamName,
-                                                                          String fullDatasourceName,
-                                                                          IntegrationTestingConfig config);
+
+  abstract StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config, boolean transactionEnabled)
+      throws Exception;
+
+  abstract Function<String, String> generateStreamIngestionPropsTransform(
+      String streamName,
+      String fullDatasourceName,
+      String parserType,
+      String parserOrInputFormat,
+      IntegrationTestingConfig config
+  );
+
   abstract Function<String, String> generateStreamQueryPropsTransform(String streamName, String fullDatasourceName);
+
   public abstract String getTestNamePrefix();
 
   protected void doBeforeClass() throws Exception
   {
     streamAdminClient = createStreamAdminClient(config);
-    wikipediaStreamEventGenerator = new WikipediaStreamEventStreamGenerator(EVENTS_PER_SECOND, CYCLE_PADDING_MS);
   }
 
-  protected void doClassTeardown()
+  private static String getOnlyResourcePath(String resourceRoot) throws IOException
   {
-    wikipediaStreamEventGenerator.shutdown();
+    return String.join("/", resourceRoot, Iterables.getOnlyElement(listResources(resourceRoot)));
   }
 
-  protected void doTestIndexDataWithLegacyParserStableState() throws Exception
+  protected static List<String> listDataFormatResources() throws IOException
   {
-    StreamEventWriter streamEventWriter = createStreamEventWriter(config);
-    final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig();
-    try (
-        final Closeable ignored1 = unloader(generatedTestConfig.getFullDatasourceName())
-    ) {
-      final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform().apply(getResourceAsString(INDEXER_FILE_LEGACY_PARSER));
-      LOG.info("supervisorSpec: [%s]\n", taskSpec);
-      // Start supervisor
-      generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
-      LOG.info("Submitted supervisor");
-      // Start data generator
-      wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, TOTAL_NUMBER_OF_SECOND, FIRST_EVENT_TIME);
-      verifyIngestedData(generatedTestConfig);
+    return listResources(DATA_RESOURCE_ROOT)
+        .stream()
+        .filter(resource -> !SUPERVISOR_SPEC_TEMPLATE_FILE.equals(resource))
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Returns a map of key to path to spec. The returned map contains at least 2 specs and one of them
+   * should be a {@link #SERIALIZER} spec.
+   */
+  protected static Map<String, String> findTestSpecs(String resourceRoot) throws IOException
+  {
+    final List<String> specDirs = listResources(resourceRoot);
+    final Map<String, String> map = new HashMap<>();
+    for (String eachSpec : specDirs) {
+      if (SERIALIZER_SPEC_DIR.equals(eachSpec)) {
+        map.put(SERIALIZER, getOnlyResourcePath(String.join("/", resourceRoot, SERIALIZER_SPEC_DIR)));
+      } else if (INPUT_ROW_PARSER_SPEC_DIR.equals(eachSpec)) {
+        map.put(INPUT_ROW_PARSER, getOnlyResourcePath(String.join("/", resourceRoot, INPUT_ROW_PARSER_SPEC_DIR)));
+      } else if (INPUT_FORMAT_SPEC_DIR.equals(eachSpec)) {
+        map.put(INPUT_FORMAT, getOnlyResourcePath(String.join("/", resourceRoot, INPUT_FORMAT_SPEC_DIR)));
+      }
     }
-    finally {
-      doMethodTeardown(generatedTestConfig, streamEventWriter);
+    if (!map.containsKey(SERIALIZER_SPEC_DIR)) {
+      throw new IAE("Failed to find serializer spec under [%s]. Found resources are %s", resourceRoot, map);
     }
+    if (map.size() == 1) {
+      throw new IAE("Failed to find input format or parser spec under [%s]. Found resources are %s", resourceRoot, map);
+    }
+    return map;
+  }
+
+  private Closeable createResourceCloser(GeneratedTestConfig generatedTestConfig)
+  {
+    return Closer.create().register(() -> doMethodTeardown(generatedTestConfig));
   }
 
-  protected void doTestIndexDataWithInputFormatStableState() throws Exception
+  protected void doTestIndexDataStableState(
+      boolean transactionEnabled,
+      String serializerPath,
+      String parserType,
+      String specPath
+  ) throws Exception
   {
-    StreamEventWriter streamEventWriter = createStreamEventWriter(config);
-    final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig();
+    final EventSerializer serializer = jsonMapper.readValue(getResourceAsStream(serializerPath), EventSerializer.class);
+    final StreamGenerator streamGenerator = new WikipediaStreamEventStreamGenerator(
+        serializer,
+        EVENTS_PER_SECOND,
+        CYCLE_PADDING_MS
+    );
+    final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(parserType, getResourceAsString(specPath));
     try (
-        final Closeable ignored1 = unloader(generatedTestConfig.getFullDatasourceName())
+        final Closeable closer = createResourceCloser(generatedTestConfig);
+        final StreamEventWriter streamEventWriter = createStreamEventWriter(config, transactionEnabled)
     ) {
-      final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform().apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
+      final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform()
+                                                 .apply(getResourceAsString(SUPERVISOR_SPEC_TEMPLATE_PATH));
       LOG.info("supervisorSpec: [%s]\n", taskSpec);
       // Start supervisor
       generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
       LOG.info("Submitted supervisor");
       // Start data generator
-      wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, TOTAL_NUMBER_OF_SECOND, FIRST_EVENT_TIME);
+      streamGenerator.run(
+          generatedTestConfig.getStreamName(),
+          streamEventWriter,
+          TOTAL_NUMBER_OF_SECOND,
+          FIRST_EVENT_TIME
+      );
       verifyIngestedData(generatedTestConfig);
     }
-    finally {
-      doMethodTeardown(generatedTestConfig, streamEventWriter);
-    }
   }
 
-  void doTestIndexDataWithLosingCoordinator() throws Exception
+  void doTestIndexDataWithLosingCoordinator(boolean transactionEnabled) throws Exception

Review comment:
       Kinesis does not have transaction enabled/disabled. Does it make sense to put this at Kafka layer instead of here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #9783: Integration tests for stream ingestion with various data formats

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #9783:
URL: https://github.com/apache/druid/pull/9783#discussion_r417007253



##########
File path: integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
##########
@@ -67,92 +92,147 @@
   private IntegrationTestingConfig config;
 
   private StreamAdminClient streamAdminClient;
-  private WikipediaStreamEventStreamGenerator wikipediaStreamEventGenerator;
 
   abstract StreamAdminClient createStreamAdminClient(IntegrationTestingConfig config) throws Exception;
-  abstract StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config) throws Exception;
-  abstract Function<String, String> generateStreamIngestionPropsTransform(String streamName,
-                                                                          String fullDatasourceName,
-                                                                          IntegrationTestingConfig config);
+
+  abstract StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config, boolean transactionEnabled)
+      throws Exception;
+
+  abstract Function<String, String> generateStreamIngestionPropsTransform(
+      String streamName,
+      String fullDatasourceName,
+      String parserType,
+      String parserOrInputFormat,
+      IntegrationTestingConfig config
+  );
+
   abstract Function<String, String> generateStreamQueryPropsTransform(String streamName, String fullDatasourceName);
+
   public abstract String getTestNamePrefix();
 
   protected void doBeforeClass() throws Exception
   {
     streamAdminClient = createStreamAdminClient(config);
-    wikipediaStreamEventGenerator = new WikipediaStreamEventStreamGenerator(EVENTS_PER_SECOND, CYCLE_PADDING_MS);
   }
 
-  protected void doClassTeardown()
+  private static String getOnlyResourcePath(String resourceRoot) throws IOException
   {
-    wikipediaStreamEventGenerator.shutdown();
+    return String.join("/", resourceRoot, Iterables.getOnlyElement(listResources(resourceRoot)));
   }
 
-  protected void doTestIndexDataWithLegacyParserStableState() throws Exception
+  protected static List<String> listDataFormatResources() throws IOException
   {
-    StreamEventWriter streamEventWriter = createStreamEventWriter(config);
-    final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig();
-    try (
-        final Closeable ignored1 = unloader(generatedTestConfig.getFullDatasourceName())
-    ) {
-      final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform().apply(getResourceAsString(INDEXER_FILE_LEGACY_PARSER));
-      LOG.info("supervisorSpec: [%s]\n", taskSpec);
-      // Start supervisor
-      generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
-      LOG.info("Submitted supervisor");
-      // Start data generator
-      wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, TOTAL_NUMBER_OF_SECOND, FIRST_EVENT_TIME);
-      verifyIngestedData(generatedTestConfig);
+    return listResources(DATA_RESOURCE_ROOT)
+        .stream()
+        .filter(resource -> !SUPERVISOR_SPEC_TEMPLATE_FILE.equals(resource))
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Returns a map of key to path to spec. The returned map contains at least 2 specs and one of them
+   * should be a {@link #SERIALIZER} spec.
+   */
+  protected static Map<String, String> findTestSpecs(String resourceRoot) throws IOException
+  {
+    final List<String> specDirs = listResources(resourceRoot);
+    final Map<String, String> map = new HashMap<>();
+    for (String eachSpec : specDirs) {
+      if (SERIALIZER_SPEC_DIR.equals(eachSpec)) {
+        map.put(SERIALIZER, getOnlyResourcePath(String.join("/", resourceRoot, SERIALIZER_SPEC_DIR)));
+      } else if (INPUT_ROW_PARSER_SPEC_DIR.equals(eachSpec)) {
+        map.put(INPUT_ROW_PARSER, getOnlyResourcePath(String.join("/", resourceRoot, INPUT_ROW_PARSER_SPEC_DIR)));
+      } else if (INPUT_FORMAT_SPEC_DIR.equals(eachSpec)) {
+        map.put(INPUT_FORMAT, getOnlyResourcePath(String.join("/", resourceRoot, INPUT_FORMAT_SPEC_DIR)));
+      }
     }
-    finally {
-      doMethodTeardown(generatedTestConfig, streamEventWriter);
+    if (!map.containsKey(SERIALIZER_SPEC_DIR)) {
+      throw new IAE("Failed to find serializer spec under [%s]. Found resources are %s", resourceRoot, map);
     }
+    if (map.size() == 1) {
+      throw new IAE("Failed to find input format or parser spec under [%s]. Found resources are %s", resourceRoot, map);
+    }
+    return map;
+  }
+
+  private Closeable createResourceCloser(GeneratedTestConfig generatedTestConfig)
+  {
+    return Closer.create().register(() -> doMethodTeardown(generatedTestConfig));
   }
 
-  protected void doTestIndexDataWithInputFormatStableState() throws Exception
+  protected void doTestIndexDataStableState(
+      boolean transactionEnabled,
+      String serializerPath,
+      String parserType,
+      String specPath
+  ) throws Exception
   {
-    StreamEventWriter streamEventWriter = createStreamEventWriter(config);
-    final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig();
+    final EventSerializer serializer = jsonMapper.readValue(getResourceAsStream(serializerPath), EventSerializer.class);
+    final StreamGenerator streamGenerator = new WikipediaStreamEventStreamGenerator(
+        serializer,
+        EVENTS_PER_SECOND,
+        CYCLE_PADDING_MS
+    );
+    final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(parserType, getResourceAsString(specPath));
     try (
-        final Closeable ignored1 = unloader(generatedTestConfig.getFullDatasourceName())
+        final Closeable closer = createResourceCloser(generatedTestConfig);
+        final StreamEventWriter streamEventWriter = createStreamEventWriter(config, transactionEnabled)
     ) {
-      final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform().apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
+      final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform()
+                                                 .apply(getResourceAsString(SUPERVISOR_SPEC_TEMPLATE_PATH));
       LOG.info("supervisorSpec: [%s]\n", taskSpec);
       // Start supervisor
       generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
       LOG.info("Submitted supervisor");
       // Start data generator
-      wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, TOTAL_NUMBER_OF_SECOND, FIRST_EVENT_TIME);
+      streamGenerator.run(
+          generatedTestConfig.getStreamName(),
+          streamEventWriter,
+          TOTAL_NUMBER_OF_SECOND,
+          FIRST_EVENT_TIME
+      );
       verifyIngestedData(generatedTestConfig);
     }
-    finally {
-      doMethodTeardown(generatedTestConfig, streamEventWriter);
-    }
   }
 
-  void doTestIndexDataWithLosingCoordinator() throws Exception
+  void doTestIndexDataWithLosingCoordinator(boolean transactionEnabled) throws Exception

Review comment:
       Sure, if you do not need further change then feel free to deferred this for separate PR




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org