You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2020/06/16 06:05:03 UTC

[GitHub] [druid] gianm commented on a change in pull request #10020: global table datasource for broadcast segments

gianm commented on a change in pull request #10020:
URL: https://github.com/apache/druid/pull/10020#discussion_r440600939



##########
File path: processing/src/main/java/org/apache/druid/query/GlobalTableDataSource.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+/**
+ * {@link TableDataSource} variant for globally available 'broadcast' segments. If bound to a
+ * {@link org.apache.druid.segment.join.JoinableFactory} that can create an
+ * {@link org.apache.druid.segment.join.table.IndexedTable} using DruidBinders.joinableFactoryBinder, this allows
+ * optimal usage of segments using this DataSource type in join operations (because they are global), and so can be pushed
+ * down to historicals as a {@link JoinDataSource}, instead of requiring a subquery join using
+ * {@link InlineDataSource} to construct an {@link org.apache.druid.segment.join.table.IndexedTable} on the fly on the
+ * broker. Because it is also a {@link TableDataSource}, when queried directly, or on the left hand side of a join,
+ * they will be treated as any normal segment.
+ */
+@JsonTypeName("global")
+public class GlobalTableDataSource extends TableDataSource
+{
+  @JsonCreator
+  public GlobalTableDataSource(@JsonProperty("name") String name)
+  {
+    super(name);
+  }
+
+  @Override
+  public boolean isCacheable()
+  {
+    return false;

Review comment:
       Why shouldn't it be cacheable?

##########
File path: processing/src/main/java/org/apache/druid/query/TableDataSource.java
##########
@@ -102,6 +102,11 @@ public final boolean equals(Object o)
       return false;
     }
 
+    if ((o instanceof GlobalTableDataSource || this instanceof GlobalTableDataSource) &&
+        !getClass().equals(o.getClass())) {

Review comment:
       I think you can make this a little less gross by replacing this `equals` impl (and the one in GlobalTableDataSource) with a new auto-generated one that checks getClass.

##########
File path: processing/src/main/java/org/apache/druid/query/GlobalTableDataSource.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+/**
+ * {@link TableDataSource} variant for globally available 'broadcast' segments. If bound to a
+ * {@link org.apache.druid.segment.join.JoinableFactory} that can create an
+ * {@link org.apache.druid.segment.join.table.IndexedTable} using DruidBinders.joinableFactoryBinder, this allows
+ * optimal usage of segments using this DataSource type in join operations (because they are global), and so can be pushed
+ * down to historicals as a {@link JoinDataSource}, instead of requiring a subquery join using
+ * {@link InlineDataSource} to construct an {@link org.apache.druid.segment.join.table.IndexedTable} on the fly on the
+ * broker. Because it is also a {@link TableDataSource}, when queried directly, or on the left hand side of a join,
+ * they will be treated as any normal segment.

Review comment:
       "datasource" makes more sense here than "segment".

##########
File path: sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
##########
@@ -196,119 +207,132 @@ public DruidSchema(
   public void start() throws InterruptedException
   {
     cacheExec.submit(
-        new Runnable()
-        {
-          @Override
-          public void run()
-          {
-            try {
-              while (!Thread.currentThread().isInterrupted()) {
-                final Set<SegmentId> segmentsToRefresh = new TreeSet<>();
-                final Set<String> dataSourcesToRebuild = new TreeSet<>();
-
-                try {
-                  synchronized (lock) {
-                    final long nextRefreshNoFuzz = DateTimes
-                        .utc(lastRefresh)
-                        .plus(config.getMetadataRefreshPeriod())
-                        .getMillis();
-
-                    // Fuzz a bit to spread load out when we have multiple brokers.
-                    final long nextRefresh = nextRefreshNoFuzz + (long) ((nextRefreshNoFuzz - lastRefresh) * 0.10);
-
-                    while (true) {
-                      // Do not refresh if it's too soon after a failure (to avoid rapid cycles of failure).
-                      final boolean wasRecentFailure = DateTimes.utc(lastFailure)
-                                                                .plus(config.getMetadataRefreshPeriod())
-                                                                .isAfterNow();
-
-                      if (isServerViewInitialized &&
-                          !wasRecentFailure &&
-                          (!segmentsNeedingRefresh.isEmpty() || !dataSourcesNeedingRebuild.isEmpty()) &&
-                          (refreshImmediately || nextRefresh < System.currentTimeMillis())) {
-                        // We need to do a refresh. Break out of the waiting loop.
-                        break;
-                      }
-
-                      if (isServerViewInitialized) {
-                        // Server view is initialized, but we don't need to do a refresh. Could happen if there are
-                        // no segments in the system yet. Just mark us as initialized, then.
-                        initialized.countDown();
-                      }
-
-                      // Wait some more, we'll wake up when it might be time to do another refresh.
-                      lock.wait(Math.max(1, nextRefresh - System.currentTimeMillis()));
+        () -> {
+          try {
+            while (!Thread.currentThread().isInterrupted()) {
+              final Set<SegmentId> segmentsToRefresh = new TreeSet<>();
+              final Set<String> dataSourcesToRebuild = new TreeSet<>();
+
+              try {
+                synchronized (lock) {
+                  final long nextRefreshNoFuzz = DateTimes
+                      .utc(lastRefresh)
+                      .plus(config.getMetadataRefreshPeriod())
+                      .getMillis();
+
+                  // Fuzz a bit to spread load out when we have multiple brokers.
+                  final long nextRefresh = nextRefreshNoFuzz + (long) ((nextRefreshNoFuzz - lastRefresh) * 0.10);
+
+                  while (true) {
+                    // Do not refresh if it's too soon after a failure (to avoid rapid cycles of failure).
+                    final boolean wasRecentFailure = DateTimes.utc(lastFailure)
+                                                              .plus(config.getMetadataRefreshPeriod())
+                                                              .isAfterNow();
+
+                    if (isServerViewInitialized &&
+                        !wasRecentFailure &&
+                        (!segmentsNeedingRefresh.isEmpty() || !dataSourcesNeedingRebuild.isEmpty()) &&
+                        (refreshImmediately || nextRefresh < System.currentTimeMillis())) {
+                      // We need to do a refresh. Break out of the waiting loop.
+                      break;
                     }
 
-                    segmentsToRefresh.addAll(segmentsNeedingRefresh);
-                    segmentsNeedingRefresh.clear();
-
-                    // Mutable segments need a refresh every period, since new columns could be added dynamically.
-                    segmentsNeedingRefresh.addAll(mutableSegments);
+                    if (isServerViewInitialized) {
+                      // Server view is initialized, but we don't need to do a refresh. Could happen if there are
+                      // no segments in the system yet. Just mark us as initialized, then.
+                      initialized.countDown();
+                    }
 
-                    lastFailure = 0L;
-                    lastRefresh = System.currentTimeMillis();
-                    refreshImmediately = false;
+                    // Wait some more, we'll wake up when it might be time to do another refresh.
+                    lock.wait(Math.max(1, nextRefresh - System.currentTimeMillis()));
                   }
 
-                  // Refresh the segments.
-                  final Set<SegmentId> refreshed = refreshSegments(segmentsToRefresh);
+                  segmentsToRefresh.addAll(segmentsNeedingRefresh);
+                  segmentsNeedingRefresh.clear();
 
-                  synchronized (lock) {
-                    // Add missing segments back to the refresh list.
-                    segmentsNeedingRefresh.addAll(Sets.difference(segmentsToRefresh, refreshed));
+                  // Mutable segments need a refresh every period, since new columns could be added dynamically.
+                  segmentsNeedingRefresh.addAll(mutableSegments);
 
-                    // Compute the list of dataSources to rebuild tables for.
-                    dataSourcesToRebuild.addAll(dataSourcesNeedingRebuild);
-                    refreshed.forEach(segment -> dataSourcesToRebuild.add(segment.getDataSource()));
-                    dataSourcesNeedingRebuild.clear();
+                  lastFailure = 0L;
+                  lastRefresh = System.currentTimeMillis();
+                  refreshImmediately = false;
+                }
 
-                    lock.notifyAll();
-                  }
+                // Refresh the segments.
+                final Set<SegmentId> refreshed = refreshSegments(segmentsToRefresh);
 
-                  // Rebuild the dataSources.
-                  for (String dataSource : dataSourcesToRebuild) {
-                    final DruidTable druidTable = buildDruidTable(dataSource);
-                    final DruidTable oldTable = tables.put(dataSource, druidTable);
-                    if (oldTable == null || !oldTable.getRowSignature().equals(druidTable.getRowSignature())) {
-                      log.info("dataSource [%s] has new signature: %s.", dataSource, druidTable.getRowSignature());
-                    } else {
-                      log.debug("dataSource [%s] signature is unchanged.", dataSource);
-                    }
-                  }
+                synchronized (lock) {
+                  // Add missing segments back to the refresh list.
+                  segmentsNeedingRefresh.addAll(Sets.difference(segmentsToRefresh, refreshed));
 
-                  initialized.countDown();
-                }
-                catch (InterruptedException e) {
-                  // Fall through.
-                  throw e;
+                  // Compute the list of dataSources to rebuild tables for.
+                  dataSourcesToRebuild.addAll(dataSourcesNeedingRebuild);
+                  refreshed.forEach(segment -> dataSourcesToRebuild.add(segment.getDataSource()));
+                  dataSourcesNeedingRebuild.clear();
+
+                  lock.notifyAll();
                 }
-                catch (Exception e) {
-                  log.warn(e, "Metadata refresh failed, trying again soon.");
-
-                  synchronized (lock) {
-                    // Add our segments and dataSources back to their refresh and rebuild lists.
-                    segmentsNeedingRefresh.addAll(segmentsToRefresh);
-                    dataSourcesNeedingRebuild.addAll(dataSourcesToRebuild);
-                    lastFailure = System.currentTimeMillis();
-                    lock.notifyAll();
+
+                // Rebuild the dataSources.
+                for (String dataSource : dataSourcesToRebuild) {
+                  final DruidTable druidTable = buildDruidTable(dataSource);
+                  final DruidTable oldTable = tables.put(dataSource, druidTable);
+                  if (oldTable == null || !oldTable.getRowSignature().equals(druidTable.getRowSignature())) {
+                    log.info("dataSource [%s] has new signature: %s.", dataSource, druidTable.getRowSignature());
+                  } else {
+                    log.debug("dataSource [%s] signature is unchanged.", dataSource);
                   }
                 }
+
+                initialized.countDown();
+              }
+              catch (InterruptedException e) {
+                // Fall through.
+                throw e;
+              }
+              catch (Exception e) {
+                log.warn(e, "Metadata refresh failed, trying again soon.");
+
+                synchronized (lock) {
+                  // Add our segments and dataSources back to their refresh and rebuild lists.
+                  segmentsNeedingRefresh.addAll(segmentsToRefresh);
+                  dataSourcesNeedingRebuild.addAll(dataSourcesToRebuild);
+                  lastFailure = System.currentTimeMillis();
+                  lock.notifyAll();
+                }
               }
-            }
-            catch (InterruptedException e) {
-              // Just exit.
-            }
-            catch (Throwable e) {
-              // Throwables that fall out to here (not caught by an inner try/catch) are potentially gnarly, like
-              // OOMEs. Anyway, let's just emit an alert and stop refreshing metadata.
-              log.makeAlert(e, "Metadata refresh failed permanently").emit();
-              throw e;
-            }
-            finally {
-              log.info("Metadata refresh stopped.");
             }
           }
+          catch (InterruptedException e) {
+            // Just exit.
+          }
+          catch (Throwable e) {
+            // Throwables that fall out to here (not caught by an inner try/catch) are potentially gnarly, like
+            // OOMEs. Anyway, let's just emit an alert and stop refreshing metadata.
+            log.makeAlert(e, "Metadata refresh failed permanently").emit();
+            throw e;
+          }
+          finally {
+            log.info("Metadata refresh stopped.");
+          }
+        }
+    );
+
+    ScheduledExecutors.scheduleWithFixedDelay(
+        localSegmentExec,
+        config.getMetadataRefreshPeriod().toStandardDuration(),
+        config.getMetadataRefreshPeriod().toStandardDuration(),
+        () -> {
+          synchronized (lock) {
+            // refresh known broadcast segments. Since DruidSchema is only present on the broker, any segment we have
+            // locally in the SegmentManager must be broadcast datasources. This could potentially be replaced in the
+            // future by fetching load rules from the coordinator
+            Set<String> localSegmentDatasources = segmentManager.getDataSourceNames();

Review comment:
       `localSegmentDataSources` would be more consistent spelling, I think.

##########
File path: processing/src/test/java/org/apache/druid/query/GlobalTableDataSourceTest.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.segment.TestHelper;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class GlobalTableDataSourceTest
+{
+  private static final GlobalTableDataSource GLOBAL_TABLE_DATA_SOURCE = new GlobalTableDataSource("foo");
+
+  @Test
+  public void testEquals()

Review comment:
       Please add a test for nonequality with a TableDataSource of the same name.

##########
File path: sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
##########
@@ -196,119 +207,132 @@ public DruidSchema(
   public void start() throws InterruptedException
   {
     cacheExec.submit(
-        new Runnable()
-        {
-          @Override
-          public void run()
-          {
-            try {
-              while (!Thread.currentThread().isInterrupted()) {
-                final Set<SegmentId> segmentsToRefresh = new TreeSet<>();
-                final Set<String> dataSourcesToRebuild = new TreeSet<>();
-
-                try {
-                  synchronized (lock) {
-                    final long nextRefreshNoFuzz = DateTimes
-                        .utc(lastRefresh)
-                        .plus(config.getMetadataRefreshPeriod())
-                        .getMillis();
-
-                    // Fuzz a bit to spread load out when we have multiple brokers.
-                    final long nextRefresh = nextRefreshNoFuzz + (long) ((nextRefreshNoFuzz - lastRefresh) * 0.10);
-
-                    while (true) {
-                      // Do not refresh if it's too soon after a failure (to avoid rapid cycles of failure).
-                      final boolean wasRecentFailure = DateTimes.utc(lastFailure)
-                                                                .plus(config.getMetadataRefreshPeriod())
-                                                                .isAfterNow();
-
-                      if (isServerViewInitialized &&
-                          !wasRecentFailure &&
-                          (!segmentsNeedingRefresh.isEmpty() || !dataSourcesNeedingRebuild.isEmpty()) &&
-                          (refreshImmediately || nextRefresh < System.currentTimeMillis())) {
-                        // We need to do a refresh. Break out of the waiting loop.
-                        break;
-                      }
-
-                      if (isServerViewInitialized) {
-                        // Server view is initialized, but we don't need to do a refresh. Could happen if there are
-                        // no segments in the system yet. Just mark us as initialized, then.
-                        initialized.countDown();
-                      }
-
-                      // Wait some more, we'll wake up when it might be time to do another refresh.
-                      lock.wait(Math.max(1, nextRefresh - System.currentTimeMillis()));
+        () -> {
+          try {
+            while (!Thread.currentThread().isInterrupted()) {
+              final Set<SegmentId> segmentsToRefresh = new TreeSet<>();
+              final Set<String> dataSourcesToRebuild = new TreeSet<>();
+
+              try {
+                synchronized (lock) {
+                  final long nextRefreshNoFuzz = DateTimes
+                      .utc(lastRefresh)
+                      .plus(config.getMetadataRefreshPeriod())
+                      .getMillis();
+
+                  // Fuzz a bit to spread load out when we have multiple brokers.
+                  final long nextRefresh = nextRefreshNoFuzz + (long) ((nextRefreshNoFuzz - lastRefresh) * 0.10);
+
+                  while (true) {
+                    // Do not refresh if it's too soon after a failure (to avoid rapid cycles of failure).
+                    final boolean wasRecentFailure = DateTimes.utc(lastFailure)
+                                                              .plus(config.getMetadataRefreshPeriod())
+                                                              .isAfterNow();
+
+                    if (isServerViewInitialized &&
+                        !wasRecentFailure &&
+                        (!segmentsNeedingRefresh.isEmpty() || !dataSourcesNeedingRebuild.isEmpty()) &&
+                        (refreshImmediately || nextRefresh < System.currentTimeMillis())) {
+                      // We need to do a refresh. Break out of the waiting loop.
+                      break;
                     }
 
-                    segmentsToRefresh.addAll(segmentsNeedingRefresh);
-                    segmentsNeedingRefresh.clear();
-
-                    // Mutable segments need a refresh every period, since new columns could be added dynamically.
-                    segmentsNeedingRefresh.addAll(mutableSegments);
+                    if (isServerViewInitialized) {
+                      // Server view is initialized, but we don't need to do a refresh. Could happen if there are
+                      // no segments in the system yet. Just mark us as initialized, then.
+                      initialized.countDown();
+                    }
 
-                    lastFailure = 0L;
-                    lastRefresh = System.currentTimeMillis();
-                    refreshImmediately = false;
+                    // Wait some more, we'll wake up when it might be time to do another refresh.
+                    lock.wait(Math.max(1, nextRefresh - System.currentTimeMillis()));
                   }
 
-                  // Refresh the segments.
-                  final Set<SegmentId> refreshed = refreshSegments(segmentsToRefresh);
+                  segmentsToRefresh.addAll(segmentsNeedingRefresh);
+                  segmentsNeedingRefresh.clear();
 
-                  synchronized (lock) {
-                    // Add missing segments back to the refresh list.
-                    segmentsNeedingRefresh.addAll(Sets.difference(segmentsToRefresh, refreshed));
+                  // Mutable segments need a refresh every period, since new columns could be added dynamically.
+                  segmentsNeedingRefresh.addAll(mutableSegments);
 
-                    // Compute the list of dataSources to rebuild tables for.
-                    dataSourcesToRebuild.addAll(dataSourcesNeedingRebuild);
-                    refreshed.forEach(segment -> dataSourcesToRebuild.add(segment.getDataSource()));
-                    dataSourcesNeedingRebuild.clear();
+                  lastFailure = 0L;
+                  lastRefresh = System.currentTimeMillis();
+                  refreshImmediately = false;
+                }
 
-                    lock.notifyAll();
-                  }
+                // Refresh the segments.
+                final Set<SegmentId> refreshed = refreshSegments(segmentsToRefresh);
 
-                  // Rebuild the dataSources.
-                  for (String dataSource : dataSourcesToRebuild) {
-                    final DruidTable druidTable = buildDruidTable(dataSource);
-                    final DruidTable oldTable = tables.put(dataSource, druidTable);
-                    if (oldTable == null || !oldTable.getRowSignature().equals(druidTable.getRowSignature())) {
-                      log.info("dataSource [%s] has new signature: %s.", dataSource, druidTable.getRowSignature());
-                    } else {
-                      log.debug("dataSource [%s] signature is unchanged.", dataSource);
-                    }
-                  }
+                synchronized (lock) {
+                  // Add missing segments back to the refresh list.
+                  segmentsNeedingRefresh.addAll(Sets.difference(segmentsToRefresh, refreshed));
 
-                  initialized.countDown();
-                }
-                catch (InterruptedException e) {
-                  // Fall through.
-                  throw e;
+                  // Compute the list of dataSources to rebuild tables for.
+                  dataSourcesToRebuild.addAll(dataSourcesNeedingRebuild);
+                  refreshed.forEach(segment -> dataSourcesToRebuild.add(segment.getDataSource()));
+                  dataSourcesNeedingRebuild.clear();
+
+                  lock.notifyAll();
                 }
-                catch (Exception e) {
-                  log.warn(e, "Metadata refresh failed, trying again soon.");
-
-                  synchronized (lock) {
-                    // Add our segments and dataSources back to their refresh and rebuild lists.
-                    segmentsNeedingRefresh.addAll(segmentsToRefresh);
-                    dataSourcesNeedingRebuild.addAll(dataSourcesToRebuild);
-                    lastFailure = System.currentTimeMillis();
-                    lock.notifyAll();
+
+                // Rebuild the dataSources.
+                for (String dataSource : dataSourcesToRebuild) {
+                  final DruidTable druidTable = buildDruidTable(dataSource);
+                  final DruidTable oldTable = tables.put(dataSource, druidTable);
+                  if (oldTable == null || !oldTable.getRowSignature().equals(druidTable.getRowSignature())) {
+                    log.info("dataSource [%s] has new signature: %s.", dataSource, druidTable.getRowSignature());
+                  } else {
+                    log.debug("dataSource [%s] signature is unchanged.", dataSource);
                   }
                 }
+
+                initialized.countDown();
+              }
+              catch (InterruptedException e) {
+                // Fall through.
+                throw e;
+              }
+              catch (Exception e) {
+                log.warn(e, "Metadata refresh failed, trying again soon.");
+
+                synchronized (lock) {
+                  // Add our segments and dataSources back to their refresh and rebuild lists.
+                  segmentsNeedingRefresh.addAll(segmentsToRefresh);
+                  dataSourcesNeedingRebuild.addAll(dataSourcesToRebuild);
+                  lastFailure = System.currentTimeMillis();
+                  lock.notifyAll();
+                }
               }
-            }
-            catch (InterruptedException e) {
-              // Just exit.
-            }
-            catch (Throwable e) {
-              // Throwables that fall out to here (not caught by an inner try/catch) are potentially gnarly, like
-              // OOMEs. Anyway, let's just emit an alert and stop refreshing metadata.
-              log.makeAlert(e, "Metadata refresh failed permanently").emit();
-              throw e;
-            }
-            finally {
-              log.info("Metadata refresh stopped.");
             }
           }
+          catch (InterruptedException e) {
+            // Just exit.
+          }
+          catch (Throwable e) {
+            // Throwables that fall out to here (not caught by an inner try/catch) are potentially gnarly, like
+            // OOMEs. Anyway, let's just emit an alert and stop refreshing metadata.
+            log.makeAlert(e, "Metadata refresh failed permanently").emit();
+            throw e;
+          }
+          finally {
+            log.info("Metadata refresh stopped.");
+          }
+        }
+    );
+
+    ScheduledExecutors.scheduleWithFixedDelay(
+        localSegmentExec,
+        config.getMetadataRefreshPeriod().toStandardDuration(),
+        config.getMetadataRefreshPeriod().toStandardDuration(),
+        () -> {
+          synchronized (lock) {
+            // refresh known broadcast segments. Since DruidSchema is only present on the broker, any segment we have
+            // locally in the SegmentManager must be broadcast datasources. This could potentially be replaced in the
+            // future by fetching load rules from the coordinator
+            Set<String> localSegmentDatasources = segmentManager.getDataSourceNames();
+            dataSourcesNeedingRebuild.addAll(localSegmentDatasources);

Review comment:
       Why do we need to rebuild them all continuously?

##########
File path: sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
##########
@@ -122,6 +128,8 @@
   // All dataSources that need tables regenerated.
   private final Set<String> dataSourcesNeedingRebuild = new HashSet<>();
 
+  private final Set<String> broadcastDatasources = new HashSet<>();

Review comment:
       `broadcastDataSources` is more consistent spelling. Please add a comment too.

##########
File path: sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
##########
@@ -196,119 +207,132 @@ public DruidSchema(
   public void start() throws InterruptedException
   {
     cacheExec.submit(
-        new Runnable()
-        {
-          @Override
-          public void run()
-          {
-            try {
-              while (!Thread.currentThread().isInterrupted()) {
-                final Set<SegmentId> segmentsToRefresh = new TreeSet<>();
-                final Set<String> dataSourcesToRebuild = new TreeSet<>();
-
-                try {
-                  synchronized (lock) {
-                    final long nextRefreshNoFuzz = DateTimes
-                        .utc(lastRefresh)
-                        .plus(config.getMetadataRefreshPeriod())
-                        .getMillis();
-
-                    // Fuzz a bit to spread load out when we have multiple brokers.
-                    final long nextRefresh = nextRefreshNoFuzz + (long) ((nextRefreshNoFuzz - lastRefresh) * 0.10);
-
-                    while (true) {
-                      // Do not refresh if it's too soon after a failure (to avoid rapid cycles of failure).
-                      final boolean wasRecentFailure = DateTimes.utc(lastFailure)
-                                                                .plus(config.getMetadataRefreshPeriod())
-                                                                .isAfterNow();
-
-                      if (isServerViewInitialized &&
-                          !wasRecentFailure &&
-                          (!segmentsNeedingRefresh.isEmpty() || !dataSourcesNeedingRebuild.isEmpty()) &&
-                          (refreshImmediately || nextRefresh < System.currentTimeMillis())) {
-                        // We need to do a refresh. Break out of the waiting loop.
-                        break;
-                      }
-
-                      if (isServerViewInitialized) {
-                        // Server view is initialized, but we don't need to do a refresh. Could happen if there are
-                        // no segments in the system yet. Just mark us as initialized, then.
-                        initialized.countDown();
-                      }
-
-                      // Wait some more, we'll wake up when it might be time to do another refresh.
-                      lock.wait(Math.max(1, nextRefresh - System.currentTimeMillis()));
+        () -> {
+          try {
+            while (!Thread.currentThread().isInterrupted()) {
+              final Set<SegmentId> segmentsToRefresh = new TreeSet<>();
+              final Set<String> dataSourcesToRebuild = new TreeSet<>();
+
+              try {
+                synchronized (lock) {
+                  final long nextRefreshNoFuzz = DateTimes
+                      .utc(lastRefresh)
+                      .plus(config.getMetadataRefreshPeriod())
+                      .getMillis();
+
+                  // Fuzz a bit to spread load out when we have multiple brokers.
+                  final long nextRefresh = nextRefreshNoFuzz + (long) ((nextRefreshNoFuzz - lastRefresh) * 0.10);
+
+                  while (true) {
+                    // Do not refresh if it's too soon after a failure (to avoid rapid cycles of failure).
+                    final boolean wasRecentFailure = DateTimes.utc(lastFailure)
+                                                              .plus(config.getMetadataRefreshPeriod())
+                                                              .isAfterNow();
+
+                    if (isServerViewInitialized &&
+                        !wasRecentFailure &&
+                        (!segmentsNeedingRefresh.isEmpty() || !dataSourcesNeedingRebuild.isEmpty()) &&
+                        (refreshImmediately || nextRefresh < System.currentTimeMillis())) {
+                      // We need to do a refresh. Break out of the waiting loop.
+                      break;
                     }
 
-                    segmentsToRefresh.addAll(segmentsNeedingRefresh);
-                    segmentsNeedingRefresh.clear();
-
-                    // Mutable segments need a refresh every period, since new columns could be added dynamically.
-                    segmentsNeedingRefresh.addAll(mutableSegments);
+                    if (isServerViewInitialized) {
+                      // Server view is initialized, but we don't need to do a refresh. Could happen if there are
+                      // no segments in the system yet. Just mark us as initialized, then.
+                      initialized.countDown();
+                    }
 
-                    lastFailure = 0L;
-                    lastRefresh = System.currentTimeMillis();
-                    refreshImmediately = false;
+                    // Wait some more, we'll wake up when it might be time to do another refresh.
+                    lock.wait(Math.max(1, nextRefresh - System.currentTimeMillis()));
                   }
 
-                  // Refresh the segments.
-                  final Set<SegmentId> refreshed = refreshSegments(segmentsToRefresh);
+                  segmentsToRefresh.addAll(segmentsNeedingRefresh);
+                  segmentsNeedingRefresh.clear();
 
-                  synchronized (lock) {
-                    // Add missing segments back to the refresh list.
-                    segmentsNeedingRefresh.addAll(Sets.difference(segmentsToRefresh, refreshed));
+                  // Mutable segments need a refresh every period, since new columns could be added dynamically.
+                  segmentsNeedingRefresh.addAll(mutableSegments);
 
-                    // Compute the list of dataSources to rebuild tables for.
-                    dataSourcesToRebuild.addAll(dataSourcesNeedingRebuild);
-                    refreshed.forEach(segment -> dataSourcesToRebuild.add(segment.getDataSource()));
-                    dataSourcesNeedingRebuild.clear();
+                  lastFailure = 0L;
+                  lastRefresh = System.currentTimeMillis();
+                  refreshImmediately = false;
+                }
 
-                    lock.notifyAll();
-                  }
+                // Refresh the segments.
+                final Set<SegmentId> refreshed = refreshSegments(segmentsToRefresh);
 
-                  // Rebuild the dataSources.
-                  for (String dataSource : dataSourcesToRebuild) {
-                    final DruidTable druidTable = buildDruidTable(dataSource);
-                    final DruidTable oldTable = tables.put(dataSource, druidTable);
-                    if (oldTable == null || !oldTable.getRowSignature().equals(druidTable.getRowSignature())) {
-                      log.info("dataSource [%s] has new signature: %s.", dataSource, druidTable.getRowSignature());
-                    } else {
-                      log.debug("dataSource [%s] signature is unchanged.", dataSource);
-                    }
-                  }
+                synchronized (lock) {
+                  // Add missing segments back to the refresh list.
+                  segmentsNeedingRefresh.addAll(Sets.difference(segmentsToRefresh, refreshed));
 
-                  initialized.countDown();
-                }
-                catch (InterruptedException e) {
-                  // Fall through.
-                  throw e;
+                  // Compute the list of dataSources to rebuild tables for.
+                  dataSourcesToRebuild.addAll(dataSourcesNeedingRebuild);
+                  refreshed.forEach(segment -> dataSourcesToRebuild.add(segment.getDataSource()));
+                  dataSourcesNeedingRebuild.clear();
+
+                  lock.notifyAll();
                 }
-                catch (Exception e) {
-                  log.warn(e, "Metadata refresh failed, trying again soon.");
-
-                  synchronized (lock) {
-                    // Add our segments and dataSources back to their refresh and rebuild lists.
-                    segmentsNeedingRefresh.addAll(segmentsToRefresh);
-                    dataSourcesNeedingRebuild.addAll(dataSourcesToRebuild);
-                    lastFailure = System.currentTimeMillis();
-                    lock.notifyAll();
+
+                // Rebuild the dataSources.
+                for (String dataSource : dataSourcesToRebuild) {
+                  final DruidTable druidTable = buildDruidTable(dataSource);
+                  final DruidTable oldTable = tables.put(dataSource, druidTable);
+                  if (oldTable == null || !oldTable.getRowSignature().equals(druidTable.getRowSignature())) {
+                    log.info("dataSource [%s] has new signature: %s.", dataSource, druidTable.getRowSignature());
+                  } else {
+                    log.debug("dataSource [%s] signature is unchanged.", dataSource);
                   }
                 }
+
+                initialized.countDown();
+              }
+              catch (InterruptedException e) {
+                // Fall through.
+                throw e;
+              }
+              catch (Exception e) {
+                log.warn(e, "Metadata refresh failed, trying again soon.");
+
+                synchronized (lock) {
+                  // Add our segments and dataSources back to their refresh and rebuild lists.
+                  segmentsNeedingRefresh.addAll(segmentsToRefresh);
+                  dataSourcesNeedingRebuild.addAll(dataSourcesToRebuild);
+                  lastFailure = System.currentTimeMillis();
+                  lock.notifyAll();
+                }
               }
-            }
-            catch (InterruptedException e) {
-              // Just exit.
-            }
-            catch (Throwable e) {
-              // Throwables that fall out to here (not caught by an inner try/catch) are potentially gnarly, like
-              // OOMEs. Anyway, let's just emit an alert and stop refreshing metadata.
-              log.makeAlert(e, "Metadata refresh failed permanently").emit();
-              throw e;
-            }
-            finally {
-              log.info("Metadata refresh stopped.");
             }
           }
+          catch (InterruptedException e) {
+            // Just exit.
+          }
+          catch (Throwable e) {
+            // Throwables that fall out to here (not caught by an inner try/catch) are potentially gnarly, like
+            // OOMEs. Anyway, let's just emit an alert and stop refreshing metadata.
+            log.makeAlert(e, "Metadata refresh failed permanently").emit();
+            throw e;
+          }
+          finally {
+            log.info("Metadata refresh stopped.");
+          }
+        }
+    );
+
+    ScheduledExecutors.scheduleWithFixedDelay(

Review comment:
       Why not do this as part of the loop in the main thread?

##########
File path: processing/src/main/java/org/apache/druid/query/DataSource.java
##########
@@ -35,7 +35,8 @@
     @JsonSubTypes.Type(value = UnionDataSource.class, name = "union"),
     @JsonSubTypes.Type(value = JoinDataSource.class, name = "join"),
     @JsonSubTypes.Type(value = LookupDataSource.class, name = "lookup"),
-    @JsonSubTypes.Type(value = InlineDataSource.class, name = "inline")
+    @JsonSubTypes.Type(value = InlineDataSource.class, name = "inline"),
+    @JsonSubTypes.Type(value = GlobalTableDataSource.class, name = "global")

Review comment:
       1. I think `globalTable` would be a better name. We only have one chance to get it right!
   2. IMO for some consistency between SQL and native, we'll need to either transparently globalify the regular `table` type (perhaps a rewrite step like ClientQuerySegmentWalker's inlining?) or we'll need to document the `globalTable` type. I think the former is nicer, because the latter comes with too many caveats (you have to make sure to use it in the proper conditions).

##########
File path: sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
##########
@@ -616,6 +631,12 @@ private DruidTable buildDruidTable(final String dataSource)
 
       final RowSignature.Builder builder = RowSignature.builder();
       columnTypes.forEach(builder::add);
+      if (broadcastDatasources.contains(dataSource)) {

Review comment:
       tiny nit: the logic is laid out a bit weirdly here; it'd make more sense to emphasize what's different by having the dataSource be created in the if block, but the DruidTable created outside of it.

##########
File path: sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
##########
@@ -122,6 +128,8 @@
   // All dataSources that need tables regenerated.
   private final Set<String> dataSourcesNeedingRebuild = new HashSet<>();
 
+  private final Set<String> broadcastDatasources = new HashSet<>();

Review comment:
       This should be `@GuardedBy("lock")`, and so should `dataSourcesNeedingRebuild`, `mutableSegments`, `segmentsNeedingRefresh`, `refreshImmediately`, `lastRefresh`, `lastFailure`, and `isServerViewInitialized`.
   
   Could you please add those, and also remove the comment on `lock`, which is woefully out of date. (Thanks in advance for the housekeeping work.)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org