You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2020/03/28 03:28:12 UTC

[GitHub] [druid] maytasm3 opened a new pull request #9576: Add Integration Test for functionality of kinesis ingestion

maytasm3 opened a new pull request #9576: Add Integration Test for functionality of kinesis ingestion
URL: https://github.com/apache/druid/pull/9576
 
 
   Add Integration Test for functionality of kinesis ingestion
   
   ### Description
   
   Added following Integration Test for functionality of kinesis ingestion:
   
   1. Functional tests when Druid and Kafka are in stable state
   - legacy parser
   - inputFormat
   - Greater than 1 taskCount
   
   2. Functional tests when Druid is in an unstable state 
   - losing nodes
   - Stop/start supervisor
   
   3. Functional tests when Kafka is in an unstable state 
   - adding partitions
   
   To verify ingestion:
   - Kafka lag should be minimal, the consumer should be able to pull off the queue at a comparable rate to the producer.
   - Realtime queries works from the indexing tasks
   - Queries works reading from historical segments (after handed off)
   - Queries return expected count/value/etc.
   
   This PR has:
   - [x] been self-reviewed.
   - [ ] added documentation for new or modified features or behaviors.
   - [x] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/licenses.yaml)
   - [ ] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
   - [ ] added unit tests or modified existing tests to cover new code paths.
   - [x] added integration tests.
   - [] been tested in a test Druid cluster.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jon-wei commented on a change in pull request #9576: Add Integration Test for functionality of kinesis ingestion

Posted by GitBox <gi...@apache.org>.
jon-wei commented on a change in pull request #9576: Add Integration Test for functionality of kinesis ingestion
URL: https://github.com/apache/druid/pull/9576#discussion_r401339982
 
 

 ##########
 File path: integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceTest.java
 ##########
 @@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.indexer;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.testing.utils.DruidClusterAdminClient;
+import org.apache.druid.testing.utils.ITRetryUtil;
+import org.apache.druid.testing.utils.KinesisAdminClient;
+import org.apache.druid.testing.utils.KinesisEventWriter;
+import org.apache.druid.testing.utils.WikipediaStreamEventStreamGenerator;
+import org.apache.druid.tests.TestNGGroup;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import java.io.Closeable;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
+
+@Test(groups = TestNGGroup.KINESIS_INDEX)
+@Guice(moduleFactory = DruidTestModuleFactory.class)
+public class ITKinesisIndexingServiceTest extends AbstractITBatchIndexTest
+{
+  private static final Logger LOG = new Logger(AbstractKafkaIndexerTest.class);
+  private static final int KINESIS_SHARD_COUNT = 2;
+  private static final String STREAM_EXPIRE_TAG = "druid-ci-expire-after";
 
 Review comment:
   Can you add a comment here explaining what it's used for?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm3 commented on a change in pull request #9576: Add Integration Test for functionality of kinesis ingestion

Posted by GitBox <gi...@apache.org>.
maytasm3 commented on a change in pull request #9576: Add Integration Test for functionality of kinesis ingestion
URL: https://github.com/apache/druid/pull/9576#discussion_r401983350
 
 

 ##########
 File path: integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceTest.java
 ##########
 @@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.indexer;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.testing.utils.DruidClusterAdminClient;
+import org.apache.druid.testing.utils.ITRetryUtil;
+import org.apache.druid.testing.utils.KinesisAdminClient;
+import org.apache.druid.testing.utils.KinesisEventWriter;
+import org.apache.druid.testing.utils.WikipediaStreamEventStreamGenerator;
+import org.apache.druid.tests.TestNGGroup;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import java.io.Closeable;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
+
+@Test(groups = TestNGGroup.KINESIS_INDEX)
+@Guice(moduleFactory = DruidTestModuleFactory.class)
+public class ITKinesisIndexingServiceTest extends AbstractITBatchIndexTest
+{
+  private static final Logger LOG = new Logger(AbstractKafkaIndexerTest.class);
+  private static final int KINESIS_SHARD_COUNT = 2;
+  private static final String STREAM_EXPIRE_TAG = "druid-ci-expire-after";
+  private static final long WAIT_TIME_MILLIS = 3 * 60 * 1000L;
+  private static final DateTime FIRST_EVENT_TIME = DateTimes.of(1994, 4, 29, 1, 0);
+  private static final String INDEXER_FILE_LEGACY_PARSER = "/indexer/stream_supervisor_spec_legacy_parser.json";
+  private static final String INDEXER_FILE_INPUT_FORMAT = "/indexer/stream_supervisor_spec_input_format.json";
+  private static final String QUERIES_FILE = "/indexer/stream_index_queries.json";
+  // format for the querying interval
+  private static final DateTimeFormatter INTERVAL_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:'00Z'");
+  // format for the expected timestamp in a query response
+  private static final DateTimeFormatter TIMESTAMP_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'.000Z'");
+  private static final int EVENTS_PER_SECOND = 6;
+  private static final long CYCLE_PADDING_MS = 100;
+  private static final int TOTAL_NUMBER_OF_SECOND = 10;
+
+  @Inject
+  private DruidClusterAdminClient druidClusterAdminClient;
+
+  private String streamName;
+  private String fullDatasourceName;
+  private KinesisAdminClient kinesisAdminClient;
+  private KinesisEventWriter kinesisEventWriter;
+  private WikipediaStreamEventStreamGenerator wikipediaStreamEventGenerator;
+  private Function<String, String> kinesisIngestionPropsTransform;
+  private Function<String, String> kinesisQueryPropsTransform;
+  private String supervisorId;
+  private int secondsToGenerateRemaining;
+
+  @BeforeClass
+  public void beforeClass() throws Exception
+  {
+    kinesisAdminClient = new KinesisAdminClient(config.getStreamEndpoint());
+    kinesisEventWriter = new KinesisEventWriter(config.getStreamEndpoint(), false);
+    wikipediaStreamEventGenerator = new WikipediaStreamEventStreamGenerator(EVENTS_PER_SECOND, CYCLE_PADDING_MS);
+  }
+
+  @AfterClass
+  public void tearDown()
+  {
+    wikipediaStreamEventGenerator.shutdown();
+    kinesisEventWriter.shutdown();
+  }
+
+  @BeforeMethod
+  public void before()
+  {
+    streamName = "kinesis_index_test_" + UUID.randomUUID();
+    String datasource = "kinesis_indexing_service_test_" + UUID.randomUUID();
+    Map<String, String> tags = ImmutableMap.of(STREAM_EXPIRE_TAG, Long.toString(DateTimes.nowUtc().plusMinutes(30).getMillis()));
+    kinesisAdminClient.createStream(streamName, KINESIS_SHARD_COUNT, tags);
+    ITRetryUtil.retryUntil(
+        () -> kinesisAdminClient.isStreamActive(streamName),
+        true,
+        10000,
+        30,
+        "Wait for stream active"
+    );
+    secondsToGenerateRemaining = TOTAL_NUMBER_OF_SECOND;
+    fullDatasourceName = datasource + config.getExtraDatasourceNameSuffix();
+    kinesisIngestionPropsTransform = spec -> {
+      try {
+        spec = StringUtils.replace(
+            spec,
+            "%%DATASOURCE%%",
+            fullDatasourceName
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%STREAM_TYPE%%",
+            "kinesis"
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TOPIC_KEY%%",
+            "stream"
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TOPIC_VALUE%%",
+            streamName
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%USE_EARLIEST_KEY%%",
+            "useEarliestSequenceNumber"
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%STREAM_PROPERTIES_KEY%%",
+            "endpoint"
+        );
+        return StringUtils.replace(
+            spec,
+            "%%STREAM_PROPERTIES_VALUE%%",
+            jsonMapper.writeValueAsString(config.getStreamEndpoint())
+        );
+      }
+      catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    };
+    kinesisQueryPropsTransform = spec -> {
+      try {
+        spec = StringUtils.replace(
+            spec,
+            "%%DATASOURCE%%",
+            fullDatasourceName
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%",
+            TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMEBOUNDARY_RESPONSE_MAXTIME%%",
+            TIMESTAMP_FMT.print(FIRST_EVENT_TIME.plusSeconds(TOTAL_NUMBER_OF_SECOND - 1))
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMEBOUNDARY_RESPONSE_MINTIME%%",
+            TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMESERIES_QUERY_START%%",
+            INTERVAL_FMT.print(FIRST_EVENT_TIME)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMESERIES_QUERY_END%%",
+            INTERVAL_FMT.print(FIRST_EVENT_TIME.plusSeconds(TOTAL_NUMBER_OF_SECOND - 1).plusMinutes(2))
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMESERIES_RESPONSE_TIMESTAMP%%",
+            TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMESERIES_ADDED%%",
+            Long.toString(getSumOfEventSequence(EVENTS_PER_SECOND) * TOTAL_NUMBER_OF_SECOND)
+        );
+        return StringUtils.replace(
+            spec,
+            "%%TIMESERIES_NUMEVENTS%%",
+            Integer.toString(EVENTS_PER_SECOND * TOTAL_NUMBER_OF_SECOND)
+        );
+      }
+      catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    };
+  }
+
+  @AfterMethod
+  public void teardown()
+  {
+    try {
+      kinesisEventWriter.flush();
+      indexer.shutdownSupervisor(supervisorId);
+      unloader(fullDatasourceName);
+      kinesisAdminClient.deleteStream(streamName);
+    }
+    catch (Exception e) {
+      // Best effort cleanup
+    }
+  }
+
+  @Test
+  public void testKineseIndexDataWithLegacyParserStableState() throws Exception
+  {
+    try (
+        final Closeable ignored1 = unloader(fullDatasourceName)
+    ) {
+      final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_LEGACY_PARSER));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      supervisorId = indexer.submitSupervisor(taskSpec);
+      LOG.info("Submitted supervisor");
+      // Start Kinesis data generator
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, TOTAL_NUMBER_OF_SECOND, FIRST_EVENT_TIME);
+      verifyIngestedData(supervisorId);
+    }
+  }
+
+  @Test
+  public void testKineseIndexDataWithInputFormatStableState() throws Exception
+  {
+    try (
+        final Closeable ignored1 = unloader(fullDatasourceName)
+    ) {
+      final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      supervisorId = indexer.submitSupervisor(taskSpec);
+      LOG.info("Submitted supervisor");
+      // Start Kinesis data generator
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, TOTAL_NUMBER_OF_SECOND, FIRST_EVENT_TIME);
+      verifyIngestedData(supervisorId);
+    }
+  }
+
+  @Test
+  public void testKineseIndexDataWithLosingCoordinator() throws Exception
+  {
+    testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartCoordinatorContainer(), () -> druidClusterAdminClient.waitUntilCoordinatorReady());
+  }
+
+  @Test
+  public void testKineseIndexDataWithLosingOverlord() throws Exception
+  {
+    testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartIndexerContainer(), () -> druidClusterAdminClient.waitUntilIndexerReady());
+  }
+
+  @Test
+  public void testKineseIndexDataWithLosingHistorical() throws Exception
+  {
+    testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartHistoricalContainer(), () -> druidClusterAdminClient.waitUntilHistoricalReady());
+  }
+
+  @Test
+  public void testKineseIndexDataWithStartStopSupervisor() throws Exception
+  {
+    try (
+        final Closeable ignored1 = unloader(fullDatasourceName)
+    ) {
+      final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      supervisorId = indexer.submitSupervisor(taskSpec);
+      LOG.info("Submitted supervisor");
+      // Start generating half of the data
+      int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 2;
+      secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound;
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME);
+      // Suspend the supervisor
+      indexer.suspendSupervisor(supervisorId);
+      // Start generating remainning half of the data
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound));
+      // Resume the supervisor
+      indexer.resumeSupervisor(supervisorId);
+      // Verify supervisor is healthy after suspension
+      ITRetryUtil.retryUntil(
+          () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(supervisorId)),
+          true,
+          10000,
+          30,
+          "Waiting for supervisor to be healthy"
+      );
+      // Verify that supervisor can catch up with the stream
+      verifyIngestedData(supervisorId);
+    }
+  }
+
+  @Test
+  public void testKineseIndexDataWithKinesisReshardSplit() throws Exception
+  {
+    // Reshard the supervisor by split from KINESIS_SHARD_COUNT to KINESIS_SHARD_COUNT * 2
+    testIndexWithKinesisReshardHelper(KINESIS_SHARD_COUNT * 2);
+  }
+
+  @Test
+  public void testKineseIndexDataWithKinesisReshardMerge() throws Exception
+  {
+    // Reshard the supervisor by split from KINESIS_SHARD_COUNT to KINESIS_SHARD_COUNT / 2
+    testIndexWithKinesisReshardHelper(KINESIS_SHARD_COUNT / 2);
+  }
+
+  private void testIndexWithLosingNodeHelper(Runnable restartRunnable, Runnable waitForReadyRunnable) throws Exception
+  {
+    try (
+        final Closeable ignored1 = unloader(fullDatasourceName)
+    ) {
+      final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      supervisorId = indexer.submitSupervisor(taskSpec);
+      LOG.info("Submitted supervisor");
+      // Start generating one third of the data (before restarting)
+      int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 3;
+      secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound;
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME);
+      // Restart Druid process
+      LOG.info("Restarting Druid process");
+      restartRunnable.run();
+      LOG.info("Restarted Druid process");
+      // Start generating one third of the data (while restarting)
+      int secondsToGenerateSecondRound = TOTAL_NUMBER_OF_SECOND / 3;
+      secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateSecondRound;
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateSecondRound, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound));
+      // Wait for Druid process to be available
+      LOG.info("Waiting for Druid process to be available");
+      waitForReadyRunnable.run();
+      LOG.info("Druid process is now available");
+      // Start generating remainding data (after restarting)
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound + secondsToGenerateSecondRound));
+      // Verify supervisor is healthy
+      ITRetryUtil.retryUntil(
+          () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(supervisorId)),
+          true,
+          10000,
+          30,
+          "Waiting for supervisor to be healthy"
+      );
+      // Verify that supervisor ingested all data
+      verifyIngestedData(supervisorId);
+    }
+  }
+
+  private void testIndexWithKinesisReshardHelper(int newShardCount) throws Exception
+  {
+    try (
+        final Closeable ignored1 = unloader(fullDatasourceName)
+    ) {
+      final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      supervisorId = indexer.submitSupervisor(taskSpec);
+      LOG.info("Submitted supervisor");
+      // Start generating one third of the data (before resharding)
+      int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 3;
+      secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound;
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME);
+      // Reshard the supervisor by split from KINESIS_SHARD_COUNT to newShardCount
+      kinesisAdminClient.updateShardCount(streamName, newShardCount);
+      // Start generating one third of the data (while resharding)
+      int secondsToGenerateSecondRound = TOTAL_NUMBER_OF_SECOND / 3;
+      secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateSecondRound;
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateSecondRound, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound));
+      // Wait for kinesis stream to finish resharding
 
 Review comment:
   Changed the logic to:
   - after issuing reshard call
   - do DescribeStream polling for an updating status or an active status with the final expected number of shards
   - begin second phase when ^ true
   - check that stream is active status with the final expected number of shards
   - begin third phase when ^ true

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jon-wei commented on a change in pull request #9576: Add Integration Test for functionality of kinesis ingestion

Posted by GitBox <gi...@apache.org>.
jon-wei commented on a change in pull request #9576: Add Integration Test for functionality of kinesis ingestion
URL: https://github.com/apache/druid/pull/9576#discussion_r401339047
 
 

 ##########
 File path: integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceTest.java
 ##########
 @@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.indexer;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.testing.utils.DruidClusterAdminClient;
+import org.apache.druid.testing.utils.ITRetryUtil;
+import org.apache.druid.testing.utils.KinesisAdminClient;
+import org.apache.druid.testing.utils.KinesisEventWriter;
+import org.apache.druid.testing.utils.WikipediaStreamEventStreamGenerator;
+import org.apache.druid.tests.TestNGGroup;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import java.io.Closeable;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
+
+@Test(groups = TestNGGroup.KINESIS_INDEX)
+@Guice(moduleFactory = DruidTestModuleFactory.class)
+public class ITKinesisIndexingServiceTest extends AbstractITBatchIndexTest
+{
+  private static final Logger LOG = new Logger(AbstractKafkaIndexerTest.class);
+  private static final int KINESIS_SHARD_COUNT = 2;
+  private static final String STREAM_EXPIRE_TAG = "druid-ci-expire-after";
+  private static final long WAIT_TIME_MILLIS = 3 * 60 * 1000L;
+  private static final DateTime FIRST_EVENT_TIME = DateTimes.of(1994, 4, 29, 1, 0);
+  private static final String INDEXER_FILE_LEGACY_PARSER = "/indexer/stream_supervisor_spec_legacy_parser.json";
+  private static final String INDEXER_FILE_INPUT_FORMAT = "/indexer/stream_supervisor_spec_input_format.json";
+  private static final String QUERIES_FILE = "/indexer/stream_index_queries.json";
+  // format for the querying interval
+  private static final DateTimeFormatter INTERVAL_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:'00Z'");
+  // format for the expected timestamp in a query response
+  private static final DateTimeFormatter TIMESTAMP_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'.000Z'");
+  private static final int EVENTS_PER_SECOND = 6;
+  private static final long CYCLE_PADDING_MS = 100;
+  private static final int TOTAL_NUMBER_OF_SECOND = 10;
+
+  @Inject
+  private DruidClusterAdminClient druidClusterAdminClient;
+
+  private String streamName;
+  private String fullDatasourceName;
+  private KinesisAdminClient kinesisAdminClient;
+  private KinesisEventWriter kinesisEventWriter;
+  private WikipediaStreamEventStreamGenerator wikipediaStreamEventGenerator;
+  private Function<String, String> kinesisIngestionPropsTransform;
+  private Function<String, String> kinesisQueryPropsTransform;
+  private String supervisorId;
+  private int secondsToGenerateRemaining;
+
+  @BeforeClass
+  public void beforeClass() throws Exception
+  {
+    kinesisAdminClient = new KinesisAdminClient(config.getStreamEndpoint());
+    kinesisEventWriter = new KinesisEventWriter(config.getStreamEndpoint(), false);
+    wikipediaStreamEventGenerator = new WikipediaStreamEventStreamGenerator(EVENTS_PER_SECOND, CYCLE_PADDING_MS);
+  }
+
+  @AfterClass
+  public void tearDown()
+  {
+    wikipediaStreamEventGenerator.shutdown();
+    kinesisEventWriter.shutdown();
+  }
+
+  @BeforeMethod
+  public void before()
+  {
+    streamName = "kinesis_index_test_" + UUID.randomUUID();
+    String datasource = "kinesis_indexing_service_test_" + UUID.randomUUID();
+    Map<String, String> tags = ImmutableMap.of(STREAM_EXPIRE_TAG, Long.toString(DateTimes.nowUtc().plusMinutes(30).getMillis()));
+    kinesisAdminClient.createStream(streamName, KINESIS_SHARD_COUNT, tags);
+    ITRetryUtil.retryUntil(
+        () -> kinesisAdminClient.isStreamActive(streamName),
+        true,
+        10000,
+        30,
+        "Wait for stream active"
+    );
+    secondsToGenerateRemaining = TOTAL_NUMBER_OF_SECOND;
+    fullDatasourceName = datasource + config.getExtraDatasourceNameSuffix();
+    kinesisIngestionPropsTransform = spec -> {
+      try {
+        spec = StringUtils.replace(
+            spec,
+            "%%DATASOURCE%%",
+            fullDatasourceName
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%STREAM_TYPE%%",
+            "kinesis"
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TOPIC_KEY%%",
+            "stream"
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TOPIC_VALUE%%",
+            streamName
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%USE_EARLIEST_KEY%%",
+            "useEarliestSequenceNumber"
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%STREAM_PROPERTIES_KEY%%",
+            "endpoint"
+        );
+        return StringUtils.replace(
+            spec,
+            "%%STREAM_PROPERTIES_VALUE%%",
+            jsonMapper.writeValueAsString(config.getStreamEndpoint())
+        );
+      }
+      catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    };
+    kinesisQueryPropsTransform = spec -> {
+      try {
+        spec = StringUtils.replace(
+            spec,
+            "%%DATASOURCE%%",
+            fullDatasourceName
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%",
+            TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMEBOUNDARY_RESPONSE_MAXTIME%%",
+            TIMESTAMP_FMT.print(FIRST_EVENT_TIME.plusSeconds(TOTAL_NUMBER_OF_SECOND - 1))
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMEBOUNDARY_RESPONSE_MINTIME%%",
+            TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMESERIES_QUERY_START%%",
+            INTERVAL_FMT.print(FIRST_EVENT_TIME)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMESERIES_QUERY_END%%",
+            INTERVAL_FMT.print(FIRST_EVENT_TIME.plusSeconds(TOTAL_NUMBER_OF_SECOND - 1).plusMinutes(2))
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMESERIES_RESPONSE_TIMESTAMP%%",
+            TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMESERIES_ADDED%%",
+            Long.toString(getSumOfEventSequence(EVENTS_PER_SECOND) * TOTAL_NUMBER_OF_SECOND)
+        );
+        return StringUtils.replace(
+            spec,
+            "%%TIMESERIES_NUMEVENTS%%",
+            Integer.toString(EVENTS_PER_SECOND * TOTAL_NUMBER_OF_SECOND)
+        );
+      }
+      catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    };
+  }
+
+  @AfterMethod
+  public void teardown()
+  {
+    try {
+      kinesisEventWriter.flush();
+      indexer.shutdownSupervisor(supervisorId);
+      unloader(fullDatasourceName);
+      kinesisAdminClient.deleteStream(streamName);
+    }
+    catch (Exception e) {
+      // Best effort cleanup
+    }
+  }
+
+  @Test
+  public void testKineseIndexDataWithLegacyParserStableState() throws Exception
+  {
+    try (
+        final Closeable ignored1 = unloader(fullDatasourceName)
+    ) {
+      final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_LEGACY_PARSER));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      supervisorId = indexer.submitSupervisor(taskSpec);
+      LOG.info("Submitted supervisor");
+      // Start Kinesis data generator
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, TOTAL_NUMBER_OF_SECOND, FIRST_EVENT_TIME);
+      verifyIngestedData(supervisorId);
+    }
+  }
+
+  @Test
+  public void testKineseIndexDataWithInputFormatStableState() throws Exception
+  {
+    try (
+        final Closeable ignored1 = unloader(fullDatasourceName)
+    ) {
+      final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      supervisorId = indexer.submitSupervisor(taskSpec);
+      LOG.info("Submitted supervisor");
+      // Start Kinesis data generator
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, TOTAL_NUMBER_OF_SECOND, FIRST_EVENT_TIME);
+      verifyIngestedData(supervisorId);
+    }
+  }
+
+  @Test
+  public void testKineseIndexDataWithLosingCoordinator() throws Exception
+  {
+    testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartCoordinatorContainer(), () -> druidClusterAdminClient.waitUntilCoordinatorReady());
+  }
+
+  @Test
+  public void testKineseIndexDataWithLosingOverlord() throws Exception
+  {
+    testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartIndexerContainer(), () -> druidClusterAdminClient.waitUntilIndexerReady());
+  }
+
+  @Test
+  public void testKineseIndexDataWithLosingHistorical() throws Exception
+  {
+    testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartHistoricalContainer(), () -> druidClusterAdminClient.waitUntilHistoricalReady());
+  }
+
+  @Test
+  public void testKineseIndexDataWithStartStopSupervisor() throws Exception
+  {
+    try (
+        final Closeable ignored1 = unloader(fullDatasourceName)
+    ) {
+      final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      supervisorId = indexer.submitSupervisor(taskSpec);
+      LOG.info("Submitted supervisor");
+      // Start generating half of the data
+      int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 2;
+      secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound;
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME);
+      // Suspend the supervisor
+      indexer.suspendSupervisor(supervisorId);
+      // Start generating remainning half of the data
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound));
+      // Resume the supervisor
+      indexer.resumeSupervisor(supervisorId);
+      // Verify supervisor is healthy after suspension
+      ITRetryUtil.retryUntil(
+          () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(supervisorId)),
+          true,
+          10000,
+          30,
+          "Waiting for supervisor to be healthy"
+      );
+      // Verify that supervisor can catch up with the stream
+      verifyIngestedData(supervisorId);
+    }
+  }
+
+  @Test
+  public void testKineseIndexDataWithKinesisReshardSplit() throws Exception
+  {
+    // Reshard the supervisor by split from KINESIS_SHARD_COUNT to KINESIS_SHARD_COUNT * 2
+    testIndexWithKinesisReshardHelper(KINESIS_SHARD_COUNT * 2);
+  }
+
+  @Test
+  public void testKineseIndexDataWithKinesisReshardMerge() throws Exception
+  {
+    // Reshard the supervisor by split from KINESIS_SHARD_COUNT to KINESIS_SHARD_COUNT / 2
+    testIndexWithKinesisReshardHelper(KINESIS_SHARD_COUNT / 2);
+  }
+
+  private void testIndexWithLosingNodeHelper(Runnable restartRunnable, Runnable waitForReadyRunnable) throws Exception
+  {
+    try (
+        final Closeable ignored1 = unloader(fullDatasourceName)
+    ) {
+      final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      supervisorId = indexer.submitSupervisor(taskSpec);
+      LOG.info("Submitted supervisor");
+      // Start generating one third of the data (before restarting)
+      int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 3;
+      secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound;
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME);
+      // Restart Druid process
+      LOG.info("Restarting Druid process");
+      restartRunnable.run();
+      LOG.info("Restarted Druid process");
+      // Start generating one third of the data (while restarting)
+      int secondsToGenerateSecondRound = TOTAL_NUMBER_OF_SECOND / 3;
+      secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateSecondRound;
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateSecondRound, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound));
+      // Wait for Druid process to be available
+      LOG.info("Waiting for Druid process to be available");
+      waitForReadyRunnable.run();
+      LOG.info("Druid process is now available");
+      // Start generating remainding data (after restarting)
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound + secondsToGenerateSecondRound));
+      // Verify supervisor is healthy
+      ITRetryUtil.retryUntil(
+          () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(supervisorId)),
+          true,
+          10000,
+          30,
+          "Waiting for supervisor to be healthy"
+      );
+      // Verify that supervisor ingested all data
+      verifyIngestedData(supervisorId);
+    }
+  }
+
+  private void testIndexWithKinesisReshardHelper(int newShardCount) throws Exception
+  {
+    try (
+        final Closeable ignored1 = unloader(fullDatasourceName)
+    ) {
+      final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      supervisorId = indexer.submitSupervisor(taskSpec);
+      LOG.info("Submitted supervisor");
+      // Start generating one third of the data (before resharding)
+      int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 3;
+      secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound;
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME);
+      // Reshard the supervisor by split from KINESIS_SHARD_COUNT to newShardCount
+      kinesisAdminClient.updateShardCount(streamName, newShardCount);
+      // Start generating one third of the data (while resharding)
+      int secondsToGenerateSecondRound = TOTAL_NUMBER_OF_SECOND / 3;
+      secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateSecondRound;
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateSecondRound, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound));
+      // Wait for kinesis stream to finish resharding
+      ITRetryUtil.retryUntil(
+          () -> kinesisAdminClient.isStreamActive(streamName),
+          true,
+          10000,
+          30,
+          "Waiting for Kinesis stream to finish resharding"
+      );
+      // Start generating remainding data (after resharding)
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound + secondsToGenerateSecondRound));
+      // Verify supervisor is healthy after suspension
+      ITRetryUtil.retryUntil(
+          () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(supervisorId)),
+          true,
+          10000,
+          30,
+          "Waiting for supervisor to be healthy"
+      );
+      // Verify that supervisor can catch up with the stream
+      verifyIngestedData(supervisorId);
+    }
+  }
+
+  private void verifyIngestedData(String supervisorId) throws Exception
+  {
+    // Wait for supervisor to consume events
+    LOG.info("Waiting for [%s] millis for Kafka indexing tasks to consume events", WAIT_TIME_MILLIS);
+    Thread.sleep(WAIT_TIME_MILLIS);
+    // Query data
+    final String querySpec = kinesisQueryPropsTransform.apply(getResourceAsString(QUERIES_FILE));
+    // this query will probably be answered from the indexing tasks but possibly from 2 historical segments / 2 indexing
+    this.queryHelper.testQueriesFromString(querySpec, 2);
+    LOG.info("Shutting down supervisor");
+    indexer.shutdownSupervisor(supervisorId);
+    // wait for all kafka indexing tasks to finish
 
 Review comment:
   kafka -> kinesis

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm3 commented on a change in pull request #9576: Add Integration Test for functionality of kinesis ingestion

Posted by GitBox <gi...@apache.org>.
maytasm3 commented on a change in pull request #9576: Add Integration Test for functionality of kinesis ingestion
URL: https://github.com/apache/druid/pull/9576#discussion_r401992367
 
 

 ##########
 File path: integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceTest.java
 ##########
 @@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.indexer;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.testing.utils.DruidClusterAdminClient;
+import org.apache.druid.testing.utils.ITRetryUtil;
+import org.apache.druid.testing.utils.KinesisAdminClient;
+import org.apache.druid.testing.utils.KinesisEventWriter;
+import org.apache.druid.testing.utils.WikipediaStreamEventStreamGenerator;
+import org.apache.druid.tests.TestNGGroup;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import java.io.Closeable;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
+
+@Test(groups = TestNGGroup.KINESIS_INDEX)
+@Guice(moduleFactory = DruidTestModuleFactory.class)
+public class ITKinesisIndexingServiceTest extends AbstractITBatchIndexTest
+{
+  private static final Logger LOG = new Logger(AbstractKafkaIndexerTest.class);
+  private static final int KINESIS_SHARD_COUNT = 2;
+  private static final String STREAM_EXPIRE_TAG = "druid-ci-expire-after";
+  private static final long WAIT_TIME_MILLIS = 3 * 60 * 1000L;
+  private static final DateTime FIRST_EVENT_TIME = DateTimes.of(1994, 4, 29, 1, 0);
+  private static final String INDEXER_FILE_LEGACY_PARSER = "/indexer/stream_supervisor_spec_legacy_parser.json";
+  private static final String INDEXER_FILE_INPUT_FORMAT = "/indexer/stream_supervisor_spec_input_format.json";
+  private static final String QUERIES_FILE = "/indexer/stream_index_queries.json";
+  // format for the querying interval
+  private static final DateTimeFormatter INTERVAL_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:'00Z'");
+  // format for the expected timestamp in a query response
+  private static final DateTimeFormatter TIMESTAMP_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'.000Z'");
+  private static final int EVENTS_PER_SECOND = 6;
+  private static final long CYCLE_PADDING_MS = 100;
+  private static final int TOTAL_NUMBER_OF_SECOND = 10;
+
+  @Inject
+  private DruidClusterAdminClient druidClusterAdminClient;
+
+  private String streamName;
+  private String fullDatasourceName;
+  private KinesisAdminClient kinesisAdminClient;
+  private KinesisEventWriter kinesisEventWriter;
+  private WikipediaStreamEventStreamGenerator wikipediaStreamEventGenerator;
+  private Function<String, String> kinesisIngestionPropsTransform;
+  private Function<String, String> kinesisQueryPropsTransform;
+  private String supervisorId;
+  private int secondsToGenerateRemaining;
+
+  @BeforeClass
+  public void beforeClass() throws Exception
+  {
+    kinesisAdminClient = new KinesisAdminClient(config.getStreamEndpoint());
+    kinesisEventWriter = new KinesisEventWriter(config.getStreamEndpoint(), false);
+    wikipediaStreamEventGenerator = new WikipediaStreamEventStreamGenerator(EVENTS_PER_SECOND, CYCLE_PADDING_MS);
+  }
+
+  @AfterClass
+  public void tearDown()
+  {
+    wikipediaStreamEventGenerator.shutdown();
+    kinesisEventWriter.shutdown();
+  }
+
+  @BeforeMethod
+  public void before()
+  {
+    streamName = "kinesis_index_test_" + UUID.randomUUID();
+    String datasource = "kinesis_indexing_service_test_" + UUID.randomUUID();
+    Map<String, String> tags = ImmutableMap.of(STREAM_EXPIRE_TAG, Long.toString(DateTimes.nowUtc().plusMinutes(30).getMillis()));
+    kinesisAdminClient.createStream(streamName, KINESIS_SHARD_COUNT, tags);
+    ITRetryUtil.retryUntil(
+        () -> kinesisAdminClient.isStreamActive(streamName),
+        true,
+        10000,
+        30,
+        "Wait for stream active"
+    );
+    secondsToGenerateRemaining = TOTAL_NUMBER_OF_SECOND;
+    fullDatasourceName = datasource + config.getExtraDatasourceNameSuffix();
+    kinesisIngestionPropsTransform = spec -> {
+      try {
+        spec = StringUtils.replace(
+            spec,
+            "%%DATASOURCE%%",
+            fullDatasourceName
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%STREAM_TYPE%%",
+            "kinesis"
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TOPIC_KEY%%",
+            "stream"
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TOPIC_VALUE%%",
+            streamName
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%USE_EARLIEST_KEY%%",
+            "useEarliestSequenceNumber"
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%STREAM_PROPERTIES_KEY%%",
+            "endpoint"
+        );
+        return StringUtils.replace(
+            spec,
+            "%%STREAM_PROPERTIES_VALUE%%",
+            jsonMapper.writeValueAsString(config.getStreamEndpoint())
+        );
+      }
+      catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    };
+    kinesisQueryPropsTransform = spec -> {
+      try {
+        spec = StringUtils.replace(
+            spec,
+            "%%DATASOURCE%%",
+            fullDatasourceName
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%",
+            TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMEBOUNDARY_RESPONSE_MAXTIME%%",
+            TIMESTAMP_FMT.print(FIRST_EVENT_TIME.plusSeconds(TOTAL_NUMBER_OF_SECOND - 1))
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMEBOUNDARY_RESPONSE_MINTIME%%",
+            TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMESERIES_QUERY_START%%",
+            INTERVAL_FMT.print(FIRST_EVENT_TIME)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMESERIES_QUERY_END%%",
+            INTERVAL_FMT.print(FIRST_EVENT_TIME.plusSeconds(TOTAL_NUMBER_OF_SECOND - 1).plusMinutes(2))
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMESERIES_RESPONSE_TIMESTAMP%%",
+            TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMESERIES_ADDED%%",
+            Long.toString(getSumOfEventSequence(EVENTS_PER_SECOND) * TOTAL_NUMBER_OF_SECOND)
+        );
+        return StringUtils.replace(
+            spec,
+            "%%TIMESERIES_NUMEVENTS%%",
+            Integer.toString(EVENTS_PER_SECOND * TOTAL_NUMBER_OF_SECOND)
+        );
+      }
+      catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    };
+  }
+
+  @AfterMethod
+  public void teardown()
+  {
+    try {
+      kinesisEventWriter.flush();
+      indexer.shutdownSupervisor(supervisorId);
+      unloader(fullDatasourceName);
+      kinesisAdminClient.deleteStream(streamName);
+    }
+    catch (Exception e) {
+      // Best effort cleanup
+    }
+  }
+
+  @Test
+  public void testKineseIndexDataWithLegacyParserStableState() throws Exception
+  {
+    try (
+        final Closeable ignored1 = unloader(fullDatasourceName)
+    ) {
+      final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_LEGACY_PARSER));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      supervisorId = indexer.submitSupervisor(taskSpec);
+      LOG.info("Submitted supervisor");
+      // Start Kinesis data generator
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, TOTAL_NUMBER_OF_SECOND, FIRST_EVENT_TIME);
+      verifyIngestedData(supervisorId);
+    }
+  }
+
+  @Test
+  public void testKineseIndexDataWithInputFormatStableState() throws Exception
+  {
+    try (
+        final Closeable ignored1 = unloader(fullDatasourceName)
+    ) {
+      final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      supervisorId = indexer.submitSupervisor(taskSpec);
+      LOG.info("Submitted supervisor");
+      // Start Kinesis data generator
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, TOTAL_NUMBER_OF_SECOND, FIRST_EVENT_TIME);
+      verifyIngestedData(supervisorId);
+    }
+  }
+
+  @Test
+  public void testKineseIndexDataWithLosingCoordinator() throws Exception
+  {
+    testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartCoordinatorContainer(), () -> druidClusterAdminClient.waitUntilCoordinatorReady());
+  }
+
+  @Test
+  public void testKineseIndexDataWithLosingOverlord() throws Exception
+  {
+    testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartIndexerContainer(), () -> druidClusterAdminClient.waitUntilIndexerReady());
+  }
+
+  @Test
+  public void testKineseIndexDataWithLosingHistorical() throws Exception
+  {
+    testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartHistoricalContainer(), () -> druidClusterAdminClient.waitUntilHistoricalReady());
+  }
+
+  @Test
+  public void testKineseIndexDataWithStartStopSupervisor() throws Exception
+  {
+    try (
+        final Closeable ignored1 = unloader(fullDatasourceName)
+    ) {
+      final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      supervisorId = indexer.submitSupervisor(taskSpec);
+      LOG.info("Submitted supervisor");
+      // Start generating half of the data
+      int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 2;
+      secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound;
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME);
+      // Suspend the supervisor
+      indexer.suspendSupervisor(supervisorId);
+      // Start generating remainning half of the data
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound));
+      // Resume the supervisor
+      indexer.resumeSupervisor(supervisorId);
+      // Verify supervisor is healthy after suspension
+      ITRetryUtil.retryUntil(
+          () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(supervisorId)),
+          true,
+          10000,
+          30,
+          "Waiting for supervisor to be healthy"
+      );
+      // Verify that supervisor can catch up with the stream
+      verifyIngestedData(supervisorId);
+    }
+  }
+
+  @Test
+  public void testKineseIndexDataWithKinesisReshardSplit() throws Exception
+  {
+    // Reshard the supervisor by split from KINESIS_SHARD_COUNT to KINESIS_SHARD_COUNT * 2
+    testIndexWithKinesisReshardHelper(KINESIS_SHARD_COUNT * 2);
+  }
+
+  @Test
+  public void testKineseIndexDataWithKinesisReshardMerge() throws Exception
+  {
+    // Reshard the supervisor by split from KINESIS_SHARD_COUNT to KINESIS_SHARD_COUNT / 2
+    testIndexWithKinesisReshardHelper(KINESIS_SHARD_COUNT / 2);
+  }
+
+  private void testIndexWithLosingNodeHelper(Runnable restartRunnable, Runnable waitForReadyRunnable) throws Exception
+  {
+    try (
+        final Closeable ignored1 = unloader(fullDatasourceName)
+    ) {
+      final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      supervisorId = indexer.submitSupervisor(taskSpec);
+      LOG.info("Submitted supervisor");
+      // Start generating one third of the data (before restarting)
+      int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 3;
+      secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound;
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME);
+      // Restart Druid process
+      LOG.info("Restarting Druid process");
+      restartRunnable.run();
+      LOG.info("Restarted Druid process");
+      // Start generating one third of the data (while restarting)
+      int secondsToGenerateSecondRound = TOTAL_NUMBER_OF_SECOND / 3;
+      secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateSecondRound;
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateSecondRound, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound));
+      // Wait for Druid process to be available
+      LOG.info("Waiting for Druid process to be available");
+      waitForReadyRunnable.run();
+      LOG.info("Druid process is now available");
+      // Start generating remainding data (after restarting)
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound + secondsToGenerateSecondRound));
+      // Verify supervisor is healthy
+      ITRetryUtil.retryUntil(
+          () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(supervisorId)),
+          true,
+          10000,
+          30,
+          "Waiting for supervisor to be healthy"
+      );
+      // Verify that supervisor ingested all data
+      verifyIngestedData(supervisorId);
+    }
+  }
+
+  private void testIndexWithKinesisReshardHelper(int newShardCount) throws Exception
+  {
+    try (
+        final Closeable ignored1 = unloader(fullDatasourceName)
+    ) {
+      final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      supervisorId = indexer.submitSupervisor(taskSpec);
+      LOG.info("Submitted supervisor");
+      // Start generating one third of the data (before resharding)
+      int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 3;
+      secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound;
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME);
+      // Reshard the supervisor by split from KINESIS_SHARD_COUNT to newShardCount
+      kinesisAdminClient.updateShardCount(streamName, newShardCount);
+      // Start generating one third of the data (while resharding)
+      int secondsToGenerateSecondRound = TOTAL_NUMBER_OF_SECOND / 3;
+      secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateSecondRound;
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateSecondRound, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound));
+      // Wait for kinesis stream to finish resharding
+      ITRetryUtil.retryUntil(
+          () -> kinesisAdminClient.isStreamActive(streamName),
+          true,
+          10000,
+          30,
+          "Waiting for Kinesis stream to finish resharding"
+      );
+      // Start generating remainding data (after resharding)
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound + secondsToGenerateSecondRound));
+      // Verify supervisor is healthy after suspension
 
 Review comment:
   Done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jon-wei merged pull request #9576: Add Integration Test for functionality of kinesis ingestion

Posted by GitBox <gi...@apache.org>.
jon-wei merged pull request #9576: Add Integration Test for functionality of kinesis ingestion
URL: https://github.com/apache/druid/pull/9576
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm3 commented on a change in pull request #9576: Add Integration Test for functionality of kinesis ingestion

Posted by GitBox <gi...@apache.org>.
maytasm3 commented on a change in pull request #9576: Add Integration Test for functionality of kinesis ingestion
URL: https://github.com/apache/druid/pull/9576#discussion_r401992743
 
 

 ##########
 File path: integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceTest.java
 ##########
 @@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.indexer;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.testing.utils.DruidClusterAdminClient;
+import org.apache.druid.testing.utils.ITRetryUtil;
+import org.apache.druid.testing.utils.KinesisAdminClient;
+import org.apache.druid.testing.utils.KinesisEventWriter;
+import org.apache.druid.testing.utils.WikipediaStreamEventStreamGenerator;
+import org.apache.druid.tests.TestNGGroup;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import java.io.Closeable;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
+
+@Test(groups = TestNGGroup.KINESIS_INDEX)
+@Guice(moduleFactory = DruidTestModuleFactory.class)
+public class ITKinesisIndexingServiceTest extends AbstractITBatchIndexTest
+{
+  private static final Logger LOG = new Logger(AbstractKafkaIndexerTest.class);
+  private static final int KINESIS_SHARD_COUNT = 2;
+  private static final String STREAM_EXPIRE_TAG = "druid-ci-expire-after";
+  private static final long WAIT_TIME_MILLIS = 3 * 60 * 1000L;
+  private static final DateTime FIRST_EVENT_TIME = DateTimes.of(1994, 4, 29, 1, 0);
+  private static final String INDEXER_FILE_LEGACY_PARSER = "/indexer/stream_supervisor_spec_legacy_parser.json";
+  private static final String INDEXER_FILE_INPUT_FORMAT = "/indexer/stream_supervisor_spec_input_format.json";
+  private static final String QUERIES_FILE = "/indexer/stream_index_queries.json";
+  // format for the querying interval
+  private static final DateTimeFormatter INTERVAL_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:'00Z'");
+  // format for the expected timestamp in a query response
+  private static final DateTimeFormatter TIMESTAMP_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'.000Z'");
+  private static final int EVENTS_PER_SECOND = 6;
+  private static final long CYCLE_PADDING_MS = 100;
+  private static final int TOTAL_NUMBER_OF_SECOND = 10;
+
+  @Inject
+  private DruidClusterAdminClient druidClusterAdminClient;
+
+  private String streamName;
+  private String fullDatasourceName;
+  private KinesisAdminClient kinesisAdminClient;
+  private KinesisEventWriter kinesisEventWriter;
+  private WikipediaStreamEventStreamGenerator wikipediaStreamEventGenerator;
+  private Function<String, String> kinesisIngestionPropsTransform;
+  private Function<String, String> kinesisQueryPropsTransform;
+  private String supervisorId;
+  private int secondsToGenerateRemaining;
+
+  @BeforeClass
+  public void beforeClass() throws Exception
+  {
+    kinesisAdminClient = new KinesisAdminClient(config.getStreamEndpoint());
+    kinesisEventWriter = new KinesisEventWriter(config.getStreamEndpoint(), false);
+    wikipediaStreamEventGenerator = new WikipediaStreamEventStreamGenerator(EVENTS_PER_SECOND, CYCLE_PADDING_MS);
+  }
+
+  @AfterClass
+  public void tearDown()
+  {
+    wikipediaStreamEventGenerator.shutdown();
+    kinesisEventWriter.shutdown();
+  }
+
+  @BeforeMethod
+  public void before()
+  {
+    streamName = "kinesis_index_test_" + UUID.randomUUID();
+    String datasource = "kinesis_indexing_service_test_" + UUID.randomUUID();
+    Map<String, String> tags = ImmutableMap.of(STREAM_EXPIRE_TAG, Long.toString(DateTimes.nowUtc().plusMinutes(30).getMillis()));
+    kinesisAdminClient.createStream(streamName, KINESIS_SHARD_COUNT, tags);
+    ITRetryUtil.retryUntil(
+        () -> kinesisAdminClient.isStreamActive(streamName),
+        true,
+        10000,
+        30,
+        "Wait for stream active"
+    );
+    secondsToGenerateRemaining = TOTAL_NUMBER_OF_SECOND;
+    fullDatasourceName = datasource + config.getExtraDatasourceNameSuffix();
+    kinesisIngestionPropsTransform = spec -> {
+      try {
+        spec = StringUtils.replace(
+            spec,
+            "%%DATASOURCE%%",
+            fullDatasourceName
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%STREAM_TYPE%%",
+            "kinesis"
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TOPIC_KEY%%",
+            "stream"
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TOPIC_VALUE%%",
+            streamName
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%USE_EARLIEST_KEY%%",
+            "useEarliestSequenceNumber"
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%STREAM_PROPERTIES_KEY%%",
+            "endpoint"
+        );
+        return StringUtils.replace(
+            spec,
+            "%%STREAM_PROPERTIES_VALUE%%",
+            jsonMapper.writeValueAsString(config.getStreamEndpoint())
+        );
+      }
+      catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    };
+    kinesisQueryPropsTransform = spec -> {
+      try {
+        spec = StringUtils.replace(
+            spec,
+            "%%DATASOURCE%%",
+            fullDatasourceName
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%",
+            TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMEBOUNDARY_RESPONSE_MAXTIME%%",
+            TIMESTAMP_FMT.print(FIRST_EVENT_TIME.plusSeconds(TOTAL_NUMBER_OF_SECOND - 1))
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMEBOUNDARY_RESPONSE_MINTIME%%",
+            TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMESERIES_QUERY_START%%",
+            INTERVAL_FMT.print(FIRST_EVENT_TIME)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMESERIES_QUERY_END%%",
+            INTERVAL_FMT.print(FIRST_EVENT_TIME.plusSeconds(TOTAL_NUMBER_OF_SECOND - 1).plusMinutes(2))
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMESERIES_RESPONSE_TIMESTAMP%%",
+            TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMESERIES_ADDED%%",
+            Long.toString(getSumOfEventSequence(EVENTS_PER_SECOND) * TOTAL_NUMBER_OF_SECOND)
+        );
+        return StringUtils.replace(
+            spec,
+            "%%TIMESERIES_NUMEVENTS%%",
+            Integer.toString(EVENTS_PER_SECOND * TOTAL_NUMBER_OF_SECOND)
+        );
+      }
+      catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    };
+  }
+
+  @AfterMethod
+  public void teardown()
+  {
+    try {
+      kinesisEventWriter.flush();
+      indexer.shutdownSupervisor(supervisorId);
+      unloader(fullDatasourceName);
+      kinesisAdminClient.deleteStream(streamName);
+    }
+    catch (Exception e) {
+      // Best effort cleanup
+    }
+  }
+
+  @Test
+  public void testKineseIndexDataWithLegacyParserStableState() throws Exception
 
 Review comment:
   Done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm3 commented on a change in pull request #9576: Add Integration Test for functionality of kinesis ingestion

Posted by GitBox <gi...@apache.org>.
maytasm3 commented on a change in pull request #9576: Add Integration Test for functionality of kinesis ingestion
URL: https://github.com/apache/druid/pull/9576#discussion_r401980200
 
 

 ##########
 File path: integration-tests/docker/tls/generate-server-certs-and-keystores.sh
 ##########
 @@ -17,6 +17,12 @@
 
 cd /tls
 
+FILE_CHECK_IF_RAN=/tls/server.key
+if [ -f "$FILE_CHECK_IF_RAN" ]; then
+  echo "Script was ran already. Skip running again."
 
 Review comment:
   Done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] suneet-s commented on a change in pull request #9576: Add Integration Test for functionality of kinesis ingestion

Posted by GitBox <gi...@apache.org>.
suneet-s commented on a change in pull request #9576: Add Integration Test for functionality of kinesis ingestion
URL: https://github.com/apache/druid/pull/9576#discussion_r401061901
 
 

 ##########
 File path: integration-tests/README.md
 ##########
 @@ -68,6 +68,21 @@ can either be 8 or 11.
 Druid's configuration (using Docker) can be overrided by providing -Doverride.config.path=<PATH_TO_FILE>. 
 The file must contain one property per line, the key must start with `druid_` and the format should be snake case. 
 
+## Debugging Druid while running tests
 
 Review comment:
   🎉 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jon-wei commented on a change in pull request #9576: Add Integration Test for functionality of kinesis ingestion

Posted by GitBox <gi...@apache.org>.
jon-wei commented on a change in pull request #9576: Add Integration Test for functionality of kinesis ingestion
URL: https://github.com/apache/druid/pull/9576#discussion_r401335755
 
 

 ##########
 File path: integration-tests/src/main/java/org/apache/druid/testing/utils/WikipediaStreamEventStreamGenerator.java
 ##########
 @@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class WikipediaStreamEventStreamGenerator extends SyntheticStreamGenerator
+{
+  private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'Z'");
+
+  public WikipediaStreamEventStreamGenerator(int eventsPerSeconds, long cyclePaddingMs)
+  {
+    super(eventsPerSeconds, cyclePaddingMs);
+  }
+
+  @Override
+  Object getEvent(int i, DateTime timestamp)
+  {
+    Map<String, Object> event = new HashMap<>();
+    event.put("page", "Gypsy Danger");
+    event.put("language", "en");
+    event.put("user", "nuclear");
+    event.put("unpatrolled", "true");
+    event.put("newPage", "true");
+    event.put("robot", "false");
+    event.put("anonymous", "false");
+    event.put("namespace", "article");
+    event.put("continent", "North Americ");
 
 Review comment:
   North Americ -> North America

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm3 commented on a change in pull request #9576: Add Integration Test for functionality of kinesis ingestion

Posted by GitBox <gi...@apache.org>.
maytasm3 commented on a change in pull request #9576: Add Integration Test for functionality of kinesis ingestion
URL: https://github.com/apache/druid/pull/9576#discussion_r401981221
 
 

 ##########
 File path: integration-tests/src/main/java/org/apache/druid/testing/utils/WikipediaStreamEventStreamGenerator.java
 ##########
 @@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class WikipediaStreamEventStreamGenerator extends SyntheticStreamGenerator
+{
+  private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'Z'");
+
+  public WikipediaStreamEventStreamGenerator(int eventsPerSeconds, long cyclePaddingMs)
+  {
+    super(eventsPerSeconds, cyclePaddingMs);
+  }
+
+  @Override
+  Object getEvent(int i, DateTime timestamp)
+  {
+    Map<String, Object> event = new HashMap<>();
+    event.put("page", "Gypsy Danger");
+    event.put("language", "en");
+    event.put("user", "nuclear");
+    event.put("unpatrolled", "true");
+    event.put("newPage", "true");
+    event.put("robot", "false");
+    event.put("anonymous", "false");
+    event.put("namespace", "article");
+    event.put("continent", "North Americ");
 
 Review comment:
   Done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jon-wei commented on a change in pull request #9576: Add Integration Test for functionality of kinesis ingestion

Posted by GitBox <gi...@apache.org>.
jon-wei commented on a change in pull request #9576: Add Integration Test for functionality of kinesis ingestion
URL: https://github.com/apache/druid/pull/9576#discussion_r401339845
 
 

 ##########
 File path: integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceTest.java
 ##########
 @@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.indexer;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.testing.utils.DruidClusterAdminClient;
+import org.apache.druid.testing.utils.ITRetryUtil;
+import org.apache.druid.testing.utils.KinesisAdminClient;
+import org.apache.druid.testing.utils.KinesisEventWriter;
+import org.apache.druid.testing.utils.WikipediaStreamEventStreamGenerator;
+import org.apache.druid.tests.TestNGGroup;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import java.io.Closeable;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
+
+@Test(groups = TestNGGroup.KINESIS_INDEX)
+@Guice(moduleFactory = DruidTestModuleFactory.class)
+public class ITKinesisIndexingServiceTest extends AbstractITBatchIndexTest
+{
+  private static final Logger LOG = new Logger(AbstractKafkaIndexerTest.class);
+  private static final int KINESIS_SHARD_COUNT = 2;
+  private static final String STREAM_EXPIRE_TAG = "druid-ci-expire-after";
 
 Review comment:
   Ah, nevermind, just saw that part of the description

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jon-wei commented on a change in pull request #9576: Add Integration Test for functionality of kinesis ingestion

Posted by GitBox <gi...@apache.org>.
jon-wei commented on a change in pull request #9576: Add Integration Test for functionality of kinesis ingestion
URL: https://github.com/apache/druid/pull/9576#discussion_r401338975
 
 

 ##########
 File path: integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceTest.java
 ##########
 @@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.indexer;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.testing.utils.DruidClusterAdminClient;
+import org.apache.druid.testing.utils.ITRetryUtil;
+import org.apache.druid.testing.utils.KinesisAdminClient;
+import org.apache.druid.testing.utils.KinesisEventWriter;
+import org.apache.druid.testing.utils.WikipediaStreamEventStreamGenerator;
+import org.apache.druid.tests.TestNGGroup;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import java.io.Closeable;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
+
+@Test(groups = TestNGGroup.KINESIS_INDEX)
+@Guice(moduleFactory = DruidTestModuleFactory.class)
+public class ITKinesisIndexingServiceTest extends AbstractITBatchIndexTest
+{
+  private static final Logger LOG = new Logger(AbstractKafkaIndexerTest.class);
+  private static final int KINESIS_SHARD_COUNT = 2;
+  private static final String STREAM_EXPIRE_TAG = "druid-ci-expire-after";
+  private static final long WAIT_TIME_MILLIS = 3 * 60 * 1000L;
+  private static final DateTime FIRST_EVENT_TIME = DateTimes.of(1994, 4, 29, 1, 0);
+  private static final String INDEXER_FILE_LEGACY_PARSER = "/indexer/stream_supervisor_spec_legacy_parser.json";
+  private static final String INDEXER_FILE_INPUT_FORMAT = "/indexer/stream_supervisor_spec_input_format.json";
+  private static final String QUERIES_FILE = "/indexer/stream_index_queries.json";
+  // format for the querying interval
+  private static final DateTimeFormatter INTERVAL_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:'00Z'");
+  // format for the expected timestamp in a query response
+  private static final DateTimeFormatter TIMESTAMP_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'.000Z'");
+  private static final int EVENTS_PER_SECOND = 6;
+  private static final long CYCLE_PADDING_MS = 100;
+  private static final int TOTAL_NUMBER_OF_SECOND = 10;
+
+  @Inject
+  private DruidClusterAdminClient druidClusterAdminClient;
+
+  private String streamName;
+  private String fullDatasourceName;
+  private KinesisAdminClient kinesisAdminClient;
+  private KinesisEventWriter kinesisEventWriter;
+  private WikipediaStreamEventStreamGenerator wikipediaStreamEventGenerator;
+  private Function<String, String> kinesisIngestionPropsTransform;
+  private Function<String, String> kinesisQueryPropsTransform;
+  private String supervisorId;
+  private int secondsToGenerateRemaining;
+
+  @BeforeClass
+  public void beforeClass() throws Exception
+  {
+    kinesisAdminClient = new KinesisAdminClient(config.getStreamEndpoint());
+    kinesisEventWriter = new KinesisEventWriter(config.getStreamEndpoint(), false);
+    wikipediaStreamEventGenerator = new WikipediaStreamEventStreamGenerator(EVENTS_PER_SECOND, CYCLE_PADDING_MS);
+  }
+
+  @AfterClass
+  public void tearDown()
+  {
+    wikipediaStreamEventGenerator.shutdown();
+    kinesisEventWriter.shutdown();
+  }
+
+  @BeforeMethod
+  public void before()
+  {
+    streamName = "kinesis_index_test_" + UUID.randomUUID();
+    String datasource = "kinesis_indexing_service_test_" + UUID.randomUUID();
+    Map<String, String> tags = ImmutableMap.of(STREAM_EXPIRE_TAG, Long.toString(DateTimes.nowUtc().plusMinutes(30).getMillis()));
+    kinesisAdminClient.createStream(streamName, KINESIS_SHARD_COUNT, tags);
+    ITRetryUtil.retryUntil(
+        () -> kinesisAdminClient.isStreamActive(streamName),
+        true,
+        10000,
+        30,
+        "Wait for stream active"
+    );
+    secondsToGenerateRemaining = TOTAL_NUMBER_OF_SECOND;
+    fullDatasourceName = datasource + config.getExtraDatasourceNameSuffix();
+    kinesisIngestionPropsTransform = spec -> {
+      try {
+        spec = StringUtils.replace(
+            spec,
+            "%%DATASOURCE%%",
+            fullDatasourceName
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%STREAM_TYPE%%",
+            "kinesis"
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TOPIC_KEY%%",
+            "stream"
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TOPIC_VALUE%%",
+            streamName
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%USE_EARLIEST_KEY%%",
+            "useEarliestSequenceNumber"
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%STREAM_PROPERTIES_KEY%%",
+            "endpoint"
+        );
+        return StringUtils.replace(
+            spec,
+            "%%STREAM_PROPERTIES_VALUE%%",
+            jsonMapper.writeValueAsString(config.getStreamEndpoint())
+        );
+      }
+      catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    };
+    kinesisQueryPropsTransform = spec -> {
+      try {
+        spec = StringUtils.replace(
+            spec,
+            "%%DATASOURCE%%",
+            fullDatasourceName
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%",
+            TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMEBOUNDARY_RESPONSE_MAXTIME%%",
+            TIMESTAMP_FMT.print(FIRST_EVENT_TIME.plusSeconds(TOTAL_NUMBER_OF_SECOND - 1))
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMEBOUNDARY_RESPONSE_MINTIME%%",
+            TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMESERIES_QUERY_START%%",
+            INTERVAL_FMT.print(FIRST_EVENT_TIME)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMESERIES_QUERY_END%%",
+            INTERVAL_FMT.print(FIRST_EVENT_TIME.plusSeconds(TOTAL_NUMBER_OF_SECOND - 1).plusMinutes(2))
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMESERIES_RESPONSE_TIMESTAMP%%",
+            TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMESERIES_ADDED%%",
+            Long.toString(getSumOfEventSequence(EVENTS_PER_SECOND) * TOTAL_NUMBER_OF_SECOND)
+        );
+        return StringUtils.replace(
+            spec,
+            "%%TIMESERIES_NUMEVENTS%%",
+            Integer.toString(EVENTS_PER_SECOND * TOTAL_NUMBER_OF_SECOND)
+        );
+      }
+      catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    };
+  }
+
+  @AfterMethod
+  public void teardown()
+  {
+    try {
+      kinesisEventWriter.flush();
+      indexer.shutdownSupervisor(supervisorId);
+      unloader(fullDatasourceName);
+      kinesisAdminClient.deleteStream(streamName);
+    }
+    catch (Exception e) {
+      // Best effort cleanup
+    }
+  }
+
+  @Test
+  public void testKineseIndexDataWithLegacyParserStableState() throws Exception
+  {
+    try (
+        final Closeable ignored1 = unloader(fullDatasourceName)
+    ) {
+      final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_LEGACY_PARSER));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      supervisorId = indexer.submitSupervisor(taskSpec);
+      LOG.info("Submitted supervisor");
+      // Start Kinesis data generator
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, TOTAL_NUMBER_OF_SECOND, FIRST_EVENT_TIME);
+      verifyIngestedData(supervisorId);
+    }
+  }
+
+  @Test
+  public void testKineseIndexDataWithInputFormatStableState() throws Exception
+  {
+    try (
+        final Closeable ignored1 = unloader(fullDatasourceName)
+    ) {
+      final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      supervisorId = indexer.submitSupervisor(taskSpec);
+      LOG.info("Submitted supervisor");
+      // Start Kinesis data generator
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, TOTAL_NUMBER_OF_SECOND, FIRST_EVENT_TIME);
+      verifyIngestedData(supervisorId);
+    }
+  }
+
+  @Test
+  public void testKineseIndexDataWithLosingCoordinator() throws Exception
+  {
+    testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartCoordinatorContainer(), () -> druidClusterAdminClient.waitUntilCoordinatorReady());
+  }
+
+  @Test
+  public void testKineseIndexDataWithLosingOverlord() throws Exception
+  {
+    testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartIndexerContainer(), () -> druidClusterAdminClient.waitUntilIndexerReady());
+  }
+
+  @Test
+  public void testKineseIndexDataWithLosingHistorical() throws Exception
+  {
+    testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartHistoricalContainer(), () -> druidClusterAdminClient.waitUntilHistoricalReady());
+  }
+
+  @Test
+  public void testKineseIndexDataWithStartStopSupervisor() throws Exception
+  {
+    try (
+        final Closeable ignored1 = unloader(fullDatasourceName)
+    ) {
+      final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      supervisorId = indexer.submitSupervisor(taskSpec);
+      LOG.info("Submitted supervisor");
+      // Start generating half of the data
+      int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 2;
+      secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound;
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME);
+      // Suspend the supervisor
+      indexer.suspendSupervisor(supervisorId);
+      // Start generating remainning half of the data
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound));
+      // Resume the supervisor
+      indexer.resumeSupervisor(supervisorId);
+      // Verify supervisor is healthy after suspension
+      ITRetryUtil.retryUntil(
+          () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(supervisorId)),
+          true,
+          10000,
+          30,
+          "Waiting for supervisor to be healthy"
+      );
+      // Verify that supervisor can catch up with the stream
+      verifyIngestedData(supervisorId);
+    }
+  }
+
+  @Test
+  public void testKineseIndexDataWithKinesisReshardSplit() throws Exception
+  {
+    // Reshard the supervisor by split from KINESIS_SHARD_COUNT to KINESIS_SHARD_COUNT * 2
+    testIndexWithKinesisReshardHelper(KINESIS_SHARD_COUNT * 2);
+  }
+
+  @Test
+  public void testKineseIndexDataWithKinesisReshardMerge() throws Exception
+  {
+    // Reshard the supervisor by split from KINESIS_SHARD_COUNT to KINESIS_SHARD_COUNT / 2
+    testIndexWithKinesisReshardHelper(KINESIS_SHARD_COUNT / 2);
+  }
+
+  private void testIndexWithLosingNodeHelper(Runnable restartRunnable, Runnable waitForReadyRunnable) throws Exception
+  {
+    try (
+        final Closeable ignored1 = unloader(fullDatasourceName)
+    ) {
+      final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      supervisorId = indexer.submitSupervisor(taskSpec);
+      LOG.info("Submitted supervisor");
+      // Start generating one third of the data (before restarting)
+      int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 3;
+      secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound;
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME);
+      // Restart Druid process
+      LOG.info("Restarting Druid process");
+      restartRunnable.run();
+      LOG.info("Restarted Druid process");
+      // Start generating one third of the data (while restarting)
+      int secondsToGenerateSecondRound = TOTAL_NUMBER_OF_SECOND / 3;
+      secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateSecondRound;
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateSecondRound, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound));
+      // Wait for Druid process to be available
+      LOG.info("Waiting for Druid process to be available");
+      waitForReadyRunnable.run();
+      LOG.info("Druid process is now available");
+      // Start generating remainding data (after restarting)
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound + secondsToGenerateSecondRound));
+      // Verify supervisor is healthy
+      ITRetryUtil.retryUntil(
+          () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(supervisorId)),
+          true,
+          10000,
+          30,
+          "Waiting for supervisor to be healthy"
+      );
+      // Verify that supervisor ingested all data
+      verifyIngestedData(supervisorId);
+    }
+  }
+
+  private void testIndexWithKinesisReshardHelper(int newShardCount) throws Exception
+  {
+    try (
+        final Closeable ignored1 = unloader(fullDatasourceName)
+    ) {
+      final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      supervisorId = indexer.submitSupervisor(taskSpec);
+      LOG.info("Submitted supervisor");
+      // Start generating one third of the data (before resharding)
+      int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 3;
+      secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound;
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME);
+      // Reshard the supervisor by split from KINESIS_SHARD_COUNT to newShardCount
+      kinesisAdminClient.updateShardCount(streamName, newShardCount);
+      // Start generating one third of the data (while resharding)
+      int secondsToGenerateSecondRound = TOTAL_NUMBER_OF_SECOND / 3;
+      secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateSecondRound;
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateSecondRound, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound));
+      // Wait for kinesis stream to finish resharding
+      ITRetryUtil.retryUntil(
+          () -> kinesisAdminClient.isStreamActive(streamName),
+          true,
+          10000,
+          30,
+          "Waiting for Kinesis stream to finish resharding"
+      );
+      // Start generating remainding data (after resharding)
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound + secondsToGenerateSecondRound));
+      // Verify supervisor is healthy after suspension
 
 Review comment:
   Suggest having a supervisor healthy check as well before the resharding occurs, so the resharding occurs while the supervisor is running

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jon-wei commented on a change in pull request #9576: Add Integration Test for functionality of kinesis ingestion

Posted by GitBox <gi...@apache.org>.
jon-wei commented on a change in pull request #9576: Add Integration Test for functionality of kinesis ingestion
URL: https://github.com/apache/druid/pull/9576#discussion_r401336626
 
 

 ##########
 File path: integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceTest.java
 ##########
 @@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.indexer;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.testing.utils.DruidClusterAdminClient;
+import org.apache.druid.testing.utils.ITRetryUtil;
+import org.apache.druid.testing.utils.KinesisAdminClient;
+import org.apache.druid.testing.utils.KinesisEventWriter;
+import org.apache.druid.testing.utils.WikipediaStreamEventStreamGenerator;
+import org.apache.druid.tests.TestNGGroup;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import java.io.Closeable;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
+
+@Test(groups = TestNGGroup.KINESIS_INDEX)
+@Guice(moduleFactory = DruidTestModuleFactory.class)
+public class ITKinesisIndexingServiceTest extends AbstractITBatchIndexTest
+{
+  private static final Logger LOG = new Logger(AbstractKafkaIndexerTest.class);
+  private static final int KINESIS_SHARD_COUNT = 2;
+  private static final String STREAM_EXPIRE_TAG = "druid-ci-expire-after";
 
 Review comment:
   How is the expire tag used?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jon-wei commented on a change in pull request #9576: Add Integration Test for functionality of kinesis ingestion

Posted by GitBox <gi...@apache.org>.
jon-wei commented on a change in pull request #9576: Add Integration Test for functionality of kinesis ingestion
URL: https://github.com/apache/druid/pull/9576#discussion_r401337807
 
 

 ##########
 File path: integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceTest.java
 ##########
 @@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.indexer;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.testing.utils.DruidClusterAdminClient;
+import org.apache.druid.testing.utils.ITRetryUtil;
+import org.apache.druid.testing.utils.KinesisAdminClient;
+import org.apache.druid.testing.utils.KinesisEventWriter;
+import org.apache.druid.testing.utils.WikipediaStreamEventStreamGenerator;
+import org.apache.druid.tests.TestNGGroup;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import java.io.Closeable;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
+
+@Test(groups = TestNGGroup.KINESIS_INDEX)
+@Guice(moduleFactory = DruidTestModuleFactory.class)
+public class ITKinesisIndexingServiceTest extends AbstractITBatchIndexTest
+{
+  private static final Logger LOG = new Logger(AbstractKafkaIndexerTest.class);
+  private static final int KINESIS_SHARD_COUNT = 2;
+  private static final String STREAM_EXPIRE_TAG = "druid-ci-expire-after";
+  private static final long WAIT_TIME_MILLIS = 3 * 60 * 1000L;
+  private static final DateTime FIRST_EVENT_TIME = DateTimes.of(1994, 4, 29, 1, 0);
+  private static final String INDEXER_FILE_LEGACY_PARSER = "/indexer/stream_supervisor_spec_legacy_parser.json";
+  private static final String INDEXER_FILE_INPUT_FORMAT = "/indexer/stream_supervisor_spec_input_format.json";
+  private static final String QUERIES_FILE = "/indexer/stream_index_queries.json";
+  // format for the querying interval
+  private static final DateTimeFormatter INTERVAL_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:'00Z'");
+  // format for the expected timestamp in a query response
+  private static final DateTimeFormatter TIMESTAMP_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'.000Z'");
+  private static final int EVENTS_PER_SECOND = 6;
+  private static final long CYCLE_PADDING_MS = 100;
+  private static final int TOTAL_NUMBER_OF_SECOND = 10;
+
+  @Inject
+  private DruidClusterAdminClient druidClusterAdminClient;
+
+  private String streamName;
+  private String fullDatasourceName;
+  private KinesisAdminClient kinesisAdminClient;
+  private KinesisEventWriter kinesisEventWriter;
+  private WikipediaStreamEventStreamGenerator wikipediaStreamEventGenerator;
+  private Function<String, String> kinesisIngestionPropsTransform;
+  private Function<String, String> kinesisQueryPropsTransform;
+  private String supervisorId;
+  private int secondsToGenerateRemaining;
+
+  @BeforeClass
+  public void beforeClass() throws Exception
+  {
+    kinesisAdminClient = new KinesisAdminClient(config.getStreamEndpoint());
+    kinesisEventWriter = new KinesisEventWriter(config.getStreamEndpoint(), false);
+    wikipediaStreamEventGenerator = new WikipediaStreamEventStreamGenerator(EVENTS_PER_SECOND, CYCLE_PADDING_MS);
+  }
+
+  @AfterClass
+  public void tearDown()
+  {
+    wikipediaStreamEventGenerator.shutdown();
+    kinesisEventWriter.shutdown();
+  }
+
+  @BeforeMethod
+  public void before()
+  {
+    streamName = "kinesis_index_test_" + UUID.randomUUID();
+    String datasource = "kinesis_indexing_service_test_" + UUID.randomUUID();
+    Map<String, String> tags = ImmutableMap.of(STREAM_EXPIRE_TAG, Long.toString(DateTimes.nowUtc().plusMinutes(30).getMillis()));
+    kinesisAdminClient.createStream(streamName, KINESIS_SHARD_COUNT, tags);
+    ITRetryUtil.retryUntil(
+        () -> kinesisAdminClient.isStreamActive(streamName),
+        true,
+        10000,
+        30,
+        "Wait for stream active"
+    );
+    secondsToGenerateRemaining = TOTAL_NUMBER_OF_SECOND;
+    fullDatasourceName = datasource + config.getExtraDatasourceNameSuffix();
+    kinesisIngestionPropsTransform = spec -> {
+      try {
+        spec = StringUtils.replace(
+            spec,
+            "%%DATASOURCE%%",
+            fullDatasourceName
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%STREAM_TYPE%%",
+            "kinesis"
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TOPIC_KEY%%",
+            "stream"
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TOPIC_VALUE%%",
+            streamName
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%USE_EARLIEST_KEY%%",
+            "useEarliestSequenceNumber"
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%STREAM_PROPERTIES_KEY%%",
+            "endpoint"
+        );
+        return StringUtils.replace(
+            spec,
+            "%%STREAM_PROPERTIES_VALUE%%",
+            jsonMapper.writeValueAsString(config.getStreamEndpoint())
+        );
+      }
+      catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    };
+    kinesisQueryPropsTransform = spec -> {
+      try {
+        spec = StringUtils.replace(
+            spec,
+            "%%DATASOURCE%%",
+            fullDatasourceName
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%",
+            TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMEBOUNDARY_RESPONSE_MAXTIME%%",
+            TIMESTAMP_FMT.print(FIRST_EVENT_TIME.plusSeconds(TOTAL_NUMBER_OF_SECOND - 1))
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMEBOUNDARY_RESPONSE_MINTIME%%",
+            TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMESERIES_QUERY_START%%",
+            INTERVAL_FMT.print(FIRST_EVENT_TIME)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMESERIES_QUERY_END%%",
+            INTERVAL_FMT.print(FIRST_EVENT_TIME.plusSeconds(TOTAL_NUMBER_OF_SECOND - 1).plusMinutes(2))
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMESERIES_RESPONSE_TIMESTAMP%%",
+            TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMESERIES_ADDED%%",
+            Long.toString(getSumOfEventSequence(EVENTS_PER_SECOND) * TOTAL_NUMBER_OF_SECOND)
+        );
+        return StringUtils.replace(
+            spec,
+            "%%TIMESERIES_NUMEVENTS%%",
+            Integer.toString(EVENTS_PER_SECOND * TOTAL_NUMBER_OF_SECOND)
+        );
+      }
+      catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    };
+  }
+
+  @AfterMethod
+  public void teardown()
+  {
+    try {
+      kinesisEventWriter.flush();
+      indexer.shutdownSupervisor(supervisorId);
+      unloader(fullDatasourceName);
+      kinesisAdminClient.deleteStream(streamName);
+    }
+    catch (Exception e) {
+      // Best effort cleanup
+    }
+  }
+
+  @Test
+  public void testKineseIndexDataWithLegacyParserStableState() throws Exception
+  {
+    try (
+        final Closeable ignored1 = unloader(fullDatasourceName)
+    ) {
+      final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_LEGACY_PARSER));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      supervisorId = indexer.submitSupervisor(taskSpec);
+      LOG.info("Submitted supervisor");
+      // Start Kinesis data generator
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, TOTAL_NUMBER_OF_SECOND, FIRST_EVENT_TIME);
+      verifyIngestedData(supervisorId);
+    }
+  }
+
+  @Test
+  public void testKineseIndexDataWithInputFormatStableState() throws Exception
+  {
+    try (
+        final Closeable ignored1 = unloader(fullDatasourceName)
+    ) {
+      final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      supervisorId = indexer.submitSupervisor(taskSpec);
+      LOG.info("Submitted supervisor");
+      // Start Kinesis data generator
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, TOTAL_NUMBER_OF_SECOND, FIRST_EVENT_TIME);
+      verifyIngestedData(supervisorId);
+    }
+  }
+
+  @Test
+  public void testKineseIndexDataWithLosingCoordinator() throws Exception
+  {
+    testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartCoordinatorContainer(), () -> druidClusterAdminClient.waitUntilCoordinatorReady());
+  }
+
+  @Test
+  public void testKineseIndexDataWithLosingOverlord() throws Exception
+  {
+    testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartIndexerContainer(), () -> druidClusterAdminClient.waitUntilIndexerReady());
+  }
+
+  @Test
+  public void testKineseIndexDataWithLosingHistorical() throws Exception
+  {
+    testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartHistoricalContainer(), () -> druidClusterAdminClient.waitUntilHistoricalReady());
+  }
+
+  @Test
+  public void testKineseIndexDataWithStartStopSupervisor() throws Exception
+  {
+    try (
+        final Closeable ignored1 = unloader(fullDatasourceName)
+    ) {
+      final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      supervisorId = indexer.submitSupervisor(taskSpec);
+      LOG.info("Submitted supervisor");
+      // Start generating half of the data
+      int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 2;
+      secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound;
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME);
+      // Suspend the supervisor
+      indexer.suspendSupervisor(supervisorId);
+      // Start generating remainning half of the data
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound));
+      // Resume the supervisor
+      indexer.resumeSupervisor(supervisorId);
+      // Verify supervisor is healthy after suspension
+      ITRetryUtil.retryUntil(
+          () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(supervisorId)),
+          true,
+          10000,
+          30,
+          "Waiting for supervisor to be healthy"
+      );
+      // Verify that supervisor can catch up with the stream
+      verifyIngestedData(supervisorId);
+    }
+  }
+
+  @Test
+  public void testKineseIndexDataWithKinesisReshardSplit() throws Exception
+  {
+    // Reshard the supervisor by split from KINESIS_SHARD_COUNT to KINESIS_SHARD_COUNT * 2
+    testIndexWithKinesisReshardHelper(KINESIS_SHARD_COUNT * 2);
+  }
+
+  @Test
+  public void testKineseIndexDataWithKinesisReshardMerge() throws Exception
+  {
+    // Reshard the supervisor by split from KINESIS_SHARD_COUNT to KINESIS_SHARD_COUNT / 2
+    testIndexWithKinesisReshardHelper(KINESIS_SHARD_COUNT / 2);
+  }
+
+  private void testIndexWithLosingNodeHelper(Runnable restartRunnable, Runnable waitForReadyRunnable) throws Exception
+  {
+    try (
+        final Closeable ignored1 = unloader(fullDatasourceName)
+    ) {
+      final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      supervisorId = indexer.submitSupervisor(taskSpec);
+      LOG.info("Submitted supervisor");
+      // Start generating one third of the data (before restarting)
+      int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 3;
+      secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound;
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME);
+      // Restart Druid process
+      LOG.info("Restarting Druid process");
+      restartRunnable.run();
+      LOG.info("Restarted Druid process");
+      // Start generating one third of the data (while restarting)
+      int secondsToGenerateSecondRound = TOTAL_NUMBER_OF_SECOND / 3;
+      secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateSecondRound;
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateSecondRound, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound));
+      // Wait for Druid process to be available
+      LOG.info("Waiting for Druid process to be available");
+      waitForReadyRunnable.run();
+      LOG.info("Druid process is now available");
+      // Start generating remainding data (after restarting)
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound + secondsToGenerateSecondRound));
+      // Verify supervisor is healthy
+      ITRetryUtil.retryUntil(
+          () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(supervisorId)),
+          true,
+          10000,
+          30,
+          "Waiting for supervisor to be healthy"
+      );
+      // Verify that supervisor ingested all data
+      verifyIngestedData(supervisorId);
+    }
+  }
+
+  private void testIndexWithKinesisReshardHelper(int newShardCount) throws Exception
+  {
+    try (
+        final Closeable ignored1 = unloader(fullDatasourceName)
+    ) {
+      final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      supervisorId = indexer.submitSupervisor(taskSpec);
+      LOG.info("Submitted supervisor");
+      // Start generating one third of the data (before resharding)
+      int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 3;
+      secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound;
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME);
+      // Reshard the supervisor by split from KINESIS_SHARD_COUNT to newShardCount
+      kinesisAdminClient.updateShardCount(streamName, newShardCount);
+      // Start generating one third of the data (while resharding)
+      int secondsToGenerateSecondRound = TOTAL_NUMBER_OF_SECOND / 3;
+      secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateSecondRound;
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateSecondRound, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound));
+      // Wait for kinesis stream to finish resharding
 
 Review comment:
   For the resharding test, I think you'll want to have longer timers for the event generation, with only 3s here I think it's maybe possible that AWS doesn't actually begin the resharding until you've already finished this second phase. Maybe 30s is better.
   
   Or maybe it could check for the stream status becoming UPDATING and start the second phase then.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm3 commented on a change in pull request #9576: Add Integration Test for functionality of kinesis ingestion

Posted by GitBox <gi...@apache.org>.
maytasm3 commented on a change in pull request #9576: Add Integration Test for functionality of kinesis ingestion
URL: https://github.com/apache/druid/pull/9576#discussion_r401981120
 
 

 ##########
 File path: integration-tests/README.md
 ##########
 @@ -107,6 +122,7 @@ Then run the tests using a command similar to:
   # Run all integration tests that have been verified to work against a quickstart cluster.
   mvn verify -P int-tests-config-file -Dgroups=quickstart-compatible
 ```
+>>>>>>> upstream/master
 
 Review comment:
   Oops. Good catch. Removed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm3 commented on a change in pull request #9576: Add Integration Test for functionality of kinesis ingestion

Posted by GitBox <gi...@apache.org>.
maytasm3 commented on a change in pull request #9576: Add Integration Test for functionality of kinesis ingestion
URL: https://github.com/apache/druid/pull/9576#discussion_r401983350
 
 

 ##########
 File path: integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceTest.java
 ##########
 @@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.indexer;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.testing.utils.DruidClusterAdminClient;
+import org.apache.druid.testing.utils.ITRetryUtil;
+import org.apache.druid.testing.utils.KinesisAdminClient;
+import org.apache.druid.testing.utils.KinesisEventWriter;
+import org.apache.druid.testing.utils.WikipediaStreamEventStreamGenerator;
+import org.apache.druid.tests.TestNGGroup;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import java.io.Closeable;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
+
+@Test(groups = TestNGGroup.KINESIS_INDEX)
+@Guice(moduleFactory = DruidTestModuleFactory.class)
+public class ITKinesisIndexingServiceTest extends AbstractITBatchIndexTest
+{
+  private static final Logger LOG = new Logger(AbstractKafkaIndexerTest.class);
+  private static final int KINESIS_SHARD_COUNT = 2;
+  private static final String STREAM_EXPIRE_TAG = "druid-ci-expire-after";
+  private static final long WAIT_TIME_MILLIS = 3 * 60 * 1000L;
+  private static final DateTime FIRST_EVENT_TIME = DateTimes.of(1994, 4, 29, 1, 0);
+  private static final String INDEXER_FILE_LEGACY_PARSER = "/indexer/stream_supervisor_spec_legacy_parser.json";
+  private static final String INDEXER_FILE_INPUT_FORMAT = "/indexer/stream_supervisor_spec_input_format.json";
+  private static final String QUERIES_FILE = "/indexer/stream_index_queries.json";
+  // format for the querying interval
+  private static final DateTimeFormatter INTERVAL_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:'00Z'");
+  // format for the expected timestamp in a query response
+  private static final DateTimeFormatter TIMESTAMP_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'.000Z'");
+  private static final int EVENTS_PER_SECOND = 6;
+  private static final long CYCLE_PADDING_MS = 100;
+  private static final int TOTAL_NUMBER_OF_SECOND = 10;
+
+  @Inject
+  private DruidClusterAdminClient druidClusterAdminClient;
+
+  private String streamName;
+  private String fullDatasourceName;
+  private KinesisAdminClient kinesisAdminClient;
+  private KinesisEventWriter kinesisEventWriter;
+  private WikipediaStreamEventStreamGenerator wikipediaStreamEventGenerator;
+  private Function<String, String> kinesisIngestionPropsTransform;
+  private Function<String, String> kinesisQueryPropsTransform;
+  private String supervisorId;
+  private int secondsToGenerateRemaining;
+
+  @BeforeClass
+  public void beforeClass() throws Exception
+  {
+    kinesisAdminClient = new KinesisAdminClient(config.getStreamEndpoint());
+    kinesisEventWriter = new KinesisEventWriter(config.getStreamEndpoint(), false);
+    wikipediaStreamEventGenerator = new WikipediaStreamEventStreamGenerator(EVENTS_PER_SECOND, CYCLE_PADDING_MS);
+  }
+
+  @AfterClass
+  public void tearDown()
+  {
+    wikipediaStreamEventGenerator.shutdown();
+    kinesisEventWriter.shutdown();
+  }
+
+  @BeforeMethod
+  public void before()
+  {
+    streamName = "kinesis_index_test_" + UUID.randomUUID();
+    String datasource = "kinesis_indexing_service_test_" + UUID.randomUUID();
+    Map<String, String> tags = ImmutableMap.of(STREAM_EXPIRE_TAG, Long.toString(DateTimes.nowUtc().plusMinutes(30).getMillis()));
+    kinesisAdminClient.createStream(streamName, KINESIS_SHARD_COUNT, tags);
+    ITRetryUtil.retryUntil(
+        () -> kinesisAdminClient.isStreamActive(streamName),
+        true,
+        10000,
+        30,
+        "Wait for stream active"
+    );
+    secondsToGenerateRemaining = TOTAL_NUMBER_OF_SECOND;
+    fullDatasourceName = datasource + config.getExtraDatasourceNameSuffix();
+    kinesisIngestionPropsTransform = spec -> {
+      try {
+        spec = StringUtils.replace(
+            spec,
+            "%%DATASOURCE%%",
+            fullDatasourceName
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%STREAM_TYPE%%",
+            "kinesis"
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TOPIC_KEY%%",
+            "stream"
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TOPIC_VALUE%%",
+            streamName
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%USE_EARLIEST_KEY%%",
+            "useEarliestSequenceNumber"
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%STREAM_PROPERTIES_KEY%%",
+            "endpoint"
+        );
+        return StringUtils.replace(
+            spec,
+            "%%STREAM_PROPERTIES_VALUE%%",
+            jsonMapper.writeValueAsString(config.getStreamEndpoint())
+        );
+      }
+      catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    };
+    kinesisQueryPropsTransform = spec -> {
+      try {
+        spec = StringUtils.replace(
+            spec,
+            "%%DATASOURCE%%",
+            fullDatasourceName
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%",
+            TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMEBOUNDARY_RESPONSE_MAXTIME%%",
+            TIMESTAMP_FMT.print(FIRST_EVENT_TIME.plusSeconds(TOTAL_NUMBER_OF_SECOND - 1))
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMEBOUNDARY_RESPONSE_MINTIME%%",
+            TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMESERIES_QUERY_START%%",
+            INTERVAL_FMT.print(FIRST_EVENT_TIME)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMESERIES_QUERY_END%%",
+            INTERVAL_FMT.print(FIRST_EVENT_TIME.plusSeconds(TOTAL_NUMBER_OF_SECOND - 1).plusMinutes(2))
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMESERIES_RESPONSE_TIMESTAMP%%",
+            TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMESERIES_ADDED%%",
+            Long.toString(getSumOfEventSequence(EVENTS_PER_SECOND) * TOTAL_NUMBER_OF_SECOND)
+        );
+        return StringUtils.replace(
+            spec,
+            "%%TIMESERIES_NUMEVENTS%%",
+            Integer.toString(EVENTS_PER_SECOND * TOTAL_NUMBER_OF_SECOND)
+        );
+      }
+      catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    };
+  }
+
+  @AfterMethod
+  public void teardown()
+  {
+    try {
+      kinesisEventWriter.flush();
+      indexer.shutdownSupervisor(supervisorId);
+      unloader(fullDatasourceName);
+      kinesisAdminClient.deleteStream(streamName);
+    }
+    catch (Exception e) {
+      // Best effort cleanup
+    }
+  }
+
+  @Test
+  public void testKineseIndexDataWithLegacyParserStableState() throws Exception
+  {
+    try (
+        final Closeable ignored1 = unloader(fullDatasourceName)
+    ) {
+      final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_LEGACY_PARSER));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      supervisorId = indexer.submitSupervisor(taskSpec);
+      LOG.info("Submitted supervisor");
+      // Start Kinesis data generator
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, TOTAL_NUMBER_OF_SECOND, FIRST_EVENT_TIME);
+      verifyIngestedData(supervisorId);
+    }
+  }
+
+  @Test
+  public void testKineseIndexDataWithInputFormatStableState() throws Exception
+  {
+    try (
+        final Closeable ignored1 = unloader(fullDatasourceName)
+    ) {
+      final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      supervisorId = indexer.submitSupervisor(taskSpec);
+      LOG.info("Submitted supervisor");
+      // Start Kinesis data generator
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, TOTAL_NUMBER_OF_SECOND, FIRST_EVENT_TIME);
+      verifyIngestedData(supervisorId);
+    }
+  }
+
+  @Test
+  public void testKineseIndexDataWithLosingCoordinator() throws Exception
+  {
+    testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartCoordinatorContainer(), () -> druidClusterAdminClient.waitUntilCoordinatorReady());
+  }
+
+  @Test
+  public void testKineseIndexDataWithLosingOverlord() throws Exception
+  {
+    testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartIndexerContainer(), () -> druidClusterAdminClient.waitUntilIndexerReady());
+  }
+
+  @Test
+  public void testKineseIndexDataWithLosingHistorical() throws Exception
+  {
+    testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartHistoricalContainer(), () -> druidClusterAdminClient.waitUntilHistoricalReady());
+  }
+
+  @Test
+  public void testKineseIndexDataWithStartStopSupervisor() throws Exception
+  {
+    try (
+        final Closeable ignored1 = unloader(fullDatasourceName)
+    ) {
+      final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      supervisorId = indexer.submitSupervisor(taskSpec);
+      LOG.info("Submitted supervisor");
+      // Start generating half of the data
+      int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 2;
+      secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound;
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME);
+      // Suspend the supervisor
+      indexer.suspendSupervisor(supervisorId);
+      // Start generating remainning half of the data
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound));
+      // Resume the supervisor
+      indexer.resumeSupervisor(supervisorId);
+      // Verify supervisor is healthy after suspension
+      ITRetryUtil.retryUntil(
+          () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(supervisorId)),
+          true,
+          10000,
+          30,
+          "Waiting for supervisor to be healthy"
+      );
+      // Verify that supervisor can catch up with the stream
+      verifyIngestedData(supervisorId);
+    }
+  }
+
+  @Test
+  public void testKineseIndexDataWithKinesisReshardSplit() throws Exception
+  {
+    // Reshard the supervisor by split from KINESIS_SHARD_COUNT to KINESIS_SHARD_COUNT * 2
+    testIndexWithKinesisReshardHelper(KINESIS_SHARD_COUNT * 2);
+  }
+
+  @Test
+  public void testKineseIndexDataWithKinesisReshardMerge() throws Exception
+  {
+    // Reshard the supervisor by split from KINESIS_SHARD_COUNT to KINESIS_SHARD_COUNT / 2
+    testIndexWithKinesisReshardHelper(KINESIS_SHARD_COUNT / 2);
+  }
+
+  private void testIndexWithLosingNodeHelper(Runnable restartRunnable, Runnable waitForReadyRunnable) throws Exception
+  {
+    try (
+        final Closeable ignored1 = unloader(fullDatasourceName)
+    ) {
+      final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      supervisorId = indexer.submitSupervisor(taskSpec);
+      LOG.info("Submitted supervisor");
+      // Start generating one third of the data (before restarting)
+      int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 3;
+      secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound;
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME);
+      // Restart Druid process
+      LOG.info("Restarting Druid process");
+      restartRunnable.run();
+      LOG.info("Restarted Druid process");
+      // Start generating one third of the data (while restarting)
+      int secondsToGenerateSecondRound = TOTAL_NUMBER_OF_SECOND / 3;
+      secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateSecondRound;
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateSecondRound, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound));
+      // Wait for Druid process to be available
+      LOG.info("Waiting for Druid process to be available");
+      waitForReadyRunnable.run();
+      LOG.info("Druid process is now available");
+      // Start generating remainding data (after restarting)
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound + secondsToGenerateSecondRound));
+      // Verify supervisor is healthy
+      ITRetryUtil.retryUntil(
+          () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(supervisorId)),
+          true,
+          10000,
+          30,
+          "Waiting for supervisor to be healthy"
+      );
+      // Verify that supervisor ingested all data
+      verifyIngestedData(supervisorId);
+    }
+  }
+
+  private void testIndexWithKinesisReshardHelper(int newShardCount) throws Exception
+  {
+    try (
+        final Closeable ignored1 = unloader(fullDatasourceName)
+    ) {
+      final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      supervisorId = indexer.submitSupervisor(taskSpec);
+      LOG.info("Submitted supervisor");
+      // Start generating one third of the data (before resharding)
+      int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 3;
+      secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound;
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME);
+      // Reshard the supervisor by split from KINESIS_SHARD_COUNT to newShardCount
+      kinesisAdminClient.updateShardCount(streamName, newShardCount);
+      // Start generating one third of the data (while resharding)
+      int secondsToGenerateSecondRound = TOTAL_NUMBER_OF_SECOND / 3;
+      secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateSecondRound;
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateSecondRound, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound));
+      // Wait for kinesis stream to finish resharding
 
 Review comment:
   Changed the logic to:
   - after issuing reshard call
   - do DescribeStream polling for an updating status or an active status with the final expected number of shards
   - begin second phase when ^ true
   - check that stream is active status (no need to check the number of shards since earlier we already check for "updating status or an active status with the final expected number of shards", hence if it is active now it was be the active after resharding)
   - begin third phase when ^ true

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm3 commented on a change in pull request #9576: Add Integration Test for functionality of kinesis ingestion

Posted by GitBox <gi...@apache.org>.
maytasm3 commented on a change in pull request #9576: Add Integration Test for functionality of kinesis ingestion
URL: https://github.com/apache/druid/pull/9576#discussion_r402009347
 
 

 ##########
 File path: integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceTest.java
 ##########
 @@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.indexer;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.testing.utils.DruidClusterAdminClient;
+import org.apache.druid.testing.utils.ITRetryUtil;
+import org.apache.druid.testing.utils.KinesisAdminClient;
+import org.apache.druid.testing.utils.KinesisEventWriter;
+import org.apache.druid.testing.utils.WikipediaStreamEventStreamGenerator;
+import org.apache.druid.tests.TestNGGroup;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import java.io.Closeable;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
+
+@Test(groups = TestNGGroup.KINESIS_INDEX)
+@Guice(moduleFactory = DruidTestModuleFactory.class)
+public class ITKinesisIndexingServiceTest extends AbstractITBatchIndexTest
+{
+  private static final Logger LOG = new Logger(AbstractKafkaIndexerTest.class);
+  private static final int KINESIS_SHARD_COUNT = 2;
+  private static final String STREAM_EXPIRE_TAG = "druid-ci-expire-after";
+  private static final long WAIT_TIME_MILLIS = 3 * 60 * 1000L;
+  private static final DateTime FIRST_EVENT_TIME = DateTimes.of(1994, 4, 29, 1, 0);
+  private static final String INDEXER_FILE_LEGACY_PARSER = "/indexer/stream_supervisor_spec_legacy_parser.json";
+  private static final String INDEXER_FILE_INPUT_FORMAT = "/indexer/stream_supervisor_spec_input_format.json";
+  private static final String QUERIES_FILE = "/indexer/stream_index_queries.json";
+  // format for the querying interval
+  private static final DateTimeFormatter INTERVAL_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:'00Z'");
+  // format for the expected timestamp in a query response
+  private static final DateTimeFormatter TIMESTAMP_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'.000Z'");
+  private static final int EVENTS_PER_SECOND = 6;
+  private static final long CYCLE_PADDING_MS = 100;
+  private static final int TOTAL_NUMBER_OF_SECOND = 10;
+
+  @Inject
+  private DruidClusterAdminClient druidClusterAdminClient;
+
+  private String streamName;
+  private String fullDatasourceName;
+  private KinesisAdminClient kinesisAdminClient;
+  private KinesisEventWriter kinesisEventWriter;
+  private WikipediaStreamEventStreamGenerator wikipediaStreamEventGenerator;
+  private Function<String, String> kinesisIngestionPropsTransform;
+  private Function<String, String> kinesisQueryPropsTransform;
+  private String supervisorId;
+  private int secondsToGenerateRemaining;
+
+  @BeforeClass
+  public void beforeClass() throws Exception
+  {
+    kinesisAdminClient = new KinesisAdminClient(config.getStreamEndpoint());
+    kinesisEventWriter = new KinesisEventWriter(config.getStreamEndpoint(), false);
+    wikipediaStreamEventGenerator = new WikipediaStreamEventStreamGenerator(EVENTS_PER_SECOND, CYCLE_PADDING_MS);
+  }
+
+  @AfterClass
+  public void tearDown()
+  {
+    wikipediaStreamEventGenerator.shutdown();
+    kinesisEventWriter.shutdown();
+  }
+
+  @BeforeMethod
+  public void before()
+  {
+    streamName = "kinesis_index_test_" + UUID.randomUUID();
+    String datasource = "kinesis_indexing_service_test_" + UUID.randomUUID();
+    Map<String, String> tags = ImmutableMap.of(STREAM_EXPIRE_TAG, Long.toString(DateTimes.nowUtc().plusMinutes(30).getMillis()));
+    kinesisAdminClient.createStream(streamName, KINESIS_SHARD_COUNT, tags);
+    ITRetryUtil.retryUntil(
+        () -> kinesisAdminClient.isStreamActive(streamName),
+        true,
+        10000,
+        30,
+        "Wait for stream active"
+    );
+    secondsToGenerateRemaining = TOTAL_NUMBER_OF_SECOND;
+    fullDatasourceName = datasource + config.getExtraDatasourceNameSuffix();
+    kinesisIngestionPropsTransform = spec -> {
+      try {
+        spec = StringUtils.replace(
+            spec,
+            "%%DATASOURCE%%",
+            fullDatasourceName
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%STREAM_TYPE%%",
+            "kinesis"
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TOPIC_KEY%%",
+            "stream"
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TOPIC_VALUE%%",
+            streamName
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%USE_EARLIEST_KEY%%",
+            "useEarliestSequenceNumber"
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%STREAM_PROPERTIES_KEY%%",
+            "endpoint"
+        );
+        return StringUtils.replace(
+            spec,
+            "%%STREAM_PROPERTIES_VALUE%%",
+            jsonMapper.writeValueAsString(config.getStreamEndpoint())
+        );
+      }
+      catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    };
+    kinesisQueryPropsTransform = spec -> {
+      try {
+        spec = StringUtils.replace(
+            spec,
+            "%%DATASOURCE%%",
+            fullDatasourceName
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%",
+            TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMEBOUNDARY_RESPONSE_MAXTIME%%",
+            TIMESTAMP_FMT.print(FIRST_EVENT_TIME.plusSeconds(TOTAL_NUMBER_OF_SECOND - 1))
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMEBOUNDARY_RESPONSE_MINTIME%%",
+            TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMESERIES_QUERY_START%%",
+            INTERVAL_FMT.print(FIRST_EVENT_TIME)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMESERIES_QUERY_END%%",
+            INTERVAL_FMT.print(FIRST_EVENT_TIME.plusSeconds(TOTAL_NUMBER_OF_SECOND - 1).plusMinutes(2))
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMESERIES_RESPONSE_TIMESTAMP%%",
+            TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMESERIES_ADDED%%",
+            Long.toString(getSumOfEventSequence(EVENTS_PER_SECOND) * TOTAL_NUMBER_OF_SECOND)
+        );
+        return StringUtils.replace(
+            spec,
+            "%%TIMESERIES_NUMEVENTS%%",
+            Integer.toString(EVENTS_PER_SECOND * TOTAL_NUMBER_OF_SECOND)
+        );
+      }
+      catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    };
+  }
+
+  @AfterMethod
+  public void teardown()
+  {
+    try {
+      kinesisEventWriter.flush();
+      indexer.shutdownSupervisor(supervisorId);
+      unloader(fullDatasourceName);
+      kinesisAdminClient.deleteStream(streamName);
+    }
+    catch (Exception e) {
+      // Best effort cleanup
+    }
+  }
+
+  @Test
+  public void testKineseIndexDataWithLegacyParserStableState() throws Exception
+  {
+    try (
+        final Closeable ignored1 = unloader(fullDatasourceName)
+    ) {
+      final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_LEGACY_PARSER));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      supervisorId = indexer.submitSupervisor(taskSpec);
+      LOG.info("Submitted supervisor");
+      // Start Kinesis data generator
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, TOTAL_NUMBER_OF_SECOND, FIRST_EVENT_TIME);
+      verifyIngestedData(supervisorId);
+    }
+  }
+
+  @Test
+  public void testKineseIndexDataWithInputFormatStableState() throws Exception
+  {
+    try (
+        final Closeable ignored1 = unloader(fullDatasourceName)
+    ) {
+      final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      supervisorId = indexer.submitSupervisor(taskSpec);
+      LOG.info("Submitted supervisor");
+      // Start Kinesis data generator
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, TOTAL_NUMBER_OF_SECOND, FIRST_EVENT_TIME);
+      verifyIngestedData(supervisorId);
+    }
+  }
+
+  @Test
+  public void testKineseIndexDataWithLosingCoordinator() throws Exception
+  {
+    testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartCoordinatorContainer(), () -> druidClusterAdminClient.waitUntilCoordinatorReady());
+  }
+
+  @Test
+  public void testKineseIndexDataWithLosingOverlord() throws Exception
+  {
+    testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartIndexerContainer(), () -> druidClusterAdminClient.waitUntilIndexerReady());
+  }
+
+  @Test
+  public void testKineseIndexDataWithLosingHistorical() throws Exception
+  {
+    testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartHistoricalContainer(), () -> druidClusterAdminClient.waitUntilHistoricalReady());
+  }
+
+  @Test
+  public void testKineseIndexDataWithStartStopSupervisor() throws Exception
+  {
+    try (
+        final Closeable ignored1 = unloader(fullDatasourceName)
+    ) {
+      final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      supervisorId = indexer.submitSupervisor(taskSpec);
+      LOG.info("Submitted supervisor");
+      // Start generating half of the data
+      int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 2;
+      secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound;
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME);
+      // Suspend the supervisor
+      indexer.suspendSupervisor(supervisorId);
+      // Start generating remainning half of the data
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound));
+      // Resume the supervisor
+      indexer.resumeSupervisor(supervisorId);
+      // Verify supervisor is healthy after suspension
+      ITRetryUtil.retryUntil(
+          () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(supervisorId)),
+          true,
+          10000,
+          30,
+          "Waiting for supervisor to be healthy"
+      );
+      // Verify that supervisor can catch up with the stream
+      verifyIngestedData(supervisorId);
+    }
+  }
+
+  @Test
+  public void testKineseIndexDataWithKinesisReshardSplit() throws Exception
+  {
+    // Reshard the supervisor by split from KINESIS_SHARD_COUNT to KINESIS_SHARD_COUNT * 2
+    testIndexWithKinesisReshardHelper(KINESIS_SHARD_COUNT * 2);
+  }
+
+  @Test
+  public void testKineseIndexDataWithKinesisReshardMerge() throws Exception
+  {
+    // Reshard the supervisor by split from KINESIS_SHARD_COUNT to KINESIS_SHARD_COUNT / 2
+    testIndexWithKinesisReshardHelper(KINESIS_SHARD_COUNT / 2);
+  }
+
+  private void testIndexWithLosingNodeHelper(Runnable restartRunnable, Runnable waitForReadyRunnable) throws Exception
+  {
+    try (
+        final Closeable ignored1 = unloader(fullDatasourceName)
+    ) {
+      final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      supervisorId = indexer.submitSupervisor(taskSpec);
+      LOG.info("Submitted supervisor");
+      // Start generating one third of the data (before restarting)
+      int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 3;
+      secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound;
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME);
+      // Restart Druid process
+      LOG.info("Restarting Druid process");
+      restartRunnable.run();
+      LOG.info("Restarted Druid process");
+      // Start generating one third of the data (while restarting)
+      int secondsToGenerateSecondRound = TOTAL_NUMBER_OF_SECOND / 3;
+      secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateSecondRound;
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateSecondRound, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound));
+      // Wait for Druid process to be available
+      LOG.info("Waiting for Druid process to be available");
+      waitForReadyRunnable.run();
+      LOG.info("Druid process is now available");
+      // Start generating remainding data (after restarting)
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound + secondsToGenerateSecondRound));
+      // Verify supervisor is healthy
+      ITRetryUtil.retryUntil(
+          () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(supervisorId)),
+          true,
+          10000,
+          30,
+          "Waiting for supervisor to be healthy"
+      );
+      // Verify that supervisor ingested all data
+      verifyIngestedData(supervisorId);
+    }
+  }
+
+  private void testIndexWithKinesisReshardHelper(int newShardCount) throws Exception
+  {
+    try (
+        final Closeable ignored1 = unloader(fullDatasourceName)
+    ) {
+      final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      supervisorId = indexer.submitSupervisor(taskSpec);
+      LOG.info("Submitted supervisor");
+      // Start generating one third of the data (before resharding)
+      int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 3;
+      secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound;
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME);
+      // Reshard the supervisor by split from KINESIS_SHARD_COUNT to newShardCount
+      kinesisAdminClient.updateShardCount(streamName, newShardCount);
+      // Start generating one third of the data (while resharding)
+      int secondsToGenerateSecondRound = TOTAL_NUMBER_OF_SECOND / 3;
+      secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateSecondRound;
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateSecondRound, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound));
+      // Wait for kinesis stream to finish resharding
 
 Review comment:
   From running locally, I can see that resharding does takes around 30000-40000ms (3-4 mins). This means that after issuing reshard call, when we check for "updating status or an active status with the final expected number of shards" immediately after, then very most likely it will be "updating status" that returns true (rather than "active status with the final expected number of shards"). I am only including "active status with the final expected number of shards" check in case the reshard finish by the time we do the check (most likely wont happen)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm3 commented on a change in pull request #9576: Add Integration Test for functionality of kinesis ingestion

Posted by GitBox <gi...@apache.org>.
maytasm3 commented on a change in pull request #9576: Add Integration Test for functionality of kinesis ingestion
URL: https://github.com/apache/druid/pull/9576#discussion_r401992516
 
 

 ##########
 File path: integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceTest.java
 ##########
 @@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.indexer;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.testing.utils.DruidClusterAdminClient;
+import org.apache.druid.testing.utils.ITRetryUtil;
+import org.apache.druid.testing.utils.KinesisAdminClient;
+import org.apache.druid.testing.utils.KinesisEventWriter;
+import org.apache.druid.testing.utils.WikipediaStreamEventStreamGenerator;
+import org.apache.druid.tests.TestNGGroup;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import java.io.Closeable;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
+
+@Test(groups = TestNGGroup.KINESIS_INDEX)
+@Guice(moduleFactory = DruidTestModuleFactory.class)
+public class ITKinesisIndexingServiceTest extends AbstractITBatchIndexTest
+{
+  private static final Logger LOG = new Logger(AbstractKafkaIndexerTest.class);
+  private static final int KINESIS_SHARD_COUNT = 2;
+  private static final String STREAM_EXPIRE_TAG = "druid-ci-expire-after";
+  private static final long WAIT_TIME_MILLIS = 3 * 60 * 1000L;
+  private static final DateTime FIRST_EVENT_TIME = DateTimes.of(1994, 4, 29, 1, 0);
+  private static final String INDEXER_FILE_LEGACY_PARSER = "/indexer/stream_supervisor_spec_legacy_parser.json";
+  private static final String INDEXER_FILE_INPUT_FORMAT = "/indexer/stream_supervisor_spec_input_format.json";
+  private static final String QUERIES_FILE = "/indexer/stream_index_queries.json";
+  // format for the querying interval
+  private static final DateTimeFormatter INTERVAL_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:'00Z'");
+  // format for the expected timestamp in a query response
+  private static final DateTimeFormatter TIMESTAMP_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'.000Z'");
+  private static final int EVENTS_PER_SECOND = 6;
+  private static final long CYCLE_PADDING_MS = 100;
+  private static final int TOTAL_NUMBER_OF_SECOND = 10;
+
+  @Inject
+  private DruidClusterAdminClient druidClusterAdminClient;
+
+  private String streamName;
+  private String fullDatasourceName;
+  private KinesisAdminClient kinesisAdminClient;
+  private KinesisEventWriter kinesisEventWriter;
+  private WikipediaStreamEventStreamGenerator wikipediaStreamEventGenerator;
+  private Function<String, String> kinesisIngestionPropsTransform;
+  private Function<String, String> kinesisQueryPropsTransform;
+  private String supervisorId;
+  private int secondsToGenerateRemaining;
+
+  @BeforeClass
+  public void beforeClass() throws Exception
+  {
+    kinesisAdminClient = new KinesisAdminClient(config.getStreamEndpoint());
+    kinesisEventWriter = new KinesisEventWriter(config.getStreamEndpoint(), false);
+    wikipediaStreamEventGenerator = new WikipediaStreamEventStreamGenerator(EVENTS_PER_SECOND, CYCLE_PADDING_MS);
+  }
+
+  @AfterClass
+  public void tearDown()
+  {
+    wikipediaStreamEventGenerator.shutdown();
+    kinesisEventWriter.shutdown();
+  }
+
+  @BeforeMethod
+  public void before()
+  {
+    streamName = "kinesis_index_test_" + UUID.randomUUID();
+    String datasource = "kinesis_indexing_service_test_" + UUID.randomUUID();
+    Map<String, String> tags = ImmutableMap.of(STREAM_EXPIRE_TAG, Long.toString(DateTimes.nowUtc().plusMinutes(30).getMillis()));
+    kinesisAdminClient.createStream(streamName, KINESIS_SHARD_COUNT, tags);
+    ITRetryUtil.retryUntil(
+        () -> kinesisAdminClient.isStreamActive(streamName),
+        true,
+        10000,
+        30,
+        "Wait for stream active"
+    );
+    secondsToGenerateRemaining = TOTAL_NUMBER_OF_SECOND;
+    fullDatasourceName = datasource + config.getExtraDatasourceNameSuffix();
+    kinesisIngestionPropsTransform = spec -> {
+      try {
+        spec = StringUtils.replace(
+            spec,
+            "%%DATASOURCE%%",
+            fullDatasourceName
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%STREAM_TYPE%%",
+            "kinesis"
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TOPIC_KEY%%",
+            "stream"
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TOPIC_VALUE%%",
+            streamName
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%USE_EARLIEST_KEY%%",
+            "useEarliestSequenceNumber"
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%STREAM_PROPERTIES_KEY%%",
+            "endpoint"
+        );
+        return StringUtils.replace(
+            spec,
+            "%%STREAM_PROPERTIES_VALUE%%",
+            jsonMapper.writeValueAsString(config.getStreamEndpoint())
+        );
+      }
+      catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    };
+    kinesisQueryPropsTransform = spec -> {
+      try {
+        spec = StringUtils.replace(
+            spec,
+            "%%DATASOURCE%%",
+            fullDatasourceName
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%",
+            TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMEBOUNDARY_RESPONSE_MAXTIME%%",
+            TIMESTAMP_FMT.print(FIRST_EVENT_TIME.plusSeconds(TOTAL_NUMBER_OF_SECOND - 1))
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMEBOUNDARY_RESPONSE_MINTIME%%",
+            TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMESERIES_QUERY_START%%",
+            INTERVAL_FMT.print(FIRST_EVENT_TIME)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMESERIES_QUERY_END%%",
+            INTERVAL_FMT.print(FIRST_EVENT_TIME.plusSeconds(TOTAL_NUMBER_OF_SECOND - 1).plusMinutes(2))
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMESERIES_RESPONSE_TIMESTAMP%%",
+            TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMESERIES_ADDED%%",
+            Long.toString(getSumOfEventSequence(EVENTS_PER_SECOND) * TOTAL_NUMBER_OF_SECOND)
+        );
+        return StringUtils.replace(
+            spec,
+            "%%TIMESERIES_NUMEVENTS%%",
+            Integer.toString(EVENTS_PER_SECOND * TOTAL_NUMBER_OF_SECOND)
+        );
+      }
+      catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    };
+  }
+
+  @AfterMethod
+  public void teardown()
+  {
+    try {
+      kinesisEventWriter.flush();
+      indexer.shutdownSupervisor(supervisorId);
+      unloader(fullDatasourceName);
+      kinesisAdminClient.deleteStream(streamName);
+    }
+    catch (Exception e) {
+      // Best effort cleanup
+    }
+  }
+
+  @Test
+  public void testKineseIndexDataWithLegacyParserStableState() throws Exception
+  {
+    try (
+        final Closeable ignored1 = unloader(fullDatasourceName)
+    ) {
+      final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_LEGACY_PARSER));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      supervisorId = indexer.submitSupervisor(taskSpec);
+      LOG.info("Submitted supervisor");
+      // Start Kinesis data generator
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, TOTAL_NUMBER_OF_SECOND, FIRST_EVENT_TIME);
+      verifyIngestedData(supervisorId);
+    }
+  }
+
+  @Test
+  public void testKineseIndexDataWithInputFormatStableState() throws Exception
+  {
+    try (
+        final Closeable ignored1 = unloader(fullDatasourceName)
+    ) {
+      final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      supervisorId = indexer.submitSupervisor(taskSpec);
+      LOG.info("Submitted supervisor");
+      // Start Kinesis data generator
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, TOTAL_NUMBER_OF_SECOND, FIRST_EVENT_TIME);
+      verifyIngestedData(supervisorId);
+    }
+  }
+
+  @Test
+  public void testKineseIndexDataWithLosingCoordinator() throws Exception
+  {
+    testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartCoordinatorContainer(), () -> druidClusterAdminClient.waitUntilCoordinatorReady());
+  }
+
+  @Test
+  public void testKineseIndexDataWithLosingOverlord() throws Exception
+  {
+    testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartIndexerContainer(), () -> druidClusterAdminClient.waitUntilIndexerReady());
+  }
+
+  @Test
+  public void testKineseIndexDataWithLosingHistorical() throws Exception
+  {
+    testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartHistoricalContainer(), () -> druidClusterAdminClient.waitUntilHistoricalReady());
+  }
+
+  @Test
+  public void testKineseIndexDataWithStartStopSupervisor() throws Exception
+  {
+    try (
+        final Closeable ignored1 = unloader(fullDatasourceName)
+    ) {
+      final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      supervisorId = indexer.submitSupervisor(taskSpec);
+      LOG.info("Submitted supervisor");
+      // Start generating half of the data
+      int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 2;
+      secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound;
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME);
+      // Suspend the supervisor
+      indexer.suspendSupervisor(supervisorId);
+      // Start generating remainning half of the data
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound));
+      // Resume the supervisor
+      indexer.resumeSupervisor(supervisorId);
+      // Verify supervisor is healthy after suspension
+      ITRetryUtil.retryUntil(
+          () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(supervisorId)),
+          true,
+          10000,
+          30,
+          "Waiting for supervisor to be healthy"
+      );
+      // Verify that supervisor can catch up with the stream
+      verifyIngestedData(supervisorId);
+    }
+  }
+
+  @Test
+  public void testKineseIndexDataWithKinesisReshardSplit() throws Exception
+  {
+    // Reshard the supervisor by split from KINESIS_SHARD_COUNT to KINESIS_SHARD_COUNT * 2
+    testIndexWithKinesisReshardHelper(KINESIS_SHARD_COUNT * 2);
+  }
+
+  @Test
+  public void testKineseIndexDataWithKinesisReshardMerge() throws Exception
+  {
+    // Reshard the supervisor by split from KINESIS_SHARD_COUNT to KINESIS_SHARD_COUNT / 2
+    testIndexWithKinesisReshardHelper(KINESIS_SHARD_COUNT / 2);
+  }
+
+  private void testIndexWithLosingNodeHelper(Runnable restartRunnable, Runnable waitForReadyRunnable) throws Exception
+  {
+    try (
+        final Closeable ignored1 = unloader(fullDatasourceName)
+    ) {
+      final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      supervisorId = indexer.submitSupervisor(taskSpec);
+      LOG.info("Submitted supervisor");
+      // Start generating one third of the data (before restarting)
+      int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 3;
+      secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound;
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME);
+      // Restart Druid process
+      LOG.info("Restarting Druid process");
+      restartRunnable.run();
+      LOG.info("Restarted Druid process");
+      // Start generating one third of the data (while restarting)
+      int secondsToGenerateSecondRound = TOTAL_NUMBER_OF_SECOND / 3;
+      secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateSecondRound;
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateSecondRound, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound));
+      // Wait for Druid process to be available
+      LOG.info("Waiting for Druid process to be available");
+      waitForReadyRunnable.run();
+      LOG.info("Druid process is now available");
+      // Start generating remainding data (after restarting)
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound + secondsToGenerateSecondRound));
+      // Verify supervisor is healthy
+      ITRetryUtil.retryUntil(
+          () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(supervisorId)),
+          true,
+          10000,
+          30,
+          "Waiting for supervisor to be healthy"
+      );
+      // Verify that supervisor ingested all data
+      verifyIngestedData(supervisorId);
+    }
+  }
+
+  private void testIndexWithKinesisReshardHelper(int newShardCount) throws Exception
+  {
+    try (
+        final Closeable ignored1 = unloader(fullDatasourceName)
+    ) {
+      final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      supervisorId = indexer.submitSupervisor(taskSpec);
+      LOG.info("Submitted supervisor");
+      // Start generating one third of the data (before resharding)
+      int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 3;
+      secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound;
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME);
+      // Reshard the supervisor by split from KINESIS_SHARD_COUNT to newShardCount
+      kinesisAdminClient.updateShardCount(streamName, newShardCount);
+      // Start generating one third of the data (while resharding)
+      int secondsToGenerateSecondRound = TOTAL_NUMBER_OF_SECOND / 3;
+      secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateSecondRound;
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateSecondRound, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound));
+      // Wait for kinesis stream to finish resharding
+      ITRetryUtil.retryUntil(
+          () -> kinesisAdminClient.isStreamActive(streamName),
+          true,
+          10000,
+          30,
+          "Waiting for Kinesis stream to finish resharding"
+      );
+      // Start generating remainding data (after resharding)
+      wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound + secondsToGenerateSecondRound));
+      // Verify supervisor is healthy after suspension
+      ITRetryUtil.retryUntil(
+          () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(supervisorId)),
+          true,
+          10000,
+          30,
+          "Waiting for supervisor to be healthy"
+      );
+      // Verify that supervisor can catch up with the stream
+      verifyIngestedData(supervisorId);
+    }
+  }
+
+  private void verifyIngestedData(String supervisorId) throws Exception
+  {
+    // Wait for supervisor to consume events
+    LOG.info("Waiting for [%s] millis for Kafka indexing tasks to consume events", WAIT_TIME_MILLIS);
+    Thread.sleep(WAIT_TIME_MILLIS);
+    // Query data
+    final String querySpec = kinesisQueryPropsTransform.apply(getResourceAsString(QUERIES_FILE));
+    // this query will probably be answered from the indexing tasks but possibly from 2 historical segments / 2 indexing
+    this.queryHelper.testQueriesFromString(querySpec, 2);
+    LOG.info("Shutting down supervisor");
+    indexer.shutdownSupervisor(supervisorId);
+    // wait for all kafka indexing tasks to finish
 
 Review comment:
   Done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jon-wei commented on a change in pull request #9576: Add Integration Test for functionality of kinesis ingestion

Posted by GitBox <gi...@apache.org>.
jon-wei commented on a change in pull request #9576: Add Integration Test for functionality of kinesis ingestion
URL: https://github.com/apache/druid/pull/9576#discussion_r401339376
 
 

 ##########
 File path: integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceTest.java
 ##########
 @@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.indexer;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.testing.utils.DruidClusterAdminClient;
+import org.apache.druid.testing.utils.ITRetryUtil;
+import org.apache.druid.testing.utils.KinesisAdminClient;
+import org.apache.druid.testing.utils.KinesisEventWriter;
+import org.apache.druid.testing.utils.WikipediaStreamEventStreamGenerator;
+import org.apache.druid.tests.TestNGGroup;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import java.io.Closeable;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
+
+@Test(groups = TestNGGroup.KINESIS_INDEX)
+@Guice(moduleFactory = DruidTestModuleFactory.class)
+public class ITKinesisIndexingServiceTest extends AbstractITBatchIndexTest
+{
+  private static final Logger LOG = new Logger(AbstractKafkaIndexerTest.class);
+  private static final int KINESIS_SHARD_COUNT = 2;
+  private static final String STREAM_EXPIRE_TAG = "druid-ci-expire-after";
+  private static final long WAIT_TIME_MILLIS = 3 * 60 * 1000L;
+  private static final DateTime FIRST_EVENT_TIME = DateTimes.of(1994, 4, 29, 1, 0);
+  private static final String INDEXER_FILE_LEGACY_PARSER = "/indexer/stream_supervisor_spec_legacy_parser.json";
+  private static final String INDEXER_FILE_INPUT_FORMAT = "/indexer/stream_supervisor_spec_input_format.json";
+  private static final String QUERIES_FILE = "/indexer/stream_index_queries.json";
+  // format for the querying interval
+  private static final DateTimeFormatter INTERVAL_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:'00Z'");
+  // format for the expected timestamp in a query response
+  private static final DateTimeFormatter TIMESTAMP_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'.000Z'");
+  private static final int EVENTS_PER_SECOND = 6;
+  private static final long CYCLE_PADDING_MS = 100;
+  private static final int TOTAL_NUMBER_OF_SECOND = 10;
+
+  @Inject
+  private DruidClusterAdminClient druidClusterAdminClient;
+
+  private String streamName;
+  private String fullDatasourceName;
+  private KinesisAdminClient kinesisAdminClient;
+  private KinesisEventWriter kinesisEventWriter;
+  private WikipediaStreamEventStreamGenerator wikipediaStreamEventGenerator;
+  private Function<String, String> kinesisIngestionPropsTransform;
+  private Function<String, String> kinesisQueryPropsTransform;
+  private String supervisorId;
+  private int secondsToGenerateRemaining;
+
+  @BeforeClass
+  public void beforeClass() throws Exception
+  {
+    kinesisAdminClient = new KinesisAdminClient(config.getStreamEndpoint());
+    kinesisEventWriter = new KinesisEventWriter(config.getStreamEndpoint(), false);
+    wikipediaStreamEventGenerator = new WikipediaStreamEventStreamGenerator(EVENTS_PER_SECOND, CYCLE_PADDING_MS);
+  }
+
+  @AfterClass
+  public void tearDown()
+  {
+    wikipediaStreamEventGenerator.shutdown();
+    kinesisEventWriter.shutdown();
+  }
+
+  @BeforeMethod
+  public void before()
+  {
+    streamName = "kinesis_index_test_" + UUID.randomUUID();
+    String datasource = "kinesis_indexing_service_test_" + UUID.randomUUID();
+    Map<String, String> tags = ImmutableMap.of(STREAM_EXPIRE_TAG, Long.toString(DateTimes.nowUtc().plusMinutes(30).getMillis()));
+    kinesisAdminClient.createStream(streamName, KINESIS_SHARD_COUNT, tags);
+    ITRetryUtil.retryUntil(
+        () -> kinesisAdminClient.isStreamActive(streamName),
+        true,
+        10000,
+        30,
+        "Wait for stream active"
+    );
+    secondsToGenerateRemaining = TOTAL_NUMBER_OF_SECOND;
+    fullDatasourceName = datasource + config.getExtraDatasourceNameSuffix();
+    kinesisIngestionPropsTransform = spec -> {
+      try {
+        spec = StringUtils.replace(
+            spec,
+            "%%DATASOURCE%%",
+            fullDatasourceName
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%STREAM_TYPE%%",
+            "kinesis"
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TOPIC_KEY%%",
+            "stream"
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TOPIC_VALUE%%",
+            streamName
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%USE_EARLIEST_KEY%%",
+            "useEarliestSequenceNumber"
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%STREAM_PROPERTIES_KEY%%",
+            "endpoint"
+        );
+        return StringUtils.replace(
+            spec,
+            "%%STREAM_PROPERTIES_VALUE%%",
+            jsonMapper.writeValueAsString(config.getStreamEndpoint())
+        );
+      }
+      catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    };
+    kinesisQueryPropsTransform = spec -> {
+      try {
+        spec = StringUtils.replace(
+            spec,
+            "%%DATASOURCE%%",
+            fullDatasourceName
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%",
+            TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMEBOUNDARY_RESPONSE_MAXTIME%%",
+            TIMESTAMP_FMT.print(FIRST_EVENT_TIME.plusSeconds(TOTAL_NUMBER_OF_SECOND - 1))
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMEBOUNDARY_RESPONSE_MINTIME%%",
+            TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMESERIES_QUERY_START%%",
+            INTERVAL_FMT.print(FIRST_EVENT_TIME)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMESERIES_QUERY_END%%",
+            INTERVAL_FMT.print(FIRST_EVENT_TIME.plusSeconds(TOTAL_NUMBER_OF_SECOND - 1).plusMinutes(2))
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMESERIES_RESPONSE_TIMESTAMP%%",
+            TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMESERIES_ADDED%%",
+            Long.toString(getSumOfEventSequence(EVENTS_PER_SECOND) * TOTAL_NUMBER_OF_SECOND)
+        );
+        return StringUtils.replace(
+            spec,
+            "%%TIMESERIES_NUMEVENTS%%",
+            Integer.toString(EVENTS_PER_SECOND * TOTAL_NUMBER_OF_SECOND)
+        );
+      }
+      catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    };
+  }
+
+  @AfterMethod
+  public void teardown()
+  {
+    try {
+      kinesisEventWriter.flush();
+      indexer.shutdownSupervisor(supervisorId);
+      unloader(fullDatasourceName);
+      kinesisAdminClient.deleteStream(streamName);
+    }
+    catch (Exception e) {
+      // Best effort cleanup
+    }
+  }
+
+  @Test
+  public void testKineseIndexDataWithLegacyParserStableState() throws Exception
 
 Review comment:
   In the test names, `Kinese` -> `Kinesis` here and elsewhere

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm3 commented on a change in pull request #9576: Add Integration Test for functionality of kinesis ingestion

Posted by GitBox <gi...@apache.org>.
maytasm3 commented on a change in pull request #9576: Add Integration Test for functionality of kinesis ingestion
URL: https://github.com/apache/druid/pull/9576#discussion_r401982888
 
 

 ##########
 File path: integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceTest.java
 ##########
 @@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.indexer;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.testing.utils.DruidClusterAdminClient;
+import org.apache.druid.testing.utils.ITRetryUtil;
+import org.apache.druid.testing.utils.KinesisAdminClient;
+import org.apache.druid.testing.utils.KinesisEventWriter;
+import org.apache.druid.testing.utils.WikipediaStreamEventStreamGenerator;
+import org.apache.druid.tests.TestNGGroup;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import java.io.Closeable;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
+
+@Test(groups = TestNGGroup.KINESIS_INDEX)
+@Guice(moduleFactory = DruidTestModuleFactory.class)
+public class ITKinesisIndexingServiceTest extends AbstractITBatchIndexTest
+{
+  private static final Logger LOG = new Logger(AbstractKafkaIndexerTest.class);
+  private static final int KINESIS_SHARD_COUNT = 2;
+  private static final String STREAM_EXPIRE_TAG = "druid-ci-expire-after";
 
 Review comment:
   It's to help people cleanup the test streams if the IT cleanup method fails or didn't run (this shouldn't happen normally but can such as if the test unexpectedly terminates midway). Added the comment

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] suneet-s commented on a change in pull request #9576: Add Integration Test for functionality of kinesis ingestion

Posted by GitBox <gi...@apache.org>.
suneet-s commented on a change in pull request #9576: Add Integration Test for functionality of kinesis ingestion
URL: https://github.com/apache/druid/pull/9576#discussion_r401064073
 
 

 ##########
 File path: integration-tests/docker/tls/generate-server-certs-and-keystores.sh
 ##########
 @@ -17,6 +17,12 @@
 
 cd /tls
 
+FILE_CHECK_IF_RAN=/tls/server.key
+if [ -f "$FILE_CHECK_IF_RAN" ]; then
+  echo "Script was ran already. Skip running again."
 
 Review comment:
   nit: No need to change if everything else looks good. If I saw the log line as is, it's a little ambiguous - which script? what's the impact of skipping running again?
   ```suggestion
     echo "Using existing tls keys since /tls/server.key exists - skipping generation of all certs. To generate certs, delete this file"
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jon-wei commented on a change in pull request #9576: Add Integration Test for functionality of kinesis ingestion

Posted by GitBox <gi...@apache.org>.
jon-wei commented on a change in pull request #9576: Add Integration Test for functionality of kinesis ingestion
URL: https://github.com/apache/druid/pull/9576#discussion_r401327607
 
 

 ##########
 File path: integration-tests/README.md
 ##########
 @@ -107,6 +122,7 @@ Then run the tests using a command similar to:
   # Run all integration tests that have been verified to work against a quickstart cluster.
   mvn verify -P int-tests-config-file -Dgroups=quickstart-compatible
 ```
+>>>>>>> upstream/master
 
 Review comment:
   Looks like something from a conflict

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org