You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by "gianm (via GitHub)" <gi...@apache.org> on 2023/05/17 13:01:13 UTC

[GitHub] [druid] gianm commented on a diff in pull request #13967: Detect segment unavailability for queries in broker

gianm commented on code in PR #13967:
URL: https://github.com/apache/druid/pull/13967#discussion_r1196048484


##########
server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHistory.java:
##########
@@ -51,38 +51,43 @@
 
   private final int maxSize;
 
+  private final boolean asyncMode;
+
   private final CircularBuffer<Holder<T>> changes;
 
   @VisibleForTesting
-  final LinkedHashMap<CustomSettableFuture<T>, Counter> waitingFutures;
+  LinkedHashMap<CustomSettableFuture<T>, Counter> waitingFutures;
 
-  private final ExecutorService singleThreadedExecutor;
-  private final Runnable resolveWaitingFuturesRunnable;
+  private ExecutorService singleThreadedExecutor;

Review Comment:
   Please add some comments here about what these fields are for, and pointing out that they are only set in `asyncMode`.



##########
processing/src/main/java/org/apache/druid/query/QueryException.java:
##########
@@ -79,6 +79,7 @@ public class QueryException extends RuntimeException implements SanitizableExcep
   public static final String SQL_PARSE_FAILED_ERROR_CODE = "SQL parse failed";
   public static final String PLAN_VALIDATION_FAILED_ERROR_CODE = "Plan validation failed";
   public static final String SQL_QUERY_UNSUPPORTED_ERROR_CODE = "SQL query is unsupported";
+  public static final String UNAVAILABLE_SEGMENTS_ERROR_CODE = "Some segments are unavailable";

Review Comment:
   IMO `Data not available` is a nicer error code. Hard to explain why, but, it feels to me more in line with the other ones.



##########
sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCache.java:
##########
@@ -234,9 +243,78 @@ public SegmentMetadataCache(
     this.brokerInternalQueryConfig = brokerInternalQueryConfig;
     this.emitter = emitter;
 
+    if (!brokerSegmentWatcherConfig.isDetectUnavailableSegments()) {
+      isMetadataViewInitialized = true;
+    } else {
+      initMetadataSegmentViewCallback(metadataSegmentView);
+    }
     initServerViewTimelineCallback(serverView);
   }
 
+  public SegmentMetadataCache(
+      final QueryLifecycleFactory queryLifecycleFactory,
+      final TimelineServerView serverView,
+      final SegmentManager segmentManager,
+      final JoinableFactory joinableFactory,
+      final SegmentMetadataCacheConfig config,
+      final Escalator escalator,
+      final BrokerInternalQueryConfig brokerInternalQueryConfig,
+      final ServiceEmitter emitter
+  )
+  {
+    this(
+        queryLifecycleFactory,
+        serverView,
+        null,
+        new BrokerSegmentWatcherConfig()
+        {
+          @Override
+          public boolean isDetectUnavailableSegments()
+          {
+            return false;
+          }
+        },
+        segmentManager,
+        joinableFactory,
+        config,
+        escalator,
+        brokerInternalQueryConfig,
+        emitter
+    );
+  }
+
+  private void initMetadataSegmentViewCallback(final MetadataSegmentView metadataSegmentView)
+  {
+    metadataSegmentView.registerSegmentCallback(
+        callbackExec,
+        new ServerView.HandedOffSegmentCallback()
+        {
+          @Override
+          public void fullSync(List<DataSegmentChange> segments)
+          {
+            // This is a no op!
+          }
+
+          @Override
+          public void deltaSync(List<DataSegmentChange> segments)
+          {
+            // This is a no op!
+          }
+
+          @Override
+          public void segmentViewInitialized()
+          {
+            synchronized (lock) {
+              isMetadataViewInitialized = true;
+              log.info("Segment view is intialized");

Review Comment:
   Two points:
   
   - This message isn't very useful. It'd be better to use `debug` to avoid log clutter, _or_ keep `info` but make it more useful by adding the duration (i.e. `Segment view initialized in [%,d] ms.`)
   - Spelling ("initialized").



##########
server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java:
##########
@@ -936,10 +974,49 @@ public DataSegment map(int index, ResultSet r, StatementContext ctx) throws SQLE
     } else {
       log.info("Polled and found %,d segments in the database", segments.size());
     }
-    dataSourcesSnapshot = DataSourcesSnapshot.fromUsedSegments(
-        Iterables.filter(segments, Objects::nonNull), // Filter corrupted entries (see above in this method).
-        dataSourceProperties
-    );
+
+    handedOffSegments.clear();
+    handedOffSegments.addAll(datasourceToHandedOffSegments.values()

Review Comment:
   Should be careful here since the number of segments can be in the millions. This will create a lot of garbage, including a temporary Set that isn't needed. A plain `for` loop is a better idea.



##########
processing/src/main/java/org/apache/druid/timeline/DataSegmentChange.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.timeline;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonValue;
+import org.apache.druid.java.util.common.StringUtils;
+
+public class DataSegmentChange
+{
+  private final SegmentWithOvershadowedStatus segmentWithOvershadowedStatus;
+  private final ChangeType changeType;
+
+  @JsonCreator
+  public DataSegmentChange(
+      @JsonProperty("segmentWithOvershadowedStatus") SegmentWithOvershadowedStatus segmentWithOvershadowedStatus,
+      @JsonProperty("changeType") ChangeType changeType
+  )
+  {
+    this.segmentWithOvershadowedStatus = segmentWithOvershadowedStatus;
+    this.changeType = changeType;
+  }
+
+  @JsonProperty
+  public SegmentWithOvershadowedStatus getSegmentWithOvershadowedStatus()
+  {
+    return segmentWithOvershadowedStatus;
+  }
+
+  @JsonProperty
+  public ChangeType getChangeType()
+  {
+    return changeType;
+  }
+
+  @JsonIgnore
+  public boolean isLoad()

Review Comment:
   Some javadocs would be helpful for what `isLoad()` means. In particular, just from looking at the names, it is somewhat surprising that `SEGMENT_OVERSHADOWED` is considered a "load".



##########
server/src/main/java/org/apache/druid/client/CachingClusteredClient.java:
##########
@@ -456,14 +466,42 @@ private Set<SegmentServerSelector> computeSegmentsToQuery(
         }
         for (PartitionChunk<ServerSelector> chunk : filteredChunks) {
           ServerSelector server = chunk.getObject();
+          if (brokerSegmentWatcherConfig.isDetectUnavailableSegments() && !server.isQueryable()) {
+            log.debug("ServerSelector for segment id [%s] is not queryable", server.getSegment().getId());
+            continue;
+          }
           final SegmentDescriptor segment = new SegmentDescriptor(
               holder.getInterval(),
               holder.getVersion(),
               chunk.getChunkNumber()
           );
           segments.add(new SegmentServerSelector(server, segment));
+          if (server.isEmpty()) {
+            unavailableSegmentsIds.add(server.getSegment().getId());
+          }
+        }
+      }
+
+      if (brokerSegmentWatcherConfig.isDetectUnavailableSegments() && unavailableSegmentsIds.size() > 0) {
+        log.warn(
+            "Detected [%d] unavailable segments, segment ids: [%s]",
+            unavailableSegmentsIds.size(),
+            unavailableSegmentsIds
+        );
+        if (unavailableSegmentsAction == UnavailableSegmentsAction.FAIL) {
+          throw new QueryException(
+              QueryException.UNAVAILABLE_SEGMENTS_ERROR_CODE,
+              StringUtils.format(
+                  "Detected [%d] unavailable segments, segment ids: [%s]",

Review Comment:
   Shouldn't include the entire list of segments in the error message. It might be thousands. It should be clipped somehow.



##########
server/src/main/java/org/apache/druid/client/BrokerServerView.java:
##########
@@ -269,11 +309,14 @@ private void serverAddedSegment(final DruidServerMetadata server, final DataSegm
       // in theory we could probably just filter this to ensure we don't put ourselves in here, to make broker tree
       // query topologies, but for now just skip all brokers, so we don't create some sort of wild infinite query
       // loop...
+      boolean runCallBack = false;
       if (!server.getType().equals(ServerType.BROKER)) {
         log.debug("Adding segment[%s] for server[%s]", segment, server);
+
         ServerSelector selector = selectors.get(segmentId);
         if (selector == null) {
-          selector = new ServerSelector(segment, tierSelectorStrategy);
+          // if unavailableSegmentDetection is enabled, segment is not queryable unless added by coordinator

Review Comment:
   Something seems off with this comment. Seems like `queryable` is going to be `false` here always, regardless of whether unavailable segment detection is enabled? Is that intentional?



##########
server/src/main/java/org/apache/druid/client/selector/ServerSelector.java:
##########
@@ -216,4 +225,13 @@ public boolean hasData()
     return segment.get().hasData();
   }
 
+  public boolean isQueryable()
+  {
+    return isQueryable;
+  }
+
+  public void setQueryable(boolean queryable)
+  {
+    isQueryable = queryable;

Review Comment:
   Not thread-safe; probably need to synchronize on `this` like other mutators do.



##########
server/src/main/java/org/apache/druid/client/BrokerServerView.java:
##########
@@ -173,6 +181,37 @@ public CallbackAction segmentViewInitialized()
           return CallbackAction.CONTINUE;
         }
     );
+
+    metadataSegmentView.registerSegmentCallback(

Review Comment:
   Posting this comment here just so it can be attached to this file.
   
   Could you please add some class-level javadocs explaining how the logic is supposed to work? It's got quite complicated, because we are mixing stuff from `FilteredServerInventoryView` (segment announcement) and `MetadataSegmentView` (Coordinator). An explainer about the data flow and logic would be helpful, especially with links to the methods that do various important things.



##########
sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCache.java:
##########
@@ -234,9 +243,78 @@ public SegmentMetadataCache(
     this.brokerInternalQueryConfig = brokerInternalQueryConfig;
     this.emitter = emitter;
 
+    if (!brokerSegmentWatcherConfig.isDetectUnavailableSegments()) {

Review Comment:
   Couple questions about this:
   
   - Why is it necessary to wait for `metadataSegmentView`? (`metadataSegmentView` isn't used in this class otherwise, as far as I can tell.)
   - What happens in this class if some segments are unavailable? Ideally it should keep running smoothly and just ignore those segments.



##########
sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCache.java:
##########
@@ -248,6 +326,7 @@ public ServerView.CallbackAction timelineInitialized()
           {
             synchronized (lock) {
               isServerViewInitialized = true;
+              log.info("TimelineServerView is initialized");

Review Comment:
   Similar comment to the other message. Please either demote this to `debug` or make it more useful.



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -295,6 +295,26 @@ tableName, getPayloadType(), getQuoteString(), getCollation()
     );
   }
 
+  private void alterSegmentsTable(final String tableName)
+  {
+    try {
+      retryWithHandle(
+          (HandleCallback<Void>) handle -> {
+            final Batch batch = handle.createBatch();
+            if (!tableContainsColumn(handle, tableName, "handed_off")) {
+              log.info("Adding column [handed_off] to table[%s]", tableName);
+              batch.add(StringUtils.format("ALTER TABLE %1$s ADD COLUMN handed_off BOOLEAN NOT NULL DEFAULT FALSE", tableName));

Review Comment:
   Could you check if this will cause downtime (a long lock acquisition) on a large `segments` table on the most common metadata stores: MySQL and PostgreSQL?



##########
server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java:
##########
@@ -936,10 +974,49 @@ public DataSegment map(int index, ResultSet r, StatementContext ctx) throws SQLE
     } else {
       log.info("Polled and found %,d segments in the database", segments.size());
     }
-    dataSourcesSnapshot = DataSourcesSnapshot.fromUsedSegments(
-        Iterables.filter(segments, Objects::nonNull), // Filter corrupted entries (see above in this method).
-        dataSourceProperties
-    );
+
+    handedOffSegments.clear();

Review Comment:
   I suspect there's a race between this code and `markSegmentAsHandedOff`, which checks `handedOffSegments`. It could briefly be empty during calls to `markSegmentAsHandedOff`. Please look into that and let us know if it's an issue or not.



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -295,6 +295,26 @@ tableName, getPayloadType(), getQuoteString(), getCollation()
     );
   }
 
+  private void alterSegmentsTable(final String tableName)
+  {
+    try {
+      retryWithHandle(
+          (HandleCallback<Void>) handle -> {
+            final Batch batch = handle.createBatch();
+            if (!tableContainsColumn(handle, tableName, "handed_off")) {

Review Comment:
   Suggest calling this `has_loaded` rather than `handed_off`, because "handoff" is a realtime-specific concept. "Loading" applies to both realtime and batch.



##########
server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java:
##########
@@ -883,14 +910,18 @@ private void doPoll()
     //
     // setting connection to read-only will allow some database such as MySQL
     // to automatically use read-only transaction mode, further optimizing the query
+    final Map<String, Set<SegmentId>> datasourceToHandedOffSegments = new HashMap<>();
     final List<DataSegment> segments = connector.inReadOnlyTransaction(
         new TransactionCallback<List<DataSegment>>()
         {
           @Override
           public List<DataSegment> inTransaction(Handle handle, TransactionStatus status)
           {
             return handle
-                .createQuery(StringUtils.format("SELECT payload FROM %s WHERE used=true", getSegmentsTable()))
+                .createQuery(StringUtils.format(

Review Comment:
   This reformatting makes things kind of ugly, how about moving the query to its own `String sql = "SELECT…` line?



##########
server/src/main/java/org/apache/druid/client/DataSourcesSnapshot.java:
##########
@@ -167,6 +237,37 @@ private List<DataSegment> determineOvershadowedSegments()
         }
       }
     }
-    return overshadowedSegments;
+    return ImmutableSet.copyOf(overshadowedSegments);
+  }
+
+  public static Set<SegmentWithOvershadowedStatus> getSegmentsWithOvershadowedStatus(
+      Collection<ImmutableDruidDataSource> segments,
+      Set<DataSegment> overshadowedSegments,
+      Map<String, Set<SegmentId>> handedOffState
+  )
+  {
+
+    final Stream<DataSegment> usedSegments = segments
+        .stream()
+        .flatMap(t -> t.getSegments().stream());
+
+    return usedSegments
+        .map(segment -> new SegmentWithOvershadowedStatus(
+                 segment,
+                 overshadowedSegments.contains(segment),
+                 getHandedOffStateForSegment(handedOffState, segment.getDataSource(), segment.getId())
+             )
+        )
+        .collect(Collectors.toSet());
+  }
+
+  private static boolean getHandedOffStateForSegment(
+      Map<String, Set<SegmentId>> handedOffState,
+      String dataSource, SegmentId segmentId
+  )
+  {
+    return handedOffState
+        .getOrDefault(dataSource, new HashSet<>())

Review Comment:
   This ends up creating a `HashSet` for each call regardless of whether it's needed or not. And there will potentially be a lot of calls, since there's one here per segment. Better to restructure the code to avoid that.



##########
server/src/main/java/org/apache/druid/client/CachingClusteredClient.java:
##########
@@ -441,6 +450,7 @@ private Set<SegmentServerSelector> computeSegmentsToQuery(
 
       final Set<SegmentServerSelector> segments = new LinkedHashSet<>();
       final Map<String, Optional<RangeSet<String>>> dimensionRangeCache = new HashMap<>();
+      List<SegmentId> unavailableSegmentsIds = new ArrayList<>();

Review Comment:
   nit: `final` just like the other surrounding variables.



##########
server/src/main/java/org/apache/druid/client/BrokerServerView.java:
##########
@@ -269,11 +309,14 @@ private void serverAddedSegment(final DruidServerMetadata server, final DataSegm
       // in theory we could probably just filter this to ensure we don't put ourselves in here, to make broker tree
       // query topologies, but for now just skip all brokers, so we don't create some sort of wild infinite query
       // loop...
+      boolean runCallBack = false;
       if (!server.getType().equals(ServerType.BROKER)) {
         log.debug("Adding segment[%s] for server[%s]", segment, server);
+
         ServerSelector selector = selectors.get(segmentId);

Review Comment:
   Would be good to add `@GuardedBy` to these existing items to help ensure we keep things thread-safe.



##########
processing/src/main/java/org/apache/druid/timeline/DataSegmentChange.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.timeline;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonValue;
+import org.apache.druid.java.util.common.StringUtils;
+
+public class DataSegmentChange
+{
+  private final SegmentWithOvershadowedStatus segmentWithOvershadowedStatus;
+  private final ChangeType changeType;
+
+  @JsonCreator
+  public DataSegmentChange(
+      @JsonProperty("segmentWithOvershadowedStatus") SegmentWithOvershadowedStatus segmentWithOvershadowedStatus,
+      @JsonProperty("changeType") ChangeType changeType
+  )
+  {
+    this.segmentWithOvershadowedStatus = segmentWithOvershadowedStatus;
+    this.changeType = changeType;
+  }
+
+  @JsonProperty
+  public SegmentWithOvershadowedStatus getSegmentWithOvershadowedStatus()
+  {
+    return segmentWithOvershadowedStatus;
+  }
+
+  @JsonProperty
+  public ChangeType getChangeType()
+  {
+    return changeType;
+  }
+
+  @JsonIgnore
+  public boolean isLoad()
+  {
+    return changeType != ChangeType.SEGMENT_REMOVED;
+  }
+
+  @Override
+  public String toString()
+  {
+    return "DataSegmentChangeRequest{" +
+           ", changeReason=" + changeType +
+           ", segmentWithOvershadowedStatus=" + segmentWithOvershadowedStatus +
+           '}';
+  }
+
+  public enum ChangeType

Review Comment:
   Javadocs are called for here, explaining what each of the enum values means and when it would be used.



##########
processing/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java:
##########
@@ -144,6 +144,20 @@ public Collection<ObjectType> iterateAllObjects()
     );
   }
 
+  public Collection<PartitionChunkEntry<VersionType, ObjectType>> getAllPartitionChunkEntries()

Review Comment:
   1. Javadocs, please.
   2. Best to align name with `iterateAllObjects`, which this method is very similar to. Maybe `iterateAllEntries`?



##########
server/src/main/java/org/apache/druid/client/BrokerServerView.java:
##########
@@ -302,8 +348,11 @@ private void serverAddedSegment(final DruidServerMetadata server, final DataSegm
         }
         selector.addServerAndUpdateSegment(queryableDruidServer, segment);
       }
-      // run the callbacks, even if the segment came from a broker, lets downstream watchers decide what to do with it
-      runTimelineCallbacks(callback -> callback.segmentAdded(server, segment));
+
+      if (!segmentWatcherConfig.isDetectUnavailableSegments() || runCallBack) {

Review Comment:
   Maybe would be clearer to initialize `runCallBack` to `!segmentWatcherConfig.isDetectUnavailableSegments()` and have this simply be `if (runCallBack)`



##########
server/src/main/java/org/apache/druid/client/CachingClusteredClient.java:
##########
@@ -456,14 +466,42 @@ private Set<SegmentServerSelector> computeSegmentsToQuery(
         }
         for (PartitionChunk<ServerSelector> chunk : filteredChunks) {
           ServerSelector server = chunk.getObject();
+          if (brokerSegmentWatcherConfig.isDetectUnavailableSegments() && !server.isQueryable()) {
+            log.debug("ServerSelector for segment id [%s] is not queryable", server.getSegment().getId());
+            continue;
+          }
           final SegmentDescriptor segment = new SegmentDescriptor(
               holder.getInterval(),
               holder.getVersion(),
               chunk.getChunkNumber()
           );
           segments.add(new SegmentServerSelector(server, segment));
+          if (server.isEmpty()) {
+            unavailableSegmentsIds.add(server.getSegment().getId());
+          }
+        }
+      }
+
+      if (brokerSegmentWatcherConfig.isDetectUnavailableSegments() && unavailableSegmentsIds.size() > 0) {
+        log.warn(

Review Comment:
   We shouldn't do a `warn` for each query that had unavailable segments, it'll make too many logs. For visibility, instead consider other options, like a periodically emitted metric, or a new dimension on `QueryMetrics`.



##########
server/src/main/java/org/apache/druid/client/selector/ServerSelector.java:
##########
@@ -216,4 +225,13 @@ public boolean hasData()
     return segment.get().hasData();
   }
 
+  public boolean isQueryable()
+  {
+    return isQueryable;

Review Comment:
   Not thread-safe; probably need to synchronize on `this`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org