You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ab...@apache.org on 2021/08/06 18:42:57 UTC

[druid] branch master updated: Add unit test for config `druid.broker.segment.watchedTiers` (#11555)

This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 39a3db7  Add unit test for config `druid.broker.segment.watchedTiers` (#11555)
39a3db7 is described below

commit 39a3db79437a4d331bd68da6c627ad3c83019858
Author: Kashif Faraz <ka...@gmail.com>
AuthorDate: Sat Aug 7 00:12:40 2021 +0530

    Add unit test for config `druid.broker.segment.watchedTiers` (#11555)
---
 .../apache/druid/client/BrokerServerViewTest.java  | 134 +++++++++++++++------
 1 file changed, 97 insertions(+), 37 deletions(-)

diff --git a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java
index 9558133..d864d32 100644
--- a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java
+++ b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java
@@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import org.apache.druid.client.selector.HighestPriorityTierSelectorStrategy;
 import org.apache.druid.client.selector.RandomServerSelectorStrategy;
 import org.apache.druid.client.selector.ServerSelector;
@@ -58,7 +59,9 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 
@@ -97,18 +100,7 @@ public class BrokerServerViewTest extends CuratorTestBase
 
     setupViews();
 
-    final DruidServer druidServer = new DruidServer(
-        "localhost:1234",
-        "localhost:1234",
-        null,
-        10000000L,
-        ServerType.HISTORICAL,
-        "default_tier",
-        0
-    );
-
-    setupZNodeForServer(druidServer, zkPathsConfig, jsonMapper);
-
+    final DruidServer druidServer = setupHistoricalServer("default_tier", "localhost:1234", 0);
     final DataSegment segment = dataSegmentWithIntervalAndVersion("2014-10-20T00:00:00Z/P1D", "v1");
     final int partition = segment.getShardSpec().getPartitionNum();
     final Interval intervals = Intervals.of("2014-10-20T00:00:00Z/P1D");
@@ -159,21 +151,9 @@ public class BrokerServerViewTest extends CuratorTestBase
 
     final List<DruidServer> druidServers = Lists.transform(
         ImmutableList.of("locahost:0", "localhost:1", "localhost:2", "localhost:3", "localhost:4"),
-        input -> new DruidServer(
-            input,
-            input,
-            null,
-            10000000L,
-            ServerType.HISTORICAL,
-            "default_tier",
-            0
-        )
+        hostname -> setupHistoricalServer("default_tier", hostname, 0)
     );
 
-    for (DruidServer druidServer : druidServers) {
-      setupZNodeForServer(druidServer, zkPathsConfig, jsonMapper);
-    }
-
     final List<DataSegment> segments = Lists.transform(
         ImmutableList.of(
             Pair.of("2011-04-01/2011-04-03", "v1"),
@@ -268,21 +248,10 @@ public class BrokerServerViewTest extends CuratorTestBase
 
     final List<DruidServer> druidServers = Lists.transform(
         ImmutableList.of("locahost:0", "localhost:1", "localhost:2", "localhost:3", "localhost:4"),
-        input -> new DruidServer(
-            input,
-            input,
-            null,
-            10000000L,
-            ServerType.HISTORICAL,
-            "default_tier",
-            0
-        )
+        hostname -> setupHistoricalServer("default_tier", hostname, 0)
     );
 
     setupZNodeForServer(druidBroker, zkPathsConfig, jsonMapper);
-    for (DruidServer druidServer : druidServers) {
-      setupZNodeForServer(druidServer, zkPathsConfig, jsonMapper);
-    }
 
     final List<DataSegment> segments = Lists.transform(
         ImmutableList.of(
@@ -352,6 +321,85 @@ public class BrokerServerViewTest extends CuratorTestBase
     Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch));
   }
 
+  @Test
+  public void testMultipleTiers() throws Exception
+  {
+    segmentViewInitLatch = new CountDownLatch(1);
+    segmentAddedLatch = new CountDownLatch(4);
+    segmentRemovedLatch = new CountDownLatch(0);
+
+    // Setup a Broker that watches only Tier 2
+    final String tier1 = "tier1";
+    final String tier2 = "tier2";
+    setupViews(Sets.newHashSet(tier2));
+
+    // Historical Tier 1 has segments 1 and 2, Tier 2 has segments 2 and 3
+    final DruidServer server11 = setupHistoricalServer(tier1, "localhost:1", 1);
+    final DruidServer server21 = setupHistoricalServer(tier2, "localhost:2", 1);
+
+    final DataSegment segment1 = dataSegmentWithIntervalAndVersion("2020-01-01/P1D", "v1");
+    announceSegmentForServer(server11, segment1, zkPathsConfig, jsonMapper);
+
+    final DataSegment segment2 = dataSegmentWithIntervalAndVersion("2020-01-02/P1D", "v1");
+    announceSegmentForServer(server11, segment2, zkPathsConfig, jsonMapper);
+    announceSegmentForServer(server21, segment2, zkPathsConfig, jsonMapper);
+
+    final DataSegment segment3 = dataSegmentWithIntervalAndVersion("2020-01-03/P1D", "v1");
+    announceSegmentForServer(server21, segment3, zkPathsConfig, jsonMapper);
+
+    // Wait for the segments to be added
+    Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch));
+    Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch));
+
+    // Get the timeline for the datasource
+    TimelineLookup<String, ServerSelector> timeline = brokerServerView.getTimeline(
+        DataSourceAnalysis.forDataSource(new TableDataSource(segment1.getDataSource()))
+    ).get();
+
+    // Verify that the timeline has no entry for the interval of segment 1
+    Assert.assertTrue(timeline.lookup(segment1.getInterval()).isEmpty());
+
+    // Verify that there is one entry for the interval of segment 2
+    List<TimelineObjectHolder<String, ServerSelector>> timelineHolders =
+        timeline.lookup(segment2.getInterval());
+    Assert.assertEquals(1, timelineHolders.size());
+
+    TimelineObjectHolder<String, ServerSelector> timelineHolder = timelineHolders.get(0);
+    Assert.assertEquals(segment2.getInterval(), timelineHolder.getInterval());
+    Assert.assertEquals(segment2.getVersion(), timelineHolder.getVersion());
+
+    PartitionHolder<ServerSelector> partitionHolder = timelineHolder.getObject();
+    Assert.assertTrue(partitionHolder.isComplete());
+    Assert.assertEquals(1, Iterables.size(partitionHolder));
+
+    ServerSelector selector = (partitionHolder.iterator().next()).getObject();
+    Assert.assertFalse(selector.isEmpty());
+    Assert.assertEquals(segment2, selector.getSegment());
+
+    // Verify that the ServerSelector always picks Tier 1
+    for (int i = 0; i < 5; ++i) {
+      Assert.assertEquals(server21, selector.pick(null).getServer());
+    }
+    Assert.assertEquals(Collections.singletonList(server21.getMetadata()), selector.getCandidates(2));
+  }
+
+  /**
+   * Creates a DruidServer of type HISTORICAL and sets up a ZNode for it.
+   */
+  private DruidServer setupHistoricalServer(String tier, String name, int priority)
+  {
+    final DruidServer historical = new DruidServer(
+        name,
+        name,
+        null,
+        1000000,
+        ServerType.HISTORICAL,
+        tier,
+        priority
+    );
+    setupZNodeForServer(historical, zkPathsConfig, jsonMapper);
+    return historical;
+  }
 
   private Pair<Interval, Pair<String, Pair<DruidServer, DataSegment>>> createExpected(
       String intervalStr,
@@ -390,6 +438,11 @@ public class BrokerServerViewTest extends CuratorTestBase
 
   private void setupViews() throws Exception
   {
+    setupViews(null);
+  }
+
+  private void setupViews(Set<String> watchedTiers) throws Exception
+  {
     baseView = new BatchServerInventoryView(
         zkPathsConfig,
         curator,
@@ -441,6 +494,13 @@ public class BrokerServerViewTest extends CuratorTestBase
         new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()),
         new NoopServiceEmitter(),
         new BrokerSegmentWatcherConfig()
+        {
+          @Override
+          public Set<String> getWatchedTiers()
+          {
+            return watchedTiers;
+          }
+        }
     );
 
     baseView.start();

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org