You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2020/06/12 19:30:33 UTC

[GitHub] [druid] ccaominh commented on a change in pull request #10020: global table datasource for broadcast segments

ccaominh commented on a change in pull request #10020:
URL: https://github.com/apache/druid/pull/10020#discussion_r439596702



##########
File path: processing/src/main/java/org/apache/druid/query/DataSource.java
##########
@@ -35,7 +35,8 @@
     @JsonSubTypes.Type(value = UnionDataSource.class, name = "union"),
     @JsonSubTypes.Type(value = JoinDataSource.class, name = "join"),
     @JsonSubTypes.Type(value = LookupDataSource.class, name = "lookup"),
-    @JsonSubTypes.Type(value = InlineDataSource.class, name = "inline")
+    @JsonSubTypes.Type(value = InlineDataSource.class, name = "inline"),
+    @JsonSubTypes.Type(value = GlobalTableDataSource.class, name = "global")

Review comment:
       Should the `global` datasource type be added to the docs? https://druid.apache.org/docs/0.18.1/querying/query-execution.html#datasource-type

##########
File path: processing/src/main/java/org/apache/druid/query/GlobalTableDataSource.java
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("global")
+public class GlobalTableDataSource extends TableDataSource

Review comment:
       What do you think about adding a javadoc explaining why this datasource type exists?

##########
File path: sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
##########
@@ -196,119 +207,130 @@ public DruidSchema(
   public void start() throws InterruptedException
   {
     cacheExec.submit(
-        new Runnable()
-        {
-          @Override
-          public void run()
-          {
-            try {
-              while (!Thread.currentThread().isInterrupted()) {
-                final Set<SegmentId> segmentsToRefresh = new TreeSet<>();
-                final Set<String> dataSourcesToRebuild = new TreeSet<>();
-
-                try {
-                  synchronized (lock) {
-                    final long nextRefreshNoFuzz = DateTimes
-                        .utc(lastRefresh)
-                        .plus(config.getMetadataRefreshPeriod())
-                        .getMillis();
-
-                    // Fuzz a bit to spread load out when we have multiple brokers.
-                    final long nextRefresh = nextRefreshNoFuzz + (long) ((nextRefreshNoFuzz - lastRefresh) * 0.10);
-
-                    while (true) {
-                      // Do not refresh if it's too soon after a failure (to avoid rapid cycles of failure).
-                      final boolean wasRecentFailure = DateTimes.utc(lastFailure)
-                                                                .plus(config.getMetadataRefreshPeriod())
-                                                                .isAfterNow();
-
-                      if (isServerViewInitialized &&
-                          !wasRecentFailure &&
-                          (!segmentsNeedingRefresh.isEmpty() || !dataSourcesNeedingRebuild.isEmpty()) &&
-                          (refreshImmediately || nextRefresh < System.currentTimeMillis())) {
-                        // We need to do a refresh. Break out of the waiting loop.
-                        break;
-                      }
-
-                      if (isServerViewInitialized) {
-                        // Server view is initialized, but we don't need to do a refresh. Could happen if there are
-                        // no segments in the system yet. Just mark us as initialized, then.
-                        initialized.countDown();
-                      }
-
-                      // Wait some more, we'll wake up when it might be time to do another refresh.
-                      lock.wait(Math.max(1, nextRefresh - System.currentTimeMillis()));
+        () -> {
+          try {
+            while (!Thread.currentThread().isInterrupted()) {
+              final Set<SegmentId> segmentsToRefresh = new TreeSet<>();
+              final Set<String> dataSourcesToRebuild = new TreeSet<>();
+
+              try {
+                synchronized (lock) {
+                  final long nextRefreshNoFuzz = DateTimes
+                      .utc(lastRefresh)
+                      .plus(config.getMetadataRefreshPeriod())
+                      .getMillis();
+
+                  // Fuzz a bit to spread load out when we have multiple brokers.
+                  final long nextRefresh = nextRefreshNoFuzz + (long) ((nextRefreshNoFuzz - lastRefresh) * 0.10);
+
+                  while (true) {
+                    // Do not refresh if it's too soon after a failure (to avoid rapid cycles of failure).
+                    final boolean wasRecentFailure = DateTimes.utc(lastFailure)
+                                                              .plus(config.getMetadataRefreshPeriod())
+                                                              .isAfterNow();
+
+                    if (isServerViewInitialized &&
+                        !wasRecentFailure &&
+                        (!segmentsNeedingRefresh.isEmpty() || !dataSourcesNeedingRebuild.isEmpty()) &&
+                        (refreshImmediately || nextRefresh < System.currentTimeMillis())) {
+                      // We need to do a refresh. Break out of the waiting loop.
+                      break;
                     }
 
-                    segmentsToRefresh.addAll(segmentsNeedingRefresh);
-                    segmentsNeedingRefresh.clear();
-
-                    // Mutable segments need a refresh every period, since new columns could be added dynamically.
-                    segmentsNeedingRefresh.addAll(mutableSegments);
+                    if (isServerViewInitialized) {
+                      // Server view is initialized, but we don't need to do a refresh. Could happen if there are
+                      // no segments in the system yet. Just mark us as initialized, then.
+                      initialized.countDown();
+                    }
 
-                    lastFailure = 0L;
-                    lastRefresh = System.currentTimeMillis();
-                    refreshImmediately = false;
+                    // Wait some more, we'll wake up when it might be time to do another refresh.
+                    lock.wait(Math.max(1, nextRefresh - System.currentTimeMillis()));
                   }
 
-                  // Refresh the segments.
-                  final Set<SegmentId> refreshed = refreshSegments(segmentsToRefresh);
+                  segmentsToRefresh.addAll(segmentsNeedingRefresh);
+                  segmentsNeedingRefresh.clear();
 
-                  synchronized (lock) {
-                    // Add missing segments back to the refresh list.
-                    segmentsNeedingRefresh.addAll(Sets.difference(segmentsToRefresh, refreshed));
+                  // Mutable segments need a refresh every period, since new columns could be added dynamically.
+                  segmentsNeedingRefresh.addAll(mutableSegments);
 
-                    // Compute the list of dataSources to rebuild tables for.
-                    dataSourcesToRebuild.addAll(dataSourcesNeedingRebuild);
-                    refreshed.forEach(segment -> dataSourcesToRebuild.add(segment.getDataSource()));
-                    dataSourcesNeedingRebuild.clear();
+                  lastFailure = 0L;
+                  lastRefresh = System.currentTimeMillis();
+                  refreshImmediately = false;
+                }
 
-                    lock.notifyAll();
-                  }
+                // Refresh the segments.
+                final Set<SegmentId> refreshed = refreshSegments(segmentsToRefresh);
 
-                  // Rebuild the dataSources.
-                  for (String dataSource : dataSourcesToRebuild) {
-                    final DruidTable druidTable = buildDruidTable(dataSource);
-                    final DruidTable oldTable = tables.put(dataSource, druidTable);
-                    if (oldTable == null || !oldTable.getRowSignature().equals(druidTable.getRowSignature())) {
-                      log.info("dataSource [%s] has new signature: %s.", dataSource, druidTable.getRowSignature());
-                    } else {
-                      log.debug("dataSource [%s] signature is unchanged.", dataSource);
-                    }
-                  }
+                synchronized (lock) {
+                  // Add missing segments back to the refresh list.
+                  segmentsNeedingRefresh.addAll(Sets.difference(segmentsToRefresh, refreshed));
 
-                  initialized.countDown();
-                }
-                catch (InterruptedException e) {
-                  // Fall through.
-                  throw e;
+                  // Compute the list of dataSources to rebuild tables for.
+                  dataSourcesToRebuild.addAll(dataSourcesNeedingRebuild);
+                  refreshed.forEach(segment -> dataSourcesToRebuild.add(segment.getDataSource()));
+                  dataSourcesNeedingRebuild.clear();
+
+                  lock.notifyAll();
                 }
-                catch (Exception e) {
-                  log.warn(e, "Metadata refresh failed, trying again soon.");
-
-                  synchronized (lock) {
-                    // Add our segments and dataSources back to their refresh and rebuild lists.
-                    segmentsNeedingRefresh.addAll(segmentsToRefresh);
-                    dataSourcesNeedingRebuild.addAll(dataSourcesToRebuild);
-                    lastFailure = System.currentTimeMillis();
-                    lock.notifyAll();
+
+                // Rebuild the dataSources.
+                for (String dataSource : dataSourcesToRebuild) {
+                  final DruidTable druidTable = buildDruidTable(dataSource);
+                  final DruidTable oldTable = tables.put(dataSource, druidTable);
+                  if (oldTable == null || !oldTable.getRowSignature().equals(druidTable.getRowSignature())) {
+                    log.info("dataSource [%s] has new signature: %s.", dataSource, druidTable.getRowSignature());
+                  } else {
+                    log.debug("dataSource [%s] signature is unchanged.", dataSource);
                   }
                 }
+
+                initialized.countDown();
+              }
+              catch (InterruptedException e) {
+                // Fall through.
+                throw e;
+              }
+              catch (Exception e) {
+                log.warn(e, "Metadata refresh failed, trying again soon.");
+
+                synchronized (lock) {
+                  // Add our segments and dataSources back to their refresh and rebuild lists.
+                  segmentsNeedingRefresh.addAll(segmentsToRefresh);
+                  dataSourcesNeedingRebuild.addAll(dataSourcesToRebuild);
+                  lastFailure = System.currentTimeMillis();
+                  lock.notifyAll();
+                }
               }
-            }
-            catch (InterruptedException e) {
-              // Just exit.
-            }
-            catch (Throwable e) {
-              // Throwables that fall out to here (not caught by an inner try/catch) are potentially gnarly, like
-              // OOMEs. Anyway, let's just emit an alert and stop refreshing metadata.
-              log.makeAlert(e, "Metadata refresh failed permanently").emit();
-              throw e;
-            }
-            finally {
-              log.info("Metadata refresh stopped.");
             }
           }
+          catch (InterruptedException e) {
+            // Just exit.
+          }
+          catch (Throwable e) {
+            // Throwables that fall out to here (not caught by an inner try/catch) are potentially gnarly, like
+            // OOMEs. Anyway, let's just emit an alert and stop refreshing metadata.
+            log.makeAlert(e, "Metadata refresh failed permanently").emit();
+            throw e;
+          }
+          finally {
+            log.info("Metadata refresh stopped.");
+          }
+        }
+    );
+
+    ScheduledExecutors.scheduleWithFixedDelay(
+        localSegmentExec,
+        config.getMetadataRefreshPeriod().toStandardDuration(),
+        config.getMetadataRefreshPeriod().toStandardDuration(),
+        () -> {
+          synchronized (lock) {
+            // refresh known broadcast segments
+            Set<String> localSegmentDatasources = segmentManager.getDataSourceNames();
+            dataSourcesNeedingRebuild.addAll(localSegmentDatasources);
+            broadcastDatasources.clear();
+            broadcastDatasources.addAll(localSegmentDatasources);

Review comment:
       Why are all of the datasources broadcast?

##########
File path: processing/src/main/java/org/apache/druid/query/TableDataSource.java
##########
@@ -98,7 +98,7 @@ public final boolean equals(Object o)
     if (this == o) {
       return true;
     }
-    if (!(o instanceof TableDataSource)) {
+    if (!(o instanceof TableDataSource) || !getClass().equals(o.getClass())) {

Review comment:
       This change is causing `SegmentMetadataQueryTest.testSerde()` to fail: https://travis-ci.org/github/apache/druid/jobs/697612764#L5001
   
   The `query` variable has a `LegacyDataSource` whereas the deserialized serialized version has a `TableDataSource`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org