You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2019/01/15 16:31:36 UTC

[incubator-druid] branch master updated: Fix num_replicas count in sys.segments table (#6804)

This is an automated email from the ASF dual-hosted git repository.

fjy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new f72f33f  Fix num_replicas count in sys.segments table (#6804)
f72f33f is described below

commit f72f33f84aadfbf505ee9313ec96a62cf6ef8b74
Author: Surekha <su...@imply.io>
AuthorDate: Tue Jan 15 08:31:29 2019 -0800

    Fix num_replicas count in sys.segments table (#6804)
    
    * Fix num_replicas count from sys.segments
    
    * Adjust unit test for num_replica > 1
    
    * Pass named arguments instead of passing boolean constants
    
    * Address PR comments
    
    * PR comments
---
 .../org/apache/druid/client/BrokerServerView.java  |  2 +
 .../apache/druid/client/TimelineServerView.java    | 11 ++++
 .../druid/sql/calcite/schema/DruidSchema.java      | 60 +++++++++++++++++++---
 .../sql/calcite/schema/SegmentMetadataHolder.java  | 26 ++++++----
 .../druid/sql/calcite/schema/SystemSchema.java     |  2 +-
 .../druid/sql/calcite/schema/SystemSchemaTest.java |  6 +--
 .../sql/calcite/util/TestServerInventoryView.java  |  4 +-
 7 files changed, 90 insertions(+), 21 deletions(-)

diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java b/server/src/main/java/org/apache/druid/client/BrokerServerView.java
index 0b28a1b..3747a9a 100644
--- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java
+++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java
@@ -277,6 +277,8 @@ public class BrokerServerView implements TimelineServerView
             server,
             segmentId
         );
+      } else {
+        runTimelineCallbacks(callback -> callback.serverSegmentRemoved(server, segment));
       }
 
       if (selector.isEmpty()) {
diff --git a/server/src/main/java/org/apache/druid/client/TimelineServerView.java b/server/src/main/java/org/apache/druid/client/TimelineServerView.java
index 89042c6..ed1d4df 100644
--- a/server/src/main/java/org/apache/druid/client/TimelineServerView.java
+++ b/server/src/main/java/org/apache/druid/client/TimelineServerView.java
@@ -81,5 +81,16 @@ public interface TimelineServerView extends ServerView
      * @return continue or unregister
      */
     CallbackAction segmentRemoved(DataSegment segment);
+
+    /**
+     * Called when a segment is removed from a server. Note that the timeline can still have the segment, even though it's removed from given server.
+     * {@link #segmentRemoved(DataSegment)} is the authority on when segment is removed from the timeline.
+     *
+     * @param server  The server that removed a segment
+     * @param segment The segment that was removed
+     *
+     * @return continue or unregister
+     */
+    CallbackAction serverSegmentRemoved(DruidServerMetadata server, DataSegment segment);
   }
 }
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
index 4e1bb09..b2b2b67 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
@@ -20,8 +20,11 @@
 package org.apache.druid.sql.calcite.schema;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Predicates;
+import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
@@ -88,6 +91,8 @@ public class DruidSchema extends AbstractSchema
 
   private static final EmittingLogger log = new EmittingLogger(DruidSchema.class);
   private static final int MAX_SEGMENTS_PER_QUERY = 15000;
+  private static final long IS_PUBLISHED = 0;
+  private static final long IS_AVAILABLE = 1;
 
   private final QueryLifecycleFactory queryLifecycleFactory;
   private final PlannerConfig config;
@@ -168,6 +173,16 @@ public class DruidSchema extends AbstractSchema
             removeSegment(segment);
             return ServerView.CallbackAction.CONTINUE;
           }
+
+          @Override
+          public ServerView.CallbackAction serverSegmentRemoved(
+              final DruidServerMetadata server,
+              final DataSegment segment
+          )
+          {
+            removeServerSegment(server, segment);
+            return ServerView.CallbackAction.CONTINUE;
+          }
         }
     );
   }
@@ -338,12 +353,18 @@ public class DruidSchema extends AbstractSchema
       if (knownSegments == null || !knownSegments.containsKey(segment)) {
         // segmentReplicatable is used to determine if segments are served by realtime servers or not
         final long isRealtime = server.segmentReplicatable() ? 0 : 1;
+
+        final Map<String, Set<String>> serverSegmentMap = ImmutableMap.of(
+            segment.getIdentifier(),
+            ImmutableSet.of(server.getName())
+        );
+
         final SegmentMetadataHolder holder = new SegmentMetadataHolder.Builder(
             segment.getIdentifier(),
-            0,
-            1,
+            IS_PUBLISHED,
+            IS_AVAILABLE,
             isRealtime,
-            1
+            serverSegmentMap
         ).build();
         // Unknown segment.
         setSegmentSignature(segment, holder);
@@ -357,13 +378,18 @@ public class DruidSchema extends AbstractSchema
       } else {
         if (knownSegments.containsKey(segment)) {
           final SegmentMetadataHolder holder = knownSegments.get(segment);
+          final Map<String, Set<String>> segmentServerMap = holder.getReplicas();
+          final ImmutableSet<String> servers = new ImmutableSet.Builder<String>()
+              .addAll(segmentServerMap.get(segment.getIdentifier()))
+              .add(server.getName())
+              .build();
           final SegmentMetadataHolder holderWithNumReplicas = new SegmentMetadataHolder.Builder(
               holder.getSegmentId(),
               holder.isPublished(),
               holder.isAvailable(),
               holder.isRealtime(),
-              holder.getNumReplicas()
-          ).withNumReplicas(holder.getNumReplicas() + 1).build();
+              holder.getReplicas()
+          ).withReplicas(ImmutableMap.of(segment.getIdentifier(), servers)).build();
           knownSegments.put(segment, holderWithNumReplicas);
         }
         if (server.segmentReplicatable()) {
@@ -403,6 +429,28 @@ public class DruidSchema extends AbstractSchema
     }
   }
 
+  private void removeServerSegment(final DruidServerMetadata server, final DataSegment segment)
+  {
+    synchronized (lock) {
+      log.debug("Segment[%s] is gone from server[%s]", segment.getIdentifier(), server.getName());
+      final Map<DataSegment, SegmentMetadataHolder> knownSegments = segmentMetadataInfo.get(segment.getDataSource());
+      final SegmentMetadataHolder holder = knownSegments.get(segment);
+      final Map<String, Set<String>> segmentServerMap = holder.getReplicas();
+      final ImmutableSet<String> servers = FluentIterable.from(segmentServerMap.get(segment.getIdentifier()))
+                                                         .filter(Predicates.not(Predicates.equalTo(server.getName())))
+                                                         .toSet();
+      final SegmentMetadataHolder holderWithNumReplicas = new SegmentMetadataHolder.Builder(
+          holder.getSegmentId(),
+          holder.isPublished(),
+          holder.isAvailable(),
+          holder.isRealtime(),
+          holder.getReplicas()
+      ).withReplicas(ImmutableMap.of(segment.getIdentifier(), servers)).build();
+      knownSegments.put(segment, holderWithNumReplicas);
+      lock.notifyAll();
+    }
+  }
+
   /**
    * Attempt to refresh "segmentSignatures" for a set of segments. Returns the set of segments actually refreshed,
    * which may be a subset of the asked-for set.
@@ -475,7 +523,7 @@ public class DruidSchema extends AbstractSchema
                 holder.isPublished(),
                 holder.isAvailable(),
                 holder.isRealtime(),
-                holder.getNumReplicas()
+                holder.getReplicas()
             ).withRowSignature(rowSignature).withNumRows(analysis.getNumRows()).build();
             dataSourceSegments.put(segment, updatedHolder);
             setSegmentSignature(segment, updatedHolder);
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java
index b2666e1..34a5af7 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java
@@ -22,6 +22,8 @@ package org.apache.druid.sql.calcite.schema;
 import org.apache.druid.sql.calcite.table.RowSignature;
 
 import javax.annotation.Nullable;
+import java.util.Map;
+import java.util.Set;
 
 /**
  * Immutable representation of RowSignature and other segment attributes needed by {@link SystemSchema.SegmentsTable}
@@ -36,7 +38,8 @@ public class SegmentMetadataHolder
   private final long isAvailable;
   private final long isRealtime;
   private final String segmentId;
-  private final long numReplicas;
+  //segmentId -> set of servers that contain the segment
+  private final Map<String, Set<String>> segmentServerMap;
   private final long numRows;
   @Nullable
   private final RowSignature rowSignature;
@@ -47,7 +50,7 @@ public class SegmentMetadataHolder
     this.isPublished = builder.isPublished;
     this.isAvailable = builder.isAvailable;
     this.isRealtime = builder.isRealtime;
-    this.numReplicas = builder.numReplicas;
+    this.segmentServerMap = builder.segmentServerMap;
     this.numRows = builder.numRows;
     this.segmentId = builder.segmentId;
   }
@@ -72,9 +75,14 @@ public class SegmentMetadataHolder
     return segmentId;
   }
 
-  public long getNumReplicas()
+  public Map<String, Set<String>> getReplicas()
   {
-    return numReplicas;
+    return segmentServerMap;
+  }
+
+  public long getNumReplicas(String segmentId)
+  {
+    return segmentServerMap.get(segmentId).size();
   }
 
   public long getNumRows()
@@ -95,7 +103,7 @@ public class SegmentMetadataHolder
     private final long isAvailable;
     private final long isRealtime;
 
-    private long numReplicas;
+    private Map<String, Set<String>> segmentServerMap;
     @Nullable
     private RowSignature rowSignature;
     private long numRows;
@@ -105,14 +113,14 @@ public class SegmentMetadataHolder
         long isPublished,
         long isAvailable,
         long isRealtime,
-        long numReplicas
+        Map<String, Set<String>> segmentServerMap
     )
     {
       this.segmentId = segmentId;
       this.isPublished = isPublished;
       this.isAvailable = isAvailable;
       this.isRealtime = isRealtime;
-      this.numReplicas = numReplicas;
+      this.segmentServerMap = segmentServerMap;
     }
 
     public Builder withRowSignature(RowSignature rowSignature)
@@ -127,9 +135,9 @@ public class SegmentMetadataHolder
       return this;
     }
 
-    public Builder withNumReplicas(long numReplicas)
+    public Builder withReplicas(Map<String, Set<String>> segmentServerMap)
     {
-      this.numReplicas = numReplicas;
+      this.segmentServerMap = segmentServerMap;
       return this;
     }
 
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
index 57e8825..7a62a0b 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
@@ -229,7 +229,7 @@ public class SystemSchema extends AbstractSchema
       final Map<String, PartialSegmentData> partialSegmentDataMap = availableSegmentMetadata.values().stream().collect(
           Collectors.toMap(
               SegmentMetadataHolder::getSegmentId,
-              h -> new PartialSegmentData(h.isAvailable(), h.isRealtime(), h.getNumReplicas(), h.getNumRows())
+              h -> new PartialSegmentData(h.isAvailable(), h.isRealtime(), h.getNumReplicas(h.getSegmentId()), h.getNumRows())
           ));
 
       //get published segments from coordinator
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
index 4020a88..840fed8 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
@@ -199,7 +199,6 @@ public class SystemSchemaTest extends CalciteTestBase
     walker = new SpecificSegmentsQuerySegmentWalker(conglomerate)
         .add(segment1, index1)
         .add(segment2, index2)
-        .add(segment2, index2)
         .add(segment3, index2);
 
     druidSchema = new DruidSchema(
@@ -282,7 +281,7 @@ public class SystemSchemaTest extends CalciteTestBase
       DataSegment.PruneLoadSpecHolder.DEFAULT
   );
 
-  final List<DataSegment> realtimeSegments = ImmutableList.of(segment4, segment5);
+  final List<DataSegment> realtimeSegments = ImmutableList.of(segment2, segment4, segment5);
 
   private final ImmutableDruidServer druidServer1 = new ImmutableDruidServer(
       new DruidServerMetadata("server1", "localhost:0000", null, 5L, ServerType.REALTIME, DruidServer.DEFAULT_TIER, 0),
@@ -478,6 +477,7 @@ public class SystemSchemaTest extends CalciteTestBase
     // segments test1, test2  are published and available
     // segment test3 is served by historical but unpublished or unused
     // segments test4, test5 are not published but available (realtime segments)
+    // segment test2 is both published and served by a realtime server.
 
     Assert.assertEquals(8, rows.size());
 
@@ -556,7 +556,7 @@ public class SystemSchemaTest extends CalciteTestBase
     Assert.assertEquals(100L, row4[4]);
     Assert.assertEquals("version2", row4[5]);
     Assert.assertEquals(0L, row4[6]); //partition_num
-    Assert.assertEquals(2L, row4[7]); //segment test2 is served by 2 servers, so num_replicas=2
+    Assert.assertEquals(2L, row4[7]); //segment test2 is served by historical and realtime servers
     Assert.assertEquals(3L, row4[8]); //numRows = 3
     Assert.assertEquals(1L, row4[9]); //is_published
     Assert.assertEquals(1L, row4[10]); //is_available
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java
index 1b52ba9..6718b1b 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java
@@ -51,8 +51,8 @@ public class TestServerInventoryView implements TimelineServerView
       0
   );
   private static final DruidServerMetadata DUMMY_SERVER_REALTIME = new DruidServerMetadata(
-      "dummy",
-      "dummy",
+      "dummy2",
+      "dummy2",
       null,
       0,
       ServerType.REALTIME,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org