You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2018/07/24 19:00:53 UTC

[incubator-druid] branch master updated: Synchronize scheduled poll() calls in SQLMetadataRuleManager to prevent flakiness in SqlMetadataRuleManagerTest (#6033)

This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d5eb0c  Synchronize scheduled poll() calls in SQLMetadataRuleManager to prevent flakiness in SqlMetadataRuleManagerTest (#6033)
7d5eb0c is described below

commit 7d5eb0c21a62d8f40f3d7732f2a3e29f79bfa57f
Author: Roman Leventov <le...@gmail.com>
AuthorDate: Tue Jul 24 14:00:48 2018 -0500

    Synchronize scheduled poll() calls in SQLMetadataRuleManager to prevent flakiness in SqlMetadataRuleManagerTest (#6033)
---
 .../io/druid/metadata/SQLMetadataRuleManager.java  | 63 ++++++++++++----------
 1 file changed, 36 insertions(+), 27 deletions(-)

diff --git a/server/src/main/java/io/druid/metadata/SQLMetadataRuleManager.java b/server/src/main/java/io/druid/metadata/SQLMetadataRuleManager.java
index 4883a8e..6900dff 100644
--- a/server/src/main/java/io/druid/metadata/SQLMetadataRuleManager.java
+++ b/server/src/main/java/io/druid/metadata/SQLMetadataRuleManager.java
@@ -27,9 +27,6 @@ import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningScheduledExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
 import com.google.inject.Inject;
 import io.druid.audit.AuditEntry;
 import io.druid.audit.AuditInfo;
@@ -63,6 +60,7 @@ import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -72,7 +70,6 @@ import java.util.concurrent.atomic.AtomicReference;
 public class SQLMetadataRuleManager implements MetadataRuleManager
 {
 
-
   public static void createDefaultRule(
       final IDBI dbi,
       final String ruleTable,
@@ -142,13 +139,19 @@ public class SQLMetadataRuleManager implements MetadataRuleManager
   private final AuditManager auditManager;
 
   private final Object lock = new Object();
-
-  private volatile boolean started = false;
-
-  private volatile ListeningScheduledExecutorService exec = null;
-  private volatile ListenableFuture<?> future = null;
-
-  private volatile long retryStartTime = 0;
+  /** The number of times this SQLMetadataRuleManager was started. */
+  private long startCount = 0;
+  /**
+   * Equal to the current {@link #startCount} value, if the SQLMetadataRuleManager is currently started; -1 if
+   * currently stopped.
+   *
+   * This field is used to implement a simple stamp mechanism instead of just a boolean "started" flag to prevent
+   * the theoretical situation of two tasks scheduled in {@link #start()} calling {@link #poll()} concurrently, if
+   * the sequence of {@link #start()} - {@link #stop()} - {@link #start()} actions occurs quickly.
+   */
+  private long currentStartOrder = -1;
+  private ScheduledExecutorService exec = null;
+  private long retryStartTime = 0;
 
   @Inject
   public SQLMetadataRuleManager(
@@ -169,9 +172,7 @@ public class SQLMetadataRuleManager implements MetadataRuleManager
     Preconditions.checkNotNull(config.getAlertThreshold().toStandardDuration());
     Preconditions.checkNotNull(config.getPollDuration().toStandardDuration());
 
-    this.rules = new AtomicReference<>(
-        ImmutableMap.<String, List<Rule>>of()
-    );
+    this.rules = new AtomicReference<>(ImmutableMap.of());
   }
 
   @Override
@@ -179,21 +180,34 @@ public class SQLMetadataRuleManager implements MetadataRuleManager
   public void start()
   {
     synchronized (lock) {
-      if (started) {
+      if (currentStartOrder >= 0) {
         return;
       }
 
-      exec = MoreExecutors.listeningDecorator(Execs.scheduledSingleThreaded("DatabaseRuleManager-Exec--%d"));
+      startCount++;
+      currentStartOrder = startCount;
+      long localStartedOrder = currentStartOrder;
+
+      exec = Execs.scheduledSingleThreaded("DatabaseRuleManager-Exec--%d");
 
       createDefaultRule(dbi, getRulesTable(), config.getDefaultRule(), jsonMapper);
-      future = exec.scheduleWithFixedDelay(
+      exec.scheduleWithFixedDelay(
           new Runnable()
           {
             @Override
             public void run()
             {
               try {
-                poll();
+                // poll() is synchronized together with start() and stop() to ensure that when stop() exists, poll()
+                // won't actually run anymore after that (it could only enter the syncrhonized section and exit
+                // immediately because the localStartedOrder doesn't match the new currentStartOrder). It's needed
+                // to avoid flakiness in SQLMetadataRuleManagerTest.
+                // See https://github.com/apache/incubator-druid/issues/6028
+                synchronized (lock) {
+                  if (localStartedOrder == currentStartOrder) {
+                    poll();
+                  }
+                }
               }
               catch (Exception e) {
                 log.error(e, "uncaught exception in rule manager polling thread");
@@ -204,8 +218,6 @@ public class SQLMetadataRuleManager implements MetadataRuleManager
           config.getPollDuration().toStandardDuration().getMillis(),
           TimeUnit.MILLISECONDS
       );
-
-      started = true;
     }
   }
 
@@ -214,15 +226,12 @@ public class SQLMetadataRuleManager implements MetadataRuleManager
   public void stop()
   {
     synchronized (lock) {
-      if (!started) {
+      if (currentStartOrder == -1) {
         return;
       }
-
-      rules.set(ImmutableMap.<String, List<Rule>>of());
-
-      future.cancel(false);
-      future = null;
-      started = false;
+      rules.set(ImmutableMap.of());
+      currentStartOrder = -1;
+      // This call cancels the periodic poll() task, scheduled in start().
       exec.shutdownNow();
       exec = null;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org