You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2019/02/12 00:21:27 UTC
[incubator-druid] branch master updated: Fix num_rows in
sys.segments (#6888)
This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 02ef14f Fix num_rows in sys.segments (#6888)
02ef14f is described below
commit 02ef14f262c9ddceb3c8c9a4fcc44da078b67727
Author: Surekha <su...@imply.io>
AuthorDate: Mon Feb 11 16:21:19 2019 -0800
Fix num_rows in sys.segments (#6888)
* Fix the bug with num_rows in sys.segments
* Fix segmentMetadataInfo update in DruidSchema
* Add numRows to SegmentMetadataHolder builder's constructor, so it's not overwritten
* Rename SegSegmentSignature to setSegmentMetadataHolder and fix it so nested map is appended instead of recreated
* Replace Map<String, Set<String>> segmentServerMap with Set<String> for num_replica
* Remove unnecessary code and update test
* Add unit test for num_rows
* PR comments
* change access modifier to default package level
* minor changes to comments
* PR comments
---
.../druid/sql/calcite/schema/DruidSchema.java | 53 ++++++++-------
.../sql/calcite/schema/SegmentMetadataHolder.java | 45 ++++++++-----
.../druid/sql/calcite/schema/SystemSchema.java | 2 +-
.../druid/sql/calcite/schema/DruidSchemaTest.java | 77 ++++++++++++++++++++--
.../druid/sql/calcite/schema/SystemSchemaTest.java | 22 ++++++-
.../sql/calcite/util/TestServerInventoryView.java | 12 +++-
6 files changed, 162 insertions(+), 49 deletions(-)
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
index 2929229..4aaa95e 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
@@ -19,6 +19,7 @@
package org.apache.druid.sql.calcite.schema;
+import com.amazonaws.annotation.GuardedBy;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
@@ -63,7 +64,6 @@ import org.apache.druid.sql.calcite.table.RowSignature;
import org.apache.druid.sql.calcite.view.DruidViewMacro;
import org.apache.druid.sql.calcite.view.ViewManager;
import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.timeline.SegmentId;
import java.io.IOException;
import java.util.Comparator;
@@ -95,8 +95,9 @@ public class DruidSchema extends AbstractSchema
private static final EmittingLogger log = new EmittingLogger(DruidSchema.class);
private static final int MAX_SEGMENTS_PER_QUERY = 15000;
- private static final long IS_PUBLISHED = 0;
- private static final long IS_AVAILABLE = 1;
+ private static final long DEFAULT_IS_PUBLISHED = 0;
+ private static final long DEFAULT_IS_AVAILABLE = 1;
+ private static final long DEFAULT_NUM_ROWS = 0;
private final QueryLifecycleFactory queryLifecycleFactory;
private final PlannerConfig config;
@@ -107,12 +108,12 @@ public class DruidSchema extends AbstractSchema
// For awaitInitialization.
private final CountDownLatch initialized = new CountDownLatch(1);
- // Protects access to segmentSignatures, mutableSegments, segmentsNeedingRefresh, lastRefresh, isServerViewInitialized
+ // Protects access to segmentSignatures, mutableSegments, segmentsNeedingRefresh, lastRefresh, isServerViewInitialized, segmentMetadata
private final Object lock = new Object();
// DataSource -> Segment -> SegmentMetadataHolder(contains RowSignature) for that segment.
// Use TreeMap for segments so they are merged in deterministic order, from older to newer.
- // This data structure need to be accessed in a thread-safe way since SystemSchema accesses it
+ @GuardedBy("lock")
private final Map<String, TreeMap<DataSegment, SegmentMetadataHolder>> segmentMetadataInfo = new HashMap<>();
private int totalSegments = 0;
@@ -351,7 +352,8 @@ public class DruidSchema extends AbstractSchema
return builder.build();
}
- private void addSegment(final DruidServerMetadata server, final DataSegment segment)
+ @VisibleForTesting
+ void addSegment(final DruidServerMetadata server, final DataSegment segment)
{
synchronized (lock) {
final Map<DataSegment, SegmentMetadataHolder> knownSegments = segmentMetadataInfo.get(segment.getDataSource());
@@ -360,16 +362,18 @@ public class DruidSchema extends AbstractSchema
// segmentReplicatable is used to determine if segments are served by realtime servers or not
final long isRealtime = server.segmentReplicatable() ? 0 : 1;
- final Map<SegmentId, Set<String>> serverSegmentMap = ImmutableMap.of(
+ final Set<String> servers = ImmutableSet.of(server.getName());
+ holder = SegmentMetadataHolder.builder(
segment.getId(),
- ImmutableSet.of(server.getName())
- );
-
- holder = SegmentMetadataHolder
- .builder(segment.getId(), IS_PUBLISHED, IS_AVAILABLE, isRealtime, serverSegmentMap)
- .build();
+ DEFAULT_IS_PUBLISHED,
+ DEFAULT_IS_AVAILABLE,
+ isRealtime,
+ servers,
+ null,
+ DEFAULT_NUM_ROWS
+ ).build();
// Unknown segment.
- setSegmentSignature(segment, holder);
+ setSegmentMetadataHolder(segment, holder);
segmentsNeedingRefresh.add(segment);
if (!server.segmentReplicatable()) {
log.debug("Added new mutable segment[%s].", segment.getId());
@@ -378,14 +382,14 @@ public class DruidSchema extends AbstractSchema
log.debug("Added new immutable segment[%s].", segment.getId());
}
} else {
- final Map<SegmentId, Set<String>> segmentServerMap = holder.getReplicas();
+ final Set<String> segmentServers = holder.getReplicas();
final ImmutableSet<String> servers = new ImmutableSet.Builder<String>()
- .addAll(segmentServerMap.get(segment.getId()))
+ .addAll(segmentServers)
.add(server.getName())
.build();
final SegmentMetadataHolder holderWithNumReplicas = SegmentMetadataHolder
.from(holder)
- .withReplicas(ImmutableMap.of(segment.getId(), servers))
+ .withReplicas(servers)
.build();
knownSegments.put(segment, holderWithNumReplicas);
if (server.segmentReplicatable()) {
@@ -404,7 +408,7 @@ public class DruidSchema extends AbstractSchema
}
@VisibleForTesting
- protected void removeSegment(final DataSegment segment)
+ void removeSegment(final DataSegment segment)
{
synchronized (lock) {
log.debug("Segment[%s] is gone.", segment.getId());
@@ -435,13 +439,13 @@ public class DruidSchema extends AbstractSchema
log.debug("Segment[%s] is gone from server[%s]", segment.getId(), server.getName());
final Map<DataSegment, SegmentMetadataHolder> knownSegments = segmentMetadataInfo.get(segment.getDataSource());
final SegmentMetadataHolder holder = knownSegments.get(segment);
- final Map<SegmentId, Set<String>> segmentServerMap = holder.getReplicas();
- final ImmutableSet<String> servers = FluentIterable.from(segmentServerMap.get(segment.getId()))
+ final Set<String> segmentServers = holder.getReplicas();
+ final ImmutableSet<String> servers = FluentIterable.from(segmentServers)
.filter(Predicates.not(Predicates.equalTo(server.getName())))
.toSet();
final SegmentMetadataHolder holderWithNumReplicas = SegmentMetadataHolder
.from(holder)
- .withReplicas(ImmutableMap.of(segment.getId(), servers))
+ .withReplicas(servers)
.build();
knownSegments.put(segment, holderWithNumReplicas);
lock.notifyAll();
@@ -453,7 +457,7 @@ public class DruidSchema extends AbstractSchema
* which may be a subset of the asked-for set.
*/
@VisibleForTesting
- protected Set<DataSegment> refreshSegments(final Set<DataSegment> segments) throws IOException
+ Set<DataSegment> refreshSegments(final Set<DataSegment> segments) throws IOException
{
final Set<DataSegment> retVal = new HashSet<>();
@@ -525,7 +529,7 @@ public class DruidSchema extends AbstractSchema
.withNumRows(analysis.getNumRows())
.build();
dataSourceSegments.put(segment, updatedHolder);
- setSegmentSignature(segment, updatedHolder);
+ setSegmentMetadataHolder(segment, updatedHolder);
retVal.add(segment);
}
}
@@ -550,7 +554,8 @@ public class DruidSchema extends AbstractSchema
return retVal;
}
- private void setSegmentSignature(final DataSegment segment, final SegmentMetadataHolder segmentMetadataHolder)
+ @VisibleForTesting
+ void setSegmentMetadataHolder(final DataSegment segment, final SegmentMetadataHolder segmentMetadataHolder)
{
synchronized (lock) {
TreeMap<DataSegment, SegmentMetadataHolder> dataSourceSegments = segmentMetadataInfo.computeIfAbsent(
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java
index f2d5ab3..38ff928 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java
@@ -23,7 +23,6 @@ import org.apache.druid.sql.calcite.table.RowSignature;
import org.apache.druid.timeline.SegmentId;
import javax.annotation.Nullable;
-import java.util.Map;
import java.util.Set;
/**
@@ -36,15 +35,25 @@ public class SegmentMetadataHolder
long isPublished,
long isAvailable,
long isRealtime,
- Map<SegmentId, Set<String>> segmentServerMap
+ Set<String> segmentServers,
+ RowSignature rowSignature,
+ long numRows
)
{
- return new Builder(segmentId, isPublished, isAvailable, isRealtime, segmentServerMap);
+ return new Builder(segmentId, isPublished, isAvailable, isRealtime, segmentServers, rowSignature, numRows);
}
public static Builder from(SegmentMetadataHolder h)
{
- return new Builder(h.getSegmentId(), h.isPublished(), h.isAvailable(), h.isRealtime(), h.getReplicas());
+ return new Builder(
+ h.getSegmentId(),
+ h.isPublished(),
+ h.isAvailable(),
+ h.isRealtime(),
+ h.getReplicas(),
+ h.getRowSignature(),
+ h.getNumRows()
+ );
}
private final SegmentId segmentId;
@@ -54,8 +63,8 @@ public class SegmentMetadataHolder
private final long isPublished;
private final long isAvailable;
private final long isRealtime;
- //segmentId -> set of servers that contain the segment
- private final Map<SegmentId, Set<String>> segmentServerMap;
+ // set of servers that contain the segment
+ private final Set<String> segmentServers;
private final long numRows;
@Nullable
private final RowSignature rowSignature;
@@ -66,7 +75,7 @@ public class SegmentMetadataHolder
this.isPublished = builder.isPublished;
this.isAvailable = builder.isAvailable;
this.isRealtime = builder.isRealtime;
- this.segmentServerMap = builder.segmentServerMap;
+ this.segmentServers = builder.segmentServers;
this.numRows = builder.numRows;
this.segmentId = builder.segmentId;
}
@@ -91,14 +100,14 @@ public class SegmentMetadataHolder
return segmentId;
}
- public Map<SegmentId, Set<String>> getReplicas()
+ public Set<String> getReplicas()
{
- return segmentServerMap;
+ return segmentServers;
}
- public long getNumReplicas(SegmentId segmentId)
+ public long getNumReplicas()
{
- return segmentServerMap.get(segmentId).size();
+ return segmentServers.size();
}
public long getNumRows()
@@ -119,7 +128,7 @@ public class SegmentMetadataHolder
private final long isAvailable;
private final long isRealtime;
- private Map<SegmentId, Set<String>> segmentServerMap;
+ private Set<String> segmentServers;
@Nullable
private RowSignature rowSignature;
private long numRows;
@@ -129,14 +138,18 @@ public class SegmentMetadataHolder
long isPublished,
long isAvailable,
long isRealtime,
- Map<SegmentId, Set<String>> segmentServerMap
+ Set<String> servers,
+ RowSignature rowSignature,
+ long numRows
)
{
this.segmentId = segmentId;
this.isPublished = isPublished;
this.isAvailable = isAvailable;
this.isRealtime = isRealtime;
- this.segmentServerMap = segmentServerMap;
+ this.segmentServers = servers;
+ this.rowSignature = rowSignature;
+ this.numRows = numRows;
}
public Builder withRowSignature(RowSignature rowSignature)
@@ -151,9 +164,9 @@ public class SegmentMetadataHolder
return this;
}
- public Builder withReplicas(Map<SegmentId, Set<String>> segmentServerMap)
+ public Builder withReplicas(Set<String> servers)
{
- this.segmentServerMap = segmentServerMap;
+ this.segmentServers = servers;
return this;
}
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
index d0599f8..e895113 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
@@ -224,7 +224,7 @@ public class SystemSchema extends AbstractSchema
Maps.newHashMapWithExpectedSize(druidSchema.getTotalSegments());
for (SegmentMetadataHolder h : availableSegmentMetadata.values()) {
PartialSegmentData partialSegmentData =
- new PartialSegmentData(h.isAvailable(), h.isRealtime(), h.getNumReplicas(h.getSegmentId()), h.getNumRows());
+ new PartialSegmentData(h.isAvailable(), h.isRealtime(), h.getNumReplicas(), h.getNumRows());
partialSegmentDataMap.put(h.getSegmentId(), partialSegmentData);
}
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java
index 30b99e5..e707c40 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java
@@ -27,6 +27,8 @@ import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.schema.Table;
import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.druid.client.ImmutableDruidServer;
+import org.apache.druid.client.TimelineServerView;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
@@ -40,6 +42,7 @@ import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
+import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.security.NoopEscalator;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.table.DruidTable;
@@ -84,6 +87,8 @@ public class DruidSchemaTest extends CalciteTestBase
private static QueryRunnerFactoryConglomerate conglomerate;
private static Closer resourceCloser;
+ private List<ImmutableDruidServer> druidServers;
+
@BeforeClass
public static void setUpClass()
{
@@ -163,10 +168,12 @@ public class DruidSchemaTest extends CalciteTestBase
index2
);
+ final TimelineServerView serverView = new TestServerInventoryView(walker.getSegments());
+ druidServers = serverView.getDruidServers();
schema = new DruidSchema(
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
- new TestServerInventoryView(walker.getSegments()),
+ serverView,
PLANNER_CONFIG_DEFAULT,
new NoopViewManager(),
new NoopEscalator()
@@ -239,6 +246,62 @@ public class DruidSchemaTest extends CalciteTestBase
Assert.assertEquals(SqlTypeName.BIGINT, fields.get(2).getType().getSqlTypeName());
}
+ /**
+ * This tests that {@link SegmentMetadataHolder#getNumRows()} is correct in case
+ * of multiple replicas i.e. when {@link DruidSchema#addSegment(DruidServerMetadata, DataSegment)}
+ * is called more than once for same segment
+ */
+ @Test
+ public void testSegmentMetadataHolderNumRows()
+ {
+ Map<DataSegment, SegmentMetadataHolder> segmentsMetadata = schema.getSegmentMetadata();
+ final Set<DataSegment> segments = segmentsMetadata.keySet();
+ Assert.assertEquals(3, segments.size());
+ // find the only segment with datasource "foo2"
+ final DataSegment existingSegment = segments.stream()
+ .filter(segment -> segment.getDataSource().equals("foo2"))
+ .findFirst()
+ .orElse(null);
+ Assert.assertNotNull(existingSegment);
+ final SegmentMetadataHolder existingHolder = segmentsMetadata.get(existingSegment);
+ // update SegmentMetadataHolder of existingSegment with numRows=5
+ SegmentMetadataHolder updatedHolder = SegmentMetadataHolder.from(existingHolder).withNumRows(5).build();
+ schema.setSegmentMetadataHolder(existingSegment, updatedHolder);
+ // find a druidServer holding existingSegment
+ final Pair<ImmutableDruidServer, DataSegment> pair = druidServers.stream()
+ .flatMap(druidServer -> druidServer.getSegments()
+ .stream()
+ .filter(segment -> segment
+ .equals(
+ existingSegment))
+ .map(segment -> Pair
+ .of(
+ druidServer,
+ segment
+ )))
+ .findAny()
+ .orElse(null);
+ Assert.assertNotNull(pair);
+ final ImmutableDruidServer server = pair.lhs;
+ Assert.assertNotNull(server);
+ final DruidServerMetadata druidServerMetadata = server.getMetadata();
+ // invoke DruidSchema#addSegment on existingSegment
+ schema.addSegment(druidServerMetadata, existingSegment);
+ segmentsMetadata = schema.getSegmentMetadata();
+ // get the only segment with datasource "foo2"
+ final DataSegment currentSegment = segments.stream()
+ .filter(segment -> segment.getDataSource().equals("foo2"))
+ .findFirst()
+ .orElse(null);
+ final SegmentMetadataHolder currentHolder = segmentsMetadata.get(currentSegment);
+ Assert.assertEquals(updatedHolder.getSegmentId(), currentHolder.getSegmentId());
+ Assert.assertEquals(updatedHolder.getNumRows(), currentHolder.getNumRows());
+ // numreplicas do not change here since we addSegment with the same server which was serving existingSegment before
+ Assert.assertEquals(updatedHolder.getNumReplicas(), currentHolder.getNumReplicas());
+ Assert.assertEquals(updatedHolder.isAvailable(), currentHolder.isAvailable());
+ Assert.assertEquals(updatedHolder.isPublished(), currentHolder.isPublished());
+ }
+
@Test
public void testNullDatasource() throws IOException
{
@@ -247,7 +310,10 @@ public class DruidSchemaTest extends CalciteTestBase
Assert.assertEquals(segments.size(), 3);
// segments contains two segments with datasource "foo" and one with datasource "foo2"
// let's remove the only segment with datasource "foo2"
- final DataSegment segmentToRemove = segments.stream().filter(segment -> segment.getDataSource().equals("foo2")).findFirst().orElse(null);
+ final DataSegment segmentToRemove = segments.stream()
+ .filter(segment -> segment.getDataSource().equals("foo2"))
+ .findFirst()
+ .orElse(null);
Assert.assertFalse(segmentToRemove == null);
schema.removeSegment(segmentToRemove);
schema.refreshSegments(segments); // can cause NPE without dataSourceSegments null check in DruidSchema#refreshSegmentsForDataSource
@@ -262,8 +328,11 @@ public class DruidSchemaTest extends CalciteTestBase
Map<DataSegment, SegmentMetadataHolder> segmentMetadatas = schema.getSegmentMetadata();
Set<DataSegment> segments = segmentMetadatas.keySet();
Assert.assertEquals(segments.size(), 3);
- //remove one of the segments with datasource "foo"
- final DataSegment segmentToRemove = segments.stream().filter(segment -> segment.getDataSource().equals("foo")).findFirst().orElse(null);
+ // remove one of the segments with datasource "foo"
+ final DataSegment segmentToRemove = segments.stream()
+ .filter(segment -> segment.getDataSource().equals("foo"))
+ .findFirst()
+ .orElse(null);
Assert.assertFalse(segmentToRemove == null);
schema.removeSegment(segmentToRemove);
schema.refreshSegments(segments); // can cause NPE without holder null check in SegmentMetadataHolder#from
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
index 7d8cdaa..d354f14 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
@@ -118,6 +118,11 @@ public class SystemSchemaTest extends CalciteTestBase
CalciteTests.createRow(ImmutableMap.of("t", "2001-01-03", "m1", "6.0"))
);
+ private static final List<InputRow> ROWS3 = ImmutableList.of(
+ CalciteTests.createRow(ImmutableMap.of("t", "2001-01-01", "m1", "7.0", "dim3", ImmutableList.of("x"))),
+ CalciteTests.createRow(ImmutableMap.of("t", "2001-01-02", "m1", "8.0", "dim3", ImmutableList.of("xyz")))
+ );
+
private SystemSchema schema;
private SpecificSegmentsQuerySegmentWalker walker;
private DruidLeaderClient client;
@@ -204,11 +209,22 @@ public class SystemSchemaTest extends CalciteTestBase
)
.rows(ROWS2)
.buildMMappedIndex();
+ final QueryableIndex index3 = IndexBuilder.create()
+ .tmpDir(new File(tmpDir, "3"))
+ .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+ .schema(
+ new IncrementalIndexSchema.Builder()
+ .withMetrics(new LongSumAggregatorFactory("m1", "m1"))
+ .withRollup(false)
+ .build()
+ )
+ .rows(ROWS3)
+ .buildMMappedIndex();
walker = new SpecificSegmentsQuerySegmentWalker(conglomerate)
.add(segment1, index1)
.add(segment2, index2)
- .add(segment3, index2);
+ .add(segment3, index3);
druidSchema = new DruidSchema(
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
@@ -469,7 +485,7 @@ public class SystemSchemaTest extends CalciteTestBase
100L,
2L, //partition_num
1L, //num_replicas
- 3L, //numRows
+ 2L, //numRows
0L, //is_published
1L, //is_available
0L //is_realtime
@@ -481,7 +497,7 @@ public class SystemSchemaTest extends CalciteTestBase
100L,
0L, //partition_num
1L, //num_replicas
- 0L, //numRows = 3
+ 0L, //numRows
0L, //is_published
1L, //is_available
1L //is_realtime
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java
index 6718b1b..2dcc569 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java
@@ -20,7 +20,9 @@
package org.apache.druid.sql.calcite.util;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import org.apache.druid.client.DruidServer;
+import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.client.selector.ServerSelector;
@@ -33,6 +35,7 @@ import org.apache.druid.timeline.TimelineLookup;
import javax.annotation.Nullable;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor;
@@ -83,7 +86,14 @@ public class TestServerInventoryView implements TimelineServerView
@Override
public List<ImmutableDruidServer> getDruidServers()
{
- throw new UnsupportedOperationException();
+ final ImmutableDruidDataSource dataSource = new ImmutableDruidDataSource("DUMMY", Collections.emptyMap(), segments);
+ final ImmutableDruidServer server = new ImmutableDruidServer(
+ DUMMY_SERVER,
+ 0L,
+ ImmutableMap.of("src", dataSource),
+ 1
+ );
+ return ImmutableList.of(server);
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org