You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by mc...@apache.org on 2018/12/10 21:37:45 UTC

[incubator-pinot] branch master updated: Allow completing segments to finish before stopping LLC Segment Manager (#3593)

This is an automated email from the ASF dual-hosted git repository.

mcvsubbu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 24c35e8  Allow completing segments to finish before stopping LLC Segment Manager (#3593)
24c35e8 is described below

commit 24c35e8fb083451f622f31bfb45895f1dfd724aa
Author: Subbu Subramaniam <mc...@users.noreply.github.com>
AuthorDate: Mon Dec 10 13:37:40 2018 -0800

    Allow completing segments to finish before stopping LLC Segment Manager (#3593)
    
    Added a stop() method to LLC segment manager, so that we can allow the segments
    that are completing to be done, and not allow new segments to start their
    completion process while controller is shutting down
---
 .../pinot/controller/ControllerStarter.java        |  6 +++
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 50 ++++++++++++++++++++++
 2 files changed, 56 insertions(+)

diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
index bf25cef..f1336d8 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
@@ -287,6 +287,10 @@ public class ControllerStarter {
 
   public void stop() {
     try {
+      // Stop PinotLLCSegmentManager before stopping Jersey API. It is possible that stopping Jersey API
+      // may interrupt the handlers waiting on an I/O.
+      PinotLLCRealtimeSegmentManager.getInstance().stop();
+
       LOGGER.info("Closing PinotFS classes");
       PinotFSFactory.shutdown();
 
@@ -296,6 +300,7 @@ public class ControllerStarter {
       LOGGER.info("Stopping realtime segment manager");
       _realtimeSegmentsManager.stop();
 
+
       LOGGER.info("Stopping resource manager");
       _helixResourceManager.stop();
 
@@ -303,6 +308,7 @@ public class ControllerStarter {
       _periodicTaskScheduler.stop();
 
       _executorService.shutdownNow();
+
     } catch (final Exception e) {
       LOGGER.error("Caught exception while shutting down", e);
     }
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index b1ff634..563fb80 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -77,6 +77,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import javax.annotation.Nonnull;
@@ -102,6 +103,10 @@ public class PinotLLCRealtimeSegmentManager {
 
   private static final String METADATA_TEMP_DIR_SUFFIX = ".metadata.tmp";
   private static final String METADATA_EVENT_NOTIFIER_PREFIX = "metadata.event.notifier";
+
+  // Max time to wait for all LLC segments to complete committing their metadata.
+  private static final long MAX_LLC_SEGMENT_METADATA_COMMIT_TIME_MILLIS = 30_000L;
+
   // TODO: make this configurable with default set to 10
   /**
    * After step 1 of segment completion is done,
@@ -128,6 +133,9 @@ public class PinotLLCRealtimeSegmentManager {
   private final StreamPartitionAssignmentGenerator _streamPartitionAssignmentGenerator;
   private final FlushThresholdUpdateManager _flushThresholdUpdateManager;
 
+  private volatile boolean _isStopping = false;
+  private AtomicInteger _numCompletingSegments = new AtomicInteger(0);
+
   public boolean getIsSplitCommitEnabled() {
     return _controllerConf.getAcceptSplitCommit();
   }
@@ -159,6 +167,30 @@ public class PinotLLCRealtimeSegmentManager {
     _helixManager.addControllerListener(changeContext -> onBecomeLeader());
   }
 
+
+  public void stop() {
+    _isStopping = true;
+    LOGGER.info("Awaiting segment metadata commits: maxWaitTimeMillis = {}", MAX_LLC_SEGMENT_METADATA_COMMIT_TIME_MILLIS);
+    long millisToWait = MAX_LLC_SEGMENT_METADATA_COMMIT_TIME_MILLIS;
+
+    // Busy-wait for all segments that are committing metadata to complete their operation.
+    // Waiting
+    while (_numCompletingSegments.get() > 0 && millisToWait > 0) {
+      try {
+        long thisWait = 1000;
+        if (millisToWait < thisWait) {
+          thisWait = millisToWait;
+        }
+        Thread.sleep(thisWait);
+        millisToWait -= thisWait;
+      } catch (InterruptedException e) {
+        LOGGER.info("Interrupted: Remaining wait time {} (out of {})", millisToWait, MAX_LLC_SEGMENT_METADATA_COMMIT_TIME_MILLIS);
+        return;
+      }
+    }
+    LOGGER.info("Wait completed: Number of completing segments = {}", _numCompletingSegments.get());
+  }
+
   protected PinotLLCRealtimeSegmentManager(HelixAdmin helixAdmin, String clusterName, HelixManager helixManager,
       ZkHelixPropertyStore propertyStore, PinotHelixResourceManager helixResourceManager, ControllerConf controllerConf,
       ControllerMetrics controllerMetrics) {
@@ -348,6 +380,10 @@ public class PinotLLCRealtimeSegmentManager {
   }
 
   public boolean commitSegmentFile(String tableName, CommittingSegmentDescriptor committingSegmentDescriptor) {
+    if (_isStopping) {
+      LOGGER.info("Returning false since the controller is stopping");
+      return false;
+    }
     String segmentName = committingSegmentDescriptor.getSegmentName();
     String segmentLocation = committingSegmentDescriptor.getSegmentLocation();
     URI segmentFileURI = ControllerConf.getUriFromPath(segmentLocation);
@@ -395,6 +431,20 @@ public class PinotLLCRealtimeSegmentManager {
    * @return boolean
    */
   public boolean commitSegmentMetadata(String rawTableName, CommittingSegmentDescriptor committingSegmentDescriptor) {
+    if (_isStopping) {
+      LOGGER.info("Returning false since the controller is stopping");
+      return false;
+    }
+    try {
+      _numCompletingSegments.addAndGet(1);
+      return commitSegmentMetadataInternal(rawTableName, committingSegmentDescriptor);
+    } finally {
+      _numCompletingSegments.addAndGet(-1);
+    }
+  }
+
+  private boolean commitSegmentMetadataInternal(String rawTableName, CommittingSegmentDescriptor committingSegmentDescriptor) {
+
     final String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
     TableConfig tableConfig = getRealtimeTableConfig(realtimeTableName);
     if (tableConfig == null) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org