You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2019/04/29 16:55:11 UTC

[incubator-druid] branch master updated: BaseAppenderatorDriver: Fix potentially overeager segment cleanup. (#7558)

This is an automated email from the ASF dual-hosted git repository.

fjy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new ce7298b  BaseAppenderatorDriver: Fix potentially overeager segment cleanup. (#7558)
ce7298b is described below

commit ce7298b51e0ec9e6246dbb565421c744f4848ba2
Author: Gian Merlino <gi...@imply.io>
AuthorDate: Mon Apr 29 09:55:04 2019 -0700

    BaseAppenderatorDriver: Fix potentially overeager segment cleanup. (#7558)
    
    * BaseAppenderatorDriver: Fix potentially overeager segment cleanup.
    
    Here is a thing that I think can go wrong:
    
    1. We push some segments, then try to publish them transactionally.
    2. The segments are actually published, but the 200 OK response gets
       lost (connection dropped, whatever).
    3. We try again, and on the second try, the publish fails (because
       the transaction baseline start metadata no longer matches).
    4. Because the publish failed, we delete the pushed segments.
    5. But this is bad, because the publish didn't really fail, it actually
       succeeded in step 2.
    
    I haven't seen this in the wild, but thought about it while
    reviewing #7537.
    
    This patch also cleans up logging a bit, making it more accurate and
    somewhat less chatty.
    
    * Avoid wrapping exceptions when not necessary.
---
 .../appenderator/BaseAppenderatorDriver.java       | 47 +++++++++++++---------
 1 file changed, 29 insertions(+), 18 deletions(-)

diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
index f8b12ad..375c013 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
@@ -42,6 +42,7 @@ import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.segment.loading.DataSegmentKiller;
 import org.apache.druid.segment.realtime.appenderator.SegmentWithState.SegmentState;
+import org.apache.druid.timeline.DataSegment;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
@@ -554,28 +555,23 @@ public abstract class BaseAppenderatorDriver implements Closeable
 
             try {
               final Object metadata = segmentsAndMetadata.getCommitMetadata();
+              final ImmutableSet<DataSegment> ourSegments = ImmutableSet.copyOf(segmentsAndMetadata.getSegments());
               final SegmentPublishResult publishResult = publisher.publishSegments(
-                  ImmutableSet.copyOf(segmentsAndMetadata.getSegments()),
+                  ourSegments,
                   metadata == null ? null : ((AppenderatorDriverMetadata) metadata).getCallerMetadata()
               );
 
               if (publishResult.isSuccess()) {
                 log.info("Published segments.");
               } else {
-                if (publishResult.getErrorMsg() == null) {
-                  log.warn(
-                      "Transaction failure while publishing segments. Please check the overlord log."
-                      + " Removing them from deep storage and checking if someone else beat us to publishing."
-                  );
-                } else {
-                  log.warn(
-                      "Transaction failure while publishing segments because of [%s]. Please check the overlord log."
-                      + " Removing them from deep storage and checking if someone else beat us to publishing.",
-                      publishResult.getErrorMsg()
-                  );
-                }
-
-                segmentsAndMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
+                // Publishing didn't affirmatively succeed. However, segments with our identifiers may still be active
+                // now after all, for two possible reasons:
+                //
+                // 1) A replica may have beat us to publishing these segments. In this case we want to delete the
+                //    segments we pushed (if they had unique paths) to avoid wasting space on deep storage.
+                // 2) We may have actually succeeded, but not realized it due to missing the confirmation response
+                //    from the overlord. In this case we do not want to delete the segments we pushed, since they are
+                //    now live!
 
                 final Set<SegmentIdWithShardSpec> segmentsIdentifiers = segmentsAndMetadata
                     .getSegments()
@@ -583,10 +579,25 @@ public abstract class BaseAppenderatorDriver implements Closeable
                     .map(SegmentIdWithShardSpec::fromDataSegment)
                     .collect(Collectors.toSet());
 
-                if (usedSegmentChecker.findUsedSegments(segmentsIdentifiers)
-                                      .equals(Sets.newHashSet(segmentsAndMetadata.getSegments()))) {
-                  log.info("Our segments really do exist, awaiting handoff.");
+                final Set<DataSegment> activeSegments = usedSegmentChecker.findUsedSegments(segmentsIdentifiers);
+
+                if (activeSegments.equals(ourSegments)) {
+                  log.info("Could not publish segments, but checked and found them already published. Continuing.");
+
+                  // Clean up pushed segments if they are physically disjoint from the published ones (this means
+                  // they were probably pushed by a replica, and with the unique paths option).
+                  final boolean physicallyDisjoint = Sets.intersection(
+                      activeSegments.stream().map(DataSegment::getLoadSpec).collect(Collectors.toSet()),
+                      ourSegments.stream().map(DataSegment::getLoadSpec).collect(Collectors.toSet())
+                  ).isEmpty();
+
+                  if (physicallyDisjoint) {
+                    segmentsAndMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
+                  }
                 } else {
+                  // Our segments aren't active. Publish failed for some reason. Clean them up and then throw an error.
+                  segmentsAndMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
+
                   if (publishResult.getErrorMsg() != null) {
                     throw new ISE("Failed to publish segments because of [%s].", publishResult.getErrorMsg());
                   } else {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org