You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2018/12/04 04:38:39 UTC

[incubator-pinot] branch segment-selector created (now 1a2200a)

This is an automated email from the ASF dual-hosted git repository.

snlee pushed a change to branch segment-selector
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at 1a2200a  Add segment selector for merged segments Once the merge lineage information is written to the property store, broker will read the lineage and pick only segments that are required for answering the query. This PR adds the segment selector for merged segments. 1. Add segment selector interface 2. Add merged segment selector implementation 3. Add unit tests

This branch includes the following new commits:

     new 1a2200a  Add segment selector for merged segments Once the merge lineage information is written to the property store, broker will read the lineage and pick only segments that are required for answering the query. This PR adds the segment selector for merged segments. 1. Add segment selector interface 2. Add merged segment selector implementation 3. Add unit tests

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Add segment selector for merged segments Once the merge lineage information is written to the property store, broker will read the lineage and pick only segments that are required for answering the query. This PR adds the segment selector for merged segments. 1. Add segment selector interface 2. Add merged segment selector implementation 3. Add unit tests

Posted by sn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch segment-selector
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 1a2200ad1ab6bf14fedf6477894c130017895b27
Author: Seunghyun Lee <sn...@linkedin.com>
AuthorDate: Mon Dec 3 20:36:00 2018 -0800

    Add segment selector for merged segments
    Once the merge lineage information is written to the property store,
    broker will read the lineage and pick only segments that are required
    for answering the query. This PR adds the segment selector for merged
    segments.
    1. Add segment selector interface
    2. Add merged segment selector implementation
    3. Add unit tests
---
 .../routing/HelixExternalViewBasedRouting.java     |  25 ++++-
 .../routing/builder/BaseRoutingTableBuilder.java   |  11 ++-
 .../builder/DefaultOfflineRoutingTableBuilder.java |   8 +-
 .../DefaultRealtimeRoutingTableBuilder.java        |  11 ++-
 .../routing/builder/RoutingTableBuilder.java       |   5 +-
 .../routing/selector/MergedSegmentSelector.java    |  92 +++++++++++++++++++
 .../broker/routing/selector/SegmentSelector.java   |  46 ++++++++++
 .../routing/selector/SegmentSelectorFactory.java   |  44 +++++++++
 .../pinot/broker/routing/RoutingTableTest.java     |   1 +
 .../BalancedRandomRoutingTableBuilderTest.java     |   2 +-
 .../HighLevelConsumerRoutingTableBuilderTest.java  |   2 +-
 ...rtitionAwareOfflineRoutingTableBuilderTest.java |   9 +-
 ...titionAwareRealtimeRoutingTableBuilderTest.java |  10 +-
 .../routing/selector/MergeSegmentSelectorTest.java | 102 +++++++++++++++++++++
 .../{routing => util}/FakePropertyStore.java       |   2 +-
 15 files changed, 338 insertions(+), 32 deletions(-)

diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/HelixExternalViewBasedRouting.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/HelixExternalViewBasedRouting.java
index c3db57d..a11af77 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/HelixExternalViewBasedRouting.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/HelixExternalViewBasedRouting.java
@@ -17,6 +17,8 @@ package com.linkedin.pinot.broker.routing;
 
 import com.google.common.collect.Sets;
 import com.linkedin.pinot.broker.routing.builder.RoutingTableBuilder;
+import com.linkedin.pinot.broker.routing.selector.SegmentSelector;
+import com.linkedin.pinot.broker.routing.selector.SegmentSelectorFactory;
 import com.linkedin.pinot.common.config.TableConfig;
 import com.linkedin.pinot.common.config.TableNameBuilder;
 import com.linkedin.pinot.common.metrics.BrokerMeter;
@@ -67,6 +69,7 @@ public class HelixExternalViewBasedRouting implements RoutingTable {
   private final Map<String, Map<String, InstanceConfig>> _lastKnownInstanceConfigsForTable = new ConcurrentHashMap<>();
   private final Map<String, InstanceConfig> _lastKnownInstanceConfigs = new ConcurrentHashMap<>();
   private final Map<String, Set<String>> _tablesForInstance = new ConcurrentHashMap<>();
+  private final Map<String, SegmentSelector> _segmentSelectorMap = new ConcurrentHashMap<>();
 
   private final HelixExternalViewBasedTimeBoundaryService _timeBoundaryService;
   private final HelixManager _helixManager;
@@ -76,25 +79,25 @@ public class HelixExternalViewBasedRouting implements RoutingTable {
 
   private Configuration _configuration;
 
-  private ZkHelixPropertyStore<ZNRecord> _propertyStore;
-
   private RoutingTableBuilderFactory _routingTableBuilderFactory;
+  private SegmentSelectorFactory _segmentSelectorFactory;
+
 
   public HelixExternalViewBasedRouting(ZkHelixPropertyStore<ZNRecord> propertyStore, HelixManager helixManager,
       Configuration configuration) {
-    _propertyStore = propertyStore;
     _configuration = configuration;
     _timeBoundaryService = new HelixExternalViewBasedTimeBoundaryService(propertyStore);
     _routingTableBuilderMap = new HashMap<>();
     _helixManager = helixManager;
     _routingTableBuilderFactory = new RoutingTableBuilderFactory(_configuration, propertyStore);
+    _segmentSelectorFactory = new SegmentSelectorFactory(propertyStore);
   }
 
   @Override
   public Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request) {
     String tableName = request.getTableName();
     RoutingTableBuilder routingTableBuilder = _routingTableBuilderMap.get(tableName);
-    return routingTableBuilder.getRoutingTable(request);
+    return routingTableBuilder.getRoutingTable(request, _segmentSelectorMap.get(tableName));
   }
 
   @Override
@@ -116,6 +119,13 @@ public class HelixExternalViewBasedRouting implements RoutingTable {
         tableName);
     _routingTableBuilderMap.put(tableName, routingTableBuilder);
 
+    // Initialize segment selector
+    SegmentSelector segmentSelector = _segmentSelectorFactory.getSegmentSelector(tableConfig);
+    if (segmentSelector != null) {
+      LOGGER.info("Initialized segmentSelector: {} for table {}", segmentSelector.getClass().getName(), tableName);
+      _segmentSelectorMap.put(tableName, segmentSelector);
+    }
+
     // Build the routing table
     if (externalView == null) {
       // It is possible for us to get a request to serve a table for which there is no external view. In this case, just
@@ -224,6 +234,7 @@ public class HelixExternalViewBasedRouting implements RoutingTable {
     _lastKnownExternalViewVersionMap.put(tableNameWithType, externalViewRecordVersion);
 
     RoutingTableBuilder routingTableBuilder = _routingTableBuilderMap.get(tableNameWithType);
+    SegmentSelector segmentSelector = _segmentSelectorMap.get(tableNameWithType);
     if (routingTableBuilder == null) {
       //TODO: warn
       return;
@@ -236,8 +247,14 @@ public class HelixExternalViewBasedRouting implements RoutingTable {
     try {
       Map<String, InstanceConfig> relevantInstanceConfigs = new HashMap<>();
 
+      // Update routing table builder
       routingTableBuilder.computeOnExternalViewChange(tableNameWithType, externalView, instanceConfigs);
 
+      // Update segment selector
+      if (segmentSelector != null) {
+        segmentSelector.computeOnExternalViewChange();
+      }
+
       // Keep track of the instance configs that are used in that routing table
       updateInstanceConfigsMapFromExternalView(relevantInstanceConfigs, instanceConfigs, externalView);
 
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/BaseRoutingTableBuilder.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/BaseRoutingTableBuilder.java
index c00d30d..39c764b 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/BaseRoutingTableBuilder.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/BaseRoutingTableBuilder.java
@@ -16,6 +16,7 @@
 package com.linkedin.pinot.broker.routing.builder;
 
 import com.linkedin.pinot.broker.routing.RoutingTableLookupRequest;
+import com.linkedin.pinot.broker.routing.selector.SegmentSelector;
 import com.linkedin.pinot.common.config.RoutingConfig;
 import com.linkedin.pinot.common.config.TableConfig;
 import com.linkedin.pinot.common.metrics.BrokerMeter;
@@ -123,16 +124,16 @@ public abstract class BaseRoutingTableBuilder implements RoutingTableBuilder {
     }
   }
 
-  @Override
-  public Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request) {
+  public Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request, SegmentSelector segmentSelector) {
     if (_enableDynamicComputing) {
       // Copy the pointer for snapshot since the pointer for segment to servers map can change at anytime
       Map<String, List<String>> segmentToServersMap = _segmentToServersMap;
 
-      // Get all existing segments
+      // Selecting only required segments for processing a query
       Set<String> segmentsToQuery = segmentToServersMap.keySet();
-
-      // TODO: add the selection logic here
+      if (segmentSelector != null) {
+        segmentsToQuery = segmentSelector.selectSegments(request, segmentsToQuery);
+      }
 
       // Compute the final routing table
       return computeDynamicRoutingTable(segmentToServersMap, segmentsToQuery);
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/DefaultOfflineRoutingTableBuilder.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/DefaultOfflineRoutingTableBuilder.java
index 07cea6b..2ed57c4 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/DefaultOfflineRoutingTableBuilder.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/DefaultOfflineRoutingTableBuilder.java
@@ -16,6 +16,7 @@
 package com.linkedin.pinot.broker.routing.builder;
 
 import com.linkedin.pinot.broker.routing.RoutingTableLookupRequest;
+import com.linkedin.pinot.broker.routing.selector.SegmentSelector;
 import com.linkedin.pinot.common.config.TableConfig;
 import com.linkedin.pinot.common.metrics.BrokerMetrics;
 import java.util.HashSet;
@@ -47,7 +48,8 @@ public class DefaultOfflineRoutingTableBuilder implements RoutingTableBuilder {
   private int _minReplicaCountForLargeCluster = 4;
 
   @Override
-  public void init(Configuration configuration, TableConfig tableConfig, ZkHelixPropertyStore<ZNRecord> propertyStore, BrokerMetrics brokerMetrics) {
+  public void init(Configuration configuration, TableConfig tableConfig, ZkHelixPropertyStore<ZNRecord> propertyStore,
+      BrokerMetrics brokerMetrics) {
     _largeClusterRoutingTableBuilder = new LargeClusterRoutingTableBuilder();
     _smallClusterRoutingTableBuilder = new BalancedRandomRoutingTableBuilder();
     if (configuration.containsKey("minServerCountForLargeCluster")) {
@@ -121,8 +123,8 @@ public class DefaultOfflineRoutingTableBuilder implements RoutingTableBuilder {
   }
 
   @Override
-  public Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request) {
-    return _routingTableBuilder.getRoutingTable(request);
+  public Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request, SegmentSelector segmentSelector) {
+    return _routingTableBuilder.getRoutingTable(request, segmentSelector);
   }
 
   @Override
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/DefaultRealtimeRoutingTableBuilder.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/DefaultRealtimeRoutingTableBuilder.java
index d78de04..72a9a79 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/DefaultRealtimeRoutingTableBuilder.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/DefaultRealtimeRoutingTableBuilder.java
@@ -16,6 +16,7 @@
 package com.linkedin.pinot.broker.routing.builder;
 
 import com.linkedin.pinot.broker.routing.RoutingTableLookupRequest;
+import com.linkedin.pinot.broker.routing.selector.SegmentSelector;
 import com.linkedin.pinot.common.config.TableConfig;
 import com.linkedin.pinot.common.metrics.BrokerMetrics;
 import com.linkedin.pinot.common.utils.SegmentName;
@@ -68,7 +69,7 @@ public class DefaultRealtimeRoutingTableBuilder implements RoutingTableBuilder {
   }
 
   @Override
-  public Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request) {
+  public Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request, SegmentSelector segmentSelector) {
     boolean forceLLC = false;
     boolean forceHLC = false;
     for (String routingOption : request.getRoutingOptions()) {
@@ -85,14 +86,14 @@ public class DefaultRealtimeRoutingTableBuilder implements RoutingTableBuilder {
     }
 
     if (forceLLC) {
-      return _realtimeLLCRoutingTableBuilder.getRoutingTable(request);
+      return _realtimeLLCRoutingTableBuilder.getRoutingTable(request, segmentSelector);
     } else if (forceHLC) {
-      return _realtimeHLCRoutingTableBuilder.getRoutingTable(request);
+      return _realtimeHLCRoutingTableBuilder.getRoutingTable(request, segmentSelector);
     } else {
       if (_hasLLC) {
-        return _realtimeLLCRoutingTableBuilder.getRoutingTable(request);
+        return _realtimeLLCRoutingTableBuilder.getRoutingTable(request, segmentSelector);
       } else if (_hasHLC) {
-        return _realtimeHLCRoutingTableBuilder.getRoutingTable(request);
+        return _realtimeHLCRoutingTableBuilder.getRoutingTable(request, segmentSelector);
       } else {
         return Collections.emptyMap();
       }
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/RoutingTableBuilder.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/RoutingTableBuilder.java
index 5a9c320..d17dacf 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/RoutingTableBuilder.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/RoutingTableBuilder.java
@@ -16,6 +16,7 @@
 package com.linkedin.pinot.broker.routing.builder;
 
 import com.linkedin.pinot.broker.routing.RoutingTableLookupRequest;
+import com.linkedin.pinot.broker.routing.selector.SegmentSelector;
 import com.linkedin.pinot.common.config.TableConfig;
 import com.linkedin.pinot.common.metrics.BrokerMetrics;
 import java.util.List;
@@ -45,9 +46,9 @@ public interface RoutingTableBuilder {
   void computeOnExternalViewChange(String tableName, ExternalView externalView, List<InstanceConfig> instanceConfigs);
 
   /**
-   * Get the routing table based on the given lookup request.
+   * Get the routing table based on the given lookup request and segment selector
    */
-  Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request);
+  Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request, SegmentSelector segmentSelector);
 
   /**
    * Get all pre-computed routing tables.
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/selector/MergedSegmentSelector.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/selector/MergedSegmentSelector.java
new file mode 100644
index 0000000..7d74146
--- /dev/null
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/selector/MergedSegmentSelector.java
@@ -0,0 +1,92 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.broker.routing.selector;
+
+import com.linkedin.pinot.broker.routing.RoutingTableLookupRequest;
+import com.linkedin.pinot.common.config.TableConfig;
+import com.linkedin.pinot.common.lineage.SegmentGroup;
+import com.linkedin.pinot.common.lineage.SegmentMergeLineage;
+import com.linkedin.pinot.common.lineage.SegmentMergeLineageAccessHelper;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+
+
+/**
+ * Segment selector for merged segments
+ */
+public class MergedSegmentSelector implements SegmentSelector {
+  private String _tableNameWithType;
+  private ZkHelixPropertyStore<ZNRecord> _propertyStore;
+  private volatile SegmentGroup _rootSegmentGroup;
+
+  @Override
+  public void init(TableConfig tableConfig, ZkHelixPropertyStore<ZNRecord> propertyStore) {
+    _tableNameWithType = tableConfig.getTableName();
+    _propertyStore = propertyStore;
+  }
+
+  @Override
+  public void computeOnExternalViewChange() {
+    SegmentMergeLineage segmentMergeLineage = SegmentMergeLineageAccessHelper.getSegmentMergeLineage(_propertyStore,
+        _tableNameWithType);
+    _rootSegmentGroup = segmentMergeLineage.getMergeLineageRootSegmentGroup();
+  }
+
+  @Override
+  public Set<String> selectSegments(RoutingTableLookupRequest request, Set<String> segmentsToQuery) {
+    Set<String> selectedSegments = new HashSet<>(segmentsToQuery);
+    for (SegmentGroup segmentGroup : _rootSegmentGroup.getChildrenGroups()) {
+      computeSelectionProcessForSegmentGroup(segmentGroup, selectedSegments, segmentsToQuery);
+    }
+    return selectedSegments;
+  }
+
+  private void computeSelectionProcessForSegmentGroup(SegmentGroup segmentGroup, Set<String> selectedSegments,
+      Set<String> availableSegments) {
+    Set<String> segmentsForGroup = segmentGroup.getSegments();
+
+    if (availableSegments.containsAll(segmentsForGroup)) {
+      // If we pick the current group node, we delete all segments covered by children groups
+      if (segmentGroup.getChildrenGroups() == null || segmentGroup.getChildrenGroups().isEmpty()) {
+        return;
+      }
+      for (SegmentGroup child: segmentGroup.getChildrenGroups()) {
+        removeSegmentsForSegmentGroup(child, selectedSegments);
+      }
+    } else {
+      // If the current group is not picked, we compute the selection recursively for children nodes
+      selectedSegments.removeAll(segmentsForGroup);
+      for (SegmentGroup child: segmentGroup.getChildrenGroups()) {
+        computeSelectionProcessForSegmentGroup(child, selectedSegments, availableSegments);
+      }
+    }
+  }
+
+  private void removeSegmentsForSegmentGroup(SegmentGroup segmentGroup, Set<String> selectedSegments) {
+    Set<String> segmentsForGroup = segmentGroup.getSegments();
+    selectedSegments.removeAll(segmentsForGroup);
+
+    if (segmentGroup.getChildrenGroups() == null || segmentGroup.getChildrenGroups().isEmpty()) {
+      return;
+    }
+
+    for (SegmentGroup child : segmentGroup.getChildrenGroups()) {
+      removeSegmentsForSegmentGroup(child, selectedSegments);
+    }
+  }
+}
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/selector/SegmentSelector.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/selector/SegmentSelector.java
new file mode 100644
index 0000000..226af40
--- /dev/null
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/selector/SegmentSelector.java
@@ -0,0 +1,46 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.broker.routing.selector;
+
+import com.linkedin.pinot.broker.routing.RoutingTableLookupRequest;
+import com.linkedin.pinot.common.config.TableConfig;
+import java.util.Set;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+
+
+/**
+ * Interface for segment selector
+ */
+public interface SegmentSelector {
+
+  /**
+   * Initiate the segment selector
+   */
+  void init(TableConfig tableConfig, ZkHelixPropertyStore<ZNRecord> propertyStore);
+
+  /**
+   * Compute and update the information required for a selector
+   * <p>Should be called whenever there is an external view change
+   */
+  void computeOnExternalViewChange();
+
+  /**
+   * Compute selection algorithm.
+   * <p>May end up with modifying the input set of segments to query.
+   */
+  Set<String> selectSegments(RoutingTableLookupRequest request, Set<String> segmentsToQuery);
+}
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/selector/SegmentSelectorFactory.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/selector/SegmentSelectorFactory.java
new file mode 100644
index 0000000..de05ad0
--- /dev/null
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/selector/SegmentSelectorFactory.java
@@ -0,0 +1,44 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.broker.routing.selector;
+
+import com.linkedin.pinot.common.config.TableConfig;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+
+
+/**
+ * Factory for segment selector
+ */
+public class SegmentSelectorFactory {
+
+  private ZkHelixPropertyStore<ZNRecord> _propertyStore;
+
+  public SegmentSelectorFactory(ZkHelixPropertyStore<ZNRecord> propertyStore) {
+    _propertyStore = propertyStore;
+  }
+
+  /**
+   * Get a segment selector based on table config. This method returns null when no segment selector is required
+   *
+   * @param tableConfig a table config
+   * @return a segment selector or returns null if no selector is required
+   */
+  public SegmentSelector getSegmentSelector(TableConfig tableConfig) {
+    // TODO: add the support for merged segment selector once merge config is updated.
+    return null;
+  }
+}
diff --git a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/RoutingTableTest.java b/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/RoutingTableTest.java
index 25600d3..3413711 100644
--- a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/RoutingTableTest.java
+++ b/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/RoutingTableTest.java
@@ -17,6 +17,7 @@ package com.linkedin.pinot.broker.routing;
 
 import com.linkedin.pinot.broker.routing.builder.HighLevelConsumerBasedRoutingTableBuilder;
 import com.linkedin.pinot.broker.routing.builder.RoutingTableBuilder;
+import com.linkedin.pinot.broker.util.FakePropertyStore;
 import com.linkedin.pinot.common.config.TableConfig;
 import com.linkedin.pinot.common.config.TableConfig.Builder;
 import com.linkedin.pinot.common.config.TableNameBuilder;
diff --git a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/BalancedRandomRoutingTableBuilderTest.java b/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/BalancedRandomRoutingTableBuilderTest.java
index 7bae71d..46ca40c 100644
--- a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/BalancedRandomRoutingTableBuilderTest.java
+++ b/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/BalancedRandomRoutingTableBuilderTest.java
@@ -79,7 +79,7 @@ public class BalancedRandomRoutingTableBuilderTest {
     // Build routing table
     routingTableBuilder.computeOnExternalViewChange("dummy", externalView, instanceConfigList);
     RoutingTableLookupRequest request = new RoutingTableLookupRequest(tableNameWithType);
-    Map<String, List<String>> routingTable = routingTableBuilder.getRoutingTable(request);
+    Map<String, List<String>> routingTable = routingTableBuilder.getRoutingTable(request, null);
 
     Set<String> segmentsInRoutingTable = new HashSet<>();
     for (List<String> segments : routingTable.values()) {
diff --git a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/HighLevelConsumerRoutingTableBuilderTest.java b/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/HighLevelConsumerRoutingTableBuilderTest.java
index 74f53c3..c2ef4e4 100644
--- a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/HighLevelConsumerRoutingTableBuilderTest.java
+++ b/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/HighLevelConsumerRoutingTableBuilderTest.java
@@ -95,7 +95,7 @@ public class HighLevelConsumerRoutingTableBuilderTest {
       // Check if the routing table result is correct
       for (int run = 0; run < MAX_NUM_GROUPS * 10; run++) {
         RoutingTableLookupRequest request = new RoutingTableLookupRequest(tableNameWithType);
-        Map<String, List<String>> routingTable = routingTableBuilder.getRoutingTable(request);
+        Map<String, List<String>> routingTable = routingTableBuilder.getRoutingTable(request, null);
         Set<String> coveredSegments = new HashSet<>();
         for (List<String> segmentsForServer : routingTable.values()) {
           coveredSegments.addAll(segmentsForServer);
diff --git a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilderTest.java b/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilderTest.java
index c110b6e..5a44a28 100644
--- a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilderTest.java
+++ b/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilderTest.java
@@ -15,7 +15,7 @@
  */
 package com.linkedin.pinot.broker.routing.builder;
 
-import com.linkedin.pinot.broker.routing.FakePropertyStore;
+import com.linkedin.pinot.broker.util.FakePropertyStore;
 import com.linkedin.pinot.broker.routing.RoutingTableLookupRequest;
 import com.linkedin.pinot.common.config.ReplicaGroupStrategyConfig;
 import com.linkedin.pinot.common.config.RoutingConfig;
@@ -50,7 +50,6 @@ public class PartitionAwareOfflineRoutingTableBuilderTest {
   private static final String PARTITION_COLUMN = "memberId";
 
   private static final Pql2Compiler COMPILER = new Pql2Compiler();
-
   private static final Random RANDOM = new Random();
 
   private int NUM_REPLICA;
@@ -114,7 +113,7 @@ public class PartitionAwareOfflineRoutingTableBuilderTest {
       // Check the query that requires to scan all segment.
       String countStarQuery = "select count(*) from myTable";
       Map<String, List<String>> routingTable =
-          routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery));
+          routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery), null);
 
       // Check that the number of servers picked are always equal or less than the number of servers
       // from a single replica group.
@@ -134,7 +133,7 @@ public class PartitionAwareOfflineRoutingTableBuilderTest {
       for (int queryPartition = 0; queryPartition < 100; queryPartition++) {
         String filterQuery = "select count(*) from myTable where " + PARTITION_COLUMN + " = " + queryPartition;
         routingTable =
-            routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(filterQuery));
+            routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(filterQuery), null);
 
         // Check that the number of servers picked are always equal or less than the number of servers
         // in a single replica group.
@@ -218,7 +217,7 @@ public class PartitionAwareOfflineRoutingTableBuilderTest {
     for (int i = 0; i < 100; i++) {
       String countStarQuery = "select count(*) from " + OFFLINE_TABLE_NAME;
       Map<String, List<String>> routingTable =
-          routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery));
+          routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery), null);
       Assert.assertEquals(routingTable.keySet().size(), 1);
       servers.add(routingTable.keySet().iterator().next());
     }
diff --git a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/PartitionAwareRealtimeRoutingTableBuilderTest.java b/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/PartitionAwareRealtimeRoutingTableBuilderTest.java
index 7f349f8..7890b1b 100644
--- a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/PartitionAwareRealtimeRoutingTableBuilderTest.java
+++ b/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/PartitionAwareRealtimeRoutingTableBuilderTest.java
@@ -15,7 +15,7 @@
  */
 package com.linkedin.pinot.broker.routing.builder;
 
-import com.linkedin.pinot.broker.routing.FakePropertyStore;
+import com.linkedin.pinot.broker.util.FakePropertyStore;
 import com.linkedin.pinot.broker.routing.RoutingTableLookupRequest;
 import com.linkedin.pinot.common.config.ColumnPartitionConfig;
 import com.linkedin.pinot.common.config.RoutingConfig;
@@ -102,7 +102,7 @@ public class PartitionAwareRealtimeRoutingTableBuilderTest {
       // Check the query that requires to scan all segment.
       String countStarQuery = "select count(*) from myTable";
       Map<String, List<String>> routingTable =
-          routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery));
+          routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery), null);
 
       // Check that all segments are covered exactly for once.
       Set<String> assignedSegments = new HashSet<>();
@@ -117,7 +117,7 @@ public class PartitionAwareRealtimeRoutingTableBuilderTest {
       // Check the broker side server and segment pruning.
       for (int queryPartition = 0; queryPartition < 100; queryPartition++) {
         String filterQuery = "select count(*) from myTable where " + PARTITION_COLUMN + " = " + queryPartition;
-        routingTable = routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(filterQuery));
+        routingTable = routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(filterQuery), null);
 
         int partition = queryPartition % NUM_PARTITION;
         assignedSegments = new HashSet<>();
@@ -178,7 +178,7 @@ public class PartitionAwareRealtimeRoutingTableBuilderTest {
     // Check the query that requires to scan all segment.
     String countStarQuery = "select count(*) from myTable";
     Map<String, List<String>> routingTable =
-        routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery));
+        routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery), null);
 
     // Check that all segments are covered exactly for once.
     Set<String> assignedSegments = new HashSet<>();
@@ -253,7 +253,7 @@ public class PartitionAwareRealtimeRoutingTableBuilderTest {
     for (int i = 0; i < 100; i++) {
       String countStarQuery = "select count(*) from " + REALTIME_TABLE_NAME;
       Map<String, List<String>> routingTable =
-          routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery));
+          routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery), null);
       Assert.assertEquals(routingTable.keySet().size(), 1);
       servers.addAll(routingTable.keySet());
     }
diff --git a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/selector/MergeSegmentSelectorTest.java b/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/selector/MergeSegmentSelectorTest.java
new file mode 100644
index 0000000..2d5114e
--- /dev/null
+++ b/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/selector/MergeSegmentSelectorTest.java
@@ -0,0 +1,102 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.broker.routing.selector;
+
+import com.linkedin.pinot.broker.util.FakePropertyStore;
+import com.linkedin.pinot.common.config.TableConfig;
+import com.linkedin.pinot.common.lineage.SegmentMergeLineage;
+import com.linkedin.pinot.common.lineage.SegmentMergeLineageAccessHelper;
+import com.linkedin.pinot.common.utils.CommonConstants;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class MergeSegmentSelectorTest {
+  private static final String TEST_TABLE_NAME = "test_OFFLINE";
+
+  @Test
+  public void testMergeSegmentSelector() throws Exception {
+    MergedSegmentSelector segmentSelector = new MergedSegmentSelector();
+
+    TableConfig tableConfig =
+        new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(TEST_TABLE_NAME).build();
+    ZkHelixPropertyStore<ZNRecord> fakePropertyStore = new FakePropertyStore();
+    segmentSelector.init(tableConfig, fakePropertyStore);
+
+    // Add G0 (segment0), G1 (segment1), G2 (segment2) that represent 3 base segments
+    SegmentMergeLineage segmentMergeLineage = new SegmentMergeLineage(TEST_TABLE_NAME);
+    List<String> childrenGroups = new ArrayList<>();
+    for (int i = 0; i < 3; i++) {
+      String groupId = "G" + i;
+      segmentMergeLineage.addSegmentGroup(groupId, Arrays.asList(new String[]{"segment" + i}), null);
+      childrenGroups.add(groupId);
+    }
+
+    // Update segment merge lineage in the property store and update segment selector
+    SegmentMergeLineageAccessHelper.writeSegmentMergeLineage(fakePropertyStore, segmentMergeLineage, 0);
+    segmentSelector.computeOnExternalViewChange();
+
+    // Test the result of segment selection
+    Set<String> segmentsToQuery = new HashSet<>(Arrays.asList(
+        new String[]{"segment0", "segment1", "segment2", "segment3"}));
+    Set<String> selectedSegments = segmentSelector.selectSegments(null, segmentsToQuery);
+    Assert.assertEquals(selectedSegments,
+        new HashSet<>(Arrays.asList(new String[]{"segment0", "segment1", "segment2", "segment3"})));
+
+    // Add G3 (merged_0, merged_1) that was merged from G0, G1, G2
+    String mergedGroupId = "G3";
+    segmentMergeLineage.addSegmentGroup(mergedGroupId,
+        Arrays.asList(new String[]{"merged0", "merged1"}), childrenGroups);
+
+    // Update segment merge lineage in the property store and update segment selector
+    SegmentMergeLineageAccessHelper.writeSegmentMergeLineage(fakePropertyStore, segmentMergeLineage, 0);
+    segmentSelector.computeOnExternalViewChange();
+
+    // Test the result of segment selection
+    segmentsToQuery = new HashSet<>(Arrays.asList(
+        new String[]{"segment0", "segment1", "segment2", "merged0", "merged1"}));
+    selectedSegments = segmentSelector.selectSegments(null, segmentsToQuery);
+    Assert.assertEquals(selectedSegments,
+        new HashSet<>(Arrays.asList(new String[]{"merged0", "merged1"})));
+
+    // Test the result of segment selection when the external view does not have one of merged segments
+    segmentsToQuery = new HashSet<>(Arrays.asList(
+        new String[]{"segment0", "segment1", "segment2", "merged0"}));
+    selectedSegments = segmentSelector.selectSegments(null, segmentsToQuery);
+    Assert.assertEquals(selectedSegments,
+        new HashSet<>(Arrays.asList(new String[]{"segment0", "segment1", "segment2"})));
+
+    // Add G4 (segment4)
+    segmentMergeLineage.addSegmentGroup("G4", Arrays.asList(new String[]{"segment3"}), null);
+    // Update segment merge lineage in the property store and update segment selector
+    SegmentMergeLineageAccessHelper.writeSegmentMergeLineage(fakePropertyStore, segmentMergeLineage, 0);
+    segmentSelector.computeOnExternalViewChange();
+
+    // Test the result of segment selection
+    segmentsToQuery = new HashSet<>(Arrays.asList(
+        new String[]{"segment0", "segment1", "segment2", "segment3", "merged0", "merged1"}));
+    selectedSegments = segmentSelector.selectSegments(null, segmentsToQuery);
+    Assert.assertEquals(selectedSegments,
+        new HashSet<>(Arrays.asList(new String[]{"merged0", "merged1", "segment3"})));
+  }
+}
diff --git a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/FakePropertyStore.java b/pinot-broker/src/test/java/com/linkedin/pinot/broker/util/FakePropertyStore.java
similarity index 97%
rename from pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/FakePropertyStore.java
rename to pinot-broker/src/test/java/com/linkedin/pinot/broker/util/FakePropertyStore.java
index 6bb8322..364e609 100644
--- a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/FakePropertyStore.java
+++ b/pinot-broker/src/test/java/com/linkedin/pinot/broker/util/FakePropertyStore.java
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package com.linkedin.pinot.broker.routing;
+package com.linkedin.pinot.broker.util;
 
 import java.util.HashMap;
 import java.util.Map;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org