You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by "amaechler (via GitHub)" <gi...@apache.org> on 2023/05/30 23:50:26 UTC

[GitHub] [druid] amaechler opened a new pull request, #14355: Handle StreamExceptions when initializing input source

amaechler opened a new pull request, #14355:
URL: https://github.com/apache/druid/pull/14355

   ### Description
   
   When calling the Sampler API `POST /druid/indexer/v1/sampler`, the sampler creates an input source to sample records from. When the input source has to be created (i.e. a parser hasn't been provided, usually Kafka or Kinesis), we create one by instantiating a new `RecordSupplierInputSource`.
   
   The constructor of `RecordSupplierInputSource` seeks the beginning (or end) of the Kafka partitions / Kinesis shards. While doing so, things can go wrong - Network issues, authentication problems, etc. Any exceptions that occur are wrapped with a `StreamException` (done via the `wrapExceptions` helper function).
   
   The Sampler, however, deals with `SamplerException`s only. To that end, a `SamplerExceptionMapper` has been registered which maps such exceptions to a `400 Bad Request` response.
   
   The above-mentioned `RecordSupplierInputSource` already catches a `InterruptedException` and maps it to a `SamplerException`, but ignores any `StreamException`. This causes the sampler API to return a `500 Internal server error` (with the message "The RuntimeException could not be mapped to a response, re-throwing to the HTTP container").
   
   This PR addresses this issue by handling `StreamException`s accordingly.
   
   ### Alternatives
   
   We could also catch any exception (instead of exceptions of a specific type).
   
   #### Release note
   
   Handle exceptions while sampling from streaming sources. We could also omit this since the Sampler API isn't currently documented (#14125).
   
   <hr>
   
   <!-- Check the items by putting "x" in the brackets for the done things. Not all of these items apply to every PR. Remove the items which are not done or not relevant to the PR. None of the items from the checklist below are strictly necessary, but it would be very helpful if you at least self-review the PR. -->
   
   This PR has:
   
   - [ ] been self-reviewed.
   - [ ] a release note entry in the PR description.
   - [ ] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met.
   - [ ] added integration tests.
   - [ ] been tested in a test Druid cluster.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekrb19 commented on a diff in pull request #14355: Handle StreamExceptions when initializing input source

Posted by "abhishekrb19 (via GitHub)" <gi...@apache.org>.
abhishekrb19 commented on code in PR #14355:
URL: https://github.com/apache/druid/pull/14355#discussion_r1213490859


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java:
##########
@@ -68,29 +69,35 @@ public RecordSupplierInputSource(
     this.recordSupplier = recordSupplier;
     this.useEarliestOffset = useEarliestOffset;
     this.iteratorTimeoutMs = iteratorTimeoutMs;
-    try {
-      assignAndSeek(recordSupplier);
-    }
-    catch (InterruptedException e) {
-      throw new SamplerException(e, "Exception while seeking to partitions");
-    }
+
+    assignAndSeek(recordSupplier);
   }
 
   private void assignAndSeek(RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType> recordSupplier)
-      throws InterruptedException
   {
-    final Set<StreamPartition<PartitionIdType>> partitions = recordSupplier
-        .getPartitionIds(topic)
-        .stream()
-        .map(partitionId -> StreamPartition.of(topic, partitionId))
-        .collect(Collectors.toSet());
-
-    recordSupplier.assign(partitions);
-
-    if (useEarliestOffset) {
-      recordSupplier.seekToEarliest(partitions);
-    } else {
-      recordSupplier.seekToLatest(partitions);
+    try {
+      final Set<StreamPartition<PartitionIdType>> partitions = recordSupplier
+          .getPartitionIds(topic)
+          .stream()
+          .map(partitionId -> StreamPartition.of(topic, partitionId))
+          .collect(Collectors.toSet());
+
+      recordSupplier.assign(partitions);
+
+      if (useEarliestOffset) {
+        recordSupplier.seekToEarliest(partitions);
+      } else {
+        recordSupplier.seekToLatest(partitions);
+      }
+    }
+    catch (Exception e) {
+      throw new SamplerException(
+          e,
+          "Exception while seeking to the [%s] offset of partitions in topic [%s]: %s",
+          useEarliestOffset ? "earliest" : "latest",

Review Comment:
   Just noticed something: this class is shared by both kaka and kinesis streaming. In Kinesis vernacular, it's sequence number instead of offset. The Druid supervisor settings[ are also different](https://druid.apache.org/docs/latest/development/extensions-core/kinesis-ingestion.html ) for Kafka vs Kinesis: `useEarliestOffset` vs `useEarliestSequenceNumber`.
   
   If it's easy to identify that here, we could perhaps change the messaging a tiny bit especially if we're surfacing this exception all the way to the user. What do you think, @amaechler?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] amaechler commented on a diff in pull request #14355: Handle StreamExceptions when initializing input source

Posted by "amaechler (via GitHub)" <gi...@apache.org>.
amaechler commented on code in PR #14355:
URL: https://github.com/apache/druid/pull/14355#discussion_r1213878125


##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java:
##########
@@ -135,6 +137,24 @@ public void testReadTimeout() throws IOException
     Assert.assertTrue(supplier.isClosed());
   }
 
+  @Test
+  public void testRecordSupplierInputSourceThrowsSamplerExceptionWhenExceptionDuringSeek()
+  {
+    final RecordSupplier<?, ?, ?> supplier = Mockito.mock(RecordSupplier.class);
+    Mockito.when(supplier.getPartitionIds("test-stream"))
+           .thenThrow(new StreamException(new Exception("Something bad happened")));
+
+    //noinspection ResultOfObjectAllocationIgnored

Review Comment:
   This is technically complaining that the newly allocated object inside the `assertThrows` closure isn't assigned to anything :) which in this case we don't really care.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #14355: Handle StreamExceptions when initializing input source

Posted by "kfaraz (via GitHub)" <gi...@apache.org>.
kfaraz commented on code in PR #14355:
URL: https://github.com/apache/druid/pull/14355#discussion_r1213429172


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java:
##########
@@ -68,16 +70,20 @@ public RecordSupplierInputSource(
     this.recordSupplier = recordSupplier;
     this.useEarliestOffset = useEarliestOffset;
     this.iteratorTimeoutMs = iteratorTimeoutMs;
+
     try {
       assignAndSeek(recordSupplier);
     }
     catch (InterruptedException e) {
-      throw new SamplerException(e, "Exception while seeking to partitions");

Review Comment:
   Yeah, might have to revisit this at some point.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java:
##########
@@ -68,16 +70,20 @@ public RecordSupplierInputSource(
     this.recordSupplier = recordSupplier;
     this.useEarliestOffset = useEarliestOffset;
     this.iteratorTimeoutMs = iteratorTimeoutMs;
+
     try {
       assignAndSeek(recordSupplier);
     }
     catch (InterruptedException e) {
-      throw new SamplerException(e, "Exception while seeking to partitions");

Review Comment:
   Yeah, might have to revisit this at some point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] amaechler commented on a diff in pull request #14355: Handle StreamExceptions when initializing input source

Posted by "amaechler (via GitHub)" <gi...@apache.org>.
amaechler commented on code in PR #14355:
URL: https://github.com/apache/druid/pull/14355#discussion_r1213299903


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java:
##########
@@ -68,16 +70,20 @@ public RecordSupplierInputSource(
     this.recordSupplier = recordSupplier;
     this.useEarliestOffset = useEarliestOffset;
     this.iteratorTimeoutMs = iteratorTimeoutMs;
+
     try {
       assignAndSeek(recordSupplier);
     }
     catch (InterruptedException e) {
-      throw new SamplerException(e, "Exception while seeking to partitions");
+      throw new SamplerException(e, "Thread interrupted while seeking to partitions");
+    }
+    catch (StreamException e) {
+      throw new SamplerException(e, "Exception creating RecordSupplierInputSource while seeking to partitions: %s", Throwables.getRootCause(e).getMessage());

Review Comment:
   Updated and moved the try / catch. I played for a while with different exception messages based on the stage (which of the three functions is called), but in the end, I didn't think the added boilerplate was worth the value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on pull request #14355: Handle StreamExceptions when initializing input source

Posted by "kfaraz (via GitHub)" <gi...@apache.org>.
kfaraz commented on PR #14355:
URL: https://github.com/apache/druid/pull/14355#issuecomment-1573053530

   > Handle exceptions while sampling from streaming sources. We could also omit this since the Sampler API isn't currently documented (https://github.com/apache/druid/issues/14125).
   
   Yeah, we don't need a release note for this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] amaechler commented on a diff in pull request #14355: Handle StreamExceptions when initializing input source

Posted by "amaechler (via GitHub)" <gi...@apache.org>.
amaechler commented on code in PR #14355:
URL: https://github.com/apache/druid/pull/14355#discussion_r1213504282


##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java:
##########
@@ -135,6 +137,24 @@ public void testReadTimeout() throws IOException
     Assert.assertTrue(supplier.isClosed());
   }
 
+  @Test
+  public void testRecordSupplierInputSourceThrowsSamplerExceptionWhenExceptionDuringSeek()
+  {
+    final RecordSupplier<?, ?, ?> supplier = Mockito.mock(RecordSupplier.class);
+    Mockito.when(supplier.getPartitionIds("test-stream"))
+           .thenThrow(new StreamException(new Exception("Something bad happened")));
+
+    //noinspection ResultOfObjectAllocationIgnored

Review Comment:
   I added this to suppress a warning in IDEA. You're probably right that it isn't enforced by the static tool checks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz merged pull request #14355: Handle StreamExceptions when initializing input source

Posted by "kfaraz (via GitHub)" <gi...@apache.org>.
kfaraz merged PR #14355:
URL: https://github.com/apache/druid/pull/14355


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #14355: Handle StreamExceptions when initializing input source

Posted by "kfaraz (via GitHub)" <gi...@apache.org>.
kfaraz commented on code in PR #14355:
URL: https://github.com/apache/druid/pull/14355#discussion_r1213856547


##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java:
##########
@@ -135,6 +137,24 @@ public void testReadTimeout() throws IOException
     Assert.assertTrue(supplier.isClosed());
   }
 
+  @Test
+  public void testRecordSupplierInputSourceThrowsSamplerExceptionWhenExceptionDuringSeek()
+  {
+    final RecordSupplier<?, ?, ?> supplier = Mockito.mock(RecordSupplier.class);
+    Mockito.when(supplier.getPartitionIds("test-stream"))
+           .thenThrow(new StreamException(new Exception("Something bad happened")));
+
+    //noinspection ResultOfObjectAllocationIgnored

Review Comment:
   Plus we are using result now, which was the `SamplerException`, unless I am mistaken and this `noinspection` was for something else. Either way, we can fix this later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] amaechler commented on a diff in pull request #14355: Handle StreamExceptions when initializing input source

Posted by "amaechler (via GitHub)" <gi...@apache.org>.
amaechler commented on code in PR #14355:
URL: https://github.com/apache/druid/pull/14355#discussion_r1213244305


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java:
##########
@@ -68,16 +70,20 @@ public RecordSupplierInputSource(
     this.recordSupplier = recordSupplier;
     this.useEarliestOffset = useEarliestOffset;
     this.iteratorTimeoutMs = iteratorTimeoutMs;
+
     try {
       assignAndSeek(recordSupplier);
     }
     catch (InterruptedException e) {
-      throw new SamplerException(e, "Exception while seeking to partitions");

Review Comment:
   I agree, I was surprised by that too. I think this code could probably use some more refactoring.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekrb19 commented on a diff in pull request #14355: Handle StreamExceptions when initializing input source

Posted by "abhishekrb19 (via GitHub)" <gi...@apache.org>.
abhishekrb19 commented on code in PR #14355:
URL: https://github.com/apache/druid/pull/14355#discussion_r1213490859


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java:
##########
@@ -68,29 +69,35 @@ public RecordSupplierInputSource(
     this.recordSupplier = recordSupplier;
     this.useEarliestOffset = useEarliestOffset;
     this.iteratorTimeoutMs = iteratorTimeoutMs;
-    try {
-      assignAndSeek(recordSupplier);
-    }
-    catch (InterruptedException e) {
-      throw new SamplerException(e, "Exception while seeking to partitions");
-    }
+
+    assignAndSeek(recordSupplier);
   }
 
   private void assignAndSeek(RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType> recordSupplier)
-      throws InterruptedException
   {
-    final Set<StreamPartition<PartitionIdType>> partitions = recordSupplier
-        .getPartitionIds(topic)
-        .stream()
-        .map(partitionId -> StreamPartition.of(topic, partitionId))
-        .collect(Collectors.toSet());
-
-    recordSupplier.assign(partitions);
-
-    if (useEarliestOffset) {
-      recordSupplier.seekToEarliest(partitions);
-    } else {
-      recordSupplier.seekToLatest(partitions);
+    try {
+      final Set<StreamPartition<PartitionIdType>> partitions = recordSupplier
+          .getPartitionIds(topic)
+          .stream()
+          .map(partitionId -> StreamPartition.of(topic, partitionId))
+          .collect(Collectors.toSet());
+
+      recordSupplier.assign(partitions);
+
+      if (useEarliestOffset) {
+        recordSupplier.seekToEarliest(partitions);
+      } else {
+        recordSupplier.seekToLatest(partitions);
+      }
+    }
+    catch (Exception e) {
+      throw new SamplerException(
+          e,
+          "Exception while seeking to the [%s] offset of partitions in topic [%s]: %s",
+          useEarliestOffset ? "earliest" : "latest",

Review Comment:
   Just noticed something: this class is shared by both kaka and kinesis streaming. In Kinesis vernacular, it's sequence number instead of offset. The Druid supervisor settings[ are also different](https://druid.apache.org/docs/latest/development/extensions-core/kinesis-ingestion.html ) for Kafka vs Kinesis: `useEarliestSequenceNumber` vs `useEarliestOffset`.
   
   If it's easy to identify that here, we could perhaps change the messaging a tiny bit especially if we're surfacing this exception all the way to the user. What do you think, @amaechler?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekrb19 commented on a diff in pull request #14355: Handle StreamExceptions when initializing input source

Posted by "abhishekrb19 (via GitHub)" <gi...@apache.org>.
abhishekrb19 commented on code in PR #14355:
URL: https://github.com/apache/druid/pull/14355#discussion_r1213490859


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java:
##########
@@ -68,29 +69,35 @@ public RecordSupplierInputSource(
     this.recordSupplier = recordSupplier;
     this.useEarliestOffset = useEarliestOffset;
     this.iteratorTimeoutMs = iteratorTimeoutMs;
-    try {
-      assignAndSeek(recordSupplier);
-    }
-    catch (InterruptedException e) {
-      throw new SamplerException(e, "Exception while seeking to partitions");
-    }
+
+    assignAndSeek(recordSupplier);
   }
 
   private void assignAndSeek(RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType> recordSupplier)
-      throws InterruptedException
   {
-    final Set<StreamPartition<PartitionIdType>> partitions = recordSupplier
-        .getPartitionIds(topic)
-        .stream()
-        .map(partitionId -> StreamPartition.of(topic, partitionId))
-        .collect(Collectors.toSet());
-
-    recordSupplier.assign(partitions);
-
-    if (useEarliestOffset) {
-      recordSupplier.seekToEarliest(partitions);
-    } else {
-      recordSupplier.seekToLatest(partitions);
+    try {
+      final Set<StreamPartition<PartitionIdType>> partitions = recordSupplier
+          .getPartitionIds(topic)
+          .stream()
+          .map(partitionId -> StreamPartition.of(topic, partitionId))
+          .collect(Collectors.toSet());
+
+      recordSupplier.assign(partitions);
+
+      if (useEarliestOffset) {
+        recordSupplier.seekToEarliest(partitions);
+      } else {
+        recordSupplier.seekToLatest(partitions);
+      }
+    }
+    catch (Exception e) {
+      throw new SamplerException(
+          e,
+          "Exception while seeking to the [%s] offset of partitions in topic [%s]: %s",
+          useEarliestOffset ? "earliest" : "latest",

Review Comment:
   Just noticed something: this class is shared by both kaka and kinesis streaming. In Kinesis vernacular, it's sequence number instead of offset. The Druid supervisor settings[ are also different](https://druid.apache.org/docs/latest/development/extensions-core/kinesis-ingestion.html ) for Kafka vs Kinesis: `useEarliestSequenceNumber` vs `useEarliestOffset`.
   
   If it's easy to identify that here, we could perhaps change the messaging a tiny bit. What do you think, @amaechler?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #14355: Handle StreamExceptions when initializing input source

Posted by "kfaraz (via GitHub)" <gi...@apache.org>.
kfaraz commented on code in PR #14355:
URL: https://github.com/apache/druid/pull/14355#discussion_r1213162306


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java:
##########
@@ -130,7 +130,7 @@ boolean isOffsetAvailable(StreamPartition<PartitionIdType> partition,
    *
    * @return set of partitions
    */
-  Set<PartitionIdType> getPartitionIds(String stream);

Review Comment:
   Do we need to add `StreamException` to the method declaration? It is an unchecked exception anyway. Plus, we might have to declare it for most of the other methods of this class too.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java:
##########
@@ -68,16 +70,20 @@ public RecordSupplierInputSource(
     this.recordSupplier = recordSupplier;
     this.useEarliestOffset = useEarliestOffset;
     this.iteratorTimeoutMs = iteratorTimeoutMs;
+
     try {
       assignAndSeek(recordSupplier);
     }
     catch (InterruptedException e) {
-      throw new SamplerException(e, "Exception while seeking to partitions");
+      throw new SamplerException(e, "Thread interrupted while seeking to partitions");
+    }
+    catch (StreamException e) {
+      throw new SamplerException(e, "Exception creating RecordSupplierInputSource while seeking to partitions: %s", Throwables.getRootCause(e).getMessage());

Review Comment:
   Since this would be a user facing message, I am not sure we need to include the part about `creating RecordSupplierInputSource`.
   
   We could have something similar to what we had earlier with a bit more info.
   ```
   Exception while seeking to <earliest/latest> offset of partitions in the stream [<topic>]
   ```
   
   I would even suggest moving the try-catch to the `assignAndSeek()` method itself and then using an appropriate message based on the stage the exception occurred in: get partitions, assign partitions, seek to offset.



##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java:
##########
@@ -135,6 +137,20 @@ public void testReadTimeout() throws IOException
     Assert.assertTrue(supplier.isClosed());
   }
 
+  @Test
+  public void testRecordSupplierInputSourceThrowsSamplerExceptionWhenExceptionDuringSeek()
+  {
+    final RecordSupplier<?, ?, ?> supplier = Mockito.mock(RecordSupplier.class);
+    Mockito.when(supplier.getPartitionIds("test-stream"))
+           .thenThrow(new StreamException(new Exception("Something bad happened")));
+
+    //noinspection ResultOfObjectAllocationIgnored
+    Assert.assertThrows(

Review Comment:
   Maybe also verify (part of) the exception message.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java:
##########
@@ -68,16 +70,20 @@ public RecordSupplierInputSource(
     this.recordSupplier = recordSupplier;
     this.useEarliestOffset = useEarliestOffset;
     this.iteratorTimeoutMs = iteratorTimeoutMs;
+
     try {
       assignAndSeek(recordSupplier);
     }
     catch (InterruptedException e) {
-      throw new SamplerException(e, "Exception while seeking to partitions");

Review Comment:
   It seems weird to throw a `SamplerException` from an `InputSource` implementation but apparently this impl is used only with the sampler. So I guess that is okay.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java:
##########
@@ -68,16 +70,20 @@ public RecordSupplierInputSource(
     this.recordSupplier = recordSupplier;
     this.useEarliestOffset = useEarliestOffset;
     this.iteratorTimeoutMs = iteratorTimeoutMs;
+
     try {
       assignAndSeek(recordSupplier);
     }
     catch (InterruptedException e) {
-      throw new SamplerException(e, "Exception while seeking to partitions");
+      throw new SamplerException(e, "Thread interrupted while seeking to partitions");
+    }
+    catch (StreamException e) {

Review Comment:
   I think I might prefer your alternative approach of catching all `Exception`s here and wrapping them up in a `SamplerException`, as we are not doing anything special with a `StreamException`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] amaechler commented on a diff in pull request #14355: Handle StreamExceptions when initializing input source

Posted by "amaechler (via GitHub)" <gi...@apache.org>.
amaechler commented on code in PR #14355:
URL: https://github.com/apache/druid/pull/14355#discussion_r1213302329


##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java:
##########
@@ -135,6 +137,20 @@ public void testReadTimeout() throws IOException
     Assert.assertTrue(supplier.isClosed());
   }
 
+  @Test
+  public void testRecordSupplierInputSourceThrowsSamplerExceptionWhenExceptionDuringSeek()
+  {
+    final RecordSupplier<?, ?, ?> supplier = Mockito.mock(RecordSupplier.class);
+    Mockito.when(supplier.getPartitionIds("test-stream"))
+           .thenThrow(new StreamException(new Exception("Something bad happened")));
+
+    //noinspection ResultOfObjectAllocationIgnored
+    Assert.assertThrows(

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on pull request #14355: Handle StreamExceptions when initializing input source

Posted by "kfaraz (via GitHub)" <gi...@apache.org>.
kfaraz commented on PR #14355:
URL: https://github.com/apache/druid/pull/14355#issuecomment-1573054216

   @amaechler , there seem to be many unrelated UT failures. Could you please try merging the latest master into your branch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] amaechler commented on a diff in pull request #14355: Handle StreamExceptions when initializing input source

Posted by "amaechler (via GitHub)" <gi...@apache.org>.
amaechler commented on code in PR #14355:
URL: https://github.com/apache/druid/pull/14355#discussion_r1213294832


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java:
##########
@@ -130,7 +130,7 @@ boolean isOffsetAvailable(StreamPartition<PartitionIdType> partition,
    *
    * @return set of partitions
    */
-  Set<PartitionIdType> getPartitionIds(String stream);

Review Comment:
   Removed.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java:
##########
@@ -68,16 +70,20 @@ public RecordSupplierInputSource(
     this.recordSupplier = recordSupplier;
     this.useEarliestOffset = useEarliestOffset;
     this.iteratorTimeoutMs = iteratorTimeoutMs;
+
     try {
       assignAndSeek(recordSupplier);
     }
     catch (InterruptedException e) {
-      throw new SamplerException(e, "Exception while seeking to partitions");
+      throw new SamplerException(e, "Thread interrupted while seeking to partitions");
+    }
+    catch (StreamException e) {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekrb19 commented on a diff in pull request #14355: Handle StreamExceptions when initializing input source

Posted by "abhishekrb19 (via GitHub)" <gi...@apache.org>.
abhishekrb19 commented on code in PR #14355:
URL: https://github.com/apache/druid/pull/14355#discussion_r1213476581


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java:
##########
@@ -68,29 +69,35 @@ public RecordSupplierInputSource(
     this.recordSupplier = recordSupplier;
     this.useEarliestOffset = useEarliestOffset;
     this.iteratorTimeoutMs = iteratorTimeoutMs;
-    try {
-      assignAndSeek(recordSupplier);
-    }
-    catch (InterruptedException e) {
-      throw new SamplerException(e, "Exception while seeking to partitions");
-    }
+
+    assignAndSeek(recordSupplier);
   }
 
   private void assignAndSeek(RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType> recordSupplier)
-      throws InterruptedException
   {
-    final Set<StreamPartition<PartitionIdType>> partitions = recordSupplier
-        .getPartitionIds(topic)
-        .stream()
-        .map(partitionId -> StreamPartition.of(topic, partitionId))
-        .collect(Collectors.toSet());
-
-    recordSupplier.assign(partitions);
-
-    if (useEarliestOffset) {
-      recordSupplier.seekToEarliest(partitions);
-    } else {
-      recordSupplier.seekToLatest(partitions);
+    try {
+      final Set<StreamPartition<PartitionIdType>> partitions = recordSupplier
+          .getPartitionIds(topic)
+          .stream()
+          .map(partitionId -> StreamPartition.of(topic, partitionId))
+          .collect(Collectors.toSet());
+
+      recordSupplier.assign(partitions);
+
+      if (useEarliestOffset) {
+        recordSupplier.seekToEarliest(partitions);
+      } else {
+        recordSupplier.seekToLatest(partitions);
+      }
+    }
+    catch (Exception e) {
+      throw new SamplerException(
+          e,
+          "Exception while seeking to the [%s] offset of partitions in topic [%s]: %s",
+          useEarliestOffset ? "earliest" : "latest",

Review Comment:
   Neat!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekrb19 commented on a diff in pull request #14355: Handle StreamExceptions when initializing input source

Posted by "abhishekrb19 (via GitHub)" <gi...@apache.org>.
abhishekrb19 commented on code in PR #14355:
URL: https://github.com/apache/druid/pull/14355#discussion_r1213478323


##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java:
##########
@@ -135,6 +137,24 @@ public void testReadTimeout() throws IOException
     Assert.assertTrue(supplier.isClosed());
   }
 
+  @Test
+  public void testRecordSupplierInputSourceThrowsSamplerExceptionWhenExceptionDuringSeek()
+  {
+    final RecordSupplier<?, ?, ?> supplier = Mockito.mock(RecordSupplier.class);
+    Mockito.when(supplier.getPartitionIds("test-stream"))
+           .thenThrow(new StreamException(new Exception("Something bad happened")));
+
+    //noinspection ResultOfObjectAllocationIgnored

Review Comment:
   Do we need this `noinspection` annotation? My understanding is that the static checks don't run on test code, but I might be wrong.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org