You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2018/12/20 21:29:01 UTC

[GitHub] gianm closed pull request #6757: Set is_available to false by default for published segment

gianm closed pull request #6757: Set is_available to false by default for published segment
URL: https://github.com/apache/incubator-druid/pull/6757
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
index 0e7e9734646..87d97ef29e9 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
@@ -50,7 +50,6 @@
 import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.server.QueryLifecycleFactory;
 import org.apache.druid.server.coordination.DruidServerMetadata;
-import org.apache.druid.server.coordination.ServerType;
 import org.apache.druid.server.security.AuthenticationResult;
 import org.apache.druid.server.security.Escalator;
 import org.apache.druid.sql.calcite.planner.PlannerConfig;
@@ -339,10 +338,9 @@ private void addSegment(final DruidServerMetadata server, final DataSegment segm
       if (knownSegments == null || !knownSegments.containsKey(segment)) {
         // segmentReplicatable is used to determine if segments are served by realtime servers or not
         final long isRealtime = server.segmentReplicatable() ? 0 : 1;
-        final long isPublished = server.getType() == ServerType.HISTORICAL ? 1 : 0;
         final SegmentMetadataHolder holder = new SegmentMetadataHolder.Builder(
             segment.getIdentifier(),
-            isPublished,
+            0,
             1,
             isRealtime,
             1
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
index 431f4b1730e..57e8825af46 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
@@ -250,7 +250,7 @@ public TableType getJdbcTableType()
             try {
               segmentsAlreadySeen.add(val.getIdentifier());
               final PartialSegmentData partialSegmentData = partialSegmentDataMap.get(val.getIdentifier());
-              long numReplicas = 0L, numRows = 0L, isRealtime = 0L, isAvailable = 1L;
+              long numReplicas = 0L, numRows = 0L, isRealtime = 0L, isAvailable = 0L;
               if (partialSegmentData != null) {
                 numReplicas = partialSegmentData.getNumReplicas();
                 numRows = partialSegmentData.getNumRows();
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
index 9dc5a0e55f3..4020a8897b3 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
@@ -73,6 +73,7 @@
 import org.apache.druid.sql.calcite.util.TestServerInventoryView;
 import org.apache.druid.sql.calcite.view.NoopViewManager;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.easymock.EasyMock;
 import org.jboss.netty.handler.codec.http.HttpMethod;
 import org.jboss.netty.handler.codec.http.HttpResponse;
@@ -203,7 +204,7 @@ public Authorizer getAuthorizer(String name)
 
     druidSchema = new DruidSchema(
         CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
-        new TestServerInventoryView(walker.getSegments()),
+        new TestServerInventoryView(walker.getSegments(), realtimeSegments),
         PLANNER_CONFIG_DEFAULT,
         new NoopViewManager(),
         new NoopEscalator()
@@ -251,7 +252,7 @@ public Authorizer getAuthorizer(String name)
       null,
       ImmutableList.of("dim1", "dim2"),
       ImmutableList.of("met1", "met2"),
-      null,
+      new NumberedShardSpec(2, 3),
       1,
       100L,
       DataSegment.PruneLoadSpecHolder.DEFAULT
@@ -281,6 +282,8 @@ public Authorizer getAuthorizer(String name)
       DataSegment.PruneLoadSpecHolder.DEFAULT
   );
 
+  final List<DataSegment> realtimeSegments = ImmutableList.of(segment4, segment5);
+
   private final ImmutableDruidServer druidServer1 = new ImmutableDruidServer(
       new DruidServerMetadata("server1", "localhost:0000", null, 5L, ServerType.REALTIME, DruidServer.DEFAULT_TIER, 0),
       1L,
@@ -329,11 +332,6 @@ public void testGetTableMap()
   @Test
   public void testSegmentsTable() throws Exception
   {
-    // total segments = 6
-    // segments 1,2,3 are published and available
-    // segments 4,5,6  are published but unavailable
-    // segment 3 is published but not served
-    // segment 2 is served by 2 servers, so num_replicas=2
 
     final SystemSchema.SegmentsTable segmentsTable = EasyMock
         .createMockBuilder(SystemSchema.SegmentsTable.class)
@@ -356,9 +354,9 @@ public void testSegmentsTable() throws Exception
         .anyTimes();
 
     AppendableByteArrayInputStream in = new AppendableByteArrayInputStream();
-    //published but unavailable segments
+    //segments in metadata store : wikipedia1, wikipedia2, wikipedia3, test1, test2
     final String json = "[{\n"
-                        + "\t\"dataSource\": \"wikipedia-kafka\",\n"
+                        + "\t\"dataSource\": \"wikipedia1\",\n"
                         + "\t\"interval\": \"2018-08-07T23:00:00.000Z/2018-08-08T00:00:00.000Z\",\n"
                         + "\t\"version\": \"2018-08-07T23:00:00.059Z\",\n"
                         + "\t\"loadSpec\": {\n"
@@ -376,7 +374,7 @@ public void testSegmentsTable() throws Exception
                         + "\t\"size\": 47406,\n"
                         + "\t\"identifier\": \"wikipedia-kafka_2018-08-07T23:00:00.000Z_2018-08-08T00:00:00.000Z_2018-08-07T23:00:00.059Z_51\"\n"
                         + "}, {\n"
-                        + "\t\"dataSource\": \"wikipedia-kafka\",\n"
+                        + "\t\"dataSource\": \"wikipedia2\",\n"
                         + "\t\"interval\": \"2018-08-07T18:00:00.000Z/2018-08-07T19:00:00.000Z\",\n"
                         + "\t\"version\": \"2018-08-07T18:00:00.117Z\",\n"
                         + "\t\"loadSpec\": {\n"
@@ -394,7 +392,7 @@ public void testSegmentsTable() throws Exception
                         + "\t\"size\": 83846,\n"
                         + "\t\"identifier\": \"wikipedia-kafka_2018-08-07T18:00:00.000Z_2018-08-07T19:00:00.000Z_2018-08-07T18:00:00.117Z_9\"\n"
                         + "}, {\n"
-                        + "\t\"dataSource\": \"wikipedia-kafka\",\n"
+                        + "\t\"dataSource\": \"wikipedia3\",\n"
                         + "\t\"interval\": \"2018-08-07T23:00:00.000Z/2018-08-08T00:00:00.000Z\",\n"
                         + "\t\"version\": \"2018-08-07T23:00:00.059Z\",\n"
                         + "\t\"loadSpec\": {\n"
@@ -411,6 +409,34 @@ public void testSegmentsTable() throws Exception
                         + "\t\"binaryVersion\": 9,\n"
                         + "\t\"size\": 53527,\n"
                         + "\t\"identifier\": \"wikipedia-kafka_2018-08-07T23:00:00.000Z_2018-08-08T00:00:00.000Z_2018-08-07T23:00:00.059Z_50\"\n"
+                        + "}, {\n"
+                        + "\t\"dataSource\": \"test1\",\n"
+                        + "\t\"interval\": \"2010-01-01T00:00:00.000Z/2011-01-01T00:00:00.000Z\",\n"
+                        + "\t\"version\": \"version1\",\n"
+                        + "\t\"loadSpec\": null,\n"
+                        + "\t\"dimensions\": \"dim1,dim2\",\n"
+                        + "\t\"metrics\": \"met1,met2\",\n"
+                        + "\t\"shardSpec\": {\n"
+                        + "\t\t\"type\": \"none\",\n"
+                        + "\t\t\"domainDimensions\": []\n"
+                        + "\t},\n"
+                        + "\t\"binaryVersion\": 1,\n"
+                        + "\t\"size\": 100,\n"
+                        + "\t\"identifier\": \"test1_2010-01-01T00:00:00.000Z_2011-01-01T00:00:00.000Z_version1\"\n"
+                        + "}, {\n"
+                        + "\t\"dataSource\": \"test2\",\n"
+                        + "\t\"interval\": \"2011-01-01T00:00:00.000Z/2012-01-01T00:00:00.000Z\",\n"
+                        + "\t\"version\": \"version2\",\n"
+                        + "\t\"loadSpec\": null,\n"
+                        + "\t\"dimensions\": \"dim1,dim2\",\n"
+                        + "\t\"metrics\": \"met1,met2\",\n"
+                        + "\t\"shardSpec\": {\n"
+                        + "\t\t\"type\": \"none\",\n"
+                        + "\t\t\"domainDimensions\": []\n"
+                        + "\t},\n"
+                        + "\t\"binaryVersion\": 1,\n"
+                        + "\t\"size\": 100,\n"
+                        + "\t\"identifier\": \"test2_2011-01-01T00:00:00.000Z_2012-01-01T00:00:00.000Z_version2\"\n"
                         + "}]";
     byte[] bytesToWrite = json.getBytes(StandardCharsets.UTF_8);
     in.add(bytesToWrite);
@@ -447,19 +473,137 @@ public Object get(String name)
 
     final List<Object[]> rows = segmentsTable.scan(dataContext).toList();
 
-    Assert.assertEquals(6, rows.size());
+    // total segments = 8
+    // segment wikipedia1, wikipedia2, wikipedia3 are published but unavailable
+    // segments test1, test2  are published and available
+    // segment test3 is served by historical but unpublished or unused
+    // segments test4, test5 are not published but available (realtime segments)
+
+    Assert.assertEquals(8, rows.size());
 
     Object[] row0 = rows.get(0);
-    //segment 6 is published and unavailable, num_replicas is 0
-    Assert.assertEquals(1L, row0[9]);
-    Assert.assertEquals(0L, row0[7]);
+    //segment 0 is published and unavailable, num_replicas is 0
+    Assert.assertEquals(
+        "wikipedia1_2018-08-07T23:00:00.000Z_2018-08-08T00:00:00.000Z_2018-08-07T23:00:00.059Z",
+        row0[0]
+    );
+    Assert.assertEquals("wikipedia1", row0[1]);
+    Assert.assertEquals("2018-08-07T23:00:00.000Z", row0[2]);
+    Assert.assertEquals("2018-08-08T00:00:00.000Z", row0[3]);
+    Assert.assertEquals(47406L, row0[4]);
+    Assert.assertEquals("2018-08-07T23:00:00.059Z", row0[5]);
+    Assert.assertEquals(0L, row0[6]); //partition_num
+    Assert.assertEquals(0L, row0[7]); //num_replicas
     Assert.assertEquals(0L, row0[8]); //numRows = 0
+    Assert.assertEquals(1L, row0[9]); //is_published
+    Assert.assertEquals(0L, row0[10]); //is_available
+    Assert.assertEquals(0L, row0[11]); //is_realtime
+
+    Object[] row1 = rows.get(1);
+    Assert.assertEquals(
+        "wikipedia2_2018-08-07T18:00:00.000Z_2018-08-07T19:00:00.000Z_2018-08-07T18:00:00.117Z",
+        row1[0]
+    );
+    Assert.assertEquals("wikipedia2", row1[1]);
+    Assert.assertEquals("2018-08-07T18:00:00.000Z", row1[2]);
+    Assert.assertEquals("2018-08-07T19:00:00.000Z", row1[3]);
+    Assert.assertEquals(83846L, row1[4]);
+    Assert.assertEquals("2018-08-07T18:00:00.117Z", row1[5]);
+    Assert.assertEquals(0L, row1[6]); //partition_num
+    Assert.assertEquals(0L, row1[7]); //num_replicas
+    Assert.assertEquals(0L, row1[8]); //numRows = 0
+    Assert.assertEquals(1L, row1[9]); //is_published
+    Assert.assertEquals(0L, row1[10]); //is_available
+    Assert.assertEquals(0L, row1[11]); //is_realtime
+
+
+    Object[] row2 = rows.get(2);
+    Assert.assertEquals(
+        "wikipedia3_2018-08-07T23:00:00.000Z_2018-08-08T00:00:00.000Z_2018-08-07T23:00:00.059Z",
+        row2[0]
+    );
+    Assert.assertEquals("wikipedia3", row2[1]);
+    Assert.assertEquals("2018-08-07T23:00:00.000Z", row2[2]);
+    Assert.assertEquals("2018-08-08T00:00:00.000Z", row2[3]);
+    Assert.assertEquals(53527L, row2[4]);
+    Assert.assertEquals("2018-08-07T23:00:00.059Z", row2[5]);
+    Assert.assertEquals(0L, row2[6]); //partition_num
+    Assert.assertEquals(0L, row2[7]); //num_replicas
+    Assert.assertEquals(0L, row2[8]); //numRows = 0
+    Assert.assertEquals(1L, row2[9]); //is_published
+    Assert.assertEquals(0L, row2[10]); //is_available
+    Assert.assertEquals(0L, row2[11]); //is_realtime
+
+    Object[] row3 = rows.get(3);
+    Assert.assertEquals("test1_2010-01-01T00:00:00.000Z_2011-01-01T00:00:00.000Z_version1", row3[0]);
+    Assert.assertEquals("test1", row3[1]);
+    Assert.assertEquals("2010-01-01T00:00:00.000Z", row3[2]);
+    Assert.assertEquals("2011-01-01T00:00:00.000Z", row3[3]);
+    Assert.assertEquals(100L, row3[4]);
+    Assert.assertEquals("version1", row3[5]);
+    Assert.assertEquals(0L, row3[6]); //partition_num
+    Assert.assertEquals(1L, row3[7]); //num_replicas
+    Assert.assertEquals(3L, row3[8]); //numRows = 3
+    Assert.assertEquals(1L, row3[9]); //is_published
+    Assert.assertEquals(1L, row3[10]); //is_available
+    Assert.assertEquals(0L, row3[11]); //is_realtime
 
     Object[] row4 = rows.get(4);
-    //segment 2 is published and has 2 replicas
-    Assert.assertEquals(1L, row4[9]);
-    Assert.assertEquals(2L, row4[7]);
-    Assert.assertEquals(3L, row4[8]);  //numRows = 3
+    Assert.assertEquals("test2_2011-01-01T00:00:00.000Z_2012-01-01T00:00:00.000Z_version2", row4[0]);
+    Assert.assertEquals("test2", row4[1]);
+    Assert.assertEquals("2011-01-01T00:00:00.000Z", row4[2]);
+    Assert.assertEquals("2012-01-01T00:00:00.000Z", row4[3]);
+    Assert.assertEquals(100L, row4[4]);
+    Assert.assertEquals("version2", row4[5]);
+    Assert.assertEquals(0L, row4[6]); //partition_num
+    Assert.assertEquals(2L, row4[7]); //segment test2 is served by 2 servers, so num_replicas=2
+    Assert.assertEquals(3L, row4[8]); //numRows = 3
+    Assert.assertEquals(1L, row4[9]); //is_published
+    Assert.assertEquals(1L, row4[10]); //is_available
+    Assert.assertEquals(0L, row4[11]); //is_realtime
+
+    Object[] row5 = rows.get(5);
+    //segment test3 is unpublished and has a NumberedShardSpec with partitionNum = 2
+    Assert.assertEquals("test3_2012-01-01T00:00:00.000Z_2013-01-01T00:00:00.000Z_version3_2", row5[0]);
+    Assert.assertEquals("test3", row5[1]);
+    Assert.assertEquals("2012-01-01T00:00:00.000Z", row5[2]);
+    Assert.assertEquals("2013-01-01T00:00:00.000Z", row5[3]);
+    Assert.assertEquals(100L, row5[4]);
+    Assert.assertEquals("version3", row5[5]);
+    Assert.assertEquals(2L, row5[6]); //partition_num = 2
+    Assert.assertEquals(1L, row5[7]); //num_replicas
+    Assert.assertEquals(3L, row5[8]); //numRows = 3
+    Assert.assertEquals(0L, row5[9]); //is_published
+    Assert.assertEquals(1L, row5[10]); //is_available
+    Assert.assertEquals(0L, row5[11]); //is_realtime
+
+    Object[] row6 = rows.get(6);
+    Assert.assertEquals("test5_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version5", row6[0]);
+    Assert.assertEquals("test5", row6[1]);
+    Assert.assertEquals("2017-01-01T00:00:00.000Z", row6[2]);
+    Assert.assertEquals("2018-01-01T00:00:00.000Z", row6[3]);
+    Assert.assertEquals(100L, row6[4]);
+    Assert.assertEquals("version5", row6[5]);
+    Assert.assertEquals(0L, row6[6]); //partition_num
+    Assert.assertEquals(1L, row6[7]); //num_replicas
+    Assert.assertEquals(0L, row6[8]); //numRows = 0
+    Assert.assertEquals(0L, row6[9]); //is_published
+    Assert.assertEquals(1L, row6[10]); //is_available
+    Assert.assertEquals(1L, row6[11]); //is_realtime
+
+    Object[] row7 = rows.get(7);
+    Assert.assertEquals("test4_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version4", row7[0]);
+    Assert.assertEquals("test4", row7[1]);
+    Assert.assertEquals("2017-01-01T00:00:00.000Z", row7[2]);
+    Assert.assertEquals("2018-01-01T00:00:00.000Z", row7[3]);
+    Assert.assertEquals(100L, row7[4]);
+    Assert.assertEquals("version4", row7[5]);
+    Assert.assertEquals(0L, row7[6]); //partition_num
+    Assert.assertEquals(1L, row7[7]); //num_replicas
+    Assert.assertEquals(0L, row7[8]); //numRows
+    Assert.assertEquals(0L, row7[9]); //is_published
+    Assert.assertEquals(1L, row7[10]); //is_available
+    Assert.assertEquals(1L, row7[11]); //is_realtime
 
     // Verify value types.
     verifyTypes(rows, SystemSchema.SEGMENTS_SIGNATURE);
@@ -576,7 +720,7 @@ public Object get(String name)
 
     Object[] row2 = rows.get(2);
     Assert.assertEquals("server2:1234", row2[0]);
-    Assert.assertEquals("test3_2012-01-01T00:00:00.000Z_2013-01-01T00:00:00.000Z_version3", row2[1]);
+    Assert.assertEquals("test3_2012-01-01T00:00:00.000Z_2013-01-01T00:00:00.000Z_version3_2", row2[1]);
 
     Object[] row3 = rows.get(3);
     Assert.assertEquals("server2:1234", row3[0]);
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java
index 363d595f38d..1b52ba982fa 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java
@@ -32,6 +32,7 @@
 import org.apache.druid.timeline.TimelineLookup;
 
 import javax.annotation.Nullable;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Executor;
 
@@ -49,13 +50,29 @@
       "dummy",
       0
   );
+  private static final DruidServerMetadata DUMMY_SERVER_REALTIME = new DruidServerMetadata(
+      "dummy",
+      "dummy",
+      null,
+      0,
+      ServerType.REALTIME,
+      "dummy",
+      0
+  );
   private final List<DataSegment> segments;
+  private List<DataSegment> realtimeSegments = new ArrayList<>();
 
   public TestServerInventoryView(List<DataSegment> segments)
   {
     this.segments = ImmutableList.copyOf(segments);
   }
 
+  public TestServerInventoryView(List<DataSegment> segments, List<DataSegment> realtimeSegments)
+  {
+    this.segments = ImmutableList.copyOf(segments);
+    this.realtimeSegments = ImmutableList.copyOf(realtimeSegments);
+  }
+
   @Override
   public TimelineLookup<String, ServerSelector> getTimeline(DataSource dataSource)
   {
@@ -75,7 +92,9 @@ public void registerSegmentCallback(Executor exec, final SegmentCallback callbac
     for (final DataSegment segment : segments) {
       exec.execute(() -> callback.segmentAdded(DUMMY_SERVER, segment));
     }
-
+    for (final DataSegment segment : realtimeSegments) {
+      exec.execute(() -> callback.segmentAdded(DUMMY_SERVER_REALTIME, segment));
+    }
     exec.execute(callback::segmentViewInitialized);
   }
 
@@ -85,7 +104,9 @@ public void registerTimelineCallback(final Executor exec, final TimelineCallback
     for (DataSegment segment : segments) {
       exec.execute(() -> callback.segmentAdded(DUMMY_SERVER, segment));
     }
-
+    for (final DataSegment segment : realtimeSegments) {
+      exec.execute(() -> callback.segmentAdded(DUMMY_SERVER_REALTIME, segment));
+    }
     exec.execute(callback::timelineInitialized);
   }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org