You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2018/07/27 20:16:27 UTC

[GitHub] jihoonson closed pull request #6056: [Backport] Synchronize scheduled poll() calls in SQLMetadataSegmentManager

jihoonson closed pull request #6056: [Backport] Synchronize scheduled poll() calls in SQLMetadataSegmentManager
URL: https://github.com/apache/incubator-druid/pull/6056
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/server/src/main/java/io/druid/metadata/SQLMetadataRuleManager.java b/server/src/main/java/io/druid/metadata/SQLMetadataRuleManager.java
index c1cb98fe5f6..1c00c6830a1 100644
--- a/server/src/main/java/io/druid/metadata/SQLMetadataRuleManager.java
+++ b/server/src/main/java/io/druid/metadata/SQLMetadataRuleManager.java
@@ -148,10 +148,12 @@ public Void withHandle(Handle handle) throws Exception
    * This field is used to implement a simple stamp mechanism instead of just a boolean "started" flag to prevent
    * the theoretical situation of two tasks scheduled in {@link #start()} calling {@link #poll()} concurrently, if
    * the sequence of {@link #start()} - {@link #stop()} - {@link #start()} actions occurs quickly.
+   *
+   * {@link SQLMetadataSegmentManager} also have a similar issue.
    */
   private long currentStartOrder = -1;
   private ScheduledExecutorService exec = null;
-  private long retryStartTime = 0;
+  private long failStartTimeMs = 0;
 
   @Inject
   public SQLMetadataRuleManager(
@@ -311,17 +313,17 @@ public void poll()
       log.info("Polled and found rules for %,d datasource(s)", newRules.size());
 
       rules.set(newRules);
-      retryStartTime = 0;
+      failStartTimeMs = 0;
     }
     catch (Exception e) {
-      if (retryStartTime == 0) {
-        retryStartTime = System.currentTimeMillis();
+      if (failStartTimeMs == 0) {
+        failStartTimeMs = System.currentTimeMillis();
       }
 
-      if (System.currentTimeMillis() - retryStartTime > config.getAlertThreshold().toStandardDuration().getMillis()) {
+      if (System.currentTimeMillis() - failStartTimeMs > config.getAlertThreshold().toStandardDuration().getMillis()) {
         log.makeAlert(e, "Exception while polling for rules")
            .emit();
-        retryStartTime = 0;
+        failStartTimeMs = 0;
       } else {
         log.error(e, "Exception while polling for rules");
       }
diff --git a/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java b/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java
index 36f628c55d2..203bcea2b16 100644
--- a/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java
+++ b/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java
@@ -29,11 +29,7 @@
 import com.google.common.collect.Interners;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Ordering;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningScheduledExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
 import com.google.inject.Inject;
-import io.druid.java.util.emitter.EmittingLogger;
 import io.druid.client.DruidDataSource;
 import io.druid.client.ImmutableDruidDataSource;
 import io.druid.guice.ManageLifecycle;
@@ -44,6 +40,7 @@
 import io.druid.java.util.common.concurrent.Execs;
 import io.druid.java.util.common.lifecycle.LifecycleStart;
 import io.druid.java.util.common.lifecycle.LifecycleStop;
+import io.druid.java.util.emitter.EmittingLogger;
 import io.druid.timeline.DataSegment;
 import io.druid.timeline.TimelineObjectHolder;
 import io.druid.timeline.VersionedIntervalTimeline;
@@ -73,8 +70,11 @@
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
 /**
@@ -85,10 +85,16 @@
   private static final Interner<DataSegment> DATA_SEGMENT_INTERNER = Interners.newWeakInterner();
   private static final EmittingLogger log = new EmittingLogger(SQLMetadataSegmentManager.class);
 
-  // Use to synchronize start() and stop(). These methods should be synchronized to prevent from being called at the
-  // same time if two different threads are calling them. This might be possible if a druid coordinator gets and drops
-  // leadership repeatedly in quick succession.
-  private final Object lock = new Object();
+  /**
+   * Use to synchronize {@link #start()}, {@link #stop()}, {@link #poll()}, and {@link #isStarted()}. These methods
+   * should be synchronized to prevent from being called at the same time if two different threads are calling them.
+   * This might be possible if a druid coordinator gets and drops leadership repeatedly in quick succession.
+   */
+  private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+  /** {@link #poll()} and {@link #isStarted()} use readLock. */
+  private final Lock readLock = readWriteLock.readLock();
+  /** {@link #start()} and {@link #stop()} use writeLock. */
+  private final Lock writeLock = readWriteLock.writeLock();
 
   private final ObjectMapper jsonMapper;
   private final Supplier<MetadataSegmentManagerConfig> config;
@@ -96,9 +102,21 @@
   private final AtomicReference<ConcurrentHashMap<String, DruidDataSource>> dataSourcesRef;
   private final SQLMetadataConnector connector;
 
-  private volatile ListeningScheduledExecutorService exec = null;
-  private volatile ListenableFuture<?> future = null;
-  private volatile boolean started;
+  /** The number of times this SQLMetadataSegmentManager was started. */
+  private long startCount = 0;
+  /**
+   * Equal to the current {@link #startCount} value, if the SQLMetadataSegmentManager is currently started; -1 if
+   * currently stopped.
+   *
+   * This field is used to implement a simple stamp mechanism instead of just a boolean "started" flag to prevent
+   * the theoretical situation of two or more tasks scheduled in {@link #start()} calling {@link #isStarted()} and
+   * {@link #poll()} concurrently, if the sequence of {@link #start()} - {@link #stop()} - {@link #start()} actions
+   * occurs quickly.
+   *
+   * {@link SQLMetadataRuleManager} also have a similar issue.
+   */
+  private long currentStartOrder = -1;
+  private ScheduledExecutorService exec = null;
 
   @Inject
   public SQLMetadataSegmentManager(
@@ -121,34 +139,52 @@ public SQLMetadataSegmentManager(
   @LifecycleStart
   public void start()
   {
-    synchronized (lock) {
-      if (started) {
+    writeLock.lock();
+    try {
+      if (isStarted()) {
         return;
       }
 
-      exec = MoreExecutors.listeningDecorator(Execs.scheduledSingleThreaded("DatabaseSegmentManager-Exec--%d"));
+      startCount++;
+      currentStartOrder = startCount;
+      final long localStartOrder = currentStartOrder;
+
+      exec = Execs.scheduledSingleThreaded("DatabaseSegmentManager-Exec--%d");
 
       final Duration delay = config.get().getPollDuration().toStandardDuration();
-      future = exec.scheduleWithFixedDelay(
+      exec.scheduleWithFixedDelay(
           new Runnable()
           {
             @Override
             public void run()
             {
+              // poll() is synchronized together with start(), stop() and isStarted() to ensure that when stop() exists,
+              // poll() won't actually run anymore after that (it could only enter the syncrhonized section and exit
+              // immediately because the localStartedOrder doesn't match the new currentStartOrder). It's needed
+              // to avoid flakiness in SQLMetadataSegmentManagerTest.
+              // See https://github.com/apache/incubator-druid/issues/6028
+              readLock.lock();
               try {
-                poll();
+                if (localStartOrder == currentStartOrder) {
+                  poll();
+                }
               }
               catch (Exception e) {
                 log.makeAlert(e, "uncaught exception in segment manager polling thread").emit();
 
               }
+              finally {
+                readLock.unlock();
+              }
             }
           },
           0,
           delay.getMillis(),
           TimeUnit.MILLISECONDS
       );
-      started = true;
+    }
+    finally {
+      writeLock.unlock();
     }
   }
 
@@ -156,8 +192,9 @@ public void run()
   @LifecycleStop
   public void stop()
   {
-    synchronized (lock) {
-      if (!started) {
+    writeLock.lock();
+    try {
+      if (!isStarted()) {
         return;
       }
 
@@ -167,11 +204,12 @@ public void stop()
         current = dataSourcesRef.get();
       } while (!dataSourcesRef.compareAndSet(current, emptyMap));
 
-      future.cancel(false);
-      future = null;
+      currentStartOrder = -1;
       exec.shutdownNow();
       exec = null;
-      started = false;
+    }
+    finally {
+      writeLock.unlock();
     }
   }
 
@@ -367,7 +405,15 @@ public boolean removeSegment(String ds, final String segmentID)
   @Override
   public boolean isStarted()
   {
-    return started;
+    // isStarted() is synchronized together with start(), stop() and poll() to ensure that the latest currentStartOrder
+    // is always visible. readLock should be used to avoid unexpected performance degradation of DruidCoordinator.
+    readLock.lock();
+    try {
+      return currentStartOrder >= 0;
+    }
+    finally {
+      readLock.unlock();
+    }
   }
 
   @Override
@@ -421,10 +467,6 @@ public ImmutableDruidDataSource getInventoryValue(String key)
   public void poll()
   {
     try {
-      if (!started) {
-        return;
-      }
-
       ConcurrentHashMap<String, DruidDataSource> newDataSources = new ConcurrentHashMap<>();
 
       log.debug("Starting polling of segment table");
diff --git a/server/src/test/java/io/druid/metadata/SQLMetadataSegmentManagerTest.java b/server/src/test/java/io/druid/metadata/SQLMetadataSegmentManagerTest.java
index 92789455165..b6cc0c71525 100644
--- a/server/src/test/java/io/druid/metadata/SQLMetadataSegmentManagerTest.java
+++ b/server/src/test/java/io/druid/metadata/SQLMetadataSegmentManagerTest.java
@@ -118,6 +118,7 @@ public void testPoll()
   {
     manager.start();
     manager.poll();
+    Assert.assertTrue(manager.isStarted());
     Assert.assertEquals(
         ImmutableList.of("wikipedia"),
         manager.getAllDatasourceNames()
@@ -149,6 +150,7 @@ public void testPollWithCurroptedSegment()
     EmittingLogger.registerEmitter(new NoopServiceEmitter());
     manager.start();
     manager.poll();
+    Assert.assertTrue(manager.isStarted());
 
     Assert.assertEquals(
         "wikipedia", Iterables.getOnlyElement(manager.getInventory()).getName()
@@ -160,6 +162,7 @@ public void testGetUnusedSegmentsForInterval() throws Exception
   {
     manager.start();
     manager.poll();
+    Assert.assertTrue(manager.isStarted());
     Assert.assertTrue(manager.removeDatasource("wikipedia"));
 
     Assert.assertEquals(
@@ -178,6 +181,7 @@ public void testRemoveDataSource() throws IOException
   {
     manager.start();
     manager.poll();
+    Assert.assertTrue(manager.isStarted());
 
     final String newDataSource = "wikipedia2";
     final DataSegment newSegment = new DataSegment(
@@ -207,6 +211,7 @@ public void testRemoveDataSegment() throws IOException
   {
     manager.start();
     manager.poll();
+    Assert.assertTrue(manager.isStarted());
 
     final String newDataSource = "wikipedia2";
     final DataSegment newSegment = new DataSegment(


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org