You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2019/06/07 20:29:06 UTC
[incubator-druid] branch 0.15.0-incubating updated: Optimize
overshadowed segments computation (#7595) (#7850)
This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch 0.15.0-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/0.15.0-incubating by this push:
new ae57304 Optimize overshadowed segments computation (#7595) (#7850)
ae57304 is described below
commit ae573044241d8c2de29cc4f471c75713580c14e4
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Fri Jun 7 13:29:00 2019 -0700
Optimize overshadowed segments computation (#7595) (#7850)
* Move the overshadowed segment computation to SQLMetadataSegmentManager's poll
* rename method in MetadataSegmentManager
* Fix tests
* PR comments
* PR comments
* PR comments
* fix indentation
* fix tests
* fix test
* add test for SegmentWithOvershadowedStatus serde format
* PR comments
* PR comments
* fix test
* remove snapshot updates outside poll
* PR comments
* PR comments
* PR comments
* removed unused import
---
.../timeline/SegmentWithOvershadowedStatus.java | 7 +
.../org/apache/druid/utils/CollectionUtils.java | 19 ++-
.../SegmentWithOvershadowedStatusTest.java | 180 +++++++++++++++++++++
.../MaterializedViewSupervisor.java | 2 +-
.../common/actions/SegmentListActionsTest.java | 2 +-
.../results/auth_test_sys_schema_segments.json | 2 +-
.../apache/druid/client/DataSourcesSnapshot.java | 114 +++++++++++++
.../druid/client/ImmutableDruidDataSource.java | 40 -----
.../druid/metadata/MetadataSegmentManager.java | 27 +++-
.../druid/metadata/SQLMetadataSegmentManager.java | 132 +++++++--------
.../druid/server/coordinator/DruidCoordinator.java | 35 +++-
.../coordinator/DruidCoordinatorRuntimeParams.java | 22 ++-
.../helper/DruidCoordinatorRuleRunner.java | 16 +-
.../druid/server/http/DataSourcesResource.java | 2 +-
.../apache/druid/server/http/MetadataResource.java | 25 ++-
.../metadata/SQLMetadataSegmentManagerTest.java | 2 +-
.../coordinator/CuratorDruidCoordinatorTest.java | 6 +
.../DruidCoordinatorRuleRunnerTest.java | 19 ++-
.../server/coordinator/DruidCoordinatorTest.java | 22 +--
.../druid/sql/calcite/schema/SystemSchema.java | 13 +-
20 files changed, 514 insertions(+), 173 deletions(-)
diff --git a/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java b/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java
index e86daea..3f2972f 100644
--- a/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java
+++ b/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java
@@ -21,6 +21,7 @@ package org.apache.druid.timeline;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonUnwrapped;
/**
* DataSegment object plus the overshadowed status for the segment. An immutable object.
@@ -31,6 +32,12 @@ import com.fasterxml.jackson.annotation.JsonProperty;
public class SegmentWithOvershadowedStatus implements Comparable<SegmentWithOvershadowedStatus>
{
private final boolean overshadowed;
+ /**
+ * dataSegment is serialized "unwrapped", i.e. it's properties are included as properties of
+ * enclosing class. If in future, if {@Code SegmentWithOvershadowedStatus} were to extend {@link DataSegment},
+ * there will be no change in the serialized format.
+ */
+ @JsonUnwrapped
private final DataSegment dataSegment;
@JsonCreator
diff --git a/core/src/main/java/org/apache/druid/utils/CollectionUtils.java b/core/src/main/java/org/apache/druid/utils/CollectionUtils.java
index 6b4d3cc..af3cf07 100644
--- a/core/src/main/java/org/apache/druid/utils/CollectionUtils.java
+++ b/core/src/main/java/org/apache/druid/utils/CollectionUtils.java
@@ -19,10 +19,14 @@
package org.apache.druid.utils;
+import com.google.common.collect.Maps;
+
import java.util.AbstractCollection;
import java.util.Collection;
import java.util.Iterator;
+import java.util.Map;
import java.util.Spliterator;
+import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
@@ -68,5 +72,18 @@ public final class CollectionUtils
};
}
- private CollectionUtils() {}
+ /**
+ * Returns a transformed map from the given input map where the value is modified based on the given valueMapper
+ * function.
+ */
+ public static <K, V, V2> Map<K, V2> mapValues(Map<K, V> map, Function<V, V2> valueMapper)
+ {
+ final Map<K, V2> result = Maps.newHashMapWithExpectedSize(map.size());
+ map.forEach((k, v) -> result.put(k, valueMapper.apply(v)));
+ return result;
+ }
+
+ private CollectionUtils()
+ {
+ }
}
diff --git a/core/src/test/java/org/apache/druid/timeline/SegmentWithOvershadowedStatusTest.java b/core/src/test/java/org/apache/druid/timeline/SegmentWithOvershadowedStatusTest.java
new file mode 100644
index 0000000..050f9e0
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/timeline/SegmentWithOvershadowedStatusTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.timeline;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.TestObjectMapper;
+import org.apache.druid.jackson.CommaListJoinDeserializer;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
+import org.apache.druid.timeline.partition.NoneShardSpec;
+import org.apache.druid.timeline.partition.ShardSpec;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class SegmentWithOvershadowedStatusTest
+{
+ private static final ObjectMapper mapper = new TestObjectMapper();
+ private static final int TEST_VERSION = 0x9;
+
+ @Before
+ public void setUp()
+ {
+ InjectableValues.Std injectableValues = new InjectableValues.Std();
+ injectableValues.addValue(DataSegment.PruneLoadSpecHolder.class, DataSegment.PruneLoadSpecHolder.DEFAULT);
+ mapper.setInjectableValues(injectableValues);
+ }
+
+ @Test
+ public void testUnwrappedSegmentWithOvershadowedStatusDeserialization() throws Exception
+ {
+ final Interval interval = Intervals.of("2011-10-01/2011-10-02");
+ final ImmutableMap<String, Object> loadSpec = ImmutableMap.of("something", "or_other");
+
+ final DataSegment dataSegment = new DataSegment(
+ "something",
+ interval,
+ "1",
+ loadSpec,
+ Arrays.asList("dim1", "dim2"),
+ Arrays.asList("met1", "met2"),
+ NoneShardSpec.instance(),
+ TEST_VERSION,
+ 1
+ );
+
+ final SegmentWithOvershadowedStatus segment = new SegmentWithOvershadowedStatus(dataSegment, false);
+
+ final Map<String, Object> objectMap = mapper.readValue(
+ mapper.writeValueAsString(segment),
+ JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
+ );
+
+ Assert.assertEquals(11, objectMap.size());
+ Assert.assertEquals("something", objectMap.get("dataSource"));
+ Assert.assertEquals(interval.toString(), objectMap.get("interval"));
+ Assert.assertEquals("1", objectMap.get("version"));
+ Assert.assertEquals(loadSpec, objectMap.get("loadSpec"));
+ Assert.assertEquals("dim1,dim2", objectMap.get("dimensions"));
+ Assert.assertEquals("met1,met2", objectMap.get("metrics"));
+ Assert.assertEquals(ImmutableMap.of("type", "none"), objectMap.get("shardSpec"));
+ Assert.assertEquals(TEST_VERSION, objectMap.get("binaryVersion"));
+ Assert.assertEquals(1, objectMap.get("size"));
+ Assert.assertEquals(false, objectMap.get("overshadowed"));
+
+ final String json = mapper.writeValueAsString(segment);
+
+ final TestSegmentWithOvershadowedStatus deserializedSegment = mapper.readValue(
+ json,
+ TestSegmentWithOvershadowedStatus.class
+ );
+
+ Assert.assertEquals(segment.getDataSegment().getDataSource(), deserializedSegment.getDataSource());
+ Assert.assertEquals(segment.getDataSegment().getInterval(), deserializedSegment.getInterval());
+ Assert.assertEquals(segment.getDataSegment().getVersion(), deserializedSegment.getVersion());
+ Assert.assertEquals(segment.getDataSegment().getLoadSpec(), deserializedSegment.getLoadSpec());
+ Assert.assertEquals(segment.getDataSegment().getDimensions(), deserializedSegment.getDimensions());
+ Assert.assertEquals(segment.getDataSegment().getMetrics(), deserializedSegment.getMetrics());
+ Assert.assertEquals(segment.getDataSegment().getShardSpec(), deserializedSegment.getShardSpec());
+ Assert.assertEquals(segment.getDataSegment().getSize(), deserializedSegment.getSize());
+ Assert.assertEquals(segment.getDataSegment().getId(), deserializedSegment.getId());
+
+ }
+}
+
+/**
+ * Subclass of DataSegment with overshadowed status
+ */
+class TestSegmentWithOvershadowedStatus extends DataSegment
+{
+ private final boolean overshadowed;
+
+ @JsonCreator
+ public TestSegmentWithOvershadowedStatus(
+ @JsonProperty("dataSource") String dataSource,
+ @JsonProperty("interval") Interval interval,
+ @JsonProperty("version") String version,
+ @JsonProperty("loadSpec") @Nullable Map<String, Object> loadSpec,
+ @JsonProperty("dimensions")
+ @JsonDeserialize(using = CommaListJoinDeserializer.class)
+ @Nullable
+ List<String> dimensions,
+ @JsonProperty("metrics")
+ @JsonDeserialize(using = CommaListJoinDeserializer.class)
+ @Nullable
+ List<String> metrics,
+ @JsonProperty("shardSpec") @Nullable ShardSpec shardSpec,
+ @JsonProperty("binaryVersion") Integer binaryVersion,
+ @JsonProperty("size") long size,
+ @JsonProperty("overshadowed") boolean overshadowed
+ )
+ {
+ super(
+ dataSource,
+ interval,
+ version,
+ loadSpec,
+ dimensions,
+ metrics,
+ shardSpec,
+ binaryVersion,
+ size
+ );
+ this.overshadowed = overshadowed;
+ }
+
+ @JsonProperty
+ public boolean isOvershadowed()
+ {
+ return overshadowed;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof TestSegmentWithOvershadowedStatus)) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ final TestSegmentWithOvershadowedStatus that = (TestSegmentWithOvershadowedStatus) o;
+ if (overshadowed != (that.overshadowed)) {
+ return false;
+ }
+ return true;
+ }
+
+}
diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
index 105afdf..efbdcbe 100644
--- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
+++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
@@ -365,7 +365,7 @@ public class MaterializedViewSupervisor implements Supervisor
// drop derivative segments which interval equals the interval in toDeleteBaseSegments
for (Interval interval : toDropInterval.keySet()) {
for (DataSegment segment : derivativeSegments.get(interval)) {
- segmentManager.removeSegment(segment.getId());
+ segmentManager.removeSegment(segment.getId().toString());
}
}
// data of the latest interval will be built firstly.
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentListActionsTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentListActionsTest.java
index 7d5c64c..fa30dde 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentListActionsTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentListActionsTest.java
@@ -73,7 +73,7 @@ public class SegmentListActionsTest
expectedUsedSegments.forEach(s -> actionTestKit.getTaskLockbox().unlock(task, s.getInterval()));
- expectedUnusedSegments.forEach(s -> actionTestKit.getMetadataSegmentManager().removeSegment(s.getId()));
+ expectedUnusedSegments.forEach(s -> actionTestKit.getMetadataSegmentManager().removeSegment(s.getId().toString()));
}
private DataSegment createSegment(Interval interval, String version)
diff --git a/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json b/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json
index f2046de..4437e72 100644
--- a/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json
+++ b/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json
@@ -13,6 +13,6 @@
"is_available": 1,
"is_realtime": 0,
"is_overshadowed": 0,
- "payload": "{\"dataSegment\":{\"dataSource\":\"auth_test\",\"interval\":\"2012-12-29T00:00:00.000Z/2013-01-10T08:00:00.000Z\",\"version\":\"2013-01-10T08:13:47.830Z_v9\",\"loadSpec\":{\"load spec is pruned, because it's not needed on Brokers, but eats a lot of heap space\":\"\"},\"dimensions\":\"anonymous,area_code,city,continent_code,country_name,dma_code,geo,language,namespace,network,newpage,page,postal_code,region_lookup,robot,unpatrolled,user\",\"metrics\":\"added,count,deleted, [...]
+ "payload": "{\"dataSource\":\"auth_test\",\"interval\":\"2012-12-29T00:00:00.000Z/2013-01-10T08:00:00.000Z\",\"version\":\"2013-01-10T08:13:47.830Z_v9\",\"loadSpec\":{\"load spec is pruned, because it's not needed on Brokers, but eats a lot of heap space\":\"\"},\"dimensions\":\"anonymous,area_code,city,continent_code,country_name,dma_code,geo,language,namespace,network,newpage,page,postal_code,region_lookup,robot,unpatrolled,user\",\"metrics\":\"added,count,deleted,delta,delta_hist, [...]
}
]
diff --git a/server/src/main/java/org/apache/druid/client/DataSourcesSnapshot.java b/server/src/main/java/org/apache/druid/client/DataSourcesSnapshot.java
new file mode 100644
index 0000000..8417662
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/client/DataSourcesSnapshot.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Ordering;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.VersionedIntervalTimeline;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * An immutable snapshot of fields from {@link org.apache.druid.metadata.SQLMetadataSegmentManager} (dataSources and
+ * overshadowedSegments). Getters of {@link org.apache.druid.metadata.MetadataSegmentManager} should use this snapshot
+ * to return dataSources and overshadowedSegments.
+ */
+public class DataSourcesSnapshot
+{
+ private final Map<String, ImmutableDruidDataSource> dataSources;
+ private final ImmutableSet<SegmentId> overshadowedSegments;
+
+ public DataSourcesSnapshot(
+ Map<String, ImmutableDruidDataSource> dataSources
+ )
+ {
+ this.dataSources = dataSources;
+ this.overshadowedSegments = ImmutableSet.copyOf(determineOvershadowedSegments());
+ }
+
+ public Collection<ImmutableDruidDataSource> getDataSources()
+ {
+ return dataSources.values();
+ }
+
+ public Map<String, ImmutableDruidDataSource> getDataSourcesMap()
+ {
+ return dataSources;
+ }
+
+ @Nullable
+ public ImmutableDruidDataSource getDataSource(String dataSourceName)
+ {
+ return dataSources.get(dataSourceName);
+ }
+
+ public ImmutableSet<SegmentId> getOvershadowedSegments()
+ {
+ return overshadowedSegments;
+ }
+
+ @Nullable
+ public Iterable<DataSegment> iterateAllSegmentsInSnapshot()
+ {
+ if (dataSources == null) {
+ return null;
+ }
+ return () -> dataSources.values().stream()
+ .flatMap(dataSource -> dataSource.getSegments().stream())
+ .iterator();
+ }
+
+ /**
+ * This method builds timelines from all dataSources and finds the overshadowed segments list
+ *
+ * @return overshadowed segment Ids list
+ */
+ private List<SegmentId> determineOvershadowedSegments()
+ {
+ final List<DataSegment> segments = dataSources.values().stream()
+ .flatMap(ds -> ds.getSegments().stream())
+ .collect(Collectors.toList());
+ final Map<String, VersionedIntervalTimeline<String, DataSegment>> timelines = new HashMap<>();
+ segments.forEach(segment -> timelines
+ .computeIfAbsent(segment.getDataSource(), dataSource -> new VersionedIntervalTimeline<>(Ordering.natural()))
+ .add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment)));
+
+ // It's fine to add all overshadowed segments to a single collection because only
+ // a small fraction of the segments in the cluster are expected to be overshadowed,
+ // so building this collection shouldn't generate a lot of garbage.
+ final List<SegmentId> overshadowedSegments = new ArrayList<>();
+ for (DataSegment dataSegment : segments) {
+ final VersionedIntervalTimeline<String, DataSegment> timeline = timelines.get(dataSegment.getDataSource());
+ if (timeline != null && timeline.isOvershadowed(dataSegment.getInterval(), dataSegment.getVersion())) {
+ overshadowedSegments.add(dataSegment.getId());
+ }
+ }
+ return overshadowedSegments;
+ }
+
+}
diff --git a/server/src/main/java/org/apache/druid/client/ImmutableDruidDataSource.java b/server/src/main/java/org/apache/druid/client/ImmutableDruidDataSource.java
index 841b716..5953944 100644
--- a/server/src/main/java/org/apache/druid/client/ImmutableDruidDataSource.java
+++ b/server/src/main/java/org/apache/druid/client/ImmutableDruidDataSource.java
@@ -25,17 +25,12 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSortedMap;
-import com.google.common.collect.Ordering;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
-import org.apache.druid.timeline.VersionedIntervalTimeline;
import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
-import java.util.Set;
/**
* An immutable collection of metadata of segments ({@link DataSegment} objects), belonging to a particular data source.
@@ -114,41 +109,6 @@ public class ImmutableDruidDataSource
return totalSizeOfSegments;
}
- /**
- * This method finds the overshadowed segments from the given segments
- *
- * @return set of overshadowed segments
- */
- public static Set<DataSegment> determineOvershadowedSegments(Iterable<DataSegment> segments)
- {
- final Map<String, VersionedIntervalTimeline<String, DataSegment>> timelines = buildTimelines(segments);
-
- final Set<DataSegment> overshadowedSegments = new HashSet<>();
- for (DataSegment dataSegment : segments) {
- final VersionedIntervalTimeline<String, DataSegment> timeline = timelines.get(dataSegment.getDataSource());
- if (timeline != null && timeline.isOvershadowed(dataSegment.getInterval(), dataSegment.getVersion())) {
- overshadowedSegments.add(dataSegment);
- }
- }
- return overshadowedSegments;
- }
-
- /**
- * Builds a timeline from given segments
- *
- * @return map of datasource to VersionedIntervalTimeline of segments
- */
- private static Map<String, VersionedIntervalTimeline<String, DataSegment>> buildTimelines(
- Iterable<DataSegment> segments
- )
- {
- final Map<String, VersionedIntervalTimeline<String, DataSegment>> timelines = new HashMap<>();
- segments.forEach(segment -> timelines
- .computeIfAbsent(segment.getDataSource(), dataSource -> new VersionedIntervalTimeline<>(Ordering.natural()))
- .add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment)));
- return timelines;
- }
-
@Override
public String toString()
{
diff --git a/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java b/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java
index 4fc5177..db1fbef 100644
--- a/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java
+++ b/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java
@@ -20,6 +20,7 @@
package org.apache.druid.metadata;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
@@ -28,6 +29,7 @@ import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.Collection;
import java.util.List;
+import java.util.Set;
/**
*/
@@ -57,14 +59,9 @@ public interface MetadataSegmentManager
boolean removeDataSource(String dataSource);
/**
- * Prefer {@link #removeSegment(SegmentId)} to this method when possible.
- *
- * This method is not removed because {@link org.apache.druid.server.http.DataSourcesResource#deleteDatasourceSegment}
- * uses it and if it migrates to {@link #removeSegment(SegmentId)} the performance will be worse.
+ * Removes the given segmentId from metadata store. Returns true if one or more rows were affected.
*/
- boolean removeSegment(String dataSource, String segmentId);
-
- boolean removeSegment(SegmentId segmentId);
+ boolean removeSegment(String segmentId);
long disableSegments(String dataSource, Collection<String> segmentIds);
@@ -99,6 +96,22 @@ public interface MetadataSegmentManager
Collection<String> getAllDataSourceNames();
/**
+ * Returns a set of overshadowed segment Ids
+ *
+ * Will return null if we do not have a valid snapshot of segments yet (perhaps the underlying metadata store has
+ * not yet been polled.)
+ */
+ @Nullable
+ Set<SegmentId> getOvershadowedSegments();
+
+ /**
+ * Returns a snapshot of DruidDataSources and overshadowed segments
+ *
+ */
+ @Nullable
+ DataSourcesSnapshot getDataSourcesSnapshot();
+
+ /**
* Returns top N unused segment intervals in given interval when ordered by segment start time, end time.
*/
List<Interval> getUnusedSegmentIntervals(String dataSource, Interval interval, int limit);
diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java
index 35843c9..bb85142 100644
--- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java
+++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java
@@ -24,6 +24,7 @@ import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
+import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.DruidDataSource;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.guice.ManageLifecycle;
@@ -40,6 +41,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;
+import org.apache.druid.utils.CollectionUtils;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.skife.jdbi.v2.BaseResultSetMapper;
@@ -65,6 +67,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -103,11 +106,12 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
private final SQLMetadataConnector connector;
// Volatile since this reference is reassigned in "poll" and then read from in other threads.
- // Starts null so we can differentiate "never polled" (null) from "polled, but empty" (empty map).
+ // Starts null so we can differentiate "never polled" (null) from "polled, but empty" (empty dataSources map and
+ // empty overshadowedSegments set).
// Note that this is not simply a lazy-initialized variable: it starts off as null, and may transition between
// null and nonnull multiple times as stop() and start() are called.
@Nullable
- private volatile ConcurrentHashMap<String, DruidDataSource> dataSources = null;
+ private volatile DataSourcesSnapshot dataSourcesSnapshot = null;
/**
* The number of times this SQLMetadataSegmentManager was started.
@@ -206,8 +210,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
if (!isStarted()) {
return;
}
-
- dataSources = null;
+ dataSourcesSnapshot = null;
currentStartOrder = -1;
exec.shutdownNow();
exec = null;
@@ -449,8 +452,6 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
).bind("dataSource", dataSource).execute()
);
- Optional.ofNullable(dataSources).ifPresent(m -> m.remove(dataSource));
-
if (removed == 0) {
return false;
}
@@ -463,58 +464,15 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
return true;
}
+ /**
+ * This method does not update {@code dataSourcesSnapshot}, see the comments in {@code doPoll()} about
+ * snapshot update. The segment removal will be reflected after next poll cyccle runs.
+ */
@Override
- public boolean removeSegment(String dataSourceName, final String segmentId)
- {
- try {
- final boolean removed = removeSegmentFromTable(segmentId);
-
- // Call iteratePossibleParsingsWithDataSource() outside of dataSources.computeIfPresent() because the former is a
- // potentially expensive operation, while lambda to be passed into computeIfPresent() should preferably run fast.
- List<SegmentId> possibleSegmentIds = SegmentId.iteratePossibleParsingsWithDataSource(dataSourceName, segmentId);
- Optional.ofNullable(dataSources).ifPresent(
- m ->
- m.computeIfPresent(
- dataSourceName,
- (dsName, dataSource) -> {
- for (SegmentId possibleSegmentId : possibleSegmentIds) {
- if (dataSource.removeSegment(possibleSegmentId) != null) {
- break;
- }
- }
- // Returning null from the lambda here makes the ConcurrentHashMap to remove the current entry.
- //noinspection ReturnOfNull
- return dataSource.isEmpty() ? null : dataSource;
- }
- )
- );
-
- return removed;
- }
- catch (Exception e) {
- log.error(e, e.toString());
- return false;
- }
- }
-
- @Override
- public boolean removeSegment(SegmentId segmentId)
+ public boolean removeSegment(String segmentId)
{
try {
- final boolean removed = removeSegmentFromTable(segmentId.toString());
- Optional.ofNullable(dataSources).ifPresent(
- m ->
- m.computeIfPresent(
- segmentId.getDataSource(),
- (dsName, dataSource) -> {
- dataSource.removeSegment(segmentId);
- // Returning null from the lambda here makes the ConcurrentHashMap to remove the current entry.
- //noinspection ReturnOfNull
- return dataSource.isEmpty() ? null : dataSource;
- }
- )
- );
- return removed;
+ return removeSegmentFromTable(segmentId);
}
catch (Exception e) {
log.error(e, e.toString());
@@ -607,37 +565,47 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
@Nullable
public ImmutableDruidDataSource getDataSource(String dataSourceName)
{
- final DruidDataSource dataSource = Optional.ofNullable(dataSources).map(m -> m.get(dataSourceName)).orElse(null);
- return dataSource == null ? null : dataSource.toImmutableDruidDataSource();
+ final ImmutableDruidDataSource dataSource = Optional.ofNullable(dataSourcesSnapshot)
+ .map(m -> m.getDataSourcesMap().get(dataSourceName))
+ .orElse(null);
+ return dataSource == null ? null : dataSource;
}
@Override
@Nullable
public Collection<ImmutableDruidDataSource> getDataSources()
{
- return Optional.ofNullable(dataSources)
- .map(m ->
- m.values()
- .stream()
- .map(DruidDataSource::toImmutableDruidDataSource)
- .collect(Collectors.toList())
- )
- .orElse(null);
+ return Optional.ofNullable(dataSourcesSnapshot).map(m -> m.getDataSources()).orElse(null);
}
@Override
@Nullable
public Iterable<DataSegment> iterateAllSegments()
{
- final ConcurrentHashMap<String, DruidDataSource> dataSourcesSnapshot = dataSources;
- if (dataSourcesSnapshot == null) {
+ final Collection<ImmutableDruidDataSource> dataSources = Optional.ofNullable(dataSourcesSnapshot)
+ .map(m -> m.getDataSources())
+ .orElse(null);
+ if (dataSources == null) {
return null;
}
- return () -> dataSourcesSnapshot.values()
- .stream()
- .flatMap(dataSource -> dataSource.getSegments().stream())
- .iterator();
+ return () -> dataSources.stream()
+ .flatMap(dataSource -> dataSource.getSegments().stream())
+ .iterator();
+ }
+
+ @Override
+ @Nullable
+ public Set<SegmentId> getOvershadowedSegments()
+ {
+ return Optional.ofNullable(dataSourcesSnapshot).map(m -> m.getOvershadowedSegments()).orElse(null);
+ }
+
+ @Nullable
+ @Override
+ public DataSourcesSnapshot getDataSourcesSnapshot()
+ {
+ return dataSourcesSnapshot;
}
@Override
@@ -742,14 +710,26 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
.addSegmentIfAbsent(segment);
});
- // Replace "dataSources" atomically.
- dataSources = newDataSources;
+ // dataSourcesSnapshot is updated only here, please note that if datasources or segments are enabled or disabled
+ // outside of poll, the dataSourcesSnapshot can become invalid until the next poll cycle.
+ // DataSourcesSnapshot computes the overshadowed segments, which makes it an expensive operation if the
+ // snapshot is invalidated on each segment removal, especially if a user issues a lot of single segment remove
+ // calls in rapid succession. So the snapshot update is not done outside of poll at this time.
+ // Updates outside of poll(), were primarily for the user experience, so users would immediately see the effect of
+ // a segment remove call reflected in MetadataResource API calls. These updates outside of scheduled poll may be
+ // added back in removeDataSource and removeSegment methods after the on-demand polling changes from
+ // https://github.com/apache/incubator-druid/pull/7653 are in.
+ final Map<String, ImmutableDruidDataSource> updatedDataSources = CollectionUtils.mapValues(
+ newDataSources,
+ v -> v.toImmutableDruidDataSource()
+ );
+ dataSourcesSnapshot = new DataSourcesSnapshot(updatedDataSources);
}
/**
* For the garbage collector in Java, it's better to keep new objects short-living, but once they are old enough
* (i. e. promoted to old generation), try to keep them alive. In {@link #poll()}, we fetch and deserialize all
- * existing segments each time, and then replace them in {@link #dataSources}. This method allows to use already
+ * existing segments each time, and then replace them in {@link #dataSourcesSnapshot}. This method allows to use already
* existing (old) segments when possible, effectively interning them a-la {@link String#intern} or {@link
* com.google.common.collect.Interner}, aiming to make the majority of {@link DataSegment} objects garbage soon after
* they are deserialized and to die in young generation. It allows to avoid fragmentation of the old generation and
@@ -757,7 +737,9 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
*/
private DataSegment replaceWithExistingSegmentIfPresent(DataSegment segment)
{
- DruidDataSource dataSource = Optional.ofNullable(dataSources).map(m -> m.get(segment.getDataSource())).orElse(null);
+ ImmutableDruidDataSource dataSource = Optional.ofNullable(dataSourcesSnapshot)
+ .map(m -> m.getDataSourcesMap().get(segment.getDataSource()))
+ .orElse(null);
if (dataSource == null) {
return segment;
}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index 27bee2f..ba42968 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -19,6 +19,7 @@
package org.apache.druid.server.coordinator;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
@@ -29,6 +30,7 @@ import it.unimi.dsi.fastutil.objects.Object2LongMap;
import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.ZKPaths;
+import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.DruidDataSource;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ImmutableDruidDataSource;
@@ -82,6 +84,7 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
@@ -145,6 +148,10 @@ public class DruidCoordinator
private volatile boolean started = false;
private volatile SegmentReplicantLookup segmentReplicantLookup = null;
+ /**
+ * set in {@link CoordinatorRunnable#run()} at start of every coordinator run
+ */
+ private volatile DataSourcesSnapshot dataSourcesSnapshot = null;
@Inject
public DruidCoordinator(
@@ -316,7 +323,9 @@ public class DruidCoordinator
public Map<String, Double> getLoadStatus()
{
final Map<String, Double> loadStatus = new HashMap<>();
- final Collection<ImmutableDruidDataSource> dataSources = metadataSegmentManager.getDataSources();
+ final Collection<ImmutableDruidDataSource> dataSources = Optional.ofNullable(dataSourcesSnapshot)
+ .map(m -> m.getDataSources())
+ .orElse(null);
if (dataSources == null) {
return loadStatus;
@@ -365,7 +374,7 @@ public class DruidCoordinator
public void removeSegment(DataSegment segment)
{
log.info("Removing Segment[%s]", segment.getId());
- metadataSegmentManager.removeSegment(segment.getId());
+ metadataSegmentManager.removeSegment(segment.getId().toString());
}
public String getCurrentLeader()
@@ -373,6 +382,12 @@ public class DruidCoordinator
return coordLeaderSelector.getCurrentLeader();
}
+ @VisibleForTesting
+ void setDataSourcesSnapshotForTest(DataSourcesSnapshot snapshot)
+ {
+ dataSourcesSnapshot = snapshot;
+ }
+
public void moveSegment(
ImmutableDruidServer fromServer,
ImmutableDruidServer toServer,
@@ -393,7 +408,9 @@ public class DruidCoordinator
throw new IAE("Cannot move [%s] to and from the same server [%s]", segmentId, fromServer.getName());
}
- ImmutableDruidDataSource dataSource = metadataSegmentManager.getDataSource(segment.getDataSource());
+ ImmutableDruidDataSource dataSource = Optional.ofNullable(dataSourcesSnapshot)
+ .map(m -> m.getDataSource(segment.getDataSource()))
+ .orElse(null);
if (dataSource == null) {
throw new IAE("Unable to find dataSource for segment [%s] in metadata", segmentId);
}
@@ -483,7 +500,10 @@ public class DruidCoordinator
@Nullable
public Iterable<DataSegment> iterateAvailableDataSegments()
{
- return metadataSegmentManager.iterateAllSegments();
+ final Iterable<DataSegment> dataSources = Optional.ofNullable(dataSourcesSnapshot)
+ .map(m -> m.iterateAllSegmentsInSnapshot())
+ .orElse(null);
+ return dataSources == null ? null : dataSources;
}
@LifecycleStart
@@ -670,7 +690,11 @@ public class DruidCoordinator
BalancerStrategy balancerStrategy = factory.createBalancerStrategy(balancerExec);
// Do coordinator stuff.
- final Collection<ImmutableDruidDataSource> dataSources = metadataSegmentManager.getDataSources();
+ dataSourcesSnapshot = metadataSegmentManager.getDataSourcesSnapshot();
+ final Collection<ImmutableDruidDataSource> dataSources = Optional.ofNullable(dataSourcesSnapshot)
+ .map(m -> m.getDataSources())
+ .orElse(null);
+
if (dataSources == null) {
log.info("Metadata store not polled yet, skipping this run.");
return;
@@ -684,6 +708,7 @@ public class DruidCoordinator
.withCompactionConfig(getCompactionConfig())
.withEmitter(emitter)
.withBalancerStrategy(balancerStrategy)
+ .withDataSourcesSnapshot(dataSourcesSnapshot)
.build();
for (DruidCoordinatorHelper helper : helpers) {
// Don't read state and run state in the same helper otherwise racy conditions may exist
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java
index 655d6bd..de75bce 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java
@@ -21,6 +21,7 @@ package org.apache.druid.server.coordinator;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
@@ -66,6 +67,7 @@ public class DruidCoordinatorRuntimeParams
private final CoordinatorStats stats;
private final DateTime balancerReferenceTimestamp;
private final BalancerStrategy balancerStrategy;
+ private final DataSourcesSnapshot dataSourcesSnapshot;
private DruidCoordinatorRuntimeParams(
long startTime,
@@ -81,7 +83,8 @@ public class DruidCoordinatorRuntimeParams
CoordinatorCompactionConfig coordinatorCompactionConfig,
CoordinatorStats stats,
DateTime balancerReferenceTimestamp,
- BalancerStrategy balancerStrategy
+ BalancerStrategy balancerStrategy,
+ DataSourcesSnapshot dataSourcesSnapshot
)
{
this.startTime = startTime;
@@ -98,6 +101,7 @@ public class DruidCoordinatorRuntimeParams
this.stats = stats;
this.balancerReferenceTimestamp = balancerReferenceTimestamp;
this.balancerStrategy = balancerStrategy;
+ this.dataSourcesSnapshot = dataSourcesSnapshot;
}
public long getStartTime()
@@ -171,6 +175,11 @@ public class DruidCoordinatorRuntimeParams
return balancerStrategy;
}
+ public DataSourcesSnapshot getDataSourcesSnapshot()
+ {
+ return dataSourcesSnapshot;
+ }
+
public boolean hasDeletionWaitTimeElapsed()
{
return (System.currentTimeMillis() - getStartTime() > coordinatorDynamicConfig.getMillisToWaitBeforeDeleting());
@@ -237,6 +246,7 @@ public class DruidCoordinatorRuntimeParams
private CoordinatorStats stats;
private DateTime balancerReferenceTimestamp;
private BalancerStrategy balancerStrategy;
+ private DataSourcesSnapshot dataSourcesSnapshot;
Builder()
{
@@ -253,6 +263,7 @@ public class DruidCoordinatorRuntimeParams
this.coordinatorDynamicConfig = CoordinatorDynamicConfig.builder().build();
this.coordinatorCompactionConfig = CoordinatorCompactionConfig.empty();
this.balancerReferenceTimestamp = DateTimes.nowUtc();
+ this.dataSourcesSnapshot = null;
}
Builder(
@@ -304,7 +315,8 @@ public class DruidCoordinatorRuntimeParams
coordinatorCompactionConfig,
stats,
balancerReferenceTimestamp,
- balancerStrategy
+ balancerStrategy,
+ dataSourcesSnapshot
);
}
@@ -434,5 +446,11 @@ public class DruidCoordinatorRuntimeParams
this.balancerStrategy = balancerStrategy;
return this;
}
+
+ public Builder withDataSourcesSnapshot(DataSourcesSnapshot snapshot)
+ {
+ this.dataSourcesSnapshot = snapshot;
+ return this;
+ }
}
}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java
index bbceaaf..7570c81 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java
@@ -19,8 +19,9 @@
package org.apache.druid.server.coordinator.helper;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
-import org.apache.druid.client.ImmutableDruidDataSource;
+import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.metadata.MetadataRuleManager;
@@ -35,6 +36,7 @@ import org.apache.druid.timeline.SegmentId;
import org.joda.time.DateTime;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
/**
@@ -84,8 +86,14 @@ public class DruidCoordinatorRuleRunner implements DruidCoordinatorHelper
// find available segments which are not overshadowed by other segments in DB
// only those would need to be loaded/dropped
// anything overshadowed by served segments is dropped automatically by DruidCoordinatorCleanupOvershadowed
- final Set<DataSegment> overshadowed = ImmutableDruidDataSource
- .determineOvershadowedSegments(params.getAvailableSegments());
+ // If metadata store hasn't been polled yet, use empty overshadowed list
+ final DataSourcesSnapshot dataSourcesSnapshot = params.getDataSourcesSnapshot();
+ Set<SegmentId> overshadowed = ImmutableSet.of();
+ if (dataSourcesSnapshot != null) {
+ overshadowed = Optional
+ .ofNullable(dataSourcesSnapshot.getOvershadowedSegments())
+ .orElse(ImmutableSet.of());
+ }
for (String tier : cluster.getTierNames()) {
replicatorThrottler.updateReplicationState(tier);
@@ -103,7 +111,7 @@ public class DruidCoordinatorRuleRunner implements DruidCoordinatorHelper
final List<SegmentId> segmentsWithMissingRules = Lists.newArrayListWithCapacity(MAX_MISSING_RULES);
int missingRules = 0;
for (DataSegment segment : params.getAvailableSegments()) {
- if (overshadowed.contains(segment)) {
+ if (overshadowed.contains(segment.getId())) {
// Skipping overshadowed segments
continue;
}
diff --git a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
index 5904d52..3787cc6 100644
--- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
@@ -434,7 +434,7 @@ public class DataSourcesResource
@PathParam("segmentId") String segmentId
)
{
- if (databaseSegmentManager.removeSegment(dataSourceName, segmentId)) {
+ if (databaseSegmentManager.removeSegment(segmentId)) {
return Response.ok().build();
}
return Response.noContent().build();
diff --git a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java
index 3c7e8ac..556ed3d 100644
--- a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java
@@ -52,7 +52,6 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
@@ -163,11 +162,11 @@ public class MetadataResource
final Stream<DataSegment> metadataSegments = dataSourceStream.flatMap(t -> t.getSegments().stream());
if (includeOvershadowedStatus != null) {
- final Iterable<SegmentWithOvershadowedStatus> authorizedSegments = findAuthorizedSegmentWithOvershadowedStatus(
- req,
- druidDataSources,
- metadataSegments
- );
+ final Iterable<SegmentWithOvershadowedStatus> authorizedSegments =
+ findAuthorizedSegmentWithOvershadowedStatus(
+ req,
+ metadataSegments
+ );
Response.ResponseBuilder builder = Response.status(Response.Status.OK);
return builder.entity(authorizedSegments).build();
} else {
@@ -189,22 +188,18 @@ public class MetadataResource
private Iterable<SegmentWithOvershadowedStatus> findAuthorizedSegmentWithOvershadowedStatus(
HttpServletRequest req,
- Collection<ImmutableDruidDataSource> druidDataSources,
Stream<DataSegment> metadataSegments
)
{
- // It's fine to add all overshadowed segments to a single collection because only
- // a small fraction of the segments in the cluster are expected to be overshadowed,
- // so building this collection shouldn't generate a lot of garbage.
- final Set<DataSegment> overshadowedSegments = new HashSet<>();
- for (ImmutableDruidDataSource dataSource : druidDataSources) {
- overshadowedSegments.addAll(ImmutableDruidDataSource.determineOvershadowedSegments(dataSource.getSegments()));
- }
+ // If metadata store hasn't been polled yet, use empty overshadowed list
+ final Set<SegmentId> overshadowedSegments = Optional
+ .ofNullable(metadataSegmentManager.getOvershadowedSegments())
+ .orElse(Collections.emptySet());
final Stream<SegmentWithOvershadowedStatus> segmentsWithOvershadowedStatus = metadataSegments
.map(segment -> new SegmentWithOvershadowedStatus(
segment,
- overshadowedSegments.contains(segment)
+ overshadowedSegments.contains(segment.getId())
));
final Function<SegmentWithOvershadowedStatus, Iterable<ResourceAction>> raGenerator = segment -> Collections
diff --git a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSegmentManagerTest.java b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSegmentManagerTest.java
index e796448..a5d436b 100644
--- a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSegmentManagerTest.java
+++ b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSegmentManagerTest.java
@@ -294,7 +294,7 @@ public class SQLMetadataSegmentManagerTest
publisher.publishSegment(newSegment);
Assert.assertNull(manager.getDataSource(newDataSource));
- Assert.assertTrue(manager.removeSegment(newSegment.getId()));
+ Assert.assertTrue(manager.removeSegment(newSegment.getId().toString()));
}
@Test
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
index 3b22235..9cd0206 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
@@ -32,6 +32,7 @@ import org.apache.curator.utils.ZKPaths;
import org.apache.druid.client.BatchServerInventoryView;
import org.apache.druid.client.CoordinatorSegmentWatcherConfig;
import org.apache.druid.client.CoordinatorServerView;
+import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.common.config.JacksonConfigManager;
@@ -96,6 +97,7 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
private ObjectMapper objectMapper;
private JacksonConfigManager configManager;
private DruidNode druidNode;
+ private DataSourcesSnapshot dataSourcesSnapshot;
private static final String SEGPATH = "/druid/segments";
private static final String SOURCE_LOAD_PATH = "/druid/loadQueue/localhost:1";
private static final String DESTINATION_LOAD_PATH = "/druid/loadQueue/localhost:2";
@@ -127,6 +129,7 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
databaseSegmentManager = EasyMock.createNiceMock(MetadataSegmentManager.class);
metadataRuleManager = EasyMock.createNiceMock(MetadataRuleManager.class);
configManager = EasyMock.createNiceMock(JacksonConfigManager.class);
+ dataSourcesSnapshot = EasyMock.createNiceMock(DataSourcesSnapshot.class);
EasyMock.expect(
configManager.watch(
EasyMock.eq(CoordinatorDynamicConfig.CONFIG_KEY),
@@ -368,6 +371,9 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
EasyMock.expect(databaseSegmentManager.getDataSource(EasyMock.anyString())).andReturn(druidDataSource);
EasyMock.replay(databaseSegmentManager);
+ coordinator.setDataSourcesSnapshotForTest(dataSourcesSnapshot);
+ EasyMock.expect(dataSourcesSnapshot.getDataSource(EasyMock.anyString())).andReturn(druidDataSource).anyTimes();
+ EasyMock.replay(dataSourcesSnapshot);
coordinator.moveSegment(
source.toImmutableDruidServer(),
dest.toImmutableDruidServer(),
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java
index 51a7930..7d38fa0 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java
@@ -20,9 +20,11 @@
package org.apache.druid.server.coordinator;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.DruidServer;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
@@ -30,6 +32,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
import org.apache.druid.metadata.MetadataRuleManager;
+import org.apache.druid.metadata.MetadataSegmentManager;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.helper.DruidCoordinatorRuleRunner;
@@ -67,6 +70,8 @@ public class DruidCoordinatorRuleRunnerTest
private DruidCoordinatorRuleRunner ruleRunner;
private ServiceEmitter emitter;
private MetadataRuleManager databaseRuleManager;
+ private MetadataSegmentManager databaseSegmentManager;
+ private DataSourcesSnapshot dataSourcesSnapshot;
@Before
public void setUp()
@@ -76,6 +81,8 @@ public class DruidCoordinatorRuleRunnerTest
emitter = EasyMock.createMock(ServiceEmitter.class);
EmittingLogger.registerEmitter(emitter);
databaseRuleManager = EasyMock.createMock(MetadataRuleManager.class);
+ databaseSegmentManager = EasyMock.createNiceMock(MetadataSegmentManager.class);
+ dataSourcesSnapshot = EasyMock.createNiceMock(DataSourcesSnapshot.class);
DateTime start = DateTimes.of("2012-01-01");
availableSegments = new ArrayList<>();
@@ -989,7 +996,9 @@ public class DruidCoordinatorRuleRunnerTest
@Test
public void testReplicantThrottle()
{
- mockCoordinator();
+ EasyMock.expect(dataSourcesSnapshot.getOvershadowedSegments()).andReturn(ImmutableSet.of()).anyTimes();
+ EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(createCoordinatorDynamicConfig()).anyTimes();
+ EasyMock.replay(coordinator, databaseSegmentManager, dataSourcesSnapshot);
mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
mockEmptyPeon();
@@ -1114,6 +1123,8 @@ public class DruidCoordinatorRuleRunnerTest
.build()
)
.atLeastOnce();
+ EasyMock.expect(dataSourcesSnapshot.getOvershadowedSegments()).andReturn(ImmutableSet.of()).anyTimes();
+ EasyMock.replay(dataSourcesSnapshot);
coordinator.removeSegment(EasyMock.anyObject());
EasyMock.expectLastCall().anyTimes();
EasyMock.replay(coordinator);
@@ -1330,8 +1341,9 @@ public class DruidCoordinatorRuleRunnerTest
);
availableSegments.add(v1);
availableSegments.add(v2);
-
- mockCoordinator();
+ EasyMock.expect(dataSourcesSnapshot.getOvershadowedSegments()).andReturn(ImmutableSet.of(v1.getId())).anyTimes();
+ EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(createCoordinatorDynamicConfig()).anyTimes();
+ EasyMock.replay(coordinator, dataSourcesSnapshot);
mockPeon.loadSegment(EasyMock.eq(v2), EasyMock.anyObject());
EasyMock.expectLastCall().once();
mockEmptyPeon();
@@ -1375,6 +1387,7 @@ public class DruidCoordinatorRuleRunnerTest
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
.withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(5).build())
+ .withDataSourcesSnapshot(dataSourcesSnapshot)
.build();
DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params);
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
index 793bd28..bb44a6c 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
@@ -29,6 +29,7 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths;
+import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.DruidDataSource;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ImmutableDruidDataSource;
@@ -99,6 +100,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
private ObjectMapper objectMapper;
private DruidNode druidNode;
private LatchableServiceEmitter serviceEmitter = new LatchableServiceEmitter();
+ private DataSourcesSnapshot dataSourcesSnapshot;
@Before
public void setUp() throws Exception
@@ -106,8 +108,12 @@ public class DruidCoordinatorTest extends CuratorTestBase
druidServer = EasyMock.createMock(DruidServer.class);
serverInventoryView = EasyMock.createMock(SingleServerInventoryView.class);
databaseSegmentManager = EasyMock.createNiceMock(MetadataSegmentManager.class);
+ dataSourcesSnapshot = EasyMock.createNiceMock(DataSourcesSnapshot.class);
metadataRuleManager = EasyMock.createNiceMock(MetadataRuleManager.class);
JacksonConfigManager configManager = EasyMock.createNiceMock(JacksonConfigManager.class);
+ EasyMock.expect(databaseSegmentManager.getDataSourcesSnapshot()).andReturn(dataSourcesSnapshot).anyTimes();
+ EasyMock.expect(databaseSegmentManager.isStarted()).andReturn(true).anyTimes();
+ EasyMock.replay(databaseSegmentManager);
EasyMock.expect(
configManager.watch(
EasyMock.eq(CoordinatorDynamicConfig.CONFIG_KEY),
@@ -248,8 +254,9 @@ public class DruidCoordinatorTest extends CuratorTestBase
ImmutableDruidDataSource druidDataSource = EasyMock.createNiceMock(ImmutableDruidDataSource.class);
EasyMock.expect(druidDataSource.getSegment(EasyMock.anyObject(SegmentId.class))).andReturn(segment);
EasyMock.replay(druidDataSource);
- EasyMock.expect(databaseSegmentManager.getDataSource(EasyMock.anyString())).andReturn(druidDataSource);
- EasyMock.replay(databaseSegmentManager);
+ coordinator.setDataSourcesSnapshotForTest(dataSourcesSnapshot);
+ EasyMock.expect(dataSourcesSnapshot.getDataSource(EasyMock.anyString())).andReturn(druidDataSource).anyTimes();
+ EasyMock.replay(dataSourcesSnapshot);
scheduledExecutorFactory = EasyMock.createNiceMock(ScheduledExecutorFactory.class);
EasyMock.replay(scheduledExecutorFactory);
EasyMock.replay(metadataRuleManager);
@@ -533,20 +540,15 @@ public class DruidCoordinatorTest extends CuratorTestBase
private void setupMetadataSegmentManagerMock(DruidDataSource dataSource)
{
- EasyMock.expect(databaseSegmentManager.isStarted()).andReturn(true).anyTimes();
EasyMock
- .expect(databaseSegmentManager.iterateAllSegments())
+ .expect(dataSourcesSnapshot.iterateAllSegmentsInSnapshot())
.andReturn(dataSource.getSegments())
.anyTimes();
EasyMock
- .expect(databaseSegmentManager.getDataSources())
+ .expect(dataSourcesSnapshot.getDataSources())
.andReturn(Collections.singleton(dataSource.toImmutableDruidDataSource()))
.anyTimes();
- EasyMock
- .expect(databaseSegmentManager.getAllDataSourceNames())
- .andReturn(Collections.singleton(dataSource.getName()))
- .anyTimes();
- EasyMock.replay(databaseSegmentManager);
+ EasyMock.replay(dataSourcesSnapshot);
}
@Nullable
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
index e5cfa91..0289788 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
@@ -348,12 +348,13 @@ public class SystemSchema extends AbstractSchema
final AuthenticationResult authenticationResult =
(AuthenticationResult) root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT);
- final Iterable<SegmentWithOvershadowedStatus> authorizedSegments = AuthorizationUtils.filterAuthorizedResources(
- authenticationResult,
- () -> it,
- SEGMENT_WITH_OVERSHADOWED_STATUS_RA_GENERATOR,
- authorizerMapper
- );
+ final Iterable<SegmentWithOvershadowedStatus> authorizedSegments = AuthorizationUtils
+ .filterAuthorizedResources(
+ authenticationResult,
+ () -> it,
+ SEGMENT_WITH_OVERSHADOWED_STATUS_RA_GENERATOR,
+ authorizerMapper
+ );
return authorizedSegments.iterator();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org