You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2018/08/18 01:58:29 UTC

[GitHub] jon-wei closed pull request #6187: [Backport] Fix three bugs with segment publishing. (#6155)

jon-wei closed pull request #6187: [Backport] Fix three bugs with segment publishing. (#6155)
URL: https://github.com/apache/incubator-druid/pull/6187
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
index 5f039c0336b..258e203edf9 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
@@ -1126,7 +1126,7 @@ public void run()
 
         log.info("Publishing with isTransaction[%s].", ioConfig.isUseTransaction());
 
-        return toolbox.getTaskActionClient().submit(action).isSuccess();
+        return toolbox.getTaskActionClient().submit(action);
       };
 
       // Supervised kafka tasks are killed by KafkaSupervisor if they are stuck during publishing segments or waiting
@@ -2332,7 +2332,7 @@ TransactionalSegmentPublisher createPublisher(TaskToolbox toolbox, boolean useTr
 
         log.info("Publishing with isTransaction[%s].", useTransaction);
 
-        return toolbox.getTaskActionClient().submit(action).isSuccess();
+        return toolbox.getTaskActionClient().submit(action);
       };
     }
   }
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
index b9ec2705510..a13cf739b39 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
@@ -646,7 +646,7 @@ private boolean generateAndPublishSegments(
 
     final TransactionalSegmentPublisher publisher = (segments, commitMetadata) -> {
       final SegmentTransactionalInsertAction action = new SegmentTransactionalInsertAction(segments);
-      return toolbox.getTaskActionClient().submit(action).isSuccess();
+      return toolbox.getTaskActionClient().submit(action);
     };
 
     try (
diff --git a/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
index 17917d009e8..e2fcceaa8ea 100644
--- a/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
+++ b/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
@@ -122,9 +122,12 @@ SegmentIdentifier allocatePendingSegment(
    *                      {@link DataSourceMetadata#plus(DataSourceMetadata)}. If null, this insert will not
    *                      involve a metadata transaction
    *
-   * @return segment publish result indicating transaction success or failure, and set of segments actually published
+   * @return segment publish result indicating transaction success or failure, and set of segments actually published.
+   * This method must only return a failure code if it is sure that the transaction did not happen. If it is not sure,
+   * it must throw an exception instead.
    *
    * @throws IllegalArgumentException if startMetadata and endMetadata are not either both null or both non-null
+   * @throws RuntimeException         if the state of metadata storage after this call is unknown
    */
   SegmentPublishResult announceHistoricalSegments(
       Set<DataSegment> segments,
diff --git a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 746e359127e..3cf91570489 100644
--- a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -326,7 +326,7 @@ public SegmentPublishResult announceHistoricalSegments(
       }
     }
 
-    final AtomicBoolean txnFailure = new AtomicBoolean(false);
+    final AtomicBoolean definitelyNotUpdated = new AtomicBoolean(false);
 
     try {
       return connector.retryTransaction(
@@ -338,6 +338,9 @@ public SegmentPublishResult inTransaction(
                 final TransactionStatus transactionStatus
             ) throws Exception
             {
+              // Set definitelyNotUpdated back to false upon retrying.
+              definitelyNotUpdated.set(false);
+
               final Set<DataSegment> inserted = Sets.newHashSet();
 
               if (startMetadata != null) {
@@ -349,8 +352,9 @@ public SegmentPublishResult inTransaction(
                 );
 
                 if (result != DataSourceMetadataUpdateResult.SUCCESS) {
+                  // Metadata was definitely not updated.
                   transactionStatus.setRollbackOnly();
-                  txnFailure.set(true);
+                  definitelyNotUpdated.set(true);
 
                   if (result == DataSourceMetadataUpdateResult.FAILURE) {
                     throw new RuntimeException("Aborting transaction!");
@@ -374,9 +378,10 @@ public SegmentPublishResult inTransaction(
       );
     }
     catch (CallbackFailedException e) {
-      if (txnFailure.get()) {
-        return new SegmentPublishResult(ImmutableSet.<DataSegment>of(), false);
+      if (definitelyNotUpdated.get()) {
+        return SegmentPublishResult.fail();
       } else {
+        // Must throw exception if we are not sure if we updated or not.
         throw e;
       }
     }
@@ -904,7 +909,12 @@ public DataSourceMetadata getDataSourceMetadata(final String dataSource)
    * @param endMetadata   dataSource metadata post-insert will have this endMetadata merged in with
    *                      {@link DataSourceMetadata#plus(DataSourceMetadata)}
    *
-   * @return true if dataSource metadata was updated from matching startMetadata to matching endMetadata
+   * @return SUCCESS if dataSource metadata was updated from matching startMetadata to matching endMetadata, FAILURE or
+   * TRY_AGAIN if it definitely was not updated. This guarantee is meant to help
+   * {@link #announceHistoricalSegments(Set, DataSourceMetadata, DataSourceMetadata)}
+   * achieve its own guarantee.
+   *
+   * @throws RuntimeException if state is unknown after this call
    */
   protected DataSourceMetadataUpdateResult updateDataSourceMetadataWithHandle(
       final Handle handle,
diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java
index 3f0f0bac73b..5b73d40c31b 100644
--- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java
+++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java
@@ -625,8 +625,15 @@ private DataSegment mergeAndPush(final SegmentIdentifier identifier, final Sink
     try {
       if (descriptorFile.exists()) {
         // Already pushed.
-        log.info("Segment[%s] already pushed.", identifier);
-        return objectMapper.readValue(descriptorFile, DataSegment.class);
+
+        if (useUniquePath) {
+          // Don't reuse the descriptor, because the caller asked for a unique path. Leave the old one as-is, since
+          // it might serve some unknown purpose.
+          log.info("Pushing segment[%s] again with new unique path.", identifier);
+        } else {
+          log.info("Segment[%s] already pushed.", identifier);
+          return objectMapper.readValue(descriptorFile, DataSegment.class);
+        }
       }
 
       log.info("Pushing merged index for segment[%s].", identifier);
diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java b/server/src/main/java/io/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
index 1f04236c41c..788c1c15fda 100644
--- a/server/src/main/java/io/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
+++ b/server/src/main/java/io/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
@@ -554,38 +554,33 @@ protected AppenderatorDriverAddResult append(
               final boolean published = publisher.publishSegments(
                   ImmutableSet.copyOf(segmentsAndMetadata.getSegments()),
                   metadata == null ? null : ((AppenderatorDriverMetadata) metadata).getCallerMetadata()
-              );
+              ).isSuccess();
 
               if (published) {
                 log.info("Published segments.");
               } else {
-                log.info("Transaction failure while publishing segments, checking if someone else beat us to it.");
+                log.info("Transaction failure while publishing segments, removing them from deep storage "
+                         + "and checking if someone else beat us to publishing.");
+
+                segmentsAndMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
+
                 final Set<SegmentIdentifier> segmentsIdentifiers = segmentsAndMetadata
                     .getSegments()
                     .stream()
                     .map(SegmentIdentifier::fromDataSegment)
                     .collect(Collectors.toSet());
+
                 if (usedSegmentChecker.findUsedSegments(segmentsIdentifiers)
                                       .equals(Sets.newHashSet(segmentsAndMetadata.getSegments()))) {
-                  log.info(
-                      "Removing our segments from deep storage because someone else already published them: %s",
-                      segmentsAndMetadata.getSegments()
-                  );
-                  segmentsAndMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
-
                   log.info("Our segments really do exist, awaiting handoff.");
                 } else {
-                  throw new ISE("Failed to publish segments[%s]", segmentsAndMetadata.getSegments());
+                  throw new ISE("Failed to publish segments.");
                 }
               }
             }
             catch (Exception e) {
-              log.warn(
-                  "Removing segments from deep storage after failed publish: %s",
-                  segmentsAndMetadata.getSegments()
-              );
-              segmentsAndMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
-
+              // Must not remove segments here, we aren't sure if our transaction succeeded or not.
+              log.warn(e, "Failed publish, not removing segments: %s", segmentsAndMetadata.getSegments());
               throw Throwables.propagate(e);
             }
           }
diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java b/server/src/main/java/io/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java
index 359708a78c0..44326c13020 100644
--- a/server/src/main/java/io/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java
+++ b/server/src/main/java/io/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java
@@ -19,6 +19,7 @@
 
 package io.druid.segment.realtime.appenderator;
 
+import io.druid.indexing.overlord.SegmentPublishResult;
 import io.druid.timeline.DataSegment;
 
 import javax.annotation.Nullable;
@@ -30,11 +31,14 @@
   /**
    * Publish segments, along with some commit metadata, in a single transaction.
    *
-   * @return true if segments were published, false if they were not published due to txn failure with the metadata
+   * @return publish result that indicates if segments were published or not. If it is unclear
+   * if the segments were published or not, this method must throw an exception. The behavior is similar to
+   * IndexerSQLMetadataStorageCoordinator's announceHistoricalSegments.
    *
    * @throws IOException if there was an I/O error when publishing
+   * @throws RuntimeException if we cannot tell if the segments were published or not, for some other reason
    */
-  boolean publishSegments(
+  SegmentPublishResult publishSegments(
       Set<DataSegment> segments,
       @Nullable Object commitMetadata
   ) throws IOException;
diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java
index 2fd0f087d38..92d1ec1767b 100644
--- a/server/src/test/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java
+++ b/server/src/test/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java
@@ -24,13 +24,14 @@
 import com.google.common.collect.ImmutableSet;
 import io.druid.data.input.InputRow;
 import io.druid.data.input.MapBasedInputRow;
+import io.druid.indexing.overlord.SegmentPublishResult;
 import io.druid.java.util.common.DateTimes;
 import io.druid.java.util.common.Intervals;
 import io.druid.java.util.common.granularity.Granularities;
 import io.druid.segment.loading.DataSegmentKiller;
 import io.druid.segment.realtime.appenderator.BaseAppenderatorDriver.SegmentsForSequence;
-import io.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.TestSegmentAllocator;
 import io.druid.segment.realtime.appenderator.SegmentWithState.SegmentState;
+import io.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.TestSegmentAllocator;
 import io.druid.timeline.partition.NumberedShardSpec;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockSupport;
@@ -194,6 +195,6 @@ private void checkSegmentStates(int expectedNumSegmentsInState, SegmentState exp
 
   static TransactionalSegmentPublisher makeOkPublisher()
   {
-    return (segments, commitMetadata) -> true;
+    return (segments, commitMetadata) -> new SegmentPublishResult(ImmutableSet.of(), true);
   }
 }
diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
index 9535bb3868d..e9f20ffaed2 100644
--- a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
+++ b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
@@ -239,8 +239,7 @@ public void testFailDuringPublish() throws Exception
   {
     expectedException.expect(ExecutionException.class);
     expectedException.expectCause(CoreMatchers.instanceOf(ISE.class));
-    expectedException.expectMessage(
-        "Failed to publish segments[[DataSegment{size=0, shardSpec=NumberedShardSpec{partitionNum=0, partitions=0}, metrics=[], dimensions=[], version='abc123', loadSpec={}, interval=2000-01-01T00:00:00.000Z/2000-01-01T01:00:00.000Z, dataSource='foo', binaryVersion='0'}, DataSegment{size=0, shardSpec=NumberedShardSpec{partitionNum=0, partitions=0}, metrics=[], dimensions=[], version='abc123', loadSpec={}, interval=2000-01-01T01:00:00.000Z/2000-01-01T02:00:00.000Z, dataSource='foo', binaryVersion='0'}]]");
+    expectedException.expectMessage("Failed to publish segments.");
 
     testFailDuringPublishInternal(false);
   }
@@ -279,31 +278,34 @@ private void testFailDuringPublishInternal(boolean failWithException) throws Exc
       Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier, false, true).isOk());
     }
 
-    dataSegmentKiller.killQuietly(new DataSegment(
-        "foo",
-        Intervals.of("2000-01-01T00:00:00.000Z/2000-01-01T01:00:00.000Z"),
-        "abc123",
-        ImmutableMap.of(),
-        ImmutableList.of(),
-        ImmutableList.of(),
-        new NumberedShardSpec(0, 0),
-        0,
-        0
-    ));
-    EasyMock.expectLastCall().once();
-
-    dataSegmentKiller.killQuietly(new DataSegment(
-        "foo",
-        Intervals.of("2000-01-01T01:00:00.000Z/2000-01-01T02:00:00.000Z"),
-        "abc123",
-        ImmutableMap.of(),
-        ImmutableList.of(),
-        ImmutableList.of(),
-        new NumberedShardSpec(0, 0),
-        0,
-        0
-    ));
-    EasyMock.expectLastCall().once();
+    if (!failWithException) {
+      // Should only kill segments if there was _no_ exception.
+      dataSegmentKiller.killQuietly(new DataSegment(
+          "foo",
+          Intervals.of("2000-01-01T00:00:00.000Z/2000-01-01T01:00:00.000Z"),
+          "abc123",
+          ImmutableMap.of(),
+          ImmutableList.of(),
+          ImmutableList.of(),
+          new NumberedShardSpec(0, 0),
+          0,
+          0
+      ));
+      EasyMock.expectLastCall().once();
+
+      dataSegmentKiller.killQuietly(new DataSegment(
+          "foo",
+          Intervals.of("2000-01-01T01:00:00.000Z/2000-01-01T02:00:00.000Z"),
+          "abc123",
+          ImmutableMap.of(),
+          ImmutableList.of(),
+          ImmutableList.of(),
+          new NumberedShardSpec(0, 0),
+          0,
+          0
+      ));
+      EasyMock.expectLastCall().once();
+    }
 
     EasyMock.replay(dataSegmentKiller);
 
diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
index aff1e020c53..763ed0e25f1 100644
--- a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
+++ b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
@@ -30,6 +30,7 @@
 import io.druid.data.input.Committer;
 import io.druid.data.input.InputRow;
 import io.druid.data.input.MapBasedInputRow;
+import io.druid.indexing.overlord.SegmentPublishResult;
 import io.druid.jackson.DefaultObjectMapper;
 import io.druid.java.util.common.DateTimes;
 import io.druid.java.util.common.Intervals;
@@ -53,6 +54,7 @@
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -359,14 +361,7 @@ public void testIncrementalHandoff() throws Exception
 
   static TransactionalSegmentPublisher makeOkPublisher()
   {
-    return new TransactionalSegmentPublisher()
-    {
-      @Override
-      public boolean publishSegments(Set<DataSegment> segments, Object commitMetadata) throws IOException
-      {
-        return true;
-      }
-    };
+    return (segments, commitMetadata) -> new SegmentPublishResult(Collections.emptySet(), true);
   }
 
   static TransactionalSegmentPublisher makeFailingPublisher(boolean failWithException)
@@ -375,7 +370,7 @@ static TransactionalSegmentPublisher makeFailingPublisher(boolean failWithExcept
       if (failWithException) {
         throw new RuntimeException("test");
       }
-      return false;
+      return SegmentPublishResult.fail();
     };
   }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org