You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2018/12/20 21:29:01 UTC
[GitHub] gianm closed pull request #6757: Set is_available to false by
default for published segment
gianm closed pull request #6757: Set is_available to false by default for published segment
URL: https://github.com/apache/incubator-druid/pull/6757
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
index 0e7e9734646..87d97ef29e9 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
@@ -50,7 +50,6 @@
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.coordination.DruidServerMetadata;
-import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.Escalator;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
@@ -339,10 +338,9 @@ private void addSegment(final DruidServerMetadata server, final DataSegment segm
if (knownSegments == null || !knownSegments.containsKey(segment)) {
// segmentReplicatable is used to determine if segments are served by realtime servers or not
final long isRealtime = server.segmentReplicatable() ? 0 : 1;
- final long isPublished = server.getType() == ServerType.HISTORICAL ? 1 : 0;
final SegmentMetadataHolder holder = new SegmentMetadataHolder.Builder(
segment.getIdentifier(),
- isPublished,
+ 0,
1,
isRealtime,
1
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
index 431f4b1730e..57e8825af46 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
@@ -250,7 +250,7 @@ public TableType getJdbcTableType()
try {
segmentsAlreadySeen.add(val.getIdentifier());
final PartialSegmentData partialSegmentData = partialSegmentDataMap.get(val.getIdentifier());
- long numReplicas = 0L, numRows = 0L, isRealtime = 0L, isAvailable = 1L;
+ long numReplicas = 0L, numRows = 0L, isRealtime = 0L, isAvailable = 0L;
if (partialSegmentData != null) {
numReplicas = partialSegmentData.getNumReplicas();
numRows = partialSegmentData.getNumRows();
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
index 9dc5a0e55f3..4020a8897b3 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
@@ -73,6 +73,7 @@
import org.apache.druid.sql.calcite.util.TestServerInventoryView;
import org.apache.druid.sql.calcite.view.NoopViewManager;
import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.easymock.EasyMock;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponse;
@@ -203,7 +204,7 @@ public Authorizer getAuthorizer(String name)
druidSchema = new DruidSchema(
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
- new TestServerInventoryView(walker.getSegments()),
+ new TestServerInventoryView(walker.getSegments(), realtimeSegments),
PLANNER_CONFIG_DEFAULT,
new NoopViewManager(),
new NoopEscalator()
@@ -251,7 +252,7 @@ public Authorizer getAuthorizer(String name)
null,
ImmutableList.of("dim1", "dim2"),
ImmutableList.of("met1", "met2"),
- null,
+ new NumberedShardSpec(2, 3),
1,
100L,
DataSegment.PruneLoadSpecHolder.DEFAULT
@@ -281,6 +282,8 @@ public Authorizer getAuthorizer(String name)
DataSegment.PruneLoadSpecHolder.DEFAULT
);
+ final List<DataSegment> realtimeSegments = ImmutableList.of(segment4, segment5);
+
private final ImmutableDruidServer druidServer1 = new ImmutableDruidServer(
new DruidServerMetadata("server1", "localhost:0000", null, 5L, ServerType.REALTIME, DruidServer.DEFAULT_TIER, 0),
1L,
@@ -329,11 +332,6 @@ public void testGetTableMap()
@Test
public void testSegmentsTable() throws Exception
{
- // total segments = 6
- // segments 1,2,3 are published and available
- // segments 4,5,6 are published but unavailable
- // segment 3 is published but not served
- // segment 2 is served by 2 servers, so num_replicas=2
final SystemSchema.SegmentsTable segmentsTable = EasyMock
.createMockBuilder(SystemSchema.SegmentsTable.class)
@@ -356,9 +354,9 @@ public void testSegmentsTable() throws Exception
.anyTimes();
AppendableByteArrayInputStream in = new AppendableByteArrayInputStream();
- //published but unavailable segments
+ //segments in metadata store : wikipedia1, wikipedia2, wikipedia3, test1, test2
final String json = "[{\n"
- + "\t\"dataSource\": \"wikipedia-kafka\",\n"
+ + "\t\"dataSource\": \"wikipedia1\",\n"
+ "\t\"interval\": \"2018-08-07T23:00:00.000Z/2018-08-08T00:00:00.000Z\",\n"
+ "\t\"version\": \"2018-08-07T23:00:00.059Z\",\n"
+ "\t\"loadSpec\": {\n"
@@ -376,7 +374,7 @@ public void testSegmentsTable() throws Exception
+ "\t\"size\": 47406,\n"
+ "\t\"identifier\": \"wikipedia-kafka_2018-08-07T23:00:00.000Z_2018-08-08T00:00:00.000Z_2018-08-07T23:00:00.059Z_51\"\n"
+ "}, {\n"
- + "\t\"dataSource\": \"wikipedia-kafka\",\n"
+ + "\t\"dataSource\": \"wikipedia2\",\n"
+ "\t\"interval\": \"2018-08-07T18:00:00.000Z/2018-08-07T19:00:00.000Z\",\n"
+ "\t\"version\": \"2018-08-07T18:00:00.117Z\",\n"
+ "\t\"loadSpec\": {\n"
@@ -394,7 +392,7 @@ public void testSegmentsTable() throws Exception
+ "\t\"size\": 83846,\n"
+ "\t\"identifier\": \"wikipedia-kafka_2018-08-07T18:00:00.000Z_2018-08-07T19:00:00.000Z_2018-08-07T18:00:00.117Z_9\"\n"
+ "}, {\n"
- + "\t\"dataSource\": \"wikipedia-kafka\",\n"
+ + "\t\"dataSource\": \"wikipedia3\",\n"
+ "\t\"interval\": \"2018-08-07T23:00:00.000Z/2018-08-08T00:00:00.000Z\",\n"
+ "\t\"version\": \"2018-08-07T23:00:00.059Z\",\n"
+ "\t\"loadSpec\": {\n"
@@ -411,6 +409,34 @@ public void testSegmentsTable() throws Exception
+ "\t\"binaryVersion\": 9,\n"
+ "\t\"size\": 53527,\n"
+ "\t\"identifier\": \"wikipedia-kafka_2018-08-07T23:00:00.000Z_2018-08-08T00:00:00.000Z_2018-08-07T23:00:00.059Z_50\"\n"
+ + "}, {\n"
+ + "\t\"dataSource\": \"test1\",\n"
+ + "\t\"interval\": \"2010-01-01T00:00:00.000Z/2011-01-01T00:00:00.000Z\",\n"
+ + "\t\"version\": \"version1\",\n"
+ + "\t\"loadSpec\": null,\n"
+ + "\t\"dimensions\": \"dim1,dim2\",\n"
+ + "\t\"metrics\": \"met1,met2\",\n"
+ + "\t\"shardSpec\": {\n"
+ + "\t\t\"type\": \"none\",\n"
+ + "\t\t\"domainDimensions\": []\n"
+ + "\t},\n"
+ + "\t\"binaryVersion\": 1,\n"
+ + "\t\"size\": 100,\n"
+ + "\t\"identifier\": \"test1_2010-01-01T00:00:00.000Z_2011-01-01T00:00:00.000Z_version1\"\n"
+ + "}, {\n"
+ + "\t\"dataSource\": \"test2\",\n"
+ + "\t\"interval\": \"2011-01-01T00:00:00.000Z/2012-01-01T00:00:00.000Z\",\n"
+ + "\t\"version\": \"version2\",\n"
+ + "\t\"loadSpec\": null,\n"
+ + "\t\"dimensions\": \"dim1,dim2\",\n"
+ + "\t\"metrics\": \"met1,met2\",\n"
+ + "\t\"shardSpec\": {\n"
+ + "\t\t\"type\": \"none\",\n"
+ + "\t\t\"domainDimensions\": []\n"
+ + "\t},\n"
+ + "\t\"binaryVersion\": 1,\n"
+ + "\t\"size\": 100,\n"
+ + "\t\"identifier\": \"test2_2011-01-01T00:00:00.000Z_2012-01-01T00:00:00.000Z_version2\"\n"
+ "}]";
byte[] bytesToWrite = json.getBytes(StandardCharsets.UTF_8);
in.add(bytesToWrite);
@@ -447,19 +473,137 @@ public Object get(String name)
final List<Object[]> rows = segmentsTable.scan(dataContext).toList();
- Assert.assertEquals(6, rows.size());
+ // total segments = 8
+ // segment wikipedia1, wikipedia2, wikipedia3 are published but unavailable
+ // segments test1, test2 are published and available
+ // segment test3 is served by historical but unpublished or unused
+ // segments test4, test5 are not published but available (realtime segments)
+
+ Assert.assertEquals(8, rows.size());
Object[] row0 = rows.get(0);
- //segment 6 is published and unavailable, num_replicas is 0
- Assert.assertEquals(1L, row0[9]);
- Assert.assertEquals(0L, row0[7]);
+ //segment 0 is published and unavailable, num_replicas is 0
+ Assert.assertEquals(
+ "wikipedia1_2018-08-07T23:00:00.000Z_2018-08-08T00:00:00.000Z_2018-08-07T23:00:00.059Z",
+ row0[0]
+ );
+ Assert.assertEquals("wikipedia1", row0[1]);
+ Assert.assertEquals("2018-08-07T23:00:00.000Z", row0[2]);
+ Assert.assertEquals("2018-08-08T00:00:00.000Z", row0[3]);
+ Assert.assertEquals(47406L, row0[4]);
+ Assert.assertEquals("2018-08-07T23:00:00.059Z", row0[5]);
+ Assert.assertEquals(0L, row0[6]); //partition_num
+ Assert.assertEquals(0L, row0[7]); //num_replicas
Assert.assertEquals(0L, row0[8]); //numRows = 0
+ Assert.assertEquals(1L, row0[9]); //is_published
+ Assert.assertEquals(0L, row0[10]); //is_available
+ Assert.assertEquals(0L, row0[11]); //is_realtime
+
+ Object[] row1 = rows.get(1);
+ Assert.assertEquals(
+ "wikipedia2_2018-08-07T18:00:00.000Z_2018-08-07T19:00:00.000Z_2018-08-07T18:00:00.117Z",
+ row1[0]
+ );
+ Assert.assertEquals("wikipedia2", row1[1]);
+ Assert.assertEquals("2018-08-07T18:00:00.000Z", row1[2]);
+ Assert.assertEquals("2018-08-07T19:00:00.000Z", row1[3]);
+ Assert.assertEquals(83846L, row1[4]);
+ Assert.assertEquals("2018-08-07T18:00:00.117Z", row1[5]);
+ Assert.assertEquals(0L, row1[6]); //partition_num
+ Assert.assertEquals(0L, row1[7]); //num_replicas
+ Assert.assertEquals(0L, row1[8]); //numRows = 0
+ Assert.assertEquals(1L, row1[9]); //is_published
+ Assert.assertEquals(0L, row1[10]); //is_available
+ Assert.assertEquals(0L, row1[11]); //is_realtime
+
+
+ Object[] row2 = rows.get(2);
+ Assert.assertEquals(
+ "wikipedia3_2018-08-07T23:00:00.000Z_2018-08-08T00:00:00.000Z_2018-08-07T23:00:00.059Z",
+ row2[0]
+ );
+ Assert.assertEquals("wikipedia3", row2[1]);
+ Assert.assertEquals("2018-08-07T23:00:00.000Z", row2[2]);
+ Assert.assertEquals("2018-08-08T00:00:00.000Z", row2[3]);
+ Assert.assertEquals(53527L, row2[4]);
+ Assert.assertEquals("2018-08-07T23:00:00.059Z", row2[5]);
+ Assert.assertEquals(0L, row2[6]); //partition_num
+ Assert.assertEquals(0L, row2[7]); //num_replicas
+ Assert.assertEquals(0L, row2[8]); //numRows = 0
+ Assert.assertEquals(1L, row2[9]); //is_published
+ Assert.assertEquals(0L, row2[10]); //is_available
+ Assert.assertEquals(0L, row2[11]); //is_realtime
+
+ Object[] row3 = rows.get(3);
+ Assert.assertEquals("test1_2010-01-01T00:00:00.000Z_2011-01-01T00:00:00.000Z_version1", row3[0]);
+ Assert.assertEquals("test1", row3[1]);
+ Assert.assertEquals("2010-01-01T00:00:00.000Z", row3[2]);
+ Assert.assertEquals("2011-01-01T00:00:00.000Z", row3[3]);
+ Assert.assertEquals(100L, row3[4]);
+ Assert.assertEquals("version1", row3[5]);
+ Assert.assertEquals(0L, row3[6]); //partition_num
+ Assert.assertEquals(1L, row3[7]); //num_replicas
+ Assert.assertEquals(3L, row3[8]); //numRows = 3
+ Assert.assertEquals(1L, row3[9]); //is_published
+ Assert.assertEquals(1L, row3[10]); //is_available
+ Assert.assertEquals(0L, row3[11]); //is_realtime
Object[] row4 = rows.get(4);
- //segment 2 is published and has 2 replicas
- Assert.assertEquals(1L, row4[9]);
- Assert.assertEquals(2L, row4[7]);
- Assert.assertEquals(3L, row4[8]); //numRows = 3
+ Assert.assertEquals("test2_2011-01-01T00:00:00.000Z_2012-01-01T00:00:00.000Z_version2", row4[0]);
+ Assert.assertEquals("test2", row4[1]);
+ Assert.assertEquals("2011-01-01T00:00:00.000Z", row4[2]);
+ Assert.assertEquals("2012-01-01T00:00:00.000Z", row4[3]);
+ Assert.assertEquals(100L, row4[4]);
+ Assert.assertEquals("version2", row4[5]);
+ Assert.assertEquals(0L, row4[6]); //partition_num
+ Assert.assertEquals(2L, row4[7]); //segment test2 is served by 2 servers, so num_replicas=2
+ Assert.assertEquals(3L, row4[8]); //numRows = 3
+ Assert.assertEquals(1L, row4[9]); //is_published
+ Assert.assertEquals(1L, row4[10]); //is_available
+ Assert.assertEquals(0L, row4[11]); //is_realtime
+
+ Object[] row5 = rows.get(5);
+ //segment test3 is unpublished and has a NumberedShardSpec with partitionNum = 2
+ Assert.assertEquals("test3_2012-01-01T00:00:00.000Z_2013-01-01T00:00:00.000Z_version3_2", row5[0]);
+ Assert.assertEquals("test3", row5[1]);
+ Assert.assertEquals("2012-01-01T00:00:00.000Z", row5[2]);
+ Assert.assertEquals("2013-01-01T00:00:00.000Z", row5[3]);
+ Assert.assertEquals(100L, row5[4]);
+ Assert.assertEquals("version3", row5[5]);
+ Assert.assertEquals(2L, row5[6]); //partition_num = 2
+ Assert.assertEquals(1L, row5[7]); //num_replicas
+ Assert.assertEquals(3L, row5[8]); //numRows = 3
+ Assert.assertEquals(0L, row5[9]); //is_published
+ Assert.assertEquals(1L, row5[10]); //is_available
+ Assert.assertEquals(0L, row5[11]); //is_realtime
+
+ Object[] row6 = rows.get(6);
+ Assert.assertEquals("test5_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version5", row6[0]);
+ Assert.assertEquals("test5", row6[1]);
+ Assert.assertEquals("2017-01-01T00:00:00.000Z", row6[2]);
+ Assert.assertEquals("2018-01-01T00:00:00.000Z", row6[3]);
+ Assert.assertEquals(100L, row6[4]);
+ Assert.assertEquals("version5", row6[5]);
+ Assert.assertEquals(0L, row6[6]); //partition_num
+ Assert.assertEquals(1L, row6[7]); //num_replicas
+ Assert.assertEquals(0L, row6[8]); //numRows = 0
+ Assert.assertEquals(0L, row6[9]); //is_published
+ Assert.assertEquals(1L, row6[10]); //is_available
+ Assert.assertEquals(1L, row6[11]); //is_realtime
+
+ Object[] row7 = rows.get(7);
+ Assert.assertEquals("test4_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version4", row7[0]);
+ Assert.assertEquals("test4", row7[1]);
+ Assert.assertEquals("2017-01-01T00:00:00.000Z", row7[2]);
+ Assert.assertEquals("2018-01-01T00:00:00.000Z", row7[3]);
+ Assert.assertEquals(100L, row7[4]);
+ Assert.assertEquals("version4", row7[5]);
+ Assert.assertEquals(0L, row7[6]); //partition_num
+ Assert.assertEquals(1L, row7[7]); //num_replicas
+ Assert.assertEquals(0L, row7[8]); //numRows
+ Assert.assertEquals(0L, row7[9]); //is_published
+ Assert.assertEquals(1L, row7[10]); //is_available
+ Assert.assertEquals(1L, row7[11]); //is_realtime
// Verify value types.
verifyTypes(rows, SystemSchema.SEGMENTS_SIGNATURE);
@@ -576,7 +720,7 @@ public Object get(String name)
Object[] row2 = rows.get(2);
Assert.assertEquals("server2:1234", row2[0]);
- Assert.assertEquals("test3_2012-01-01T00:00:00.000Z_2013-01-01T00:00:00.000Z_version3", row2[1]);
+ Assert.assertEquals("test3_2012-01-01T00:00:00.000Z_2013-01-01T00:00:00.000Z_version3_2", row2[1]);
Object[] row3 = rows.get(3);
Assert.assertEquals("server2:1234", row3[0]);
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java
index 363d595f38d..1b52ba982fa 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java
@@ -32,6 +32,7 @@
import org.apache.druid.timeline.TimelineLookup;
import javax.annotation.Nullable;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
@@ -49,13 +50,29 @@
"dummy",
0
);
+ private static final DruidServerMetadata DUMMY_SERVER_REALTIME = new DruidServerMetadata(
+ "dummy",
+ "dummy",
+ null,
+ 0,
+ ServerType.REALTIME,
+ "dummy",
+ 0
+ );
private final List<DataSegment> segments;
+ private List<DataSegment> realtimeSegments = new ArrayList<>();
public TestServerInventoryView(List<DataSegment> segments)
{
this.segments = ImmutableList.copyOf(segments);
}
+ public TestServerInventoryView(List<DataSegment> segments, List<DataSegment> realtimeSegments)
+ {
+ this.segments = ImmutableList.copyOf(segments);
+ this.realtimeSegments = ImmutableList.copyOf(realtimeSegments);
+ }
+
@Override
public TimelineLookup<String, ServerSelector> getTimeline(DataSource dataSource)
{
@@ -75,7 +92,9 @@ public void registerSegmentCallback(Executor exec, final SegmentCallback callbac
for (final DataSegment segment : segments) {
exec.execute(() -> callback.segmentAdded(DUMMY_SERVER, segment));
}
-
+ for (final DataSegment segment : realtimeSegments) {
+ exec.execute(() -> callback.segmentAdded(DUMMY_SERVER_REALTIME, segment));
+ }
exec.execute(callback::segmentViewInitialized);
}
@@ -85,7 +104,9 @@ public void registerTimelineCallback(final Executor exec, final TimelineCallback
for (DataSegment segment : segments) {
exec.execute(() -> callback.segmentAdded(DUMMY_SERVER, segment));
}
-
+ for (final DataSegment segment : realtimeSegments) {
+ exec.execute(() -> callback.segmentAdded(DUMMY_SERVER_REALTIME, segment));
+ }
exec.execute(callback::timelineInitialized);
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org