You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2019/02/13 19:22:54 UTC

[incubator-druid] branch master updated: Improve error message for revoked locks (#7035)

This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 1701fbca Improve error message for revoked locks (#7035)
1701fbca is described below

commit 1701fbcad3430a3e6e19134e870a6059864c8a80
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Wed Feb 13 11:22:48 2019 -0800

    Improve error message for revoked locks (#7035)
    
    * Improve error message for revoked locks
    
    * fix test
    
    * fix test
    
    * fix test
    
    * fix toString
---
 .../actions/SegmentTransactionalInsertAction.java  |  7 +-
 .../SegmentTransactionalInsertActionTest.java      |  8 +--
 .../druid/indexing/common/task/IndexTaskTest.java  |  5 +-
 .../TestIndexerMetadataStorageCoordinator.java     |  2 +-
 .../indexing/overlord/SegmentPublishResult.java    | 31 +++++++--
 .../IndexerSQLMetadataStorageCoordinator.java      |  4 +-
 .../appenderator/BaseAppenderatorDriver.java       | 27 ++++++--
 .../overlord/SegmentPublishResultTest.java         | 79 ++++++++++++++++++++++
 .../IndexerSQLMetadataStorageCoordinatorTest.java  | 18 ++---
 .../appenderator/BatchAppenderatorDriverTest.java  |  2 +-
 .../StreamAppenderatorDriverFailTest.java          |  2 +-
 .../appenderator/StreamAppenderatorDriverTest.java |  7 +-
 12 files changed, 154 insertions(+), 38 deletions(-)

diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
index 8a3c713..0af850e 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
@@ -118,7 +118,12 @@ public class SegmentTransactionalInsertAction implements TaskAction<SegmentPubli
                       endMetadata
                   )
               )
-              .onInvalidLocks(SegmentPublishResult::fail)
+              .onInvalidLocks(
+                  () -> SegmentPublishResult.fail(
+                      "Invalid task locks. Maybe they are revoked by a higher priority task."
+                      + " Please check the overlord log for details."
+                  )
+              )
               .build()
       );
     }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
index e152995..463916f 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
@@ -101,7 +101,7 @@ public class SegmentTransactionalInsertActionTest
         task,
         actionTestKit.getTaskActionToolbox()
     );
-    Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(SEGMENT1), true), result1);
+    Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(SEGMENT1)), result1);
 
     SegmentPublishResult result2 = new SegmentTransactionalInsertAction(
         ImmutableSet.of(SEGMENT2),
@@ -111,7 +111,7 @@ public class SegmentTransactionalInsertActionTest
         task,
         actionTestKit.getTaskActionToolbox()
     );
-    Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(SEGMENT2), true), result2);
+    Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(SEGMENT2)), result2);
 
     Assert.assertEquals(
         ImmutableSet.of(SEGMENT1, SEGMENT2),
@@ -143,7 +143,7 @@ public class SegmentTransactionalInsertActionTest
         actionTestKit.getTaskActionToolbox()
     );
 
-    Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(), false), result);
+    Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Aborting transaction!"), result);
   }
 
   @Test
@@ -157,6 +157,6 @@ public class SegmentTransactionalInsertActionTest
     thrown.expect(IllegalStateException.class);
     thrown.expectMessage(CoreMatchers.containsString("are not covered by locks"));
     SegmentPublishResult result = action.perform(task, actionTestKit.getTaskActionToolbox());
-    Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(SEGMENT3), true), result);
+    Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(SEGMENT3)), result);
   }
 }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
index 746b0c9..ef7dbce 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
@@ -1574,10 +1574,7 @@ public class IndexTaskTest
         }
 
         if (taskAction instanceof SegmentTransactionalInsertAction) {
-          return (RetType) new SegmentPublishResult(
-              ((SegmentTransactionalInsertAction) taskAction).getSegments(),
-              true
-          );
+          return (RetType) SegmentPublishResult.ok(((SegmentTransactionalInsertAction) taskAction).getSegments());
         }
 
         if (taskAction instanceof SegmentAllocateAction) {
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
index 0424cbf..0eeecd5 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
@@ -116,7 +116,7 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto
   )
   {
     // Don't actually compare metadata, just do it!
-    return new SegmentPublishResult(announceHistoricalSegments(segments), true);
+    return SegmentPublishResult.ok(announceHistoricalSegments(segments));
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java b/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java
index dc86b72..a088c93 100644
--- a/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java
+++ b/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java
@@ -25,6 +25,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
 import org.apache.druid.timeline.DataSegment;
 
+import javax.annotation.Nullable;
 import java.util.Objects;
 import java.util.Set;
 
@@ -42,20 +43,29 @@ public class SegmentPublishResult
 {
   private final Set<DataSegment> segments;
   private final boolean success;
+  @Nullable
+  private final String errorMsg;
 
-  public static SegmentPublishResult fail()
+  public static SegmentPublishResult ok(Set<DataSegment> segments)
   {
-    return new SegmentPublishResult(ImmutableSet.of(), false);
+    return new SegmentPublishResult(segments, true, null);
+  }
+
+  public static SegmentPublishResult fail(String errorMsg)
+  {
+    return new SegmentPublishResult(ImmutableSet.of(), false, errorMsg);
   }
 
   @JsonCreator
-  public SegmentPublishResult(
+  private SegmentPublishResult(
       @JsonProperty("segments") Set<DataSegment> segments,
-      @JsonProperty("success") boolean success
+      @JsonProperty("success") boolean success,
+      @JsonProperty("errorMsg") @Nullable String errorMsg
   )
   {
     this.segments = Preconditions.checkNotNull(segments, "segments");
     this.success = success;
+    this.errorMsg = errorMsg;
 
     if (!success) {
       Preconditions.checkArgument(segments.isEmpty(), "segments must be empty for unsuccessful publishes");
@@ -74,6 +84,13 @@ public class SegmentPublishResult
     return success;
   }
 
+  @JsonProperty
+  @Nullable
+  public String getErrorMsg()
+  {
+    return errorMsg;
+  }
+
   @Override
   public boolean equals(Object o)
   {
@@ -85,13 +102,14 @@ public class SegmentPublishResult
     }
     SegmentPublishResult that = (SegmentPublishResult) o;
     return success == that.success &&
-           Objects.equals(segments, that.segments);
+           Objects.equals(segments, that.segments) &&
+           Objects.equals(errorMsg, that.errorMsg);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(segments, success);
+    return Objects.hash(segments, success, errorMsg);
   }
 
   @Override
@@ -100,6 +118,7 @@ public class SegmentPublishResult
     return "SegmentPublishResult{" +
            "segments=" + segments +
            ", success=" + success +
+           ", errorMsg='" + errorMsg + '\'' +
            '}';
   }
 }
diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 9df5673..b0aaa97 100644
--- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -324,7 +324,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
                 }
               }
 
-              return new SegmentPublishResult(ImmutableSet.copyOf(inserted), true);
+              return SegmentPublishResult.ok(ImmutableSet.copyOf(inserted));
             }
           },
           3,
@@ -333,7 +333,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
     }
     catch (CallbackFailedException e) {
       if (definitelyNotUpdated.get()) {
-        return SegmentPublishResult.fail();
+        return SegmentPublishResult.fail(e.getMessage());
       } else {
         // Must throw exception if we are not sure if we updated or not.
         throw e;
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
index cb6ba90..76100e1 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
@@ -36,6 +36,7 @@ import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.druid.data.input.Committer;
 import org.apache.druid.data.input.InputRow;
+import org.apache.druid.indexing.overlord.SegmentPublishResult;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.logger.Logger;
@@ -553,16 +554,26 @@ public abstract class BaseAppenderatorDriver implements Closeable
 
             try {
               final Object metadata = segmentsAndMetadata.getCommitMetadata();
-              final boolean published = publisher.publishSegments(
+              final SegmentPublishResult publishResult = publisher.publishSegments(
                   ImmutableSet.copyOf(segmentsAndMetadata.getSegments()),
                   metadata == null ? null : ((AppenderatorDriverMetadata) metadata).getCallerMetadata()
-              ).isSuccess();
+              );
 
-              if (published) {
+              if (publishResult.isSuccess()) {
                 log.info("Published segments.");
               } else {
-                log.info("Transaction failure while publishing segments, removing them from deep storage "
-                         + "and checking if someone else beat us to publishing.");
+                if (publishResult.getErrorMsg() == null) {
+                  log.warn(
+                      "Transaction failure while publishing segments. Please check the overlord log."
+                      + " Removing them from deep storage and checking if someone else beat us to publishing."
+                  );
+                } else {
+                  log.warn(
+                      "Transaction failure while publishing segments because of [%s]. Please check the overlord log."
+                      + " Removing them from deep storage and checking if someone else beat us to publishing.",
+                      publishResult.getErrorMsg()
+                  );
+                }
 
                 segmentsAndMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
 
@@ -576,7 +587,11 @@ public abstract class BaseAppenderatorDriver implements Closeable
                                       .equals(Sets.newHashSet(segmentsAndMetadata.getSegments()))) {
                   log.info("Our segments really do exist, awaiting handoff.");
                 } else {
-                  throw new ISE("Failed to publish segments.");
+                  if (publishResult.getErrorMsg() != null) {
+                    throw new ISE("Failed to publish segments because of [%s].", publishResult.getErrorMsg());
+                  } else {
+                    throw new ISE("Failed to publish segments.");
+                  }
                 }
               }
             }
diff --git a/server/src/test/java/org/apache/druid/indexing/overlord/SegmentPublishResultTest.java b/server/src/test/java/org/apache/druid/indexing/overlord/SegmentPublishResultTest.java
new file mode 100644
index 0000000..1772a9d
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/indexing/overlord/SegmentPublishResultTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord;
+
+import com.fasterxml.jackson.databind.InjectableValues.Std;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.DataSegment.PruneLoadSpecHolder;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class SegmentPublishResultTest
+{
+  private final ObjectMapper objectMapper = new DefaultObjectMapper()
+      .setInjectableValues(new Std().addValue(PruneLoadSpecHolder.class, PruneLoadSpecHolder.DEFAULT));
+
+  @Test
+  public void testSerdeOkResult() throws IOException
+  {
+    final SegmentPublishResult original = SegmentPublishResult.ok(
+        ImmutableSet.of(
+            segment(Intervals.of("2018/2019")),
+            segment(Intervals.of("2019/2020"))
+        )
+    );
+
+    final String json = objectMapper.writeValueAsString(original);
+    final SegmentPublishResult fromJson = objectMapper.readValue(json, SegmentPublishResult.class);
+    Assert.assertEquals(original, fromJson);
+  }
+
+  @Test
+  public void testSerdeFailResult() throws IOException
+  {
+    final SegmentPublishResult original = SegmentPublishResult.fail("test");
+
+    final String json = objectMapper.writeValueAsString(original);
+    final SegmentPublishResult fromJson = objectMapper.readValue(json, SegmentPublishResult.class);
+    Assert.assertEquals(original, fromJson);
+  }
+
+  private static DataSegment segment(Interval interval)
+  {
+    return new DataSegment(
+        "ds",
+        interval,
+        "version",
+        null,
+        null,
+        null,
+        null,
+        9,
+        10L
+    );
+  }
+}
diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 09b6f56..879ddfb 100644
--- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -304,7 +304,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
         new ObjectMetadata(null),
         new ObjectMetadata(ImmutableMap.of("foo", "bar"))
     );
-    Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(defaultSegment), true), result1);
+    Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(defaultSegment)), result1);
 
     Assert.assertArrayEquals(
         mapper.writeValueAsString(defaultSegment).getBytes(StandardCharsets.UTF_8),
@@ -322,7 +322,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
         new ObjectMetadata(ImmutableMap.of("foo", "bar")),
         new ObjectMetadata(ImmutableMap.of("foo", "baz"))
     );
-    Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(defaultSegment2), true), result2);
+    Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(defaultSegment2)), result2);
 
     Assert.assertArrayEquals(
         mapper.writeValueAsString(defaultSegment2).getBytes(StandardCharsets.UTF_8),
@@ -378,7 +378,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
         new ObjectMetadata(null),
         new ObjectMetadata(ImmutableMap.of("foo", "bar"))
     );
-    Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(defaultSegment), true), result1);
+    Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(defaultSegment)), result1);
 
     Assert.assertArrayEquals(
         mapper.writeValueAsString(defaultSegment).getBytes(StandardCharsets.UTF_8),
@@ -399,7 +399,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
         new ObjectMetadata(ImmutableMap.of("foo", "bar")),
         new ObjectMetadata(ImmutableMap.of("foo", "baz"))
     );
-    Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(defaultSegment2), true), result2);
+    Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(defaultSegment2)), result2);
 
     Assert.assertArrayEquals(
         mapper.writeValueAsString(defaultSegment2).getBytes(StandardCharsets.UTF_8),
@@ -429,7 +429,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
         new ObjectMetadata(ImmutableMap.of("foo", "bar")),
         new ObjectMetadata(ImmutableMap.of("foo", "baz"))
     );
-    Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(), false), result1);
+    Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Aborting transaction!"), result1);
 
     // Should only be tried once.
     Assert.assertEquals(1, metadataUpdateCounter.get());
@@ -443,14 +443,14 @@ public class IndexerSQLMetadataStorageCoordinatorTest
         new ObjectMetadata(null),
         new ObjectMetadata(ImmutableMap.of("foo", "baz"))
     );
-    Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(defaultSegment), true), result1);
+    Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(defaultSegment)), result1);
 
     final SegmentPublishResult result2 = coordinator.announceHistoricalSegments(
         ImmutableSet.of(defaultSegment2),
         new ObjectMetadata(null),
         new ObjectMetadata(ImmutableMap.of("foo", "baz"))
     );
-    Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(), false), result2);
+    Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Aborting transaction!"), result2);
 
     // Should only be tried once per call.
     Assert.assertEquals(2, metadataUpdateCounter.get());
@@ -464,14 +464,14 @@ public class IndexerSQLMetadataStorageCoordinatorTest
         new ObjectMetadata(null),
         new ObjectMetadata(ImmutableMap.of("foo", "baz"))
     );
-    Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(defaultSegment), true), result1);
+    Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(defaultSegment)), result1);
 
     final SegmentPublishResult result2 = coordinator.announceHistoricalSegments(
         ImmutableSet.of(defaultSegment2),
         new ObjectMetadata(ImmutableMap.of("foo", "qux")),
         new ObjectMetadata(ImmutableMap.of("foo", "baz"))
     );
-    Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(), false), result2);
+    Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Aborting transaction!"), result2);
 
     // Should only be tried once per call.
     Assert.assertEquals(2, metadataUpdateCounter.get());
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java
index 66e136a..6536cb6 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java
@@ -195,6 +195,6 @@ public class BatchAppenderatorDriverTest extends EasyMockSupport
 
   static TransactionalSegmentPublisher makeOkPublisher()
   {
-    return (segments, commitMetadata) -> new SegmentPublishResult(ImmutableSet.of(), true);
+    return (segments, commitMetadata) -> SegmentPublishResult.ok(ImmutableSet.of());
   }
 }
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
index 79b5754..9d92253 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
@@ -239,7 +239,7 @@ public class StreamAppenderatorDriverFailTest extends EasyMockSupport
   {
     expectedException.expect(ExecutionException.class);
     expectedException.expectCause(CoreMatchers.instanceOf(ISE.class));
-    expectedException.expectMessage("Failed to publish segments.");
+    expectedException.expectMessage("Failed to publish segments because of [test].");
 
     testFailDuringPublishInternal(false);
   }
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
index e169217..3a491e9 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
@@ -361,16 +361,17 @@ public class StreamAppenderatorDriverTest extends EasyMockSupport
 
   static TransactionalSegmentPublisher makeOkPublisher()
   {
-    return (segments, commitMetadata) -> new SegmentPublishResult(Collections.emptySet(), true);
+    return (segments, commitMetadata) -> SegmentPublishResult.ok(Collections.emptySet());
   }
 
   static TransactionalSegmentPublisher makeFailingPublisher(boolean failWithException)
   {
     return (segments, commitMetadata) -> {
+      final RuntimeException exception = new RuntimeException("test");
       if (failWithException) {
-        throw new RuntimeException("test");
+        throw exception;
       }
-      return SegmentPublishResult.fail();
+      return SegmentPublishResult.fail(exception.getMessage());
     };
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org