You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2020/04/29 00:03:17 UTC

[GitHub] [druid] maytasm commented on a change in pull request #9783: Integration tests for stream ingestion with various data formats

maytasm commented on a change in pull request #9783:
URL: https://github.com/apache/druid/pull/9783#discussion_r416995812



##########
File path: integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
##########
@@ -67,92 +92,147 @@
   private IntegrationTestingConfig config;
 
   private StreamAdminClient streamAdminClient;
-  private WikipediaStreamEventStreamGenerator wikipediaStreamEventGenerator;
 
   abstract StreamAdminClient createStreamAdminClient(IntegrationTestingConfig config) throws Exception;
-  abstract StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config) throws Exception;
-  abstract Function<String, String> generateStreamIngestionPropsTransform(String streamName,
-                                                                          String fullDatasourceName,
-                                                                          IntegrationTestingConfig config);
+
+  abstract StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config, boolean transactionEnabled)
+      throws Exception;
+
+  abstract Function<String, String> generateStreamIngestionPropsTransform(
+      String streamName,
+      String fullDatasourceName,
+      String parserType,
+      String parserOrInputFormat,
+      IntegrationTestingConfig config
+  );
+
   abstract Function<String, String> generateStreamQueryPropsTransform(String streamName, String fullDatasourceName);
+
   public abstract String getTestNamePrefix();
 
   protected void doBeforeClass() throws Exception
   {
     streamAdminClient = createStreamAdminClient(config);
-    wikipediaStreamEventGenerator = new WikipediaStreamEventStreamGenerator(EVENTS_PER_SECOND, CYCLE_PADDING_MS);
   }
 
-  protected void doClassTeardown()
+  private static String getOnlyResourcePath(String resourceRoot) throws IOException
   {
-    wikipediaStreamEventGenerator.shutdown();
+    return String.join("/", resourceRoot, Iterables.getOnlyElement(listResources(resourceRoot)));
   }
 
-  protected void doTestIndexDataWithLegacyParserStableState() throws Exception
+  protected static List<String> listDataFormatResources() throws IOException
   {
-    StreamEventWriter streamEventWriter = createStreamEventWriter(config);
-    final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig();
-    try (
-        final Closeable ignored1 = unloader(generatedTestConfig.getFullDatasourceName())
-    ) {
-      final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform().apply(getResourceAsString(INDEXER_FILE_LEGACY_PARSER));
-      LOG.info("supervisorSpec: [%s]\n", taskSpec);
-      // Start supervisor
-      generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
-      LOG.info("Submitted supervisor");
-      // Start data generator
-      wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, TOTAL_NUMBER_OF_SECOND, FIRST_EVENT_TIME);
-      verifyIngestedData(generatedTestConfig);
+    return listResources(DATA_RESOURCE_ROOT)
+        .stream()
+        .filter(resource -> !SUPERVISOR_SPEC_TEMPLATE_FILE.equals(resource))
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Returns a map of key to path to spec. The returned map contains at least 2 specs and one of them
+   * should be a {@link #SERIALIZER} spec.
+   */
+  protected static Map<String, String> findTestSpecs(String resourceRoot) throws IOException
+  {
+    final List<String> specDirs = listResources(resourceRoot);
+    final Map<String, String> map = new HashMap<>();
+    for (String eachSpec : specDirs) {
+      if (SERIALIZER_SPEC_DIR.equals(eachSpec)) {
+        map.put(SERIALIZER, getOnlyResourcePath(String.join("/", resourceRoot, SERIALIZER_SPEC_DIR)));
+      } else if (INPUT_ROW_PARSER_SPEC_DIR.equals(eachSpec)) {
+        map.put(INPUT_ROW_PARSER, getOnlyResourcePath(String.join("/", resourceRoot, INPUT_ROW_PARSER_SPEC_DIR)));
+      } else if (INPUT_FORMAT_SPEC_DIR.equals(eachSpec)) {
+        map.put(INPUT_FORMAT, getOnlyResourcePath(String.join("/", resourceRoot, INPUT_FORMAT_SPEC_DIR)));
+      }
     }
-    finally {
-      doMethodTeardown(generatedTestConfig, streamEventWriter);
+    if (!map.containsKey(SERIALIZER_SPEC_DIR)) {
+      throw new IAE("Failed to find serializer spec under [%s]. Found resources are %s", resourceRoot, map);
     }
+    if (map.size() == 1) {
+      throw new IAE("Failed to find input format or parser spec under [%s]. Found resources are %s", resourceRoot, map);
+    }
+    return map;
+  }
+
+  private Closeable createResourceCloser(GeneratedTestConfig generatedTestConfig)
+  {
+    return Closer.create().register(() -> doMethodTeardown(generatedTestConfig));
   }
 
-  protected void doTestIndexDataWithInputFormatStableState() throws Exception
+  protected void doTestIndexDataStableState(
+      boolean transactionEnabled,
+      String serializerPath,
+      String parserType,
+      String specPath
+  ) throws Exception
   {
-    StreamEventWriter streamEventWriter = createStreamEventWriter(config);
-    final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig();
+    final EventSerializer serializer = jsonMapper.readValue(getResourceAsStream(serializerPath), EventSerializer.class);
+    final StreamGenerator streamGenerator = new WikipediaStreamEventStreamGenerator(
+        serializer,
+        EVENTS_PER_SECOND,
+        CYCLE_PADDING_MS
+    );
+    final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(parserType, getResourceAsString(specPath));
     try (
-        final Closeable ignored1 = unloader(generatedTestConfig.getFullDatasourceName())
+        final Closeable closer = createResourceCloser(generatedTestConfig);
+        final StreamEventWriter streamEventWriter = createStreamEventWriter(config, transactionEnabled)
     ) {
-      final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform().apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
+      final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform()
+                                                 .apply(getResourceAsString(SUPERVISOR_SPEC_TEMPLATE_PATH));
       LOG.info("supervisorSpec: [%s]\n", taskSpec);
       // Start supervisor
       generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
       LOG.info("Submitted supervisor");
       // Start data generator
-      wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, TOTAL_NUMBER_OF_SECOND, FIRST_EVENT_TIME);
+      streamGenerator.run(
+          generatedTestConfig.getStreamName(),
+          streamEventWriter,
+          TOTAL_NUMBER_OF_SECOND,
+          FIRST_EVENT_TIME
+      );
       verifyIngestedData(generatedTestConfig);
     }
-    finally {
-      doMethodTeardown(generatedTestConfig, streamEventWriter);
-    }
   }
 
-  void doTestIndexDataWithLosingCoordinator() throws Exception
+  void doTestIndexDataWithLosingCoordinator(boolean transactionEnabled) throws Exception

Review comment:
       Kinesis does not have transaction enabled/disabled. Does it make sense to put this at Kafka layer instead of here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org