You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ab...@apache.org on 2021/08/06 18:42:57 UTC
[druid] branch master updated: Add unit test for config
`druid.broker.segment.watchedTiers` (#11555)
This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 39a3db7 Add unit test for config `druid.broker.segment.watchedTiers` (#11555)
39a3db7 is described below
commit 39a3db79437a4d331bd68da6c627ad3c83019858
Author: Kashif Faraz <ka...@gmail.com>
AuthorDate: Sat Aug 7 00:12:40 2021 +0530
Add unit test for config `druid.broker.segment.watchedTiers` (#11555)
---
.../apache/druid/client/BrokerServerViewTest.java | 134 +++++++++++++++------
1 file changed, 97 insertions(+), 37 deletions(-)
diff --git a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java
index 9558133..d864d32 100644
--- a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java
+++ b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java
@@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import org.apache.druid.client.selector.HighestPriorityTierSelectorStrategy;
import org.apache.druid.client.selector.RandomServerSelectorStrategy;
import org.apache.druid.client.selector.ServerSelector;
@@ -58,7 +59,9 @@ import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
@@ -97,18 +100,7 @@ public class BrokerServerViewTest extends CuratorTestBase
setupViews();
- final DruidServer druidServer = new DruidServer(
- "localhost:1234",
- "localhost:1234",
- null,
- 10000000L,
- ServerType.HISTORICAL,
- "default_tier",
- 0
- );
-
- setupZNodeForServer(druidServer, zkPathsConfig, jsonMapper);
-
+ final DruidServer druidServer = setupHistoricalServer("default_tier", "localhost:1234", 0);
final DataSegment segment = dataSegmentWithIntervalAndVersion("2014-10-20T00:00:00Z/P1D", "v1");
final int partition = segment.getShardSpec().getPartitionNum();
final Interval intervals = Intervals.of("2014-10-20T00:00:00Z/P1D");
@@ -159,21 +151,9 @@ public class BrokerServerViewTest extends CuratorTestBase
final List<DruidServer> druidServers = Lists.transform(
ImmutableList.of("locahost:0", "localhost:1", "localhost:2", "localhost:3", "localhost:4"),
- input -> new DruidServer(
- input,
- input,
- null,
- 10000000L,
- ServerType.HISTORICAL,
- "default_tier",
- 0
- )
+ hostname -> setupHistoricalServer("default_tier", hostname, 0)
);
- for (DruidServer druidServer : druidServers) {
- setupZNodeForServer(druidServer, zkPathsConfig, jsonMapper);
- }
-
final List<DataSegment> segments = Lists.transform(
ImmutableList.of(
Pair.of("2011-04-01/2011-04-03", "v1"),
@@ -268,21 +248,10 @@ public class BrokerServerViewTest extends CuratorTestBase
final List<DruidServer> druidServers = Lists.transform(
ImmutableList.of("locahost:0", "localhost:1", "localhost:2", "localhost:3", "localhost:4"),
- input -> new DruidServer(
- input,
- input,
- null,
- 10000000L,
- ServerType.HISTORICAL,
- "default_tier",
- 0
- )
+ hostname -> setupHistoricalServer("default_tier", hostname, 0)
);
setupZNodeForServer(druidBroker, zkPathsConfig, jsonMapper);
- for (DruidServer druidServer : druidServers) {
- setupZNodeForServer(druidServer, zkPathsConfig, jsonMapper);
- }
final List<DataSegment> segments = Lists.transform(
ImmutableList.of(
@@ -352,6 +321,85 @@ public class BrokerServerViewTest extends CuratorTestBase
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch));
}
+ @Test
+ public void testMultipleTiers() throws Exception
+ {
+ segmentViewInitLatch = new CountDownLatch(1);
+ segmentAddedLatch = new CountDownLatch(4);
+ segmentRemovedLatch = new CountDownLatch(0);
+
+ // Setup a Broker that watches only Tier 2
+ final String tier1 = "tier1";
+ final String tier2 = "tier2";
+ setupViews(Sets.newHashSet(tier2));
+
+ // Historical Tier 1 has segments 1 and 2, Tier 2 has segments 2 and 3
+ final DruidServer server11 = setupHistoricalServer(tier1, "localhost:1", 1);
+ final DruidServer server21 = setupHistoricalServer(tier2, "localhost:2", 1);
+
+ final DataSegment segment1 = dataSegmentWithIntervalAndVersion("2020-01-01/P1D", "v1");
+ announceSegmentForServer(server11, segment1, zkPathsConfig, jsonMapper);
+
+ final DataSegment segment2 = dataSegmentWithIntervalAndVersion("2020-01-02/P1D", "v1");
+ announceSegmentForServer(server11, segment2, zkPathsConfig, jsonMapper);
+ announceSegmentForServer(server21, segment2, zkPathsConfig, jsonMapper);
+
+ final DataSegment segment3 = dataSegmentWithIntervalAndVersion("2020-01-03/P1D", "v1");
+ announceSegmentForServer(server21, segment3, zkPathsConfig, jsonMapper);
+
+ // Wait for the segments to be added
+ Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch));
+ Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch));
+
+ // Get the timeline for the datasource
+ TimelineLookup<String, ServerSelector> timeline = brokerServerView.getTimeline(
+ DataSourceAnalysis.forDataSource(new TableDataSource(segment1.getDataSource()))
+ ).get();
+
+ // Verify that the timeline has no entry for the interval of segment 1
+ Assert.assertTrue(timeline.lookup(segment1.getInterval()).isEmpty());
+
+ // Verify that there is one entry for the interval of segment 2
+ List<TimelineObjectHolder<String, ServerSelector>> timelineHolders =
+ timeline.lookup(segment2.getInterval());
+ Assert.assertEquals(1, timelineHolders.size());
+
+ TimelineObjectHolder<String, ServerSelector> timelineHolder = timelineHolders.get(0);
+ Assert.assertEquals(segment2.getInterval(), timelineHolder.getInterval());
+ Assert.assertEquals(segment2.getVersion(), timelineHolder.getVersion());
+
+ PartitionHolder<ServerSelector> partitionHolder = timelineHolder.getObject();
+ Assert.assertTrue(partitionHolder.isComplete());
+ Assert.assertEquals(1, Iterables.size(partitionHolder));
+
+ ServerSelector selector = (partitionHolder.iterator().next()).getObject();
+ Assert.assertFalse(selector.isEmpty());
+ Assert.assertEquals(segment2, selector.getSegment());
+
+ // Verify that the ServerSelector always picks Tier 1
+ for (int i = 0; i < 5; ++i) {
+ Assert.assertEquals(server21, selector.pick(null).getServer());
+ }
+ Assert.assertEquals(Collections.singletonList(server21.getMetadata()), selector.getCandidates(2));
+ }
+
+ /**
+ * Creates a DruidServer of type HISTORICAL and sets up a ZNode for it.
+ */
+ private DruidServer setupHistoricalServer(String tier, String name, int priority)
+ {
+ final DruidServer historical = new DruidServer(
+ name,
+ name,
+ null,
+ 1000000,
+ ServerType.HISTORICAL,
+ tier,
+ priority
+ );
+ setupZNodeForServer(historical, zkPathsConfig, jsonMapper);
+ return historical;
+ }
private Pair<Interval, Pair<String, Pair<DruidServer, DataSegment>>> createExpected(
String intervalStr,
@@ -390,6 +438,11 @@ public class BrokerServerViewTest extends CuratorTestBase
private void setupViews() throws Exception
{
+ setupViews(null);
+ }
+
+ private void setupViews(Set<String> watchedTiers) throws Exception
+ {
baseView = new BatchServerInventoryView(
zkPathsConfig,
curator,
@@ -441,6 +494,13 @@ public class BrokerServerViewTest extends CuratorTestBase
new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()),
new NoopServiceEmitter(),
new BrokerSegmentWatcherConfig()
+ {
+ @Override
+ public Set<String> getWatchedTiers()
+ {
+ return watchedTiers;
+ }
+ }
);
baseView.start();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org