You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2018/08/15 20:55:57 UTC

[incubator-druid] branch master updated: Fix three bugs with segment publishing. (#6155)

This is an automated email from the ASF dual-hosted git repository.

fjy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ce3185  Fix three bugs with segment publishing. (#6155)
5ce3185 is described below

commit 5ce3185b9cd2704aa86af3348529324292b4a593
Author: Gian Merlino <gi...@gmail.com>
AuthorDate: Wed Aug 15 14:55:53 2018 -0600

    Fix three bugs with segment publishing. (#6155)
    
    * Fix three bugs with segment publishing.
    
    1. In AppenderatorImpl: always use a unique path if requested, even if the segment
       was already pushed. This is important because if we don't do this, it causes
       the issue mentioned in #6124.
    2. In IndexerSQLMetadataStorageCoordinator: Fix a bug that could cause it to return
       a "not published" result instead of throwing an exception, when there was one
       metadata update failure, followed by some random exception. This is done by
       resetting the AtomicBoolean that tracks what case we're in, each time the
       callback runs.
    3. In BaseAppenderatorDriver: Only kill segments if we get an affirmative false
       publish result. Skip killing if we just got some exception. The reason for this
       is that we want to avoid killing segments if they are in an unknown state.
    
    Two other changes to clarify the contracts a bit and hopefully prevent future bugs:
    
    1. Return SegmentPublishResult from TransactionalSegmentPublisher, to make it
    more similar to announceHistoricalSegments.
    2. Make it explicit, at multiple levels of javadocs, that a "false" publish result
    must indicate that the publish _definitely_ did not happen. Unknown states must be
    exceptions. This helps BaseAppenderatorDriver do the right thing.
    
    * Remove javadoc-only import.
    
    * Updates.
    
    * Fix test.
    
    * Fix tests.
---
 .../IncrementalPublishingKafkaIndexTaskRunner.java |  2 +-
 .../indexing/kafka/LegacyKafkaIndexTaskRunner.java |  2 +-
 .../task/AppenderatorDriverRealtimeIndexTask.java  |  2 +-
 .../io/druid/indexing/common/task/IndexTask.java   |  2 +-
 .../SinglePhaseParallelIndexTaskRunner.java        |  4 +-
 .../IndexerMetadataStorageCoordinator.java         | 12 ++--
 .../IndexerSQLMetadataStorageCoordinator.java      | 64 +++++++++++++---------
 .../realtime/appenderator/AppenderatorImpl.java    | 11 +++-
 .../appenderator/BaseAppenderatorDriver.java       | 25 ++++-----
 .../TransactionalSegmentPublisher.java             |  8 ++-
 .../appenderator/BatchAppenderatorDriverTest.java  |  5 +-
 .../StreamAppenderatorDriverFailTest.java          | 56 ++++++++++---------
 .../appenderator/StreamAppenderatorDriverTest.java |  6 +-
 13 files changed, 113 insertions(+), 86 deletions(-)

diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
index a93fde6..f8d2e10 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
@@ -1749,7 +1749,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
 
         log.info("Publishing with isTransaction[%s].", useTransaction);
 
-        return toolbox.getTaskActionClient().submit(action).isSuccess();
+        return toolbox.getTaskActionClient().submit(action);
       };
     }
   }
diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
index b7cdd3e..a9dff63 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
@@ -518,7 +518,7 @@ public class LegacyKafkaIndexTaskRunner implements KafkaIndexTaskRunner
 
         log.info("Publishing with isTransaction[%s].", ioConfig.isUseTransaction());
 
-        return toolbox.getTaskActionClient().submit(action).isSuccess();
+        return toolbox.getTaskActionClient().submit(action);
       };
 
       // Supervised kafka tasks are killed by KafkaSupervisor if they are stuck during publishing segments or waiting
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
index 9a0ea65..c04d7df 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
@@ -297,7 +297,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
 
       final TransactionalSegmentPublisher publisher = (segments, commitMetadata) -> {
         final SegmentTransactionalInsertAction action = new SegmentTransactionalInsertAction(segments);
-        return toolbox.getTaskActionClient().submit(action).isSuccess();
+        return toolbox.getTaskActionClient().submit(action);
       };
 
       // Skip connecting firehose if we've been stopped before we got started.
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
index 6d4cfba..4b14f68 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
@@ -949,7 +949,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
 
     final TransactionalSegmentPublisher publisher = (segments, commitMetadata) -> {
       final SegmentTransactionalInsertAction action = new SegmentTransactionalInsertAction(segments);
-      return toolbox.getTaskActionClient().submit(action).isSuccess();
+      return toolbox.getTaskActionClient().submit(action);
     };
 
     try (
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java
index ae1addf..8ad4ec2 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java
@@ -382,7 +382,7 @@ public class SinglePhaseParallelIndexTaskRunner implements ParallelIndexTaskRunn
   {
     final TransactionalSegmentPublisher publisher = (segments, commitMetadata) -> {
       final SegmentTransactionalInsertAction action = new SegmentTransactionalInsertAction(segments);
-      return toolbox.getTaskActionClient().submit(action).isSuccess();
+      return toolbox.getTaskActionClient().submit(action);
     };
     final UsedSegmentChecker usedSegmentChecker = new ActionBasedUsedSegmentChecker(toolbox.getTaskActionClient());
     final Set<DataSegment> segmentsToPublish = segmentsMap
@@ -390,7 +390,7 @@ public class SinglePhaseParallelIndexTaskRunner implements ParallelIndexTaskRunn
         .stream()
         .flatMap(report -> report.getSegments().stream())
         .collect(Collectors.toSet());
-    final boolean published = publisher.publishSegments(segmentsToPublish, null);
+    final boolean published = publisher.publishSegments(segmentsToPublish, null).isSuccess();
 
     if (published) {
       log.info("Published segments");
diff --git a/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
index 0a7acae..a90ca26 100644
--- a/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
+++ b/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
@@ -50,13 +50,14 @@ public interface IndexerMetadataStorageCoordinator
 
   /**
    * Get all used segments and the created_date of these segments in a given datasource and interval
-   * 
+   *
    * @param dataSource The datasource to query
    * @param interval   The interval for which all applicable and used datasources are requested. Start is inclusive, end is exclusive
+   *
    * @return The DataSegments and the related created_date of segments which include data in the requested interval
    */
   List<Pair<DataSegment, String>> getUsedSegmentAndCreatedDateForInterval(String dataSource, Interval interval);
-  
+
   /**
    * Get all segments which may include any data in the interval and are flagged as used.
    *
@@ -134,9 +135,12 @@ public interface IndexerMetadataStorageCoordinator
    *                      {@link DataSourceMetadata#plus(DataSourceMetadata)}. If null, this insert will not
    *                      involve a metadata transaction
    *
-   * @return segment publish result indicating transaction success or failure, and set of segments actually published
+   * @return segment publish result indicating transaction success or failure, and set of segments actually published.
+   * This method must only return a failure code if it is sure that the transaction did not happen. If it is not sure,
+   * it must throw an exception instead.
    *
    * @throws IllegalArgumentException if startMetadata and endMetadata are not either both null or both non-null
+   * @throws RuntimeException         if the state of metadata storage after this call is unknown
    */
   SegmentPublishResult announceHistoricalSegments(
       Set<DataSegment> segments,
@@ -177,7 +181,7 @@ public interface IndexerMetadataStorageCoordinator
    * @return true if the entry was inserted, false otherwise
    */
   boolean insertDataSourceMetadata(String dataSource, DataSourceMetadata dataSourceMetadata);
-  
+
   void updateSegmentMetadata(Set<DataSegment> segments);
 
   void deleteSegments(Set<DataSegment> segments);
diff --git a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 75e5d9a..82e17a1 100644
--- a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -311,7 +311,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
       }
     }
 
-    final AtomicBoolean txnFailure = new AtomicBoolean(false);
+    final AtomicBoolean definitelyNotUpdated = new AtomicBoolean(false);
 
     try {
       return connector.retryTransaction(
@@ -323,6 +323,9 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
                 final TransactionStatus transactionStatus
             ) throws Exception
             {
+              // Set definitelyNotUpdated back to false upon retrying.
+              definitelyNotUpdated.set(false);
+
               final Set<DataSegment> inserted = Sets.newHashSet();
 
               if (startMetadata != null) {
@@ -334,8 +337,9 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
                 );
 
                 if (result != DataSourceMetadataUpdateResult.SUCCESS) {
+                  // Metadata was definitely not updated.
                   transactionStatus.setRollbackOnly();
-                  txnFailure.set(true);
+                  definitelyNotUpdated.set(true);
 
                   if (result == DataSourceMetadataUpdateResult.FAILURE) {
                     throw new RuntimeException("Aborting transaction!");
@@ -359,9 +363,10 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
       );
     }
     catch (CallbackFailedException e) {
-      if (txnFailure.get()) {
-        return new SegmentPublishResult(ImmutableSet.of(), false);
+      if (definitelyNotUpdated.get()) {
+        return SegmentPublishResult.fail();
       } else {
+        // Must throw exception if we are not sure if we updated or not.
         throw e;
       }
     }
@@ -890,7 +895,12 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
    * @param endMetadata   dataSource metadata post-insert will have this endMetadata merged in with
    *                      {@link DataSourceMetadata#plus(DataSourceMetadata)}
    *
-   * @return true if dataSource metadata was updated from matching startMetadata to matching endMetadata
+   * @return SUCCESS if dataSource metadata was updated from matching startMetadata to matching endMetadata, FAILURE or
+   * TRY_AGAIN if it definitely was not updated. This guarantee is meant to help
+   * {@link #announceHistoricalSegments(Set, DataSourceMetadata, DataSourceMetadata)}
+   * achieve its own guarantee.
+   *
+   * @throws RuntimeException if state is unknown after this call
    */
   protected DataSourceMetadataUpdateResult updateDataSourceMetadataWithHandle(
       final Handle handle,
@@ -1163,29 +1173,31 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
         handle -> handle.createQuery(
             StringUtils.format(
                 "SELECT created_date, payload FROM %1$s WHERE dataSource = :dataSource " +
-                    "AND start >= :start AND %2$send%2$s <= :end AND used = true",
+                "AND start >= :start AND %2$send%2$s <= :end AND used = true",
                 dbTables.getSegmentsTable(), connector.getQuoteString()
             )
         )
-            .bind("dataSource", dataSource)
-            .bind("start", interval.getStart().toString())
-            .bind("end", interval.getEnd().toString())
-            .map(new ResultSetMapper<Pair<DataSegment, String>>()
-            {
-              @Override
-              public Pair<DataSegment, String> map(int index, ResultSet r, StatementContext ctx) throws SQLException
-              {
-                try {
-                  return new Pair<>(
-                      jsonMapper.readValue(r.getBytes("payload"), DataSegment.class),
-                      r.getString("created_date"));
-                }
-                catch (IOException e) {
-                  throw new RuntimeException(e);
-                }
-              }
-            })
-            .list()
+                        .bind("dataSource", dataSource)
+                        .bind("start", interval.getStart().toString())
+                        .bind("end", interval.getEnd().toString())
+                        .map(new ResultSetMapper<Pair<DataSegment, String>>()
+                        {
+                          @Override
+                          public Pair<DataSegment, String> map(int index, ResultSet r, StatementContext ctx)
+                              throws SQLException
+                          {
+                            try {
+                              return new Pair<>(
+                                  jsonMapper.readValue(r.getBytes("payload"), DataSegment.class),
+                                  r.getString("created_date")
+                              );
+                            }
+                            catch (IOException e) {
+                              throw new RuntimeException(e);
+                            }
+                          }
+                        })
+                        .list()
     );
   }
 
@@ -1197,7 +1209,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
             .createStatement(
                 StringUtils.format(
                     "INSERT INTO %s (dataSource, created_date, commit_metadata_payload, commit_metadata_sha1) VALUES" +
-                        " (:dataSource, :created_date, :commit_metadata_payload, :commit_metadata_sha1)",
+                    " (:dataSource, :created_date, :commit_metadata_payload, :commit_metadata_sha1)",
                     dbTables.getDataSourceTable()
                 )
             )
diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java
index 5b79626..4bb0857 100644
--- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java
+++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java
@@ -691,8 +691,15 @@ public class AppenderatorImpl implements Appenderator
     try {
       if (descriptorFile.exists()) {
         // Already pushed.
-        log.info("Segment[%s] already pushed.", identifier);
-        return objectMapper.readValue(descriptorFile, DataSegment.class);
+
+        if (useUniquePath) {
+          // Don't reuse the descriptor, because the caller asked for a unique path. Leave the old one as-is, since
+          // it might serve some unknown purpose.
+          log.info("Pushing segment[%s] again with new unique path.", identifier);
+        } else {
+          log.info("Segment[%s] already pushed.", identifier);
+          return objectMapper.readValue(descriptorFile, DataSegment.class);
+        }
       }
 
       log.info("Pushing merged index for segment[%s].", identifier);
diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java b/server/src/main/java/io/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
index a5a6d20..43ec063 100644
--- a/server/src/main/java/io/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
+++ b/server/src/main/java/io/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
@@ -555,38 +555,33 @@ public abstract class BaseAppenderatorDriver implements Closeable
               final boolean published = publisher.publishSegments(
                   ImmutableSet.copyOf(segmentsAndMetadata.getSegments()),
                   metadata == null ? null : ((AppenderatorDriverMetadata) metadata).getCallerMetadata()
-              );
+              ).isSuccess();
 
               if (published) {
                 log.info("Published segments.");
               } else {
-                log.info("Transaction failure while publishing segments, checking if someone else beat us to it.");
+                log.info("Transaction failure while publishing segments, removing them from deep storage "
+                         + "and checking if someone else beat us to publishing.");
+
+                segmentsAndMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
+
                 final Set<SegmentIdentifier> segmentsIdentifiers = segmentsAndMetadata
                     .getSegments()
                     .stream()
                     .map(SegmentIdentifier::fromDataSegment)
                     .collect(Collectors.toSet());
+
                 if (usedSegmentChecker.findUsedSegments(segmentsIdentifiers)
                                       .equals(Sets.newHashSet(segmentsAndMetadata.getSegments()))) {
-                  log.info(
-                      "Removing our segments from deep storage because someone else already published them: %s",
-                      segmentsAndMetadata.getSegments()
-                  );
-                  segmentsAndMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
-
                   log.info("Our segments really do exist, awaiting handoff.");
                 } else {
-                  throw new ISE("Failed to publish segments[%s]", segmentsAndMetadata.getSegments());
+                  throw new ISE("Failed to publish segments.");
                 }
               }
             }
             catch (Exception e) {
-              log.warn(
-                  "Removing segments from deep storage after failed publish: %s",
-                  segmentsAndMetadata.getSegments()
-              );
-              segmentsAndMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
-
+              // Must not remove segments here, we aren't sure if our transaction succeeded or not.
+              log.warn(e, "Failed publish, not removing segments: %s", segmentsAndMetadata.getSegments());
               throw Throwables.propagate(e);
             }
           }
diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java b/server/src/main/java/io/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java
index d49b181..9a30fbd 100644
--- a/server/src/main/java/io/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java
+++ b/server/src/main/java/io/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java
@@ -19,6 +19,7 @@
 
 package io.druid.segment.realtime.appenderator;
 
+import io.druid.indexing.overlord.SegmentPublishResult;
 import io.druid.timeline.DataSegment;
 
 import javax.annotation.Nullable;
@@ -30,11 +31,14 @@ public interface TransactionalSegmentPublisher
   /**
    * Publish segments, along with some commit metadata, in a single transaction.
    *
-   * @return true if segments were published, false if they were not published due to txn failure with the metadata
+   * @return publish result that indicates if segments were published or not. If it is unclear
+   * if the segments were published or not, this method must throw an exception. The behavior is similar to
+   * IndexerSQLMetadataStorageCoordinator's announceHistoricalSegments.
    *
    * @throws IOException if there was an I/O error when publishing
+   * @throws RuntimeException if we cannot tell if the segments were published or not, for some other reason
    */
-  boolean publishSegments(
+  SegmentPublishResult publishSegments(
       Set<DataSegment> segments,
       @Nullable Object commitMetadata
   ) throws IOException;
diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java
index 4ac3448..c472f80 100644
--- a/server/src/test/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java
+++ b/server/src/test/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java
@@ -24,13 +24,14 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import io.druid.data.input.InputRow;
 import io.druid.data.input.MapBasedInputRow;
+import io.druid.indexing.overlord.SegmentPublishResult;
 import io.druid.java.util.common.DateTimes;
 import io.druid.java.util.common.Intervals;
 import io.druid.java.util.common.granularity.Granularities;
 import io.druid.segment.loading.DataSegmentKiller;
 import io.druid.segment.realtime.appenderator.BaseAppenderatorDriver.SegmentsForSequence;
-import io.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.TestSegmentAllocator;
 import io.druid.segment.realtime.appenderator.SegmentWithState.SegmentState;
+import io.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.TestSegmentAllocator;
 import io.druid.timeline.partition.NumberedShardSpec;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockSupport;
@@ -194,6 +195,6 @@ public class BatchAppenderatorDriverTest extends EasyMockSupport
 
   static TransactionalSegmentPublisher makeOkPublisher()
   {
-    return (segments, commitMetadata) -> true;
+    return (segments, commitMetadata) -> new SegmentPublishResult(ImmutableSet.of(), true);
   }
 }
diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
index 4d512e1..1c7abb1 100644
--- a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
+++ b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
@@ -239,8 +239,7 @@ public class StreamAppenderatorDriverFailTest extends EasyMockSupport
   {
     expectedException.expect(ExecutionException.class);
     expectedException.expectCause(CoreMatchers.instanceOf(ISE.class));
-    expectedException.expectMessage(
-        "Failed to publish segments[[DataSegment{size=0, shardSpec=NumberedShardSpec{partitionNum=0, partitions=0}, metrics=[], dimensions=[], version='abc123', loadSpec={}, interval=2000-01-01T00:00:00.000Z/2000-01-01T01:00:00.000Z, dataSource='foo', binaryVersion='0'}, DataSegment{size=0, shardSpec=NumberedShardSpec{partitionNum=0, partitions=0}, metrics=[], dimensions=[], version='abc123', loadSpec={}, interval=2000-01-01T01:00:00.000Z/2000-01-01T02:00:00.000Z, dataSource='foo', binar [...]
+    expectedException.expectMessage("Failed to publish segments.");
 
     testFailDuringPublishInternal(false);
   }
@@ -279,31 +278,34 @@ public class StreamAppenderatorDriverFailTest extends EasyMockSupport
       Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier, false, true).isOk());
     }
 
-    dataSegmentKiller.killQuietly(new DataSegment(
-        "foo",
-        Intervals.of("2000-01-01T00:00:00.000Z/2000-01-01T01:00:00.000Z"),
-        "abc123",
-        ImmutableMap.of(),
-        ImmutableList.of(),
-        ImmutableList.of(),
-        new NumberedShardSpec(0, 0),
-        0,
-        0
-    ));
-    EasyMock.expectLastCall().once();
-
-    dataSegmentKiller.killQuietly(new DataSegment(
-        "foo",
-        Intervals.of("2000-01-01T01:00:00.000Z/2000-01-01T02:00:00.000Z"),
-        "abc123",
-        ImmutableMap.of(),
-        ImmutableList.of(),
-        ImmutableList.of(),
-        new NumberedShardSpec(0, 0),
-        0,
-        0
-    ));
-    EasyMock.expectLastCall().once();
+    if (!failWithException) {
+      // Should only kill segments if there was _no_ exception.
+      dataSegmentKiller.killQuietly(new DataSegment(
+          "foo",
+          Intervals.of("2000-01-01T00:00:00.000Z/2000-01-01T01:00:00.000Z"),
+          "abc123",
+          ImmutableMap.of(),
+          ImmutableList.of(),
+          ImmutableList.of(),
+          new NumberedShardSpec(0, 0),
+          0,
+          0
+      ));
+      EasyMock.expectLastCall().once();
+
+      dataSegmentKiller.killQuietly(new DataSegment(
+          "foo",
+          Intervals.of("2000-01-01T01:00:00.000Z/2000-01-01T02:00:00.000Z"),
+          "abc123",
+          ImmutableMap.of(),
+          ImmutableList.of(),
+          ImmutableList.of(),
+          new NumberedShardSpec(0, 0),
+          0,
+          0
+      ));
+      EasyMock.expectLastCall().once();
+    }
 
     EasyMock.replay(dataSegmentKiller);
 
diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
index acdbed5..b9c5e22 100644
--- a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
+++ b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
@@ -30,6 +30,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import io.druid.data.input.Committer;
 import io.druid.data.input.InputRow;
 import io.druid.data.input.MapBasedInputRow;
+import io.druid.indexing.overlord.SegmentPublishResult;
 import io.druid.jackson.DefaultObjectMapper;
 import io.druid.java.util.common.DateTimes;
 import io.druid.java.util.common.Intervals;
@@ -53,6 +54,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -359,7 +361,7 @@ public class StreamAppenderatorDriverTest extends EasyMockSupport
 
   static TransactionalSegmentPublisher makeOkPublisher()
   {
-    return (segments, commitMetadata) -> true;
+    return (segments, commitMetadata) -> new SegmentPublishResult(Collections.emptySet(), true);
   }
 
   static TransactionalSegmentPublisher makeFailingPublisher(boolean failWithException)
@@ -368,7 +370,7 @@ public class StreamAppenderatorDriverTest extends EasyMockSupport
       if (failWithException) {
         throw new RuntimeException("test");
       }
-      return false;
+      return SegmentPublishResult.fail();
     };
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org