You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2018/12/20 21:29:06 UTC
[incubator-druid] branch master updated: Set is_available to false
by default for published segment (#6757)
This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 5e5aad4 Set is_available to false by default for published segment (#6757)
5e5aad4 is described below
commit 5e5aad49e639457f4d82862e92a901f7fa9282f5
Author: Surekha <su...@imply.io>
AuthorDate: Thu Dec 20 13:29:00 2018 -0800
Set is_available to false by default for published segment (#6757)
* Set is_available to false by default for published segment
* Address comments
Fix the is_published value for segments not in metadata store
* Remove unused import
* Use non-null sharSpec for a segment in test
* Fix checkstyle
* Modify comment
---
.../druid/sql/calcite/schema/DruidSchema.java | 4 +-
.../druid/sql/calcite/schema/SystemSchema.java | 2 +-
.../druid/sql/calcite/schema/SystemSchemaTest.java | 184 ++++++++++++++++++---
.../sql/calcite/util/TestServerInventoryView.java | 25 ++-
4 files changed, 189 insertions(+), 26 deletions(-)
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
index 0e7e973..87d97ef 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
@@ -50,7 +50,6 @@ import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.coordination.DruidServerMetadata;
-import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.Escalator;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
@@ -339,10 +338,9 @@ public class DruidSchema extends AbstractSchema
if (knownSegments == null || !knownSegments.containsKey(segment)) {
// segmentReplicatable is used to determine if segments are served by realtime servers or not
final long isRealtime = server.segmentReplicatable() ? 0 : 1;
- final long isPublished = server.getType() == ServerType.HISTORICAL ? 1 : 0;
final SegmentMetadataHolder holder = new SegmentMetadataHolder.Builder(
segment.getIdentifier(),
- isPublished,
+ 0,
1,
isRealtime,
1
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
index 431f4b1..57e8825 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
@@ -250,7 +250,7 @@ public class SystemSchema extends AbstractSchema
try {
segmentsAlreadySeen.add(val.getIdentifier());
final PartialSegmentData partialSegmentData = partialSegmentDataMap.get(val.getIdentifier());
- long numReplicas = 0L, numRows = 0L, isRealtime = 0L, isAvailable = 1L;
+ long numReplicas = 0L, numRows = 0L, isRealtime = 0L, isAvailable = 0L;
if (partialSegmentData != null) {
numReplicas = partialSegmentData.getNumReplicas();
numRows = partialSegmentData.getNumRows();
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
index 9dc5a0e..4020a88 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
@@ -73,6 +73,7 @@ import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
import org.apache.druid.sql.calcite.util.TestServerInventoryView;
import org.apache.druid.sql.calcite.view.NoopViewManager;
import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.easymock.EasyMock;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponse;
@@ -203,7 +204,7 @@ public class SystemSchemaTest extends CalciteTestBase
druidSchema = new DruidSchema(
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
- new TestServerInventoryView(walker.getSegments()),
+ new TestServerInventoryView(walker.getSegments(), realtimeSegments),
PLANNER_CONFIG_DEFAULT,
new NoopViewManager(),
new NoopEscalator()
@@ -251,7 +252,7 @@ public class SystemSchemaTest extends CalciteTestBase
null,
ImmutableList.of("dim1", "dim2"),
ImmutableList.of("met1", "met2"),
- null,
+ new NumberedShardSpec(2, 3),
1,
100L,
DataSegment.PruneLoadSpecHolder.DEFAULT
@@ -281,6 +282,8 @@ public class SystemSchemaTest extends CalciteTestBase
DataSegment.PruneLoadSpecHolder.DEFAULT
);
+ final List<DataSegment> realtimeSegments = ImmutableList.of(segment4, segment5);
+
private final ImmutableDruidServer druidServer1 = new ImmutableDruidServer(
new DruidServerMetadata("server1", "localhost:0000", null, 5L, ServerType.REALTIME, DruidServer.DEFAULT_TIER, 0),
1L,
@@ -329,11 +332,6 @@ public class SystemSchemaTest extends CalciteTestBase
@Test
public void testSegmentsTable() throws Exception
{
- // total segments = 6
- // segments 1,2,3 are published and available
- // segments 4,5,6 are published but unavailable
- // segment 3 is published but not served
- // segment 2 is served by 2 servers, so num_replicas=2
final SystemSchema.SegmentsTable segmentsTable = EasyMock
.createMockBuilder(SystemSchema.SegmentsTable.class)
@@ -356,9 +354,9 @@ public class SystemSchemaTest extends CalciteTestBase
.anyTimes();
AppendableByteArrayInputStream in = new AppendableByteArrayInputStream();
- //published but unavailable segments
+ //segments in metadata store : wikipedia1, wikipedia2, wikipedia3, test1, test2
final String json = "[{\n"
- + "\t\"dataSource\": \"wikipedia-kafka\",\n"
+ + "\t\"dataSource\": \"wikipedia1\",\n"
+ "\t\"interval\": \"2018-08-07T23:00:00.000Z/2018-08-08T00:00:00.000Z\",\n"
+ "\t\"version\": \"2018-08-07T23:00:00.059Z\",\n"
+ "\t\"loadSpec\": {\n"
@@ -376,7 +374,7 @@ public class SystemSchemaTest extends CalciteTestBase
+ "\t\"size\": 47406,\n"
+ "\t\"identifier\": \"wikipedia-kafka_2018-08-07T23:00:00.000Z_2018-08-08T00:00:00.000Z_2018-08-07T23:00:00.059Z_51\"\n"
+ "}, {\n"
- + "\t\"dataSource\": \"wikipedia-kafka\",\n"
+ + "\t\"dataSource\": \"wikipedia2\",\n"
+ "\t\"interval\": \"2018-08-07T18:00:00.000Z/2018-08-07T19:00:00.000Z\",\n"
+ "\t\"version\": \"2018-08-07T18:00:00.117Z\",\n"
+ "\t\"loadSpec\": {\n"
@@ -394,7 +392,7 @@ public class SystemSchemaTest extends CalciteTestBase
+ "\t\"size\": 83846,\n"
+ "\t\"identifier\": \"wikipedia-kafka_2018-08-07T18:00:00.000Z_2018-08-07T19:00:00.000Z_2018-08-07T18:00:00.117Z_9\"\n"
+ "}, {\n"
- + "\t\"dataSource\": \"wikipedia-kafka\",\n"
+ + "\t\"dataSource\": \"wikipedia3\",\n"
+ "\t\"interval\": \"2018-08-07T23:00:00.000Z/2018-08-08T00:00:00.000Z\",\n"
+ "\t\"version\": \"2018-08-07T23:00:00.059Z\",\n"
+ "\t\"loadSpec\": {\n"
@@ -411,6 +409,34 @@ public class SystemSchemaTest extends CalciteTestBase
+ "\t\"binaryVersion\": 9,\n"
+ "\t\"size\": 53527,\n"
+ "\t\"identifier\": \"wikipedia-kafka_2018-08-07T23:00:00.000Z_2018-08-08T00:00:00.000Z_2018-08-07T23:00:00.059Z_50\"\n"
+ + "}, {\n"
+ + "\t\"dataSource\": \"test1\",\n"
+ + "\t\"interval\": \"2010-01-01T00:00:00.000Z/2011-01-01T00:00:00.000Z\",\n"
+ + "\t\"version\": \"version1\",\n"
+ + "\t\"loadSpec\": null,\n"
+ + "\t\"dimensions\": \"dim1,dim2\",\n"
+ + "\t\"metrics\": \"met1,met2\",\n"
+ + "\t\"shardSpec\": {\n"
+ + "\t\t\"type\": \"none\",\n"
+ + "\t\t\"domainDimensions\": []\n"
+ + "\t},\n"
+ + "\t\"binaryVersion\": 1,\n"
+ + "\t\"size\": 100,\n"
+ + "\t\"identifier\": \"test1_2010-01-01T00:00:00.000Z_2011-01-01T00:00:00.000Z_version1\"\n"
+ + "}, {\n"
+ + "\t\"dataSource\": \"test2\",\n"
+ + "\t\"interval\": \"2011-01-01T00:00:00.000Z/2012-01-01T00:00:00.000Z\",\n"
+ + "\t\"version\": \"version2\",\n"
+ + "\t\"loadSpec\": null,\n"
+ + "\t\"dimensions\": \"dim1,dim2\",\n"
+ + "\t\"metrics\": \"met1,met2\",\n"
+ + "\t\"shardSpec\": {\n"
+ + "\t\t\"type\": \"none\",\n"
+ + "\t\t\"domainDimensions\": []\n"
+ + "\t},\n"
+ + "\t\"binaryVersion\": 1,\n"
+ + "\t\"size\": 100,\n"
+ + "\t\"identifier\": \"test2_2011-01-01T00:00:00.000Z_2012-01-01T00:00:00.000Z_version2\"\n"
+ "}]";
byte[] bytesToWrite = json.getBytes(StandardCharsets.UTF_8);
in.add(bytesToWrite);
@@ -447,19 +473,137 @@ public class SystemSchemaTest extends CalciteTestBase
final List<Object[]> rows = segmentsTable.scan(dataContext).toList();
- Assert.assertEquals(6, rows.size());
+ // total segments = 8
+ // segment wikipedia1, wikipedia2, wikipedia3 are published but unavailable
+ // segments test1, test2 are published and available
+ // segment test3 is served by historical but unpublished or unused
+ // segments test4, test5 are not published but available (realtime segments)
+
+ Assert.assertEquals(8, rows.size());
Object[] row0 = rows.get(0);
- //segment 6 is published and unavailable, num_replicas is 0
- Assert.assertEquals(1L, row0[9]);
- Assert.assertEquals(0L, row0[7]);
+ //segment 0 is published and unavailable, num_replicas is 0
+ Assert.assertEquals(
+ "wikipedia1_2018-08-07T23:00:00.000Z_2018-08-08T00:00:00.000Z_2018-08-07T23:00:00.059Z",
+ row0[0]
+ );
+ Assert.assertEquals("wikipedia1", row0[1]);
+ Assert.assertEquals("2018-08-07T23:00:00.000Z", row0[2]);
+ Assert.assertEquals("2018-08-08T00:00:00.000Z", row0[3]);
+ Assert.assertEquals(47406L, row0[4]);
+ Assert.assertEquals("2018-08-07T23:00:00.059Z", row0[5]);
+ Assert.assertEquals(0L, row0[6]); //partition_num
+ Assert.assertEquals(0L, row0[7]); //num_replicas
Assert.assertEquals(0L, row0[8]); //numRows = 0
+ Assert.assertEquals(1L, row0[9]); //is_published
+ Assert.assertEquals(0L, row0[10]); //is_available
+ Assert.assertEquals(0L, row0[11]); //is_realtime
+
+ Object[] row1 = rows.get(1);
+ Assert.assertEquals(
+ "wikipedia2_2018-08-07T18:00:00.000Z_2018-08-07T19:00:00.000Z_2018-08-07T18:00:00.117Z",
+ row1[0]
+ );
+ Assert.assertEquals("wikipedia2", row1[1]);
+ Assert.assertEquals("2018-08-07T18:00:00.000Z", row1[2]);
+ Assert.assertEquals("2018-08-07T19:00:00.000Z", row1[3]);
+ Assert.assertEquals(83846L, row1[4]);
+ Assert.assertEquals("2018-08-07T18:00:00.117Z", row1[5]);
+ Assert.assertEquals(0L, row1[6]); //partition_num
+ Assert.assertEquals(0L, row1[7]); //num_replicas
+ Assert.assertEquals(0L, row1[8]); //numRows = 0
+ Assert.assertEquals(1L, row1[9]); //is_published
+ Assert.assertEquals(0L, row1[10]); //is_available
+ Assert.assertEquals(0L, row1[11]); //is_realtime
+
+
+ Object[] row2 = rows.get(2);
+ Assert.assertEquals(
+ "wikipedia3_2018-08-07T23:00:00.000Z_2018-08-08T00:00:00.000Z_2018-08-07T23:00:00.059Z",
+ row2[0]
+ );
+ Assert.assertEquals("wikipedia3", row2[1]);
+ Assert.assertEquals("2018-08-07T23:00:00.000Z", row2[2]);
+ Assert.assertEquals("2018-08-08T00:00:00.000Z", row2[3]);
+ Assert.assertEquals(53527L, row2[4]);
+ Assert.assertEquals("2018-08-07T23:00:00.059Z", row2[5]);
+ Assert.assertEquals(0L, row2[6]); //partition_num
+ Assert.assertEquals(0L, row2[7]); //num_replicas
+ Assert.assertEquals(0L, row2[8]); //numRows = 0
+ Assert.assertEquals(1L, row2[9]); //is_published
+ Assert.assertEquals(0L, row2[10]); //is_available
+ Assert.assertEquals(0L, row2[11]); //is_realtime
+
+ Object[] row3 = rows.get(3);
+ Assert.assertEquals("test1_2010-01-01T00:00:00.000Z_2011-01-01T00:00:00.000Z_version1", row3[0]);
+ Assert.assertEquals("test1", row3[1]);
+ Assert.assertEquals("2010-01-01T00:00:00.000Z", row3[2]);
+ Assert.assertEquals("2011-01-01T00:00:00.000Z", row3[3]);
+ Assert.assertEquals(100L, row3[4]);
+ Assert.assertEquals("version1", row3[5]);
+ Assert.assertEquals(0L, row3[6]); //partition_num
+ Assert.assertEquals(1L, row3[7]); //num_replicas
+ Assert.assertEquals(3L, row3[8]); //numRows = 3
+ Assert.assertEquals(1L, row3[9]); //is_published
+ Assert.assertEquals(1L, row3[10]); //is_available
+ Assert.assertEquals(0L, row3[11]); //is_realtime
Object[] row4 = rows.get(4);
- //segment 2 is published and has 2 replicas
- Assert.assertEquals(1L, row4[9]);
- Assert.assertEquals(2L, row4[7]);
- Assert.assertEquals(3L, row4[8]); //numRows = 3
+ Assert.assertEquals("test2_2011-01-01T00:00:00.000Z_2012-01-01T00:00:00.000Z_version2", row4[0]);
+ Assert.assertEquals("test2", row4[1]);
+ Assert.assertEquals("2011-01-01T00:00:00.000Z", row4[2]);
+ Assert.assertEquals("2012-01-01T00:00:00.000Z", row4[3]);
+ Assert.assertEquals(100L, row4[4]);
+ Assert.assertEquals("version2", row4[5]);
+ Assert.assertEquals(0L, row4[6]); //partition_num
+ Assert.assertEquals(2L, row4[7]); //segment test2 is served by 2 servers, so num_replicas=2
+ Assert.assertEquals(3L, row4[8]); //numRows = 3
+ Assert.assertEquals(1L, row4[9]); //is_published
+ Assert.assertEquals(1L, row4[10]); //is_available
+ Assert.assertEquals(0L, row4[11]); //is_realtime
+
+ Object[] row5 = rows.get(5);
+ //segment test3 is unpublished and has a NumberedShardSpec with partitionNum = 2
+ Assert.assertEquals("test3_2012-01-01T00:00:00.000Z_2013-01-01T00:00:00.000Z_version3_2", row5[0]);
+ Assert.assertEquals("test3", row5[1]);
+ Assert.assertEquals("2012-01-01T00:00:00.000Z", row5[2]);
+ Assert.assertEquals("2013-01-01T00:00:00.000Z", row5[3]);
+ Assert.assertEquals(100L, row5[4]);
+ Assert.assertEquals("version3", row5[5]);
+ Assert.assertEquals(2L, row5[6]); //partition_num = 2
+ Assert.assertEquals(1L, row5[7]); //num_replicas
+ Assert.assertEquals(3L, row5[8]); //numRows = 3
+ Assert.assertEquals(0L, row5[9]); //is_published
+ Assert.assertEquals(1L, row5[10]); //is_available
+ Assert.assertEquals(0L, row5[11]); //is_realtime
+
+ Object[] row6 = rows.get(6);
+ Assert.assertEquals("test5_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version5", row6[0]);
+ Assert.assertEquals("test5", row6[1]);
+ Assert.assertEquals("2017-01-01T00:00:00.000Z", row6[2]);
+ Assert.assertEquals("2018-01-01T00:00:00.000Z", row6[3]);
+ Assert.assertEquals(100L, row6[4]);
+ Assert.assertEquals("version5", row6[5]);
+ Assert.assertEquals(0L, row6[6]); //partition_num
+ Assert.assertEquals(1L, row6[7]); //num_replicas
+ Assert.assertEquals(0L, row6[8]); //numRows = 0
+ Assert.assertEquals(0L, row6[9]); //is_published
+ Assert.assertEquals(1L, row6[10]); //is_available
+ Assert.assertEquals(1L, row6[11]); //is_realtime
+
+ Object[] row7 = rows.get(7);
+ Assert.assertEquals("test4_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version4", row7[0]);
+ Assert.assertEquals("test4", row7[1]);
+ Assert.assertEquals("2017-01-01T00:00:00.000Z", row7[2]);
+ Assert.assertEquals("2018-01-01T00:00:00.000Z", row7[3]);
+ Assert.assertEquals(100L, row7[4]);
+ Assert.assertEquals("version4", row7[5]);
+ Assert.assertEquals(0L, row7[6]); //partition_num
+ Assert.assertEquals(1L, row7[7]); //num_replicas
+ Assert.assertEquals(0L, row7[8]); //numRows
+ Assert.assertEquals(0L, row7[9]); //is_published
+ Assert.assertEquals(1L, row7[10]); //is_available
+ Assert.assertEquals(1L, row7[11]); //is_realtime
// Verify value types.
verifyTypes(rows, SystemSchema.SEGMENTS_SIGNATURE);
@@ -576,7 +720,7 @@ public class SystemSchemaTest extends CalciteTestBase
Object[] row2 = rows.get(2);
Assert.assertEquals("server2:1234", row2[0]);
- Assert.assertEquals("test3_2012-01-01T00:00:00.000Z_2013-01-01T00:00:00.000Z_version3", row2[1]);
+ Assert.assertEquals("test3_2012-01-01T00:00:00.000Z_2013-01-01T00:00:00.000Z_version3_2", row2[1]);
Object[] row3 = rows.get(3);
Assert.assertEquals("server2:1234", row3[0]);
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java
index 363d595..1b52ba9 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java
@@ -32,6 +32,7 @@ import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.TimelineLookup;
import javax.annotation.Nullable;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
@@ -49,13 +50,29 @@ public class TestServerInventoryView implements TimelineServerView
"dummy",
0
);
+ private static final DruidServerMetadata DUMMY_SERVER_REALTIME = new DruidServerMetadata(
+ "dummy",
+ "dummy",
+ null,
+ 0,
+ ServerType.REALTIME,
+ "dummy",
+ 0
+ );
private final List<DataSegment> segments;
+ private List<DataSegment> realtimeSegments = new ArrayList<>();
public TestServerInventoryView(List<DataSegment> segments)
{
this.segments = ImmutableList.copyOf(segments);
}
+ public TestServerInventoryView(List<DataSegment> segments, List<DataSegment> realtimeSegments)
+ {
+ this.segments = ImmutableList.copyOf(segments);
+ this.realtimeSegments = ImmutableList.copyOf(realtimeSegments);
+ }
+
@Override
public TimelineLookup<String, ServerSelector> getTimeline(DataSource dataSource)
{
@@ -75,7 +92,9 @@ public class TestServerInventoryView implements TimelineServerView
for (final DataSegment segment : segments) {
exec.execute(() -> callback.segmentAdded(DUMMY_SERVER, segment));
}
-
+ for (final DataSegment segment : realtimeSegments) {
+ exec.execute(() -> callback.segmentAdded(DUMMY_SERVER_REALTIME, segment));
+ }
exec.execute(callback::segmentViewInitialized);
}
@@ -85,7 +104,9 @@ public class TestServerInventoryView implements TimelineServerView
for (DataSegment segment : segments) {
exec.execute(() -> callback.segmentAdded(DUMMY_SERVER, segment));
}
-
+ for (final DataSegment segment : realtimeSegments) {
+ exec.execute(() -> callback.segmentAdded(DUMMY_SERVER_REALTIME, segment));
+ }
exec.execute(callback::timelineInitialized);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org