You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by le...@apache.org on 2019/05/01 16:01:06 UTC

[incubator-druid] branch master updated: Add is_overshadowed column to sys.segments table (#7425)

This is an automated email from the ASF dual-hosted git repository.

leventov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 15d19f3  Add is_overshadowed column to sys.segments table (#7425)
15d19f3 is described below

commit 15d19f3059a990f022b1cb98bdd4e6a82cdf9a06
Author: Surekha <su...@imply.io>
AuthorDate: Wed May 1 09:00:57 2019 -0700

    Add is_overshadowed column to sys.segments table (#7425)
    
    * Add is_overshadowed column to sys.segments table
    
    * update docs
    
    * Rename class and variables
    
    * PR comments
    
    * PR comments
    
    * remove unused variables in MetadataResource
    
    * move constants together
    
    * add getFullyOvershadowedSegments method to ImmutableDruidDataSource
    
    * Fix compareTo of SegmentWithOvershadowedStatus
    
    * PR comment
    
    * PR comments
    
    * PR comments
    
    * PR comments
    
    * PR comments
    
    * fix issue with already consumed stream
    
    * minor refactoring
    
    * PR comments
---
 .../timeline/SegmentWithOvershadowedStatus.java    | 90 ++++++++++++++++++++++
 docs/content/querying/sql.md                       |  1 +
 .../druid/client/ImmutableDruidDataSource.java     | 40 ++++++++++
 .../helper/DruidCoordinatorRuleRunner.java         | 30 +-------
 .../apache/druid/server/http/MetadataResource.java | 64 +++++++++++++--
 .../sql/calcite/schema/MetadataSegmentView.java    | 69 ++++++++---------
 .../druid/sql/calcite/schema/SystemSchema.java     | 64 ++++++++-------
 .../druid/sql/calcite/schema/SystemSchemaTest.java | 44 +++++++----
 8 files changed, 289 insertions(+), 113 deletions(-)

diff --git a/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java b/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java
new file mode 100644
index 0000000..e86daea
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.timeline;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * DataSegment object plus the overshadowed status for the segment. An immutable object.
+ *
+ * SegmentWithOvershadowedStatus's {@link #compareTo} method considers only the {@link SegmentId}
+ * of the DataSegment object.
+ */
+public class SegmentWithOvershadowedStatus implements Comparable<SegmentWithOvershadowedStatus>
+{
+  private final boolean overshadowed;
+  private final DataSegment dataSegment;
+
+  @JsonCreator
+  public SegmentWithOvershadowedStatus(
+      @JsonProperty("dataSegment") DataSegment dataSegment,
+      @JsonProperty("overshadowed") boolean overshadowed
+  )
+  {
+    this.dataSegment = dataSegment;
+    this.overshadowed = overshadowed;
+  }
+
+  @JsonProperty
+  public boolean isOvershadowed()
+  {
+    return overshadowed;
+  }
+
+  @JsonProperty
+  public DataSegment getDataSegment()
+  {
+    return dataSegment;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof SegmentWithOvershadowedStatus)) {
+      return false;
+    }
+    final SegmentWithOvershadowedStatus that = (SegmentWithOvershadowedStatus) o;
+    if (!dataSegment.equals(that.dataSegment)) {
+      return false;
+    }
+    if (overshadowed != (that.overshadowed)) {
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public int hashCode()
+  {
+    int result = dataSegment.hashCode();
+    result = 31 * result + Boolean.hashCode(overshadowed);
+    return result;
+  }
+
+  @Override
+  public int compareTo(SegmentWithOvershadowedStatus o)
+  {
+    return dataSegment.getId().compareTo(o.dataSegment.getId());
+  }
+}
diff --git a/docs/content/querying/sql.md b/docs/content/querying/sql.md
index 9e97138..1fc1243 100644
--- a/docs/content/querying/sql.md
+++ b/docs/content/querying/sql.md
@@ -612,6 +612,7 @@ Note that a segment can be served by more than one stream ingestion tasks or His
 |is_published|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 represents this segment has been published to the metadata store with `used=1`|
 |is_available|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is currently being served by any process(Historical or realtime)|
 |is_realtime|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is being served on any type of realtime tasks|
+|is_overshadowed|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is published and is _fully_ overshadowed by some other published segments. Currently, is_overshadowed is always false for unpublished segments, although this may change in the future. You can filter for segments that "should be published" by filtering for `is_published = 1 AND is_overshadowed = 0`. Segments can briefly be both published and overshadowed if they were recently replaced, b [...]
 |payload|STRING|JSON-serialized data segment payload|
 
 For example to retrieve all segments for datasource "wikipedia", use the query:
diff --git a/server/src/main/java/org/apache/druid/client/ImmutableDruidDataSource.java b/server/src/main/java/org/apache/druid/client/ImmutableDruidDataSource.java
index 5953944..841b716 100644
--- a/server/src/main/java/org/apache/druid/client/ImmutableDruidDataSource.java
+++ b/server/src/main/java/org/apache/druid/client/ImmutableDruidDataSource.java
@@ -25,12 +25,17 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.collect.Ordering;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.VersionedIntervalTimeline;
 
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 
 /**
  * An immutable collection of metadata of segments ({@link DataSegment} objects), belonging to a particular data source.
@@ -109,6 +114,41 @@ public class ImmutableDruidDataSource
     return totalSizeOfSegments;
   }
 
+  /**
+   * This method finds the overshadowed segments from the given segments
+   *
+   * @return set of overshadowed segments
+   */
+  public static Set<DataSegment> determineOvershadowedSegments(Iterable<DataSegment> segments)
+  {
+    final Map<String, VersionedIntervalTimeline<String, DataSegment>> timelines = buildTimelines(segments);
+
+    final Set<DataSegment> overshadowedSegments = new HashSet<>();
+    for (DataSegment dataSegment : segments) {
+      final VersionedIntervalTimeline<String, DataSegment> timeline = timelines.get(dataSegment.getDataSource());
+      if (timeline != null && timeline.isOvershadowed(dataSegment.getInterval(), dataSegment.getVersion())) {
+        overshadowedSegments.add(dataSegment);
+      }
+    }
+    return overshadowedSegments;
+  }
+
+  /**
+   * Builds a timeline from given segments
+   *
+   * @return map of datasource to VersionedIntervalTimeline of segments
+   */
+  private static Map<String, VersionedIntervalTimeline<String, DataSegment>> buildTimelines(
+      Iterable<DataSegment> segments
+  )
+  {
+    final Map<String, VersionedIntervalTimeline<String, DataSegment>> timelines = new HashMap<>();
+    segments.forEach(segment -> timelines
+        .computeIfAbsent(segment.getDataSource(), dataSource -> new VersionedIntervalTimeline<>(Ordering.natural()))
+        .add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment)));
+    return timelines;
+  }
+
   @Override
   public String toString()
   {
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java
index 7d78301..bbceaaf 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java
@@ -20,7 +20,7 @@
 package org.apache.druid.server.coordinator.helper;
 
 import com.google.common.collect.Lists;
-import com.google.common.collect.Ordering;
+import org.apache.druid.client.ImmutableDruidDataSource;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.metadata.MetadataRuleManager;
@@ -32,14 +32,9 @@ import org.apache.druid.server.coordinator.ReplicationThrottler;
 import org.apache.druid.server.coordinator.rules.Rule;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
-import org.apache.druid.timeline.TimelineObjectHolder;
-import org.apache.druid.timeline.VersionedIntervalTimeline;
 import org.joda.time.DateTime;
 
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 
 /**
@@ -89,7 +84,8 @@ public class DruidCoordinatorRuleRunner implements DruidCoordinatorHelper
     // find available segments which are not overshadowed by other segments in DB
     // only those would need to be loaded/dropped
     // anything overshadowed by served segments is dropped automatically by DruidCoordinatorCleanupOvershadowed
-    Set<DataSegment> overshadowed = determineOvershadowedSegments(params);
+    final Set<DataSegment> overshadowed = ImmutableDruidDataSource
+        .determineOvershadowedSegments(params.getAvailableSegments());
 
     for (String tier : cluster.getTierNames()) {
       replicatorThrottler.updateReplicationState(tier);
@@ -138,24 +134,4 @@ public class DruidCoordinatorRuleRunner implements DruidCoordinatorHelper
 
     return params.buildFromExisting().withCoordinatorStats(stats).build();
   }
-
-  private Set<DataSegment> determineOvershadowedSegments(DruidCoordinatorRuntimeParams params)
-  {
-    Map<String, VersionedIntervalTimeline<String, DataSegment>> timelines = new HashMap<>();
-    for (DataSegment segment : params.getAvailableSegments()) {
-      timelines
-          .computeIfAbsent(segment.getDataSource(), dataSource -> new VersionedIntervalTimeline<>(Ordering.natural()))
-          .add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment));
-    }
-
-    Set<DataSegment> overshadowed = new HashSet<>();
-    for (VersionedIntervalTimeline<String, DataSegment> timeline : timelines.values()) {
-      for (TimelineObjectHolder<String, DataSegment> holder : timeline.findOvershadowed()) {
-        for (DataSegment dataSegment : holder.getObject().payloads()) {
-          overshadowed.add(dataSegment);
-        }
-      }
-    }
-    return overshadowed;
-  }
 }
diff --git a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java
index da0abe7..3c7e8ac 100644
--- a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java
@@ -37,6 +37,7 @@ import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.server.security.ResourceAction;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.SegmentWithOvershadowedStatus;
 import org.joda.time.Interval;
 
 import javax.servlet.http.HttpServletRequest;
@@ -51,6 +52,7 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
@@ -147,7 +149,8 @@ public class MetadataResource
   @Produces(MediaType.APPLICATION_JSON)
   public Response getDatabaseSegments(
       @Context final HttpServletRequest req,
-      @QueryParam("datasources") final Set<String> datasources
+      @QueryParam("datasources") final Set<String> datasources,
+      @QueryParam("includeOvershadowedStatus") final String includeOvershadowedStatus
   )
   {
     // If we haven't polled the metadata store yet, use an empty list of datasources.
@@ -159,14 +162,61 @@ public class MetadataResource
     }
     final Stream<DataSegment> metadataSegments = dataSourceStream.flatMap(t -> t.getSegments().stream());
 
-    final Function<DataSegment, Iterable<ResourceAction>> raGenerator = segment -> Collections.singletonList(
-        AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSource()));
+    if (includeOvershadowedStatus != null) {
+      final Iterable<SegmentWithOvershadowedStatus> authorizedSegments = findAuthorizedSegmentWithOvershadowedStatus(
+          req,
+          druidDataSources,
+          metadataSegments
+      );
+      Response.ResponseBuilder builder = Response.status(Response.Status.OK);
+      return builder.entity(authorizedSegments).build();
+    } else {
+
+      final Function<DataSegment, Iterable<ResourceAction>> raGenerator = segment -> Collections.singletonList(
+          AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSource()));
+
+      final Iterable<DataSegment> authorizedSegments = AuthorizationUtils.filterAuthorizedResources(
+          req,
+          metadataSegments::iterator,
+          raGenerator,
+          authorizerMapper
+      );
+
+      Response.ResponseBuilder builder = Response.status(Response.Status.OK);
+      return builder.entity(authorizedSegments).build();
+    }
+  }
 
-    final Iterable<DataSegment> authorizedSegments =
-        AuthorizationUtils.filterAuthorizedResources(req, metadataSegments::iterator, raGenerator, authorizerMapper);
+  private Iterable<SegmentWithOvershadowedStatus> findAuthorizedSegmentWithOvershadowedStatus(
+      HttpServletRequest req,
+      Collection<ImmutableDruidDataSource> druidDataSources,
+      Stream<DataSegment> metadataSegments
+  )
+  {
+    // It's fine to add all overshadowed segments to a single collection because only
+    // a small fraction of the segments in the cluster are expected to be overshadowed,
+    // so building this collection shouldn't generate a lot of garbage.
+    final Set<DataSegment> overshadowedSegments = new HashSet<>();
+    for (ImmutableDruidDataSource dataSource : druidDataSources) {
+      overshadowedSegments.addAll(ImmutableDruidDataSource.determineOvershadowedSegments(dataSource.getSegments()));
+    }
 
-    final Response.ResponseBuilder builder = Response.status(Response.Status.OK);
-    return builder.entity(authorizedSegments).build();
+    final Stream<SegmentWithOvershadowedStatus> segmentsWithOvershadowedStatus = metadataSegments
+        .map(segment -> new SegmentWithOvershadowedStatus(
+            segment,
+            overshadowedSegments.contains(segment)
+        ));
+
+    final Function<SegmentWithOvershadowedStatus, Iterable<ResourceAction>> raGenerator = segment -> Collections
+        .singletonList(AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSegment().getDataSource()));
+
+    final Iterable<SegmentWithOvershadowedStatus> authorizedSegments = AuthorizationUtils.filterAuthorizedResources(
+        req,
+        segmentsWithOvershadowedStatus::iterator,
+        raGenerator,
+        authorizerMapper
+    );
+    return authorizedSegments;
   }
 
   @GET
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java
index 50fe313..18d288e 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java
@@ -23,7 +23,9 @@ import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.JavaType;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSortedSet;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.inject.Inject;
 import org.apache.druid.client.BrokerSegmentWatcherConfig;
 import org.apache.druid.client.DataSegmentInterner;
@@ -32,7 +34,6 @@ import org.apache.druid.client.coordinator.Coordinator;
 import org.apache.druid.concurrent.LifecycleLock;
 import org.apache.druid.discovery.DruidLeaderClient;
 import org.apache.druid.guice.ManageLifecycle;
-import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
@@ -43,19 +44,17 @@ import org.apache.druid.java.util.http.client.Request;
 import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
 import org.apache.druid.sql.calcite.planner.PlannerConfig;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentWithOvershadowedStatus;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
 import org.jboss.netty.handler.codec.http.HttpMethod;
-import org.joda.time.DateTime;
 
-import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Iterator;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * This class polls the coordinator in background to keep the latest published segments.
@@ -73,12 +72,19 @@ public class MetadataSegmentView
   private final BrokerSegmentWatcherConfig segmentWatcherConfig;
 
   private final boolean isCacheEnabled;
-  @Nullable
-  private final ConcurrentMap<DataSegment, DateTime> publishedSegments;
+  /**
+   * Use {@link ImmutableSortedSet} so that the order of segments is deterministic and
+   * sys.segments queries return the segments in sorted order based on segmentId.
+   *
+   * Volatile since this reference is reassigned in {@code poll()} and then read in {@code getPublishedSegments()}
+   * from other threads.
+   */
+  @MonotonicNonNull
+  private volatile ImmutableSortedSet<SegmentWithOvershadowedStatus> publishedSegments = null;
   private final ScheduledExecutorService scheduledExec;
   private final long pollPeriodInMS;
   private final LifecycleLock lifecycleLock = new LifecycleLock();
-  private final AtomicBoolean cachePopulated = new AtomicBoolean(false);
+  private final CountDownLatch cachePopulated = new CountDownLatch(1);
 
   @Inject
   public MetadataSegmentView(
@@ -96,7 +102,6 @@ public class MetadataSegmentView
     this.segmentWatcherConfig = segmentWatcherConfig;
     this.isCacheEnabled = plannerConfig.isMetadataSegmentCacheEnable();
     this.pollPeriodInMS = plannerConfig.getMetadataSegmentPollPeriod();
-    this.publishedSegments = isCacheEnabled ? new ConcurrentHashMap<>(1000) : null;
     this.scheduledExec = Execs.scheduledSingleThreaded("MetadataSegmentView-Cache--%d");
   }
 
@@ -134,40 +139,32 @@ public class MetadataSegmentView
   private void poll()
   {
     log.info("polling published segments from coordinator");
-    final JsonParserIterator<DataSegment> metadataSegments = getMetadataSegments(
+    final JsonParserIterator<SegmentWithOvershadowedStatus> metadataSegments = getMetadataSegments(
         coordinatorDruidLeaderClient,
         jsonMapper,
         responseHandler,
         segmentWatcherConfig.getWatchedDataSources()
     );
 
-    final DateTime timestamp = DateTimes.nowUtc();
+    final ImmutableSortedSet.Builder<SegmentWithOvershadowedStatus> builder = ImmutableSortedSet.naturalOrder();
     while (metadataSegments.hasNext()) {
-      final DataSegment interned = DataSegmentInterner.intern(metadataSegments.next());
-      // timestamp is used to filter deleted segments
-      publishedSegments.put(interned, timestamp);
+      final SegmentWithOvershadowedStatus segment = metadataSegments.next();
+      final DataSegment interned = DataSegmentInterner.intern(segment.getDataSegment());
+      final SegmentWithOvershadowedStatus segmentWithOvershadowedStatus = new SegmentWithOvershadowedStatus(
+          interned,
+          segment.isOvershadowed()
+      );
+      builder.add(segmentWithOvershadowedStatus);
     }
-    // filter the segments from cache whose timestamp is not equal to latest timestamp stored,
-    // since the presence of a segment with an earlier timestamp indicates that
-    // "that" segment is not returned by coordinator in latest poll, so it's
-    // likely deleted and therefore we remove it from publishedSegments
-    // Since segments are not atomically replaced because it can cause high
-    // memory footprint due to large number of published segments, so
-    // we are incrementally removing deleted segments from the map
-    // This means publishedSegments will be eventually consistent with
-    // the segments in coordinator
-    publishedSegments.entrySet().removeIf(e -> e.getValue() != timestamp);
-    cachePopulated.set(true);
+    publishedSegments = builder.build();
+    cachePopulated.countDown();
   }
 
-  public Iterator<DataSegment> getPublishedSegments()
+  public Iterator<SegmentWithOvershadowedStatus> getPublishedSegments()
   {
     if (isCacheEnabled) {
-      Preconditions.checkState(
-          lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS) && cachePopulated.get(),
-          "hold on, still syncing published segments"
-      );
-      return publishedSegments.keySet().iterator();
+      Uninterruptibles.awaitUninterruptibly(cachePopulated);
+      return publishedSegments.iterator();
     } else {
       return getMetadataSegments(
           coordinatorDruidLeaderClient,
@@ -179,14 +176,14 @@ public class MetadataSegmentView
   }
 
   // Note that coordinator must be up to get segments
-  private JsonParserIterator<DataSegment> getMetadataSegments(
+  private JsonParserIterator<SegmentWithOvershadowedStatus> getMetadataSegments(
       DruidLeaderClient coordinatorClient,
       ObjectMapper jsonMapper,
       BytesAccumulatingResponseHandler responseHandler,
       Set<String> watchedDataSources
   )
   {
-    String query = "/druid/coordinator/v1/metadata/segments";
+    String query = "/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus";
     if (watchedDataSources != null && !watchedDataSources.isEmpty()) {
       log.debug(
           "filtering datasources in published segments based on broker's watchedDataSources[%s]", watchedDataSources);
@@ -195,7 +192,7 @@ public class MetadataSegmentView
         sb.append("datasources=").append(ds).append("&");
       }
       sb.setLength(sb.length() - 1);
-      query = "/druid/coordinator/v1/metadata/segments?" + sb;
+      query = "/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&" + sb;
     }
     Request request;
     try {
@@ -213,7 +210,7 @@ public class MetadataSegmentView
         responseHandler
     );
 
-    final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference<DataSegment>()
+    final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference<SegmentWithOvershadowedStatus>()
     {
     });
     return new JsonParserIterator<>(
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
index 29f9816..18f4c31 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
@@ -69,6 +69,7 @@ import org.apache.druid.sql.calcite.planner.PlannerContext;
 import org.apache.druid.sql.calcite.table.RowSignature;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.SegmentWithOvershadowedStatus;
 import org.jboss.netty.handler.codec.http.HttpMethod;
 
 import javax.annotation.Nullable;
@@ -92,6 +93,17 @@ public class SystemSchema extends AbstractSchema
   private static final String SERVER_SEGMENTS_TABLE = "server_segments";
   private static final String TASKS_TABLE = "tasks";
 
+  /**
+   * Booleans constants represented as long type,
+   * where 1 = true and 0 = false to make it easy to count number of segments
+   * which are published, available etc.
+   */
+  private static final long IS_PUBLISHED_FALSE = 0L;
+  private static final long IS_PUBLISHED_TRUE = 1L;
+  private static final long IS_AVAILABLE_TRUE = 1L;
+  private static final long IS_OVERSHADOWED_FALSE = 0L;
+  private static final long IS_OVERSHADOWED_TRUE = 1L;
+
   static final RowSignature SEGMENTS_SIGNATURE = RowSignature
       .builder()
       .add("segment_id", ValueType.STRING)
@@ -106,6 +118,7 @@ public class SystemSchema extends AbstractSchema
       .add("is_published", ValueType.LONG)
       .add("is_available", ValueType.LONG)
       .add("is_realtime", ValueType.LONG)
+      .add("is_overshadowed", ValueType.LONG)
       .add("payload", ValueType.STRING)
       .build();
 
@@ -189,14 +202,6 @@ public class SystemSchema extends AbstractSchema
     private final AuthorizerMapper authorizerMapper;
     private final MetadataSegmentView metadataView;
 
-    /**
-     * Booleans constants used for available segments represented as long type,
-     * where 1 = true and 0 = false to make it easy to count number of segments
-     * which are published, available
-     */
-    private static final long DEFAULT_IS_PUBLISHED = 0;
-    private static final long DEFAULT_IS_AVAILABLE = 1;
-
     public SegmentsTable(
         DruidSchema druidSchemna,
         MetadataSegmentView metadataView,
@@ -235,12 +240,12 @@ public class SystemSchema extends AbstractSchema
           Maps.newHashMapWithExpectedSize(druidSchema.getTotalSegments());
       for (AvailableSegmentMetadata h : availableSegmentMetadata.values()) {
         PartialSegmentData partialSegmentData =
-            new PartialSegmentData(DEFAULT_IS_AVAILABLE, h.isRealtime(), h.getNumReplicas(), h.getNumRows());
+            new PartialSegmentData(IS_AVAILABLE_TRUE, h.isRealtime(), h.getNumReplicas(), h.getNumRows());
         partialSegmentDataMap.put(h.getSegmentId(), partialSegmentData);
       }
 
       //get published segments from metadata segment cache (if enabled in sql planner config), else directly from coordinator
-      final Iterator<DataSegment> metadataStoreSegments = metadataView.getPublishedSegments();
+      final Iterator<SegmentWithOvershadowedStatus> metadataStoreSegments = metadataView.getPublishedSegments();
 
       final Set<SegmentId> segmentsAlreadySeen = new HashSet<>();
 
@@ -251,8 +256,9 @@ public class SystemSchema extends AbstractSchema
           ))
           .transform(val -> {
             try {
-              segmentsAlreadySeen.add(val.getId());
-              final PartialSegmentData partialSegmentData = partialSegmentDataMap.get(val.getId());
+              final DataSegment segment = val.getDataSegment();
+              segmentsAlreadySeen.add(segment.getId());
+              final PartialSegmentData partialSegmentData = partialSegmentDataMap.get(segment.getId());
               long numReplicas = 0L, numRows = 0L, isRealtime = 0L, isAvailable = 0L;
               if (partialSegmentData != null) {
                 numReplicas = partialSegmentData.getNumReplicas();
@@ -261,23 +267,24 @@ public class SystemSchema extends AbstractSchema
                 isRealtime = partialSegmentData.isRealtime();
               }
               return new Object[]{
-                  val.getId(),
-                  val.getDataSource(),
-                  val.getInterval().getStart().toString(),
-                  val.getInterval().getEnd().toString(),
-                  val.getSize(),
-                  val.getVersion(),
-                  Long.valueOf(val.getShardSpec().getPartitionNum()),
+                  segment.getId(),
+                  segment.getDataSource(),
+                  segment.getInterval().getStart().toString(),
+                  segment.getInterval().getEnd().toString(),
+                  segment.getSize(),
+                  segment.getVersion(),
+                  Long.valueOf(segment.getShardSpec().getPartitionNum()),
                   numReplicas,
                   numRows,
-                  1L, //is_published is true for published segments
+                  IS_PUBLISHED_TRUE, //is_published is true for published segments
                   isAvailable,
                   isRealtime,
+                  val.isOvershadowed() ? IS_OVERSHADOWED_TRUE : IS_OVERSHADOWED_FALSE,
                   jsonMapper.writeValueAsString(val)
               };
             }
             catch (JsonProcessingException e) {
-              throw new RE(e, "Error getting segment payload for segment %s", val.getId());
+              throw new RE(e, "Error getting segment payload for segment %s", val.getDataSegment().getId());
             }
           });
 
@@ -303,9 +310,10 @@ public class SystemSchema extends AbstractSchema
                   Long.valueOf(val.getKey().getShardSpec().getPartitionNum()),
                   numReplicas,
                   val.getValue().getNumRows(),
-                  DEFAULT_IS_PUBLISHED,
-                  DEFAULT_IS_AVAILABLE,
+                  IS_PUBLISHED_FALSE, // is_published is false for unpublished segments
+                  IS_AVAILABLE_TRUE, // is_available is assumed to be always true for segments announced by historicals or realtime tasks
                   val.getValue().isRealtime(),
+                  IS_OVERSHADOWED_FALSE, // there is an assumption here that unpublished segments are never overshadowed
                   jsonMapper.writeValueAsString(val.getKey())
               };
             }
@@ -322,18 +330,18 @@ public class SystemSchema extends AbstractSchema
 
     }
 
-    private Iterator<DataSegment> getAuthorizedPublishedSegments(
-        Iterator<DataSegment> it,
+    private Iterator<SegmentWithOvershadowedStatus> getAuthorizedPublishedSegments(
+        Iterator<SegmentWithOvershadowedStatus> it,
         DataContext root
     )
     {
       final AuthenticationResult authenticationResult =
           (AuthenticationResult) root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT);
 
-      Function<DataSegment, Iterable<ResourceAction>> raGenerator = segment -> Collections.singletonList(
-          AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSource()));
+      Function<SegmentWithOvershadowedStatus, Iterable<ResourceAction>> raGenerator = segment -> Collections.singletonList(
+          AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSegment().getDataSource()));
 
-      final Iterable<DataSegment> authorizedSegments = AuthorizationUtils.filterAuthorizedResources(
+      final Iterable<SegmentWithOvershadowedStatus> authorizedSegments = AuthorizationUtils.filterAuthorizedResources(
           authenticationResult,
           () -> it,
           raGenerator,
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
index d354f14..a942db4 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
@@ -76,6 +76,7 @@ import org.apache.druid.sql.calcite.util.TestServerInventoryView;
 import org.apache.druid.sql.calcite.view.NoopViewManager;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.SegmentWithOvershadowedStatus;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.easymock.EasyMock;
 import org.jboss.netty.handler.codec.http.HttpMethod;
@@ -381,7 +382,7 @@ public class SystemSchemaTest extends CalciteTestBase
     final RelDataType rowType = segmentsTable.getRowType(new JavaTypeFactoryImpl());
     final List<RelDataTypeField> fields = rowType.getFieldList();
 
-    Assert.assertEquals(13, fields.size());
+    Assert.assertEquals(14, fields.size());
 
     final SystemSchema.TasksTable tasksTable = (SystemSchema.TasksTable) schema.getTableMap().get("tasks");
     final RelDataType sysRowType = tasksTable.getRowType(new JavaTypeFactoryImpl());
@@ -408,11 +409,14 @@ public class SystemSchemaTest extends CalciteTestBase
         .withConstructor(druidSchema, metadataView, mapper, authMapper)
         .createMock();
     EasyMock.replay(segmentsTable);
-    final Set<DataSegment> publishedSegments = Stream.of(publishedSegment1,
-                                                         publishedSegment2,
-                                                         publishedSegment3,
-                                                         segment1,
-                                                         segment2).collect(Collectors.toSet());
+    final Set<SegmentWithOvershadowedStatus> publishedSegments = Stream.of(
+        new SegmentWithOvershadowedStatus(publishedSegment1, true),
+        new SegmentWithOvershadowedStatus(publishedSegment2, false),
+        new SegmentWithOvershadowedStatus(publishedSegment3, false),
+        new SegmentWithOvershadowedStatus(segment1, true),
+        new SegmentWithOvershadowedStatus(segment2, false)
+    ).collect(Collectors.toSet());
+
     EasyMock.expect(metadataView.getPublishedSegments()).andReturn(publishedSegments.iterator()).once();
 
     EasyMock.replay(client, request, responseHolder, responseHandler, metadataView);
@@ -463,7 +467,8 @@ public class SystemSchemaTest extends CalciteTestBase
         3L, //numRows
         1L, //is_published
         1L, //is_available
-        0L //is_realtime
+        0L, //is_realtime
+        1L //is_overshadowed
     );
 
     verifyRow(
@@ -475,7 +480,8 @@ public class SystemSchemaTest extends CalciteTestBase
         3L, //numRows
         1L, //is_published
         1L, //is_available
-        0L //is_realtime
+        0L, //is_realtime
+        0L //is_overshadowed
     );
 
     //segment test3 is unpublished and has a NumberedShardSpec with partitionNum = 2
@@ -488,7 +494,8 @@ public class SystemSchemaTest extends CalciteTestBase
         2L, //numRows
         0L, //is_published
         1L, //is_available
-        0L //is_realtime
+        0L, //is_realtime
+        0L //is_overshadowed
     );
 
     verifyRow(
@@ -500,7 +507,8 @@ public class SystemSchemaTest extends CalciteTestBase
         0L, //numRows
         0L, //is_published
         1L, //is_available
-        1L //is_realtime
+        1L, //is_realtime
+        0L //is_overshadowed
     );
 
     verifyRow(
@@ -512,7 +520,8 @@ public class SystemSchemaTest extends CalciteTestBase
         0L, //numRows
         0L, //is_published
         1L, //is_available
-        1L //is_realtime
+        1L, //is_realtime
+        0L //is_overshadowed
     );
 
     // wikipedia segments are published and unavailable, num_replicas is 0
@@ -525,7 +534,8 @@ public class SystemSchemaTest extends CalciteTestBase
         0L, //numRows
         1L, //is_published
         0L, //is_available
-        0L //is_realtime
+        0L, //is_realtime
+        1L //is_overshadowed
     );
 
     verifyRow(
@@ -537,7 +547,8 @@ public class SystemSchemaTest extends CalciteTestBase
         0L, //numRows
         1L, //is_published
         0L, //is_available
-        0L //is_realtime
+        0L, //is_realtime
+        0L //is_overshadowed
     );
 
     verifyRow(
@@ -549,7 +560,8 @@ public class SystemSchemaTest extends CalciteTestBase
         0L, //numRows
         1L, //is_published
         0L, //is_available
-        0L //is_realtime
+        0L, //is_realtime
+        0L //is_overshadowed
     );
 
     // Verify value types.
@@ -565,7 +577,8 @@ public class SystemSchemaTest extends CalciteTestBase
       long numRows,
       long isPublished,
       long isAvailable,
-      long isRealtime)
+      long isRealtime,
+      long isOvershadowed)
   {
     Assert.assertEquals(segmentId, row[0].toString());
     SegmentId id = Iterables.get(SegmentId.iterateAllPossibleParsings(segmentId), 0);
@@ -580,6 +593,7 @@ public class SystemSchemaTest extends CalciteTestBase
     Assert.assertEquals(isPublished, row[9]);
     Assert.assertEquals(isAvailable, row[10]);
     Assert.assertEquals(isRealtime, row[11]);
+    Assert.assertEquals(isOvershadowed, row[12]);
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org