You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2019/01/15 16:31:36 UTC
[incubator-druid] branch master updated: Fix num_replicas count in
sys.segments table (#6804)
This is an automated email from the ASF dual-hosted git repository.
fjy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new f72f33f Fix num_replicas count in sys.segments table (#6804)
f72f33f is described below
commit f72f33f84aadfbf505ee9313ec96a62cf6ef8b74
Author: Surekha <su...@imply.io>
AuthorDate: Tue Jan 15 08:31:29 2019 -0800
Fix num_replicas count in sys.segments table (#6804)
* Fix num_replicas count from sys.segments
* Adjust unit test for num_replica > 1
* Pass named arguments instead of passing boolean constants
* Address PR comments
* PR comments
---
.../org/apache/druid/client/BrokerServerView.java | 2 +
.../apache/druid/client/TimelineServerView.java | 11 ++++
.../druid/sql/calcite/schema/DruidSchema.java | 60 +++++++++++++++++++---
.../sql/calcite/schema/SegmentMetadataHolder.java | 26 ++++++----
.../druid/sql/calcite/schema/SystemSchema.java | 2 +-
.../druid/sql/calcite/schema/SystemSchemaTest.java | 6 +--
.../sql/calcite/util/TestServerInventoryView.java | 4 +-
7 files changed, 90 insertions(+), 21 deletions(-)
diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java b/server/src/main/java/org/apache/druid/client/BrokerServerView.java
index 0b28a1b..3747a9a 100644
--- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java
+++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java
@@ -277,6 +277,8 @@ public class BrokerServerView implements TimelineServerView
server,
segmentId
);
+ } else {
+ runTimelineCallbacks(callback -> callback.serverSegmentRemoved(server, segment));
}
if (selector.isEmpty()) {
diff --git a/server/src/main/java/org/apache/druid/client/TimelineServerView.java b/server/src/main/java/org/apache/druid/client/TimelineServerView.java
index 89042c6..ed1d4df 100644
--- a/server/src/main/java/org/apache/druid/client/TimelineServerView.java
+++ b/server/src/main/java/org/apache/druid/client/TimelineServerView.java
@@ -81,5 +81,16 @@ public interface TimelineServerView extends ServerView
* @return continue or unregister
*/
CallbackAction segmentRemoved(DataSegment segment);
+
+ /**
+ * Called when a segment is removed from a server. Note that the timeline can still have the segment, even though it's removed from given server.
+ * {@link #segmentRemoved(DataSegment)} is the authority on when segment is removed from the timeline.
+ *
+ * @param server The server that removed a segment
+ * @param segment The segment that was removed
+ *
+ * @return continue or unregister
+ */
+ CallbackAction serverSegmentRemoved(DruidServerMetadata server, DataSegment segment);
}
}
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
index 4e1bb09..b2b2b67 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
@@ -20,8 +20,11 @@
package org.apache.druid.sql.calcite.schema;
import com.google.common.base.Preconditions;
+import com.google.common.base.Predicates;
+import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
@@ -88,6 +91,8 @@ public class DruidSchema extends AbstractSchema
private static final EmittingLogger log = new EmittingLogger(DruidSchema.class);
private static final int MAX_SEGMENTS_PER_QUERY = 15000;
+ private static final long IS_PUBLISHED = 0;
+ private static final long IS_AVAILABLE = 1;
private final QueryLifecycleFactory queryLifecycleFactory;
private final PlannerConfig config;
@@ -168,6 +173,16 @@ public class DruidSchema extends AbstractSchema
removeSegment(segment);
return ServerView.CallbackAction.CONTINUE;
}
+
+ @Override
+ public ServerView.CallbackAction serverSegmentRemoved(
+ final DruidServerMetadata server,
+ final DataSegment segment
+ )
+ {
+ removeServerSegment(server, segment);
+ return ServerView.CallbackAction.CONTINUE;
+ }
}
);
}
@@ -338,12 +353,18 @@ public class DruidSchema extends AbstractSchema
if (knownSegments == null || !knownSegments.containsKey(segment)) {
// segmentReplicatable is used to determine if segments are served by realtime servers or not
final long isRealtime = server.segmentReplicatable() ? 0 : 1;
+
+ final Map<String, Set<String>> serverSegmentMap = ImmutableMap.of(
+ segment.getIdentifier(),
+ ImmutableSet.of(server.getName())
+ );
+
final SegmentMetadataHolder holder = new SegmentMetadataHolder.Builder(
segment.getIdentifier(),
- 0,
- 1,
+ IS_PUBLISHED,
+ IS_AVAILABLE,
isRealtime,
- 1
+ serverSegmentMap
).build();
// Unknown segment.
setSegmentSignature(segment, holder);
@@ -357,13 +378,18 @@ public class DruidSchema extends AbstractSchema
} else {
if (knownSegments.containsKey(segment)) {
final SegmentMetadataHolder holder = knownSegments.get(segment);
+ final Map<String, Set<String>> segmentServerMap = holder.getReplicas();
+ final ImmutableSet<String> servers = new ImmutableSet.Builder<String>()
+ .addAll(segmentServerMap.get(segment.getIdentifier()))
+ .add(server.getName())
+ .build();
final SegmentMetadataHolder holderWithNumReplicas = new SegmentMetadataHolder.Builder(
holder.getSegmentId(),
holder.isPublished(),
holder.isAvailable(),
holder.isRealtime(),
- holder.getNumReplicas()
- ).withNumReplicas(holder.getNumReplicas() + 1).build();
+ holder.getReplicas()
+ ).withReplicas(ImmutableMap.of(segment.getIdentifier(), servers)).build();
knownSegments.put(segment, holderWithNumReplicas);
}
if (server.segmentReplicatable()) {
@@ -403,6 +429,28 @@ public class DruidSchema extends AbstractSchema
}
}
+ private void removeServerSegment(final DruidServerMetadata server, final DataSegment segment)
+ {
+ synchronized (lock) {
+ log.debug("Segment[%s] is gone from server[%s]", segment.getIdentifier(), server.getName());
+ final Map<DataSegment, SegmentMetadataHolder> knownSegments = segmentMetadataInfo.get(segment.getDataSource());
+ final SegmentMetadataHolder holder = knownSegments.get(segment);
+ final Map<String, Set<String>> segmentServerMap = holder.getReplicas();
+ final ImmutableSet<String> servers = FluentIterable.from(segmentServerMap.get(segment.getIdentifier()))
+ .filter(Predicates.not(Predicates.equalTo(server.getName())))
+ .toSet();
+ final SegmentMetadataHolder holderWithNumReplicas = new SegmentMetadataHolder.Builder(
+ holder.getSegmentId(),
+ holder.isPublished(),
+ holder.isAvailable(),
+ holder.isRealtime(),
+ holder.getReplicas()
+ ).withReplicas(ImmutableMap.of(segment.getIdentifier(), servers)).build();
+ knownSegments.put(segment, holderWithNumReplicas);
+ lock.notifyAll();
+ }
+ }
+
/**
* Attempt to refresh "segmentSignatures" for a set of segments. Returns the set of segments actually refreshed,
* which may be a subset of the asked-for set.
@@ -475,7 +523,7 @@ public class DruidSchema extends AbstractSchema
holder.isPublished(),
holder.isAvailable(),
holder.isRealtime(),
- holder.getNumReplicas()
+ holder.getReplicas()
).withRowSignature(rowSignature).withNumRows(analysis.getNumRows()).build();
dataSourceSegments.put(segment, updatedHolder);
setSegmentSignature(segment, updatedHolder);
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java
index b2666e1..34a5af7 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java
@@ -22,6 +22,8 @@ package org.apache.druid.sql.calcite.schema;
import org.apache.druid.sql.calcite.table.RowSignature;
import javax.annotation.Nullable;
+import java.util.Map;
+import java.util.Set;
/**
* Immutable representation of RowSignature and other segment attributes needed by {@link SystemSchema.SegmentsTable}
@@ -36,7 +38,8 @@ public class SegmentMetadataHolder
private final long isAvailable;
private final long isRealtime;
private final String segmentId;
- private final long numReplicas;
+ //segmentId -> set of servers that contain the segment
+ private final Map<String, Set<String>> segmentServerMap;
private final long numRows;
@Nullable
private final RowSignature rowSignature;
@@ -47,7 +50,7 @@ public class SegmentMetadataHolder
this.isPublished = builder.isPublished;
this.isAvailable = builder.isAvailable;
this.isRealtime = builder.isRealtime;
- this.numReplicas = builder.numReplicas;
+ this.segmentServerMap = builder.segmentServerMap;
this.numRows = builder.numRows;
this.segmentId = builder.segmentId;
}
@@ -72,9 +75,14 @@ public class SegmentMetadataHolder
return segmentId;
}
- public long getNumReplicas()
+ public Map<String, Set<String>> getReplicas()
{
- return numReplicas;
+ return segmentServerMap;
+ }
+
+ public long getNumReplicas(String segmentId)
+ {
+ return segmentServerMap.get(segmentId).size();
}
public long getNumRows()
@@ -95,7 +103,7 @@ public class SegmentMetadataHolder
private final long isAvailable;
private final long isRealtime;
- private long numReplicas;
+ private Map<String, Set<String>> segmentServerMap;
@Nullable
private RowSignature rowSignature;
private long numRows;
@@ -105,14 +113,14 @@ public class SegmentMetadataHolder
long isPublished,
long isAvailable,
long isRealtime,
- long numReplicas
+ Map<String, Set<String>> segmentServerMap
)
{
this.segmentId = segmentId;
this.isPublished = isPublished;
this.isAvailable = isAvailable;
this.isRealtime = isRealtime;
- this.numReplicas = numReplicas;
+ this.segmentServerMap = segmentServerMap;
}
public Builder withRowSignature(RowSignature rowSignature)
@@ -127,9 +135,9 @@ public class SegmentMetadataHolder
return this;
}
- public Builder withNumReplicas(long numReplicas)
+ public Builder withReplicas(Map<String, Set<String>> segmentServerMap)
{
- this.numReplicas = numReplicas;
+ this.segmentServerMap = segmentServerMap;
return this;
}
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
index 57e8825..7a62a0b 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
@@ -229,7 +229,7 @@ public class SystemSchema extends AbstractSchema
final Map<String, PartialSegmentData> partialSegmentDataMap = availableSegmentMetadata.values().stream().collect(
Collectors.toMap(
SegmentMetadataHolder::getSegmentId,
- h -> new PartialSegmentData(h.isAvailable(), h.isRealtime(), h.getNumReplicas(), h.getNumRows())
+ h -> new PartialSegmentData(h.isAvailable(), h.isRealtime(), h.getNumReplicas(h.getSegmentId()), h.getNumRows())
));
//get published segments from coordinator
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
index 4020a88..840fed8 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
@@ -199,7 +199,6 @@ public class SystemSchemaTest extends CalciteTestBase
walker = new SpecificSegmentsQuerySegmentWalker(conglomerate)
.add(segment1, index1)
.add(segment2, index2)
- .add(segment2, index2)
.add(segment3, index2);
druidSchema = new DruidSchema(
@@ -282,7 +281,7 @@ public class SystemSchemaTest extends CalciteTestBase
DataSegment.PruneLoadSpecHolder.DEFAULT
);
- final List<DataSegment> realtimeSegments = ImmutableList.of(segment4, segment5);
+ final List<DataSegment> realtimeSegments = ImmutableList.of(segment2, segment4, segment5);
private final ImmutableDruidServer druidServer1 = new ImmutableDruidServer(
new DruidServerMetadata("server1", "localhost:0000", null, 5L, ServerType.REALTIME, DruidServer.DEFAULT_TIER, 0),
@@ -478,6 +477,7 @@ public class SystemSchemaTest extends CalciteTestBase
// segments test1, test2 are published and available
// segment test3 is served by historical but unpublished or unused
// segments test4, test5 are not published but available (realtime segments)
+ // segment test2 is both published and served by a realtime server.
Assert.assertEquals(8, rows.size());
@@ -556,7 +556,7 @@ public class SystemSchemaTest extends CalciteTestBase
Assert.assertEquals(100L, row4[4]);
Assert.assertEquals("version2", row4[5]);
Assert.assertEquals(0L, row4[6]); //partition_num
- Assert.assertEquals(2L, row4[7]); //segment test2 is served by 2 servers, so num_replicas=2
+ Assert.assertEquals(2L, row4[7]); //segment test2 is served by historical and realtime servers
Assert.assertEquals(3L, row4[8]); //numRows = 3
Assert.assertEquals(1L, row4[9]); //is_published
Assert.assertEquals(1L, row4[10]); //is_available
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java
index 1b52ba9..6718b1b 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java
@@ -51,8 +51,8 @@ public class TestServerInventoryView implements TimelineServerView
0
);
private static final DruidServerMetadata DUMMY_SERVER_REALTIME = new DruidServerMetadata(
- "dummy",
- "dummy",
+ "dummy2",
+ "dummy2",
null,
0,
ServerType.REALTIME,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org