You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2019/05/22 10:19:34 UTC

[GitHub] [incubator-druid] gianm commented on a change in pull request #7653: Refactor SQLMetadataSegmentManager; Change contract of REST methods in DataSourcesResource

gianm commented on a change in pull request #7653: Refactor SQLMetadataSegmentManager; Change contract of REST methods in DataSourcesResource
URL: https://github.com/apache/incubator-druid/pull/7653#discussion_r286321710
 
 

 ##########
 File path: server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java
 ##########
 @@ -80,51 +87,121 @@
   private static final EmittingLogger log = new EmittingLogger(SQLMetadataSegmentManager.class);
 
   /**
-   * Use to synchronize {@link #start()}, {@link #stop()}, {@link #poll()}, and {@link #isStarted()}. These methods
-   * should be synchronized to prevent from being called at the same time if two different threads are calling them.
-   * This might be possible if a druid coordinator gets and drops leadership repeatedly in quick succession.
+   * Marker interface for objects stored in {@link #latestDatabasePoll}. See the comment for that field for details.
    */
-  private final ReentrantReadWriteLock startStopLock = new ReentrantReadWriteLock();
+  private interface DatabasePoll
+  {}
+
+  /** Represents periodic {@link #poll}s happening from {@link #exec}. */
+  private static class PeriodicDatabasePoll implements DatabasePoll
+  {
+    /**
+     * This future allows to wait until {@link #dataSources} is initialized in the first {@link #poll()} happening since
+     * {@link #startPollingDatabasePeriodically()} is called for the first time, or since the last visible (in
+     * happens-before terms) call to {@link #startPollingDatabasePeriodically()} in case of Coordinator's leadership
+     * changes.
+     */
+    final CompletableFuture<Void> firstPollCompletionFuture = new CompletableFuture<>();
+  }
+
+  /**
+   * Represents on-demand {@link #poll} initiated at periods of time when SqlSegmentsMetadata doesn't poll the database
+   * periodically.
+   */
+  private static class OnDemandDatabasePoll implements DatabasePoll
+  {
+    final long initiationTimeNanos = System.nanoTime();
+    final CompletableFuture<Void> pollCompletionFuture = new CompletableFuture<>();
+
+    long nanosElapsedFromInitiation()
+    {
+      return System.nanoTime() - initiationTimeNanos;
+    }
+  }
+
+  /**
+   * Use to synchronize {@link #startPollingDatabasePeriodically}, {@link #stopPollingDatabasePeriodically}, {@link
+   * #poll}, and {@link #isPollingDatabasePeriodically}. These methods should be synchronized to prevent from being
+   * called at the same time if two different threads are calling them. This might be possible if Coordinator gets and
+   * drops leadership repeatedly in quick succession.
+   *
+   * This lock is also used to synchronize {@link #awaitOrPerformDatabasePoll} for times when SqlSegmentsMetadata
+   * is not polling the database periodically (in other words, when the Coordinator is not the leader).
+   */
+  private final ReentrantReadWriteLock startStopPollLock = new ReentrantReadWriteLock();
 
   /**
    * Used to ensure that {@link #poll()} is never run concurrently. It should already be so (at least in production
    * code), where {@link #poll()} is called only from the task created in {@link #createPollTaskForStartOrder} and is
    * scheduled in a single-threaded {@link #exec}, so this lock is an additional safety net in case there are bugs in
    * the code, and for tests, where {@link #poll()} is called from the outside code.
    *
-   * Not using {@link #startStopLock}.writeLock() in order to still be able to run {@link #poll()} concurrently with
-   * {@link #isStarted()}.
+   * Not using {@link #startStopPollLock}.writeLock() in order to still be able to run {@link #poll()} concurrently
+   * with {@link #isPollingDatabasePeriodically()}.
    */
   private final Object pollLock = new Object();
 
   private final ObjectMapper jsonMapper;
-  private final Supplier<MetadataSegmentManagerConfig> config;
+  private final Duration periodicPollDelay;
   private final Supplier<MetadataStorageTablesConfig> dbTables;
   private final SQLMetadataConnector connector;
 
-  // Volatile since this reference is reassigned in "poll" and then read from in other threads.
-  // Starts null so we can differentiate "never polled" (null) from "polled, but empty" (empty map).
-  // Note that this is not simply a lazy-initialized variable: it starts off as null, and may transition between
-  // null and nonnull multiple times as stop() and start() are called.
-  @Nullable
-  private volatile ConcurrentHashMap<String, DruidDataSource> dataSources = null;
+  /**
+   * This field is made volatile to avoid "ghost secondary reads" that may result in NPE, see
+   * https://github.com/code-review-checklists/java-concurrency#safe-local-dcl (note that dataSources resembles a lazily
+   * initialized field). Alternative is to always read the field in a snapshot local variable, but it's too easy to
+   * forget to do.
+   *
+   * This field may be updated from {@link #exec}, or from whatever thread calling {@link #doOnDemandPoll} via {@link
+   * #awaitOrPerformDatabasePoll()} via one of the public methods of SqlSegmentsMetadata.
+   */
+  private volatile @MonotonicNonNull ConcurrentHashMap<String, DruidDataSource> dataSources = null;
 
   /**
-   * The number of times this SQLMetadataSegmentManager was started.
+   * The latest {@link DatabasePoll} represent {@link #poll()} calls which update {@link #dataSources}, either
+   * periodically (see {@link PeriodicDatabasePoll}, {@link #startPollingDatabasePeriodically}, {@link
+   * #stopPollingDatabasePeriodically}) or "on demand" (see {@link OnDemandDatabasePoll}), when one of the methods that
+   * accesses {@link #dataSources} state (such as {@link #prepareImmutableDataSourceWithUsedSegments}) is called when
+   * the Coordinator is not the leader and therefore SqlSegmentsMetadata isn't polling the database periodically.
+   *
+   * The notion and the complexity of "on demand" database polls was introduced to simplify the interface of {@link
+   * MetadataSegmentManager} and guarantee that it always returns consistent and relatively up-to-date data from methods like
+   * {@link #prepareImmutableDataSourceWithUsedSegments}, while avoiding excessive repetitive polls. The last part is
+   * achieved via "hooking on" other polls by awaiting on {@link PeriodicDatabasePoll#firstPollCompletionFuture} or
+   * {@link OnDemandDatabasePoll#pollCompletionFuture}, see {@link #awaitOrPerformDatabasePoll} method
+   * implementation for details.
+   *
+   * Note: the overall implementation of periodic/on-demand polls is not completely optimal: for example, when the
+   * Coordinator just stopped leading, the latest periodic {@link #poll} (which is still "fresh") is not considered
+   * and a new on-demand poll is always initiated. This is done to simplify the implementation, while the efficiency
+   * during Coordinator leadership switches is not a priority.
+   *
+   * This field is {@code volatile} because it's checked and updated in a double-checked locking manner in {@link
+   * #awaitOrPerformDatabasePoll()}.
    */
-  private long startCount = 0;
+  private volatile @Nullable DatabasePoll latestDatabasePoll = null;
+
+  /** Used to cancel periodic poll task in {@link #stopPollingDatabasePeriodically}. */
+  @GuardedBy("startStopPollLock")
+  private @Nullable Future<?> periodicPollTaskFuture = null;
+
+  /** The number of times {@link #startPollingDatabasePeriodically} was called. */
+  private long startPollingCount = 0;
+
   /**
-   * Equal to the current {@link #startCount} value, if the SQLMetadataSegmentManager is currently started; -1 if
+   * Equal to the current {@link #startPollingCount} value if the SqlSegmentsMetadata is currently started; -1 if
    * currently stopped.
    *
    * This field is used to implement a simple stamp mechanism instead of just a boolean "started" flag to prevent
-   * the theoretical situation of two or more tasks scheduled in {@link #start()} calling {@link #isStarted()} and
-   * {@link #poll()} concurrently, if the sequence of {@link #start()} - {@link #stop()} - {@link #start()} actions
-   * occurs quickly.
+   * the theoretical situation of two or more tasks scheduled in {@link #startPollingDatabasePeriodically()} calling
+   * {@link #isPollingDatabasePeriodically()} and {@link #poll()} concurrently, if the sequence of {@link
+   * #startPollingDatabasePeriodically()} - {@link #stopPollingDatabasePeriodically()} - {@link
+   * #startPollingDatabasePeriodically()} actions occurs quickly.
    *
-   * {@link SQLMetadataRuleManager} also have a similar issue.
+   * {@link SQLMetadataRuleManager} also has a similar issue.
    */
-  private long currentStartOrder = -1;
+  private long currentStartPollingOrder = -1;
 
 Review comment:
   Should this be marked `@GuardedBy("startStopPollLock")`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org