You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2018/12/20 21:29:06 UTC

[incubator-druid] branch master updated: Set is_available to false by default for published segment (#6757)

This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e5aad4  Set is_available to false by default for published segment (#6757)
5e5aad4 is described below

commit 5e5aad49e639457f4d82862e92a901f7fa9282f5
Author: Surekha <su...@imply.io>
AuthorDate: Thu Dec 20 13:29:00 2018 -0800

    Set is_available to false by default for published segment (#6757)
    
    * Set is_available to false by default for published segment
    
    * Address comments
    
    Fix the is_published value for segments not in metadata store
    
    * Remove unused import
    
    * Use non-null sharSpec for a segment in test
    
    * Fix checkstyle
    
    * Modify comment
---
 .../druid/sql/calcite/schema/DruidSchema.java      |   4 +-
 .../druid/sql/calcite/schema/SystemSchema.java     |   2 +-
 .../druid/sql/calcite/schema/SystemSchemaTest.java | 184 ++++++++++++++++++---
 .../sql/calcite/util/TestServerInventoryView.java  |  25 ++-
 4 files changed, 189 insertions(+), 26 deletions(-)

diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
index 0e7e973..87d97ef 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
@@ -50,7 +50,6 @@ import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
 import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.server.QueryLifecycleFactory;
 import org.apache.druid.server.coordination.DruidServerMetadata;
-import org.apache.druid.server.coordination.ServerType;
 import org.apache.druid.server.security.AuthenticationResult;
 import org.apache.druid.server.security.Escalator;
 import org.apache.druid.sql.calcite.planner.PlannerConfig;
@@ -339,10 +338,9 @@ public class DruidSchema extends AbstractSchema
       if (knownSegments == null || !knownSegments.containsKey(segment)) {
         // segmentReplicatable is used to determine if segments are served by realtime servers or not
         final long isRealtime = server.segmentReplicatable() ? 0 : 1;
-        final long isPublished = server.getType() == ServerType.HISTORICAL ? 1 : 0;
         final SegmentMetadataHolder holder = new SegmentMetadataHolder.Builder(
             segment.getIdentifier(),
-            isPublished,
+            0,
             1,
             isRealtime,
             1
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
index 431f4b1..57e8825 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
@@ -250,7 +250,7 @@ public class SystemSchema extends AbstractSchema
             try {
               segmentsAlreadySeen.add(val.getIdentifier());
               final PartialSegmentData partialSegmentData = partialSegmentDataMap.get(val.getIdentifier());
-              long numReplicas = 0L, numRows = 0L, isRealtime = 0L, isAvailable = 1L;
+              long numReplicas = 0L, numRows = 0L, isRealtime = 0L, isAvailable = 0L;
               if (partialSegmentData != null) {
                 numReplicas = partialSegmentData.getNumReplicas();
                 numRows = partialSegmentData.getNumRows();
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
index 9dc5a0e..4020a88 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
@@ -73,6 +73,7 @@ import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
 import org.apache.druid.sql.calcite.util.TestServerInventoryView;
 import org.apache.druid.sql.calcite.view.NoopViewManager;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.easymock.EasyMock;
 import org.jboss.netty.handler.codec.http.HttpMethod;
 import org.jboss.netty.handler.codec.http.HttpResponse;
@@ -203,7 +204,7 @@ public class SystemSchemaTest extends CalciteTestBase
 
     druidSchema = new DruidSchema(
         CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
-        new TestServerInventoryView(walker.getSegments()),
+        new TestServerInventoryView(walker.getSegments(), realtimeSegments),
         PLANNER_CONFIG_DEFAULT,
         new NoopViewManager(),
         new NoopEscalator()
@@ -251,7 +252,7 @@ public class SystemSchemaTest extends CalciteTestBase
       null,
       ImmutableList.of("dim1", "dim2"),
       ImmutableList.of("met1", "met2"),
-      null,
+      new NumberedShardSpec(2, 3),
       1,
       100L,
       DataSegment.PruneLoadSpecHolder.DEFAULT
@@ -281,6 +282,8 @@ public class SystemSchemaTest extends CalciteTestBase
       DataSegment.PruneLoadSpecHolder.DEFAULT
   );
 
+  final List<DataSegment> realtimeSegments = ImmutableList.of(segment4, segment5);
+
   private final ImmutableDruidServer druidServer1 = new ImmutableDruidServer(
       new DruidServerMetadata("server1", "localhost:0000", null, 5L, ServerType.REALTIME, DruidServer.DEFAULT_TIER, 0),
       1L,
@@ -329,11 +332,6 @@ public class SystemSchemaTest extends CalciteTestBase
   @Test
   public void testSegmentsTable() throws Exception
   {
-    // total segments = 6
-    // segments 1,2,3 are published and available
-    // segments 4,5,6  are published but unavailable
-    // segment 3 is published but not served
-    // segment 2 is served by 2 servers, so num_replicas=2
 
     final SystemSchema.SegmentsTable segmentsTable = EasyMock
         .createMockBuilder(SystemSchema.SegmentsTable.class)
@@ -356,9 +354,9 @@ public class SystemSchemaTest extends CalciteTestBase
         .anyTimes();
 
     AppendableByteArrayInputStream in = new AppendableByteArrayInputStream();
-    //published but unavailable segments
+    //segments in metadata store : wikipedia1, wikipedia2, wikipedia3, test1, test2
     final String json = "[{\n"
-                        + "\t\"dataSource\": \"wikipedia-kafka\",\n"
+                        + "\t\"dataSource\": \"wikipedia1\",\n"
                         + "\t\"interval\": \"2018-08-07T23:00:00.000Z/2018-08-08T00:00:00.000Z\",\n"
                         + "\t\"version\": \"2018-08-07T23:00:00.059Z\",\n"
                         + "\t\"loadSpec\": {\n"
@@ -376,7 +374,7 @@ public class SystemSchemaTest extends CalciteTestBase
                         + "\t\"size\": 47406,\n"
                         + "\t\"identifier\": \"wikipedia-kafka_2018-08-07T23:00:00.000Z_2018-08-08T00:00:00.000Z_2018-08-07T23:00:00.059Z_51\"\n"
                         + "}, {\n"
-                        + "\t\"dataSource\": \"wikipedia-kafka\",\n"
+                        + "\t\"dataSource\": \"wikipedia2\",\n"
                         + "\t\"interval\": \"2018-08-07T18:00:00.000Z/2018-08-07T19:00:00.000Z\",\n"
                         + "\t\"version\": \"2018-08-07T18:00:00.117Z\",\n"
                         + "\t\"loadSpec\": {\n"
@@ -394,7 +392,7 @@ public class SystemSchemaTest extends CalciteTestBase
                         + "\t\"size\": 83846,\n"
                         + "\t\"identifier\": \"wikipedia-kafka_2018-08-07T18:00:00.000Z_2018-08-07T19:00:00.000Z_2018-08-07T18:00:00.117Z_9\"\n"
                         + "}, {\n"
-                        + "\t\"dataSource\": \"wikipedia-kafka\",\n"
+                        + "\t\"dataSource\": \"wikipedia3\",\n"
                         + "\t\"interval\": \"2018-08-07T23:00:00.000Z/2018-08-08T00:00:00.000Z\",\n"
                         + "\t\"version\": \"2018-08-07T23:00:00.059Z\",\n"
                         + "\t\"loadSpec\": {\n"
@@ -411,6 +409,34 @@ public class SystemSchemaTest extends CalciteTestBase
                         + "\t\"binaryVersion\": 9,\n"
                         + "\t\"size\": 53527,\n"
                         + "\t\"identifier\": \"wikipedia-kafka_2018-08-07T23:00:00.000Z_2018-08-08T00:00:00.000Z_2018-08-07T23:00:00.059Z_50\"\n"
+                        + "}, {\n"
+                        + "\t\"dataSource\": \"test1\",\n"
+                        + "\t\"interval\": \"2010-01-01T00:00:00.000Z/2011-01-01T00:00:00.000Z\",\n"
+                        + "\t\"version\": \"version1\",\n"
+                        + "\t\"loadSpec\": null,\n"
+                        + "\t\"dimensions\": \"dim1,dim2\",\n"
+                        + "\t\"metrics\": \"met1,met2\",\n"
+                        + "\t\"shardSpec\": {\n"
+                        + "\t\t\"type\": \"none\",\n"
+                        + "\t\t\"domainDimensions\": []\n"
+                        + "\t},\n"
+                        + "\t\"binaryVersion\": 1,\n"
+                        + "\t\"size\": 100,\n"
+                        + "\t\"identifier\": \"test1_2010-01-01T00:00:00.000Z_2011-01-01T00:00:00.000Z_version1\"\n"
+                        + "}, {\n"
+                        + "\t\"dataSource\": \"test2\",\n"
+                        + "\t\"interval\": \"2011-01-01T00:00:00.000Z/2012-01-01T00:00:00.000Z\",\n"
+                        + "\t\"version\": \"version2\",\n"
+                        + "\t\"loadSpec\": null,\n"
+                        + "\t\"dimensions\": \"dim1,dim2\",\n"
+                        + "\t\"metrics\": \"met1,met2\",\n"
+                        + "\t\"shardSpec\": {\n"
+                        + "\t\t\"type\": \"none\",\n"
+                        + "\t\t\"domainDimensions\": []\n"
+                        + "\t},\n"
+                        + "\t\"binaryVersion\": 1,\n"
+                        + "\t\"size\": 100,\n"
+                        + "\t\"identifier\": \"test2_2011-01-01T00:00:00.000Z_2012-01-01T00:00:00.000Z_version2\"\n"
                         + "}]";
     byte[] bytesToWrite = json.getBytes(StandardCharsets.UTF_8);
     in.add(bytesToWrite);
@@ -447,19 +473,137 @@ public class SystemSchemaTest extends CalciteTestBase
 
     final List<Object[]> rows = segmentsTable.scan(dataContext).toList();
 
-    Assert.assertEquals(6, rows.size());
+    // total segments = 8
+    // segment wikipedia1, wikipedia2, wikipedia3 are published but unavailable
+    // segments test1, test2  are published and available
+    // segment test3 is served by historical but unpublished or unused
+    // segments test4, test5 are not published but available (realtime segments)
+
+    Assert.assertEquals(8, rows.size());
 
     Object[] row0 = rows.get(0);
-    //segment 6 is published and unavailable, num_replicas is 0
-    Assert.assertEquals(1L, row0[9]);
-    Assert.assertEquals(0L, row0[7]);
+    //segment 0 is published and unavailable, num_replicas is 0
+    Assert.assertEquals(
+        "wikipedia1_2018-08-07T23:00:00.000Z_2018-08-08T00:00:00.000Z_2018-08-07T23:00:00.059Z",
+        row0[0]
+    );
+    Assert.assertEquals("wikipedia1", row0[1]);
+    Assert.assertEquals("2018-08-07T23:00:00.000Z", row0[2]);
+    Assert.assertEquals("2018-08-08T00:00:00.000Z", row0[3]);
+    Assert.assertEquals(47406L, row0[4]);
+    Assert.assertEquals("2018-08-07T23:00:00.059Z", row0[5]);
+    Assert.assertEquals(0L, row0[6]); //partition_num
+    Assert.assertEquals(0L, row0[7]); //num_replicas
     Assert.assertEquals(0L, row0[8]); //numRows = 0
+    Assert.assertEquals(1L, row0[9]); //is_published
+    Assert.assertEquals(0L, row0[10]); //is_available
+    Assert.assertEquals(0L, row0[11]); //is_realtime
+
+    Object[] row1 = rows.get(1);
+    Assert.assertEquals(
+        "wikipedia2_2018-08-07T18:00:00.000Z_2018-08-07T19:00:00.000Z_2018-08-07T18:00:00.117Z",
+        row1[0]
+    );
+    Assert.assertEquals("wikipedia2", row1[1]);
+    Assert.assertEquals("2018-08-07T18:00:00.000Z", row1[2]);
+    Assert.assertEquals("2018-08-07T19:00:00.000Z", row1[3]);
+    Assert.assertEquals(83846L, row1[4]);
+    Assert.assertEquals("2018-08-07T18:00:00.117Z", row1[5]);
+    Assert.assertEquals(0L, row1[6]); //partition_num
+    Assert.assertEquals(0L, row1[7]); //num_replicas
+    Assert.assertEquals(0L, row1[8]); //numRows = 0
+    Assert.assertEquals(1L, row1[9]); //is_published
+    Assert.assertEquals(0L, row1[10]); //is_available
+    Assert.assertEquals(0L, row1[11]); //is_realtime
+
+
+    Object[] row2 = rows.get(2);
+    Assert.assertEquals(
+        "wikipedia3_2018-08-07T23:00:00.000Z_2018-08-08T00:00:00.000Z_2018-08-07T23:00:00.059Z",
+        row2[0]
+    );
+    Assert.assertEquals("wikipedia3", row2[1]);
+    Assert.assertEquals("2018-08-07T23:00:00.000Z", row2[2]);
+    Assert.assertEquals("2018-08-08T00:00:00.000Z", row2[3]);
+    Assert.assertEquals(53527L, row2[4]);
+    Assert.assertEquals("2018-08-07T23:00:00.059Z", row2[5]);
+    Assert.assertEquals(0L, row2[6]); //partition_num
+    Assert.assertEquals(0L, row2[7]); //num_replicas
+    Assert.assertEquals(0L, row2[8]); //numRows = 0
+    Assert.assertEquals(1L, row2[9]); //is_published
+    Assert.assertEquals(0L, row2[10]); //is_available
+    Assert.assertEquals(0L, row2[11]); //is_realtime
+
+    Object[] row3 = rows.get(3);
+    Assert.assertEquals("test1_2010-01-01T00:00:00.000Z_2011-01-01T00:00:00.000Z_version1", row3[0]);
+    Assert.assertEquals("test1", row3[1]);
+    Assert.assertEquals("2010-01-01T00:00:00.000Z", row3[2]);
+    Assert.assertEquals("2011-01-01T00:00:00.000Z", row3[3]);
+    Assert.assertEquals(100L, row3[4]);
+    Assert.assertEquals("version1", row3[5]);
+    Assert.assertEquals(0L, row3[6]); //partition_num
+    Assert.assertEquals(1L, row3[7]); //num_replicas
+    Assert.assertEquals(3L, row3[8]); //numRows = 3
+    Assert.assertEquals(1L, row3[9]); //is_published
+    Assert.assertEquals(1L, row3[10]); //is_available
+    Assert.assertEquals(0L, row3[11]); //is_realtime
 
     Object[] row4 = rows.get(4);
-    //segment 2 is published and has 2 replicas
-    Assert.assertEquals(1L, row4[9]);
-    Assert.assertEquals(2L, row4[7]);
-    Assert.assertEquals(3L, row4[8]);  //numRows = 3
+    Assert.assertEquals("test2_2011-01-01T00:00:00.000Z_2012-01-01T00:00:00.000Z_version2", row4[0]);
+    Assert.assertEquals("test2", row4[1]);
+    Assert.assertEquals("2011-01-01T00:00:00.000Z", row4[2]);
+    Assert.assertEquals("2012-01-01T00:00:00.000Z", row4[3]);
+    Assert.assertEquals(100L, row4[4]);
+    Assert.assertEquals("version2", row4[5]);
+    Assert.assertEquals(0L, row4[6]); //partition_num
+    Assert.assertEquals(2L, row4[7]); //segment test2 is served by 2 servers, so num_replicas=2
+    Assert.assertEquals(3L, row4[8]); //numRows = 3
+    Assert.assertEquals(1L, row4[9]); //is_published
+    Assert.assertEquals(1L, row4[10]); //is_available
+    Assert.assertEquals(0L, row4[11]); //is_realtime
+
+    Object[] row5 = rows.get(5);
+    //segment test3 is unpublished and has a NumberedShardSpec with partitionNum = 2
+    Assert.assertEquals("test3_2012-01-01T00:00:00.000Z_2013-01-01T00:00:00.000Z_version3_2", row5[0]);
+    Assert.assertEquals("test3", row5[1]);
+    Assert.assertEquals("2012-01-01T00:00:00.000Z", row5[2]);
+    Assert.assertEquals("2013-01-01T00:00:00.000Z", row5[3]);
+    Assert.assertEquals(100L, row5[4]);
+    Assert.assertEquals("version3", row5[5]);
+    Assert.assertEquals(2L, row5[6]); //partition_num = 2
+    Assert.assertEquals(1L, row5[7]); //num_replicas
+    Assert.assertEquals(3L, row5[8]); //numRows = 3
+    Assert.assertEquals(0L, row5[9]); //is_published
+    Assert.assertEquals(1L, row5[10]); //is_available
+    Assert.assertEquals(0L, row5[11]); //is_realtime
+
+    Object[] row6 = rows.get(6);
+    Assert.assertEquals("test5_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version5", row6[0]);
+    Assert.assertEquals("test5", row6[1]);
+    Assert.assertEquals("2017-01-01T00:00:00.000Z", row6[2]);
+    Assert.assertEquals("2018-01-01T00:00:00.000Z", row6[3]);
+    Assert.assertEquals(100L, row6[4]);
+    Assert.assertEquals("version5", row6[5]);
+    Assert.assertEquals(0L, row6[6]); //partition_num
+    Assert.assertEquals(1L, row6[7]); //num_replicas
+    Assert.assertEquals(0L, row6[8]); //numRows = 0
+    Assert.assertEquals(0L, row6[9]); //is_published
+    Assert.assertEquals(1L, row6[10]); //is_available
+    Assert.assertEquals(1L, row6[11]); //is_realtime
+
+    Object[] row7 = rows.get(7);
+    Assert.assertEquals("test4_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version4", row7[0]);
+    Assert.assertEquals("test4", row7[1]);
+    Assert.assertEquals("2017-01-01T00:00:00.000Z", row7[2]);
+    Assert.assertEquals("2018-01-01T00:00:00.000Z", row7[3]);
+    Assert.assertEquals(100L, row7[4]);
+    Assert.assertEquals("version4", row7[5]);
+    Assert.assertEquals(0L, row7[6]); //partition_num
+    Assert.assertEquals(1L, row7[7]); //num_replicas
+    Assert.assertEquals(0L, row7[8]); //numRows
+    Assert.assertEquals(0L, row7[9]); //is_published
+    Assert.assertEquals(1L, row7[10]); //is_available
+    Assert.assertEquals(1L, row7[11]); //is_realtime
 
     // Verify value types.
     verifyTypes(rows, SystemSchema.SEGMENTS_SIGNATURE);
@@ -576,7 +720,7 @@ public class SystemSchemaTest extends CalciteTestBase
 
     Object[] row2 = rows.get(2);
     Assert.assertEquals("server2:1234", row2[0]);
-    Assert.assertEquals("test3_2012-01-01T00:00:00.000Z_2013-01-01T00:00:00.000Z_version3", row2[1]);
+    Assert.assertEquals("test3_2012-01-01T00:00:00.000Z_2013-01-01T00:00:00.000Z_version3_2", row2[1]);
 
     Object[] row3 = rows.get(3);
     Assert.assertEquals("server2:1234", row3[0]);
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java
index 363d595..1b52ba9 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java
@@ -32,6 +32,7 @@ import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.TimelineLookup;
 
 import javax.annotation.Nullable;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Executor;
 
@@ -49,13 +50,29 @@ public class TestServerInventoryView implements TimelineServerView
       "dummy",
       0
   );
+  private static final DruidServerMetadata DUMMY_SERVER_REALTIME = new DruidServerMetadata(
+      "dummy",
+      "dummy",
+      null,
+      0,
+      ServerType.REALTIME,
+      "dummy",
+      0
+  );
   private final List<DataSegment> segments;
+  private List<DataSegment> realtimeSegments = new ArrayList<>();
 
   public TestServerInventoryView(List<DataSegment> segments)
   {
     this.segments = ImmutableList.copyOf(segments);
   }
 
+  public TestServerInventoryView(List<DataSegment> segments, List<DataSegment> realtimeSegments)
+  {
+    this.segments = ImmutableList.copyOf(segments);
+    this.realtimeSegments = ImmutableList.copyOf(realtimeSegments);
+  }
+
   @Override
   public TimelineLookup<String, ServerSelector> getTimeline(DataSource dataSource)
   {
@@ -75,7 +92,9 @@ public class TestServerInventoryView implements TimelineServerView
     for (final DataSegment segment : segments) {
       exec.execute(() -> callback.segmentAdded(DUMMY_SERVER, segment));
     }
-
+    for (final DataSegment segment : realtimeSegments) {
+      exec.execute(() -> callback.segmentAdded(DUMMY_SERVER_REALTIME, segment));
+    }
     exec.execute(callback::segmentViewInitialized);
   }
 
@@ -85,7 +104,9 @@ public class TestServerInventoryView implements TimelineServerView
     for (DataSegment segment : segments) {
       exec.execute(() -> callback.segmentAdded(DUMMY_SERVER, segment));
     }
-
+    for (final DataSegment segment : realtimeSegments) {
+      exec.execute(() -> callback.segmentAdded(DUMMY_SERVER_REALTIME, segment));
+    }
     exec.execute(callback::timelineInitialized);
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org