You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2021/01/28 03:01:03 UTC

[GitHub] [druid] jihoonson commented on a change in pull request #10761: Fix kinesis ingestion bugs

jihoonson commented on a change in pull request #10761:
URL: https://github.com/apache/druid/pull/10761#discussion_r565672493



##########
File path: docs/development/extensions-core/kinesis-ingestion.md
##########
@@ -193,6 +193,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
 | `maxSavedParseExceptions`             | Integer        | When a parse exception occurs, Druid can keep track of the most recent parse exceptions. "maxSavedParseExceptions" limits how many exception instances will be saved. These saved exceptions will be made available after the task finishes in the [task completion report](../../ingestion/tasks.md#reports). Overridden if `reportParseExceptions` is set.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              
                 | no, default == 0                                                                                             |
 | `maxRecordsPerPoll`                   | Integer        | The maximum number of records/events to be fetched from buffer per poll. The actual maximum will be `Max(maxRecordsPerPoll, Max(bufferSize, 1))`                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          
                  | no, default == 100                                                                                           |
 | `repartitionTransitionDuration`       | ISO8601 Period | When shards are split or merged, the supervisor will recompute shard -> task group mappings, and signal any running tasks created under the old mappings to stop early at (current time + `repartitionTransitionDuration`). Stopping the tasks early allows Druid to begin reading from the new shards more quickly. The repartition transition wait time controlled by this property gives the stream additional time to write records to the new shards after the split/merge, which helps avoid the issues with empty shard handling described at https://github.com/apache/druid/issues/7600.                                                                                                                                                                                                                                                                                                                                                                         
       | no, (default == PT2M)                                                                                        |
+| `offsetFetchPeriod`                   | ISO8601 Period | How often the supervisor queries Kinesis and the indexing tasks to fetch current offsets and calculate lag.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               
                 | no (default == PT30S, min == PT5S)                                                                           |

Review comment:
       It could be better to clarify what will happen when `offsetFetchPeriod` is smaller than `PT5S` because people can still set a smaller than the minimum. Maybe we can add something like "The minimum period is `PT5S`. `offsetFetchPeriod` smaller than the minimum is ignored, but the minimum period is used instead."

##########
File path: extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
##########
@@ -106,12 +107,35 @@
   private static final int GET_SEQUENCE_NUMBER_RECORD_COUNT = 1000;
   private static final int GET_SEQUENCE_NUMBER_RETRY_COUNT = 10;
 
-  private static boolean isServiceExceptionRecoverable(AmazonServiceException ex)
+  /**
+   * Checks whether an exception can be retried or not. Implementation is copied
+   * from {@link com.amazonaws.retry.PredefinedRetryPolicies.SDKDefaultRetryCondition} except deprecated methods
+   * have been replaced with their recent versions.
+   */
+  @VisibleForTesting
+  static boolean isClientExceptionRecoverable(AmazonClientException exception)

Review comment:
       The original method was copied from `S3Utils.isServiceExceptionRecoverable()`. It would be better to consolidate these methods into one instead of fixing both separately. A problem is `S3Utils` is in `s3-extensions` and Druid extensions cannot have dependency on each other (I think this is the reason why the method was originally copied). I suggest creating a new util class in `aws-common` which is in Druid core and move this logic to the new class.

##########
File path: extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
##########
@@ -106,12 +107,35 @@
   private static final int GET_SEQUENCE_NUMBER_RECORD_COUNT = 1000;
   private static final int GET_SEQUENCE_NUMBER_RETRY_COUNT = 10;
 
-  private static boolean isServiceExceptionRecoverable(AmazonServiceException ex)
+  /**
+   * Checks whether an exception can be retried or not. Implementation is copied
+   * from {@link com.amazonaws.retry.PredefinedRetryPolicies.SDKDefaultRetryCondition} except deprecated methods

Review comment:
       nit: out of curiosity, what is the reason of not using `SDKDefaultRetryCondition` directly? Is it to not handle the extra params of that method (`originalRequest` and `retriesAttempted`)? If that's the case, it seems reasonable to me to copy it since those params are not in use anyway.

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
##########
@@ -578,6 +578,10 @@ public void run()
               log.debug("All partitions have been fully read.");
               publishOnStop.set(true);
               stopRequested.set(true);
+
+              // We let the fireDepartmentMetrics know that all messages have been read. This way, some metrics such as
+              // high message gap need not be reported
+              fireDepartmentMetrics.markProcessingDone();

Review comment:
       The `messageGap` metric is used by both push and pull based streaming ingestion. However, since the push-based streaming ingestion (Tranquility) is deprecated, it looks OK to me to not fix it.
   
   I'm more concerned about the test coverage of this change. Can you add a unit test in `KafkaIndexTaskTest` and `KinesisIndexTaskTest`? It seems possible to test this with the below changes:
   
   - You need to expose the `FireDepartmentMetrics` created in the taskRunner.
   - You need to know when the task enters the `PUBLISHING` status. One way to do this is modifying the `SegmentHandoffNotifier.registerSegmentHandoffCallback()` to notify the test that the task is doing handoff. For example, `registerSegmentHandoffCallback` can decrease the count of a latch, and the test can do something like this:
   ```java
       latch.await();
       Assert.assertNotEquals(
           FireDepartmentMetrics.DEFAULT_COMPLETION_TIME,
           task.getRunner().getFireDepartmentMetrics().completionTime()
       );
   ```

##########
File path: extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
##########
@@ -366,8 +390,8 @@ private Runnable fetchRecords()
           log.error(e, "encounted AWS error while attempting to fetch records, will not retry");
           throw e;
         }
-        catch (AmazonServiceException e) {

Review comment:
       Good catch. I think the intention of `S3Utils.S3RETRY` is the same as that of `SDKDefaultRetryCondition`, i.e., retrying all `IOException`s (including the ones wrapped inside of another exception) and certain `AmazonServiceException`s if they are transient. Missing retries of `IOException`s is probably a mistake.

##########
File path: server/src/main/java/org/apache/druid/segment/realtime/FireDepartmentMetrics.java
##########
@@ -45,6 +47,7 @@
   private final AtomicLong sinkCount = new AtomicLong(0);
   private final AtomicLong messageMaxTimestamp = new AtomicLong(0);
   private final AtomicLong messageGap = new AtomicLong(0);
+  private final AtomicLong completionTime = new AtomicLong(DEFAULT_COMPLETION_TIME);

Review comment:
       `completionTime` seems too broad. Suggest `messageProcessingCompletionTime`.

##########
File path: services/src/main/java/org/apache/druid/cli/PullDependencies.java
##########
@@ -83,6 +83,9 @@
                   .put("commons-beanutils", "commons-beanutils")
                   .put("org.apache.commons", "commons-compress")
                   .put("org.apache.zookeeper", "zookeeper")
+                  .put("com.fasterxml.jackson.core", "jackson-databind")
+                  .put("com.fasterxml.jackson.core", "jackson-core")
+                  .put("com.fasterxml.jackson.core", "jackson-annotations")

Review comment:
       @abhishekagarwal87 thanks for the context. This seems to work, but I'm worrying about the test coverage. Did you do any sort of testing to verify this change especially with Hadoop?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org