You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/05/09 18:11:41 UTC

[GitHub] [kafka] mjsax commented on a diff in pull request #12137: DRAFT: Consolidate StreamsException and TaskCorruptedException

mjsax commented on code in PR #12137:
URL: https://github.com/apache/kafka/pull/12137#discussion_r868282499


##########
streams/src/main/java/org/apache/kafka/streams/errors/StreamsException.java:
##########
@@ -31,24 +37,34 @@ public class StreamsException extends KafkaException {
 
     private final static long serialVersionUID = 1L;
 
-    private TaskId taskId;
+    private final Set<TaskId> taskIds = new HashSet<>();
 
     public StreamsException(final String message) {
         this(message, (TaskId) null);
     }
 
     public StreamsException(final String message, final TaskId taskId) {
         super(message);
-        this.taskId = taskId;
+        taskIds.add(taskId);
+    }
+
+    public StreamsException(final String message, final Set<TaskId> taskIds) {

Review Comment:
   Should this be `protected` or package-private instead of `public` ?



##########
streams/src/main/java/org/apache/kafka/streams/errors/StreamsException.java:
##########
@@ -57,19 +73,29 @@ public StreamsException(final Throwable throwable) {
 
     public StreamsException(final Throwable throwable, final TaskId taskId) {
         super(throwable);
-        this.taskId = taskId;
+        this.taskIds.add(taskId);
     }
 
     /**
      * @return  the {@link TaskId} that this exception originated from, or {@link Optional#empty()} if the exception
      *          cannot be traced back to a particular task. Note that the {@code TaskId} being empty does not
      *          guarantee that the exception wasn't directly related to a specific task.
      */
+    @Deprecated

Review Comment:
   Why do we need to deprecate this method? (Same question below).



##########
streams/src/main/java/org/apache/kafka/streams/errors/StreamsException.java:
##########
@@ -57,19 +73,29 @@ public StreamsException(final Throwable throwable) {
 
     public StreamsException(final Throwable throwable, final TaskId taskId) {
         super(throwable);
-        this.taskId = taskId;
+        this.taskIds.add(taskId);
     }
 
     /**
      * @return  the {@link TaskId} that this exception originated from, or {@link Optional#empty()} if the exception
      *          cannot be traced back to a particular task. Note that the {@code TaskId} being empty does not
      *          guarantee that the exception wasn't directly related to a specific task.
      */
+    @Deprecated
     public Optional<TaskId> taskId() {
-        return Optional.ofNullable(taskId);
+        return taskIds.isEmpty() ? Optional.empty() : Optional.of(new ArrayList<>(taskIds).get(0));
+    }
+
+    public Set<TaskId> taskIds() {

Review Comment:
   Why do we need to add this method? (Same question below).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org