You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2018/12/01 00:28:20 UTC

[incubator-pinot] branch improve-merge-command created (now 5a8ef50)

This is an automated email from the ASF dual-hosted git repository.

snlee pushed a change to branch improve-merge-command
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at 5a8ef50  Improve the logging for segment merge command 1. add min start / max end time for input segments and merged segment 2. added the total number of documents for input and merged segment

This branch includes the following new commits:

     new cc3b86a  initial commit
     new 5a8ef50  Improve the logging for segment merge command 1. add min start / max end time for input segments and merged segment 2. added the total number of documents for input and merged segment

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/02: initial commit

Posted by sn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch improve-merge-command
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit cc3b86a7e97e0face9fe828fff3e049c4cc43a8e
Author: Seunghyun Lee <sn...@linkedin.com>
AuthorDate: Mon Nov 26 16:24:48 2018 -0800

    initial commit
---
 .../routing/HelixExternalViewBasedRouting.java     |  22 +-
 .../BasePartitionAwareRoutingTableBuilder.java     |   6 +-
 .../routing/builder/BaseRoutingTableBuilder.java   |  11 +-
 .../builder/DefaultOfflineRoutingTableBuilder.java |   5 +-
 .../DefaultRealtimeRoutingTableBuilder.java        |  11 +-
 .../routing/builder/RoutingTableBuilder.java       |   3 +-
 .../routing/selector/DefaultSegmentSelector.java   |  42 +++
 .../routing/selector/MergedSegmentSelector.java    |  92 ++++++
 .../broker/routing/selector/SegmentSelector.java   |  46 +++
 .../routing/selector/SegmentSelectorFactory.java   |  40 +++
 .../broker/routing/RoutingTableBenchmark.java      | 122 ++++++++
 .../pinot/broker/routing/RoutingTableTest.java     |   1 +
 .../BalancedRandomRoutingTableBuilderTest.java     |   2 +-
 .../HighLevelConsumerRoutingTableBuilderTest.java  |   2 +-
 ...rtitionAwareOfflineRoutingTableBuilderTest.java |   9 +-
 ...titionAwareRealtimeRoutingTableBuilderTest.java |  11 +-
 .../broker/selector/MergeSegmentSelectorTest.java  |  77 +++++
 .../{routing => util}/FakePropertyStore.java       |  13 +-
 pinot-common/pom.xml                               |   4 +
 .../common/config/MultiLevelRollupSetting.java     |  77 +++++
 .../linkedin/pinot/common/config/RollupConfig.java |  59 ++++
 .../pinot/common/config/SegmentMergeConfig.java    | 137 +++++++++
 .../linkedin/pinot/common/config/TableConfig.java  |  84 ++++--
 .../pinot/common/lineage/SegmentGroup.java         |  72 +++++
 .../pinot/common/lineage/SegmentMergeLineage.java  | 333 +++++++++++++++++++++
 .../lineage/SegmentMergeLineageAccessHelper.java   |  82 +++++
 .../pinot/common/metadata/ZKMetadataProvider.java  |   5 +
 .../common/utils/FileUploadDownloadClient.java     |  51 +++-
 .../com/linkedin/pinot/common/utils/UUIDUtils.java |  29 ++
 .../common/lineage/SegmentMergeLineageTest.java    | 106 +++++++
 .../PinotSegmentUploadRestletResource.java         |   9 +-
 .../api/resources/PinotTableRestletResource.java   |  48 +++
 .../pinot/controller/api/upload/ZKOperator.java    |  53 +++-
 .../helix/ControllerRequestURLBuilder.java         |   5 +
 .../helix/core/PinotHelixResourceManager.java      |  68 ++++-
 .../helix/core/minion/ClusterInfoProvider.java     |   6 +
 .../generator/SegmentMergeRollupTaskGenerator.java | 313 +++++++++++++++++++
 .../minion/generator/TaskGeneratorRegistry.java    |   1 +
 .../pinot/core/common/MinionConstants.java         |   8 +
 .../data/readers/PinotSegmentRecordReader.java     |   3 +
 .../tests/SimpleMinionClusterIntegrationTest.java  |   2 +
 .../BaseMultipleSegmentsConversionExecutor.java    |  13 +-
 .../minion/executor/SegmentConversionUtils.java    |  50 ++++
 .../executor/SegmentMergeRollupTaskExecutor.java   | 105 +++++++
 .../SegmentMergeRollupTaskExecutorFactory.java     |  23 ++
 .../executor/TaskExecutorFactoryRegistry.java      |   2 +
 pom.xml                                            |   5 +
 47 files changed, 2189 insertions(+), 79 deletions(-)

diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/HelixExternalViewBasedRouting.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/HelixExternalViewBasedRouting.java
index c3db57d..287eeea 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/HelixExternalViewBasedRouting.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/HelixExternalViewBasedRouting.java
@@ -17,6 +17,8 @@ package com.linkedin.pinot.broker.routing;
 
 import com.google.common.collect.Sets;
 import com.linkedin.pinot.broker.routing.builder.RoutingTableBuilder;
+import com.linkedin.pinot.broker.routing.selector.SegmentSelector;
+import com.linkedin.pinot.broker.routing.selector.SegmentSelectorFactory;
 import com.linkedin.pinot.common.config.TableConfig;
 import com.linkedin.pinot.common.config.TableNameBuilder;
 import com.linkedin.pinot.common.metrics.BrokerMeter;
@@ -67,6 +69,7 @@ public class HelixExternalViewBasedRouting implements RoutingTable {
   private final Map<String, Map<String, InstanceConfig>> _lastKnownInstanceConfigsForTable = new ConcurrentHashMap<>();
   private final Map<String, InstanceConfig> _lastKnownInstanceConfigs = new ConcurrentHashMap<>();
   private final Map<String, Set<String>> _tablesForInstance = new ConcurrentHashMap<>();
+  private final Map<String, SegmentSelector> _segmentSelectorMap = new ConcurrentHashMap<>();
 
   private final HelixExternalViewBasedTimeBoundaryService _timeBoundaryService;
   private final HelixManager _helixManager;
@@ -76,25 +79,25 @@ public class HelixExternalViewBasedRouting implements RoutingTable {
 
   private Configuration _configuration;
 
-  private ZkHelixPropertyStore<ZNRecord> _propertyStore;
-
   private RoutingTableBuilderFactory _routingTableBuilderFactory;
+  private SegmentSelectorFactory _segmentSelectorFactory;
+
 
   public HelixExternalViewBasedRouting(ZkHelixPropertyStore<ZNRecord> propertyStore, HelixManager helixManager,
       Configuration configuration) {
-    _propertyStore = propertyStore;
     _configuration = configuration;
     _timeBoundaryService = new HelixExternalViewBasedTimeBoundaryService(propertyStore);
     _routingTableBuilderMap = new HashMap<>();
     _helixManager = helixManager;
     _routingTableBuilderFactory = new RoutingTableBuilderFactory(_configuration, propertyStore);
+    _segmentSelectorFactory = new SegmentSelectorFactory(propertyStore);
   }
 
   @Override
   public Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request) {
     String tableName = request.getTableName();
     RoutingTableBuilder routingTableBuilder = _routingTableBuilderMap.get(tableName);
-    return routingTableBuilder.getRoutingTable(request);
+    return routingTableBuilder.getRoutingTable(request, _segmentSelectorMap.get(tableName));
   }
 
   @Override
@@ -116,6 +119,11 @@ public class HelixExternalViewBasedRouting implements RoutingTable {
         tableName);
     _routingTableBuilderMap.put(tableName, routingTableBuilder);
 
+    // Initialize segment selector
+    SegmentSelector segmentSelector = _segmentSelectorFactory.createSegmentSelector(tableConfig);
+    LOGGER.info("Initialized segmentSelector: {} for table {}", segmentSelector.getClass().getName(), tableName);
+    _segmentSelectorMap.put(tableName, segmentSelector);
+
     // Build the routing table
     if (externalView == null) {
       // It is possible for us to get a request to serve a table for which there is no external view. In this case, just
@@ -223,8 +231,11 @@ public class HelixExternalViewBasedRouting implements RoutingTable {
     int externalViewRecordVersion = externalView.getRecord().getVersion();
     _lastKnownExternalViewVersionMap.put(tableNameWithType, externalViewRecordVersion);
 
+
+
     RoutingTableBuilder routingTableBuilder = _routingTableBuilderMap.get(tableNameWithType);
-    if (routingTableBuilder == null) {
+    SegmentSelector segmentSelector = _segmentSelectorMap.get(tableNameWithType);
+    if (routingTableBuilder == null || segmentSelector == null) {
       //TODO: warn
       return;
     }
@@ -237,6 +248,7 @@ public class HelixExternalViewBasedRouting implements RoutingTable {
       Map<String, InstanceConfig> relevantInstanceConfigs = new HashMap<>();
 
       routingTableBuilder.computeOnExternalViewChange(tableNameWithType, externalView, instanceConfigs);
+      segmentSelector.computeOnExternalViewChange();
 
       // Keep track of the instance configs that are used in that routing table
       updateInstanceConfigsMapFromExternalView(relevantInstanceConfigs, instanceConfigs, externalView);
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/BasePartitionAwareRoutingTableBuilder.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/BasePartitionAwareRoutingTableBuilder.java
index 8d1f568..732c636 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/BasePartitionAwareRoutingTableBuilder.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/BasePartitionAwareRoutingTableBuilder.java
@@ -18,6 +18,7 @@ package com.linkedin.pinot.broker.routing.builder;
 import com.linkedin.pinot.broker.pruner.SegmentPrunerContext;
 import com.linkedin.pinot.broker.pruner.SegmentZKMetadataPrunerService;
 import com.linkedin.pinot.broker.routing.RoutingTableLookupRequest;
+import com.linkedin.pinot.broker.routing.selector.SegmentSelector;
 import com.linkedin.pinot.common.config.TableConfig;
 import com.linkedin.pinot.common.metadata.segment.SegmentZKMetadata;
 import com.linkedin.pinot.common.metrics.BrokerMeter;
@@ -82,7 +83,7 @@ public abstract class BasePartitionAwareRoutingTableBuilder implements RoutingTa
   }
 
   @Override
-  public Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request) {
+  public Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request, SegmentSelector segmentSelector) {
     // Copy the reference for the current segment to replica to server mapping for snapshot
     Map<String, Map<Integer, String>> segmentToReplicaToServerMap = _segmentToReplicaToServerMap;
 
@@ -90,6 +91,9 @@ public abstract class BasePartitionAwareRoutingTableBuilder implements RoutingTa
     Set<String> segmentsToQuery = segmentToReplicaToServerMap.keySet();
 
     // TODO: add the selection logic here
+    if (segmentSelector != null) {
+      segmentsToQuery = segmentSelector.selectSegments(request, segmentsToQuery);
+    }
 
     Map<String, List<String>> routingTable = new HashMap<>();
     SegmentPrunerContext prunerContext = new SegmentPrunerContext(request.getBrokerRequest());
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/BaseRoutingTableBuilder.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/BaseRoutingTableBuilder.java
index c00d30d..1d59e89 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/BaseRoutingTableBuilder.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/BaseRoutingTableBuilder.java
@@ -16,6 +16,7 @@
 package com.linkedin.pinot.broker.routing.builder;
 
 import com.linkedin.pinot.broker.routing.RoutingTableLookupRequest;
+import com.linkedin.pinot.broker.routing.selector.SegmentSelector;
 import com.linkedin.pinot.common.config.RoutingConfig;
 import com.linkedin.pinot.common.config.TableConfig;
 import com.linkedin.pinot.common.metrics.BrokerMeter;
@@ -123,16 +124,16 @@ public abstract class BaseRoutingTableBuilder implements RoutingTableBuilder {
     }
   }
 
-  @Override
-  public Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request) {
+  public Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request, SegmentSelector segmentSelector) {
     if (_enableDynamicComputing) {
       // Copy the pointer for snapshot since the pointer for segment to servers map can change at anytime
       Map<String, List<String>> segmentToServersMap = _segmentToServersMap;
 
-      // Get all existing segments
-      Set<String> segmentsToQuery = segmentToServersMap.keySet();
-
       // TODO: add the selection logic here
+      Set<String> segmentsToQuery = segmentToServersMap.keySet();
+      if (segmentSelector != null) {
+        segmentsToQuery = segmentSelector.selectSegments(request, segmentsToQuery);
+      }
 
       // Compute the final routing table
       return computeDynamicRoutingTable(segmentToServersMap, segmentsToQuery);
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/DefaultOfflineRoutingTableBuilder.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/DefaultOfflineRoutingTableBuilder.java
index 07cea6b..64ba82f 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/DefaultOfflineRoutingTableBuilder.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/DefaultOfflineRoutingTableBuilder.java
@@ -16,6 +16,7 @@
 package com.linkedin.pinot.broker.routing.builder;
 
 import com.linkedin.pinot.broker.routing.RoutingTableLookupRequest;
+import com.linkedin.pinot.broker.routing.selector.SegmentSelector;
 import com.linkedin.pinot.common.config.TableConfig;
 import com.linkedin.pinot.common.metrics.BrokerMetrics;
 import java.util.HashSet;
@@ -121,8 +122,8 @@ public class DefaultOfflineRoutingTableBuilder implements RoutingTableBuilder {
   }
 
   @Override
-  public Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request) {
-    return _routingTableBuilder.getRoutingTable(request);
+  public Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request, SegmentSelector segmentSelector) {
+    return _routingTableBuilder.getRoutingTable(request, segmentSelector);
   }
 
   @Override
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/DefaultRealtimeRoutingTableBuilder.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/DefaultRealtimeRoutingTableBuilder.java
index d78de04..72a9a79 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/DefaultRealtimeRoutingTableBuilder.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/DefaultRealtimeRoutingTableBuilder.java
@@ -16,6 +16,7 @@
 package com.linkedin.pinot.broker.routing.builder;
 
 import com.linkedin.pinot.broker.routing.RoutingTableLookupRequest;
+import com.linkedin.pinot.broker.routing.selector.SegmentSelector;
 import com.linkedin.pinot.common.config.TableConfig;
 import com.linkedin.pinot.common.metrics.BrokerMetrics;
 import com.linkedin.pinot.common.utils.SegmentName;
@@ -68,7 +69,7 @@ public class DefaultRealtimeRoutingTableBuilder implements RoutingTableBuilder {
   }
 
   @Override
-  public Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request) {
+  public Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request, SegmentSelector segmentSelector) {
     boolean forceLLC = false;
     boolean forceHLC = false;
     for (String routingOption : request.getRoutingOptions()) {
@@ -85,14 +86,14 @@ public class DefaultRealtimeRoutingTableBuilder implements RoutingTableBuilder {
     }
 
     if (forceLLC) {
-      return _realtimeLLCRoutingTableBuilder.getRoutingTable(request);
+      return _realtimeLLCRoutingTableBuilder.getRoutingTable(request, segmentSelector);
     } else if (forceHLC) {
-      return _realtimeHLCRoutingTableBuilder.getRoutingTable(request);
+      return _realtimeHLCRoutingTableBuilder.getRoutingTable(request, segmentSelector);
     } else {
       if (_hasLLC) {
-        return _realtimeLLCRoutingTableBuilder.getRoutingTable(request);
+        return _realtimeLLCRoutingTableBuilder.getRoutingTable(request, segmentSelector);
       } else if (_hasHLC) {
-        return _realtimeHLCRoutingTableBuilder.getRoutingTable(request);
+        return _realtimeHLCRoutingTableBuilder.getRoutingTable(request, segmentSelector);
       } else {
         return Collections.emptyMap();
       }
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/RoutingTableBuilder.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/RoutingTableBuilder.java
index 5a9c320..3fa2bee 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/RoutingTableBuilder.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/RoutingTableBuilder.java
@@ -16,6 +16,7 @@
 package com.linkedin.pinot.broker.routing.builder;
 
 import com.linkedin.pinot.broker.routing.RoutingTableLookupRequest;
+import com.linkedin.pinot.broker.routing.selector.SegmentSelector;
 import com.linkedin.pinot.common.config.TableConfig;
 import com.linkedin.pinot.common.metrics.BrokerMetrics;
 import java.util.List;
@@ -47,7 +48,7 @@ public interface RoutingTableBuilder {
   /**
    * Get the routing table based on the given lookup request.
    */
-  Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request);
+  Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request, SegmentSelector segmentSelector);
 
   /**
    * Get all pre-computed routing tables.
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/selector/DefaultSegmentSelector.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/selector/DefaultSegmentSelector.java
new file mode 100644
index 0000000..b8105f6
--- /dev/null
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/selector/DefaultSegmentSelector.java
@@ -0,0 +1,42 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.broker.routing.selector;
+
+import com.linkedin.pinot.broker.routing.RoutingTableLookupRequest;
+import com.linkedin.pinot.common.config.TableConfig;
+import java.util.Set;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+
+
+/**
+ * Default Segment Selector
+ */
+public class DefaultSegmentSelector implements SegmentSelector {
+
+  @Override
+  public void init(TableConfig tableConfig, ZkHelixPropertyStore<ZNRecord> propertyStore) {
+  }
+
+  @Override
+  public void computeOnExternalViewChange() {
+  }
+
+  @Override
+  public Set<String> selectSegments(RoutingTableLookupRequest request, Set<String> segments) {
+    return segments;
+  }
+}
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/selector/MergedSegmentSelector.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/selector/MergedSegmentSelector.java
new file mode 100644
index 0000000..7d74146
--- /dev/null
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/selector/MergedSegmentSelector.java
@@ -0,0 +1,92 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.broker.routing.selector;
+
+import com.linkedin.pinot.broker.routing.RoutingTableLookupRequest;
+import com.linkedin.pinot.common.config.TableConfig;
+import com.linkedin.pinot.common.lineage.SegmentGroup;
+import com.linkedin.pinot.common.lineage.SegmentMergeLineage;
+import com.linkedin.pinot.common.lineage.SegmentMergeLineageAccessHelper;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+
+
+/**
+ * Segment selector for merged segments
+ */
+public class MergedSegmentSelector implements SegmentSelector {
+  private String _tableNameWithType;
+  private ZkHelixPropertyStore<ZNRecord> _propertyStore;
+  private volatile SegmentGroup _rootSegmentGroup;
+
+  @Override
+  public void init(TableConfig tableConfig, ZkHelixPropertyStore<ZNRecord> propertyStore) {
+    _tableNameWithType = tableConfig.getTableName();
+    _propertyStore = propertyStore;
+  }
+
+  @Override
+  public void computeOnExternalViewChange() {
+    SegmentMergeLineage segmentMergeLineage = SegmentMergeLineageAccessHelper.getSegmentMergeLineage(_propertyStore,
+        _tableNameWithType);
+    _rootSegmentGroup = segmentMergeLineage.getMergeLineageRootSegmentGroup();
+  }
+
+  @Override
+  public Set<String> selectSegments(RoutingTableLookupRequest request, Set<String> segmentsToQuery) {
+    Set<String> selectedSegments = new HashSet<>(segmentsToQuery);
+    for (SegmentGroup segmentGroup : _rootSegmentGroup.getChildrenGroups()) {
+      computeSelectionProcessForSegmentGroup(segmentGroup, selectedSegments, segmentsToQuery);
+    }
+    return selectedSegments;
+  }
+
+  private void computeSelectionProcessForSegmentGroup(SegmentGroup segmentGroup, Set<String> selectedSegments,
+      Set<String> availableSegments) {
+    Set<String> segmentsForGroup = segmentGroup.getSegments();
+
+    if (availableSegments.containsAll(segmentsForGroup)) {
+      // If we pick the current group node, we delete all segments covered by children groups
+      if (segmentGroup.getChildrenGroups() == null || segmentGroup.getChildrenGroups().isEmpty()) {
+        return;
+      }
+      for (SegmentGroup child: segmentGroup.getChildrenGroups()) {
+        removeSegmentsForSegmentGroup(child, selectedSegments);
+      }
+    } else {
+      // If the current group is not picked, we compute the selection recursively for children nodes
+      selectedSegments.removeAll(segmentsForGroup);
+      for (SegmentGroup child: segmentGroup.getChildrenGroups()) {
+        computeSelectionProcessForSegmentGroup(child, selectedSegments, availableSegments);
+      }
+    }
+  }
+
+  private void removeSegmentsForSegmentGroup(SegmentGroup segmentGroup, Set<String> selectedSegments) {
+    Set<String> segmentsForGroup = segmentGroup.getSegments();
+    selectedSegments.removeAll(segmentsForGroup);
+
+    if (segmentGroup.getChildrenGroups() == null || segmentGroup.getChildrenGroups().isEmpty()) {
+      return;
+    }
+
+    for (SegmentGroup child : segmentGroup.getChildrenGroups()) {
+      removeSegmentsForSegmentGroup(child, selectedSegments);
+    }
+  }
+}
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/selector/SegmentSelector.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/selector/SegmentSelector.java
new file mode 100644
index 0000000..f7bb19d
--- /dev/null
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/selector/SegmentSelector.java
@@ -0,0 +1,46 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.broker.routing.selector;
+
+import com.linkedin.pinot.broker.routing.RoutingTableLookupRequest;
+import com.linkedin.pinot.common.config.TableConfig;
+import java.util.Set;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+
+
+/**
+ * Interface for segment selector
+ */
+public interface SegmentSelector {
+
+  /**
+   * Initiate the segment selector
+   */
+  void init(TableConfig tableConfig, ZkHelixPropertyStore<ZNRecord> propertyStore);
+
+  /**
+   * Compute and update the information required for a selector
+   * <p>Should be called whenever there is an external view change
+   */
+  void computeOnExternalViewChange();
+
+  /**
+   * Compute selection algorithm.
+   * <p>May end up with modifying the input set of segments.
+   */
+  Set<String> selectSegments(RoutingTableLookupRequest request, Set<String> segmentsToQuery);
+}
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/selector/SegmentSelectorFactory.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/selector/SegmentSelectorFactory.java
new file mode 100644
index 0000000..1033bc9
--- /dev/null
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/selector/SegmentSelectorFactory.java
@@ -0,0 +1,40 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.broker.routing.selector;
+
+import com.linkedin.pinot.common.config.TableConfig;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+
+
+/**
+ * Factory for segment selector
+ */
+public class SegmentSelectorFactory {
+
+  private ZkHelixPropertyStore<ZNRecord> _propertyStore;
+
+  public SegmentSelectorFactory(ZkHelixPropertyStore<ZNRecord> propertyStore) {
+    _propertyStore = propertyStore;
+  }
+
+  public SegmentSelector createSegmentSelector(TableConfig tableConfig) {
+    // TODO: add the support for merged segment selector once merge config is updated.
+    SegmentSelector segmentSelector = new MergedSegmentSelector();
+    segmentSelector.init(tableConfig, _propertyStore);
+    return segmentSelector;
+  }
+}
diff --git a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/RoutingTableBenchmark.java b/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/RoutingTableBenchmark.java
new file mode 100644
index 0000000..046cb69
--- /dev/null
+++ b/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/RoutingTableBenchmark.java
@@ -0,0 +1,122 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.broker.routing;
+
+
+import com.linkedin.pinot.common.config.RoutingConfig;
+import com.linkedin.pinot.common.config.TableConfig;
+import com.linkedin.pinot.common.config.TableNameBuilder;
+import com.linkedin.pinot.common.utils.CommonConstants;
+import java.io.FileInputStream;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.io.IOUtils;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.testng.Assert;
+
+
+public class RoutingTableBenchmark {
+  public static final int NUM_ROUNDS = 10000;
+
+//  public static void main(String[] args) throws Exception {
+////    URL resourceUrl = RoutingTableBenchmark.class.getClassLoader().getResource("SampleExternalView.json");
+////    URL resourceUrl = RoutingTableBenchmark.class.getClassLoader().getResource("SampleRealtimeExternalView.json");
+//    URL resourceUrl = RoutingTableBenchmark.class.getClassLoader().getResource("AdStatistics.json");
+//
+//    Assert.assertNotNull(resourceUrl);
+//    String fileName = resourceUrl.getFile();
+//
+//    byte[] externalViewBytes = IOUtils.toByteArray(new FileInputStream(fileName));
+//    ExternalView externalView = new ExternalView((ZNRecord) new ZNRecordSerializer().deserialize(externalViewBytes));
+//    String tableName = externalView.getResourceName();
+//    List<InstanceConfig> instanceConfigs = getInstanceConfigs(externalView);
+//
+//    TableConfig preComputeTableConfig = generatePreComputeTableConfig(tableName);
+//    TableConfig dynamicTableConfig = generateDynamicTableConfig(tableName);
+//
+//    HelixExternalViewBasedRouting precomputeRouting = new HelixExternalViewBasedRouting(null, null, new BaseConfiguration());
+//    precomputeRouting.markDataResourceOnline(preComputeTableConfig, externalView, instanceConfigs);
+//
+//    HelixExternalViewBasedRouting dynamicRouting = new HelixExternalViewBasedRouting(null, null, new BaseConfiguration());
+//    dynamicRouting.markDataResourceOnline(dynamicTableConfig, externalView, instanceConfigs);
+//
+////    System.out.println("Pre-compute Routing table builder");
+////    for (int i = 0; i < NUM_ROUNDS; i++) {
+////      long start = System.nanoTime();
+////      Map<String, List<String>> routingTable = precomputeRouting.getRoutingTable(new RoutingTableLookupRequest(tableName));
+////      long end = System.nanoTime();
+////      System.out.println("round " + i + ": " + (end - start) / 1000000.0 + " ms");
+////    }
+////    System.out.println("");
+//
+//    System.out.println("Dynamic Routing table builder");
+//    for (int i = 0; i < NUM_ROUNDS; i++) {
+//      long start = System.nanoTime();
+//      Map<String, List<String>> routingTable = dynamicRouting.getRoutingTable(new RoutingTableLookupRequest(tableName));
+//      long end = System.nanoTime();
+//      System.out.println("round " + i + ": " + (end - start) / 1000000.0 + " ms");
+//    }
+//  }
+//
+//  private static List<InstanceConfig> getInstanceConfigs(ExternalView externalView) {
+//    List<InstanceConfig> instanceConfigs = new ArrayList<>();
+//    Set<String> instances = new HashSet<>();
+//
+//    // Collect all unique instances
+//    for (String partitionName : externalView.getPartitionSet()) {
+//      for (String instance : externalView.getStateMap(partitionName).keySet()) {
+//        if (!instances.contains(instance)) {
+//          instanceConfigs.add(new InstanceConfig(instance));
+//          instances.add(instance);
+//        }
+//      }
+//    }
+//
+//    return instanceConfigs;
+//  }
+//
+//  private static TableConfig generatePreComputeTableConfig(String tableName) throws Exception {
+//    CommonConstants.Helix.TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName);
+//    TableConfig.Builder builder = new TableConfig.Builder(tableType);
+//    builder.setTableName(tableName);
+//    TableConfig tableConfig = builder.build();
+////    tableConfig.getRoutingConfig().setRoutingTableBuilderName("KafkaLowLevel");
+//    return tableConfig;
+//  }
+//
+//  private static TableConfig generateDynamicTableConfig(String tableName) throws Exception {
+//    CommonConstants.Helix.TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName);
+//    TableConfig.Builder builder = new TableConfig.Builder(tableType);
+//    builder.setTableName(tableName);
+//    TableConfig tableConfig = builder.build();
+//    Map<String, String> option = new HashMap<>();
+//    option.put(RoutingConfig.ENABLE_DYNAMIC_COMPUTING_KEY, "true");
+//    tableConfig.getRoutingConfig().setRoutingTableBuilderOptions(option);
+////    tableConfig.getRoutingConfig().setRoutingTableBuilderName("KafkaLowLevel");
+//    return tableConfig;
+//  }
+
+}
+
diff --git a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/RoutingTableTest.java b/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/RoutingTableTest.java
index 25600d3..3413711 100644
--- a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/RoutingTableTest.java
+++ b/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/RoutingTableTest.java
@@ -17,6 +17,7 @@ package com.linkedin.pinot.broker.routing;
 
 import com.linkedin.pinot.broker.routing.builder.HighLevelConsumerBasedRoutingTableBuilder;
 import com.linkedin.pinot.broker.routing.builder.RoutingTableBuilder;
+import com.linkedin.pinot.broker.util.FakePropertyStore;
 import com.linkedin.pinot.common.config.TableConfig;
 import com.linkedin.pinot.common.config.TableConfig.Builder;
 import com.linkedin.pinot.common.config.TableNameBuilder;
diff --git a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/BalancedRandomRoutingTableBuilderTest.java b/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/BalancedRandomRoutingTableBuilderTest.java
index 7bae71d..46ca40c 100644
--- a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/BalancedRandomRoutingTableBuilderTest.java
+++ b/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/BalancedRandomRoutingTableBuilderTest.java
@@ -79,7 +79,7 @@ public class BalancedRandomRoutingTableBuilderTest {
     // Build routing table
     routingTableBuilder.computeOnExternalViewChange("dummy", externalView, instanceConfigList);
     RoutingTableLookupRequest request = new RoutingTableLookupRequest(tableNameWithType);
-    Map<String, List<String>> routingTable = routingTableBuilder.getRoutingTable(request);
+    Map<String, List<String>> routingTable = routingTableBuilder.getRoutingTable(request, null);
 
     Set<String> segmentsInRoutingTable = new HashSet<>();
     for (List<String> segments : routingTable.values()) {
diff --git a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/HighLevelConsumerRoutingTableBuilderTest.java b/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/HighLevelConsumerRoutingTableBuilderTest.java
index 74f53c3..c2ef4e4 100644
--- a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/HighLevelConsumerRoutingTableBuilderTest.java
+++ b/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/HighLevelConsumerRoutingTableBuilderTest.java
@@ -95,7 +95,7 @@ public class HighLevelConsumerRoutingTableBuilderTest {
       // Check if the routing table result is correct
       for (int run = 0; run < MAX_NUM_GROUPS * 10; run++) {
         RoutingTableLookupRequest request = new RoutingTableLookupRequest(tableNameWithType);
-        Map<String, List<String>> routingTable = routingTableBuilder.getRoutingTable(request);
+        Map<String, List<String>> routingTable = routingTableBuilder.getRoutingTable(request, null);
         Set<String> coveredSegments = new HashSet<>();
         for (List<String> segmentsForServer : routingTable.values()) {
           coveredSegments.addAll(segmentsForServer);
diff --git a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilderTest.java b/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilderTest.java
index c110b6e..5a44a28 100644
--- a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilderTest.java
+++ b/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilderTest.java
@@ -15,7 +15,7 @@
  */
 package com.linkedin.pinot.broker.routing.builder;
 
-import com.linkedin.pinot.broker.routing.FakePropertyStore;
+import com.linkedin.pinot.broker.util.FakePropertyStore;
 import com.linkedin.pinot.broker.routing.RoutingTableLookupRequest;
 import com.linkedin.pinot.common.config.ReplicaGroupStrategyConfig;
 import com.linkedin.pinot.common.config.RoutingConfig;
@@ -50,7 +50,6 @@ public class PartitionAwareOfflineRoutingTableBuilderTest {
   private static final String PARTITION_COLUMN = "memberId";
 
   private static final Pql2Compiler COMPILER = new Pql2Compiler();
-
   private static final Random RANDOM = new Random();
 
   private int NUM_REPLICA;
@@ -114,7 +113,7 @@ public class PartitionAwareOfflineRoutingTableBuilderTest {
       // Check the query that requires to scan all segment.
       String countStarQuery = "select count(*) from myTable";
       Map<String, List<String>> routingTable =
-          routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery));
+          routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery), null);
 
       // Check that the number of servers picked are always equal or less than the number of servers
       // from a single replica group.
@@ -134,7 +133,7 @@ public class PartitionAwareOfflineRoutingTableBuilderTest {
       for (int queryPartition = 0; queryPartition < 100; queryPartition++) {
         String filterQuery = "select count(*) from myTable where " + PARTITION_COLUMN + " = " + queryPartition;
         routingTable =
-            routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(filterQuery));
+            routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(filterQuery), null);
 
         // Check that the number of servers picked are always equal or less than the number of servers
         // in a single replica group.
@@ -218,7 +217,7 @@ public class PartitionAwareOfflineRoutingTableBuilderTest {
     for (int i = 0; i < 100; i++) {
       String countStarQuery = "select count(*) from " + OFFLINE_TABLE_NAME;
       Map<String, List<String>> routingTable =
-          routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery));
+          routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery), null);
       Assert.assertEquals(routingTable.keySet().size(), 1);
       servers.add(routingTable.keySet().iterator().next());
     }
diff --git a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/PartitionAwareRealtimeRoutingTableBuilderTest.java b/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/PartitionAwareRealtimeRoutingTableBuilderTest.java
index 7f349f8..93f2c9a 100644
--- a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/PartitionAwareRealtimeRoutingTableBuilderTest.java
+++ b/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/PartitionAwareRealtimeRoutingTableBuilderTest.java
@@ -15,7 +15,8 @@
  */
 package com.linkedin.pinot.broker.routing.builder;
 
-import com.linkedin.pinot.broker.routing.FakePropertyStore;
+import com.linkedin.pinot.broker.routing.selector.SegmentSelector;
+import com.linkedin.pinot.broker.util.FakePropertyStore;
 import com.linkedin.pinot.broker.routing.RoutingTableLookupRequest;
 import com.linkedin.pinot.common.config.ColumnPartitionConfig;
 import com.linkedin.pinot.common.config.RoutingConfig;
@@ -102,7 +103,7 @@ public class PartitionAwareRealtimeRoutingTableBuilderTest {
       // Check the query that requires to scan all segment.
       String countStarQuery = "select count(*) from myTable";
       Map<String, List<String>> routingTable =
-          routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery));
+          routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery), null);
 
       // Check that all segments are covered exactly for once.
       Set<String> assignedSegments = new HashSet<>();
@@ -117,7 +118,7 @@ public class PartitionAwareRealtimeRoutingTableBuilderTest {
       // Check the broker side server and segment pruning.
       for (int queryPartition = 0; queryPartition < 100; queryPartition++) {
         String filterQuery = "select count(*) from myTable where " + PARTITION_COLUMN + " = " + queryPartition;
-        routingTable = routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(filterQuery));
+        routingTable = routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(filterQuery), null);
 
         int partition = queryPartition % NUM_PARTITION;
         assignedSegments = new HashSet<>();
@@ -178,7 +179,7 @@ public class PartitionAwareRealtimeRoutingTableBuilderTest {
     // Check the query that requires to scan all segment.
     String countStarQuery = "select count(*) from myTable";
     Map<String, List<String>> routingTable =
-        routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery));
+        routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery), null);
 
     // Check that all segments are covered exactly for once.
     Set<String> assignedSegments = new HashSet<>();
@@ -253,7 +254,7 @@ public class PartitionAwareRealtimeRoutingTableBuilderTest {
     for (int i = 0; i < 100; i++) {
       String countStarQuery = "select count(*) from " + REALTIME_TABLE_NAME;
       Map<String, List<String>> routingTable =
-          routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery));
+          routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery), null);
       Assert.assertEquals(routingTable.keySet().size(), 1);
       servers.addAll(routingTable.keySet());
     }
diff --git a/pinot-broker/src/test/java/com/linkedin/pinot/broker/selector/MergeSegmentSelectorTest.java b/pinot-broker/src/test/java/com/linkedin/pinot/broker/selector/MergeSegmentSelectorTest.java
new file mode 100644
index 0000000..738e943
--- /dev/null
+++ b/pinot-broker/src/test/java/com/linkedin/pinot/broker/selector/MergeSegmentSelectorTest.java
@@ -0,0 +1,77 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.broker.selector;
+
+import com.fasterxml.uuid.Generators;
+import com.linkedin.pinot.broker.routing.selector.MergedSegmentSelector;
+import com.linkedin.pinot.broker.util.FakePropertyStore;
+import com.linkedin.pinot.common.config.TableConfig;
+import com.linkedin.pinot.common.lineage.SegmentMergeLineage;
+import com.linkedin.pinot.common.lineage.SegmentMergeLineageAccessHelper;
+import com.linkedin.pinot.common.utils.CommonConstants;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.testng.annotations.Test;
+
+
+public class MergeSegmentSelectorTest {
+  private static final String TEST_TABLE_NAME = "test_OFFLINE";
+
+  @Test
+  public void testMergeSegmentSelector() throws Exception {
+    MergedSegmentSelector segmentSelector = new MergedSegmentSelector();
+
+    TableConfig tableConfig =
+        new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(TEST_TABLE_NAME).build();
+    ZkHelixPropertyStore<ZNRecord> fakePropertyStore = new FakePropertyStore();
+    segmentSelector.init(tableConfig, fakePropertyStore);
+
+    SegmentMergeLineageAccessHelper.writeSegmentMergeLineage(fakePropertyStore, generateFakeMergeLineage(), 0);
+    segmentSelector.computeOnExternalViewChange();
+
+    Set<String> segmentsToQuery = new HashSet<>(
+        Arrays.asList(new String[]{"segment0", "segment1", "segment2", "segment3", "merged_segment0", "merged_segment1"}));
+
+    System.out.println("segments to query: " + segmentsToQuery);
+    System.out.println("segments after filter: " + segmentSelector.selectSegments(null, segmentsToQuery));
+
+  }
+
+  private SegmentMergeLineage generateFakeMergeLineage() throws Exception {
+    SegmentMergeLineage segmentMergeLineage = new SegmentMergeLineage(TEST_TABLE_NAME);
+
+    List<String> childrenGroups = new ArrayList<>();
+    for (int i = 0; i < 3; i++) {
+      String groupId = Generators.timeBasedGenerator().generate().toString();
+      segmentMergeLineage.addSegmentGroup(groupId, Arrays.asList(new String[]{"segment" + i}), null);
+      childrenGroups.add(groupId);
+    }
+
+    String mergedGroupId = Generators.timeBasedGenerator().generate().toString();
+    segmentMergeLineage.addSegmentGroup(mergedGroupId,
+        Arrays.asList(new String[]{"merged_segment0", "merged_segment1"}), childrenGroups);
+
+    String unmergedSegmentGroupId = Generators.timeBasedGenerator().generate().toString();
+    segmentMergeLineage.addSegmentGroup(unmergedSegmentGroupId, Arrays.asList(new String[]{"segment3"}), null);
+
+    return segmentMergeLineage;
+  }
+}
diff --git a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/FakePropertyStore.java b/pinot-broker/src/test/java/com/linkedin/pinot/broker/util/FakePropertyStore.java
similarity index 88%
rename from pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/FakePropertyStore.java
rename to pinot-broker/src/test/java/com/linkedin/pinot/broker/util/FakePropertyStore.java
index 6bb8322..e18c9e2 100644
--- a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/FakePropertyStore.java
+++ b/pinot-broker/src/test/java/com/linkedin/pinot/broker/util/FakePropertyStore.java
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package com.linkedin.pinot.broker.routing;
+package com.linkedin.pinot.broker.util;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -42,6 +42,17 @@ public class FakePropertyStore extends ZkHelixPropertyStore<ZNRecord> {
   }
 
   @Override
+  public boolean set(String path, ZNRecord stat, int expectedVersion, int options) {
+    try {
+      setContents(path, stat);
+      return true;
+    } catch (Exception e) {
+      return false;
+    }
+  }
+
+
+  @Override
   public boolean set(String path, ZNRecord stat, int options) {
     try {
       setContents(path, stat);
diff --git a/pinot-common/pom.xml b/pinot-common/pom.xml
index 95e4fc8..6f59e95 100644
--- a/pinot-common/pom.xml
+++ b/pinot-common/pom.xml
@@ -228,6 +228,10 @@
       <artifactId>jackson-annotations</artifactId>
     </dependency>
     <dependency>
+      <groupId>com.fasterxml.uuid</groupId>
+      <artifactId>java-uuid-generator</artifactId>
+    </dependency>
+    <dependency>
       <groupId>com.google.code.findbugs</groupId>
       <artifactId>jsr305</artifactId>
     </dependency>
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/config/MultiLevelRollupSetting.java b/pinot-common/src/main/java/com/linkedin/pinot/common/config/MultiLevelRollupSetting.java
new file mode 100644
index 0000000..d9da264
--- /dev/null
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/config/MultiLevelRollupSetting.java
@@ -0,0 +1,77 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.common.config;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class MultiLevelRollupSetting {
+  @ConfigKey("timeInputFormat")
+  public String _timeInputFormat;
+
+  @ConfigKey("timeOutputFormat")
+  public String _timeOutputFormat;
+
+  @ConfigKey("timeOutputGranularity")
+  public String _timeOutputGranularity;
+
+  @ConfigKey("minNumSegments")
+  public int _minNumSegments;
+
+  @ConfigKey("minNumTotalDocs")
+  public int _minNumTotalDocs;
+
+  public String getTimeInputFormat() {
+    return _timeInputFormat;
+  }
+
+  public void setTimeInputFormat(String timeInputFormat) {
+    _timeInputFormat = timeInputFormat;
+  }
+
+  public String getTimeOutputFormat() {
+    return _timeOutputFormat;
+  }
+
+  public void setTimeOutputFormat(String timeOutputFormat) {
+    _timeOutputFormat = timeOutputFormat;
+  }
+
+  public String getTimeOutputGranularity() {
+    return _timeOutputGranularity;
+  }
+
+  public void setTimeOutputGranularity(String timeOutputGranularity) {
+    _timeOutputGranularity = timeOutputGranularity;
+  }
+
+  public int getMinNumSegments() {
+    return _minNumSegments;
+  }
+
+  public void setMinNumSegments(int minNumSegments) {
+    _minNumSegments = minNumSegments;
+  }
+
+  public int getMinNumTotalDocs() {
+    return _minNumTotalDocs;
+  }
+
+  public void setMinNumTotalDocs(int minNumTotalDocs) {
+    _minNumTotalDocs = minNumTotalDocs;
+  }
+}
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/config/RollupConfig.java b/pinot-common/src/main/java/com/linkedin/pinot/common/config/RollupConfig.java
new file mode 100644
index 0000000..c86099f
--- /dev/null
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/config/RollupConfig.java
@@ -0,0 +1,59 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.common.config;
+
+import java.util.List;
+import java.util.Map;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class RollupConfig {
+
+  @ConfigKey("rollupType")
+  String rollupType;
+
+  @ConfigKey("preAggregateType")
+  @UseChildKeyHandler(SimpleMapChildKeyHandler.class)
+  Map<String, String> _preAggregateType;
+
+  @ConfigKey("multiLevelRollupSettings")
+  List<MultiLevelRollupSetting> _multiLevelRollupSettings;
+
+  public String getRollupType() {
+    return rollupType;
+  }
+
+  public void setRollupType(String rollupType) {
+    this.rollupType = rollupType;
+  }
+
+  public Map<String, String> getPreAggregateType() {
+    return _preAggregateType;
+  }
+
+  public void setPreAggregateType(Map<String, String> preAggregateType) {
+    _preAggregateType = preAggregateType;
+  }
+
+  public List<MultiLevelRollupSetting> getMultiLevelRollupSettings() {
+    return _multiLevelRollupSettings;
+  }
+
+  public void setMultiLevelRollupSettings(List<MultiLevelRollupSetting> multiLevelRollupSettings) {
+    _multiLevelRollupSettings = multiLevelRollupSettings;
+  }
+
+}
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/config/SegmentMergeConfig.java b/pinot-common/src/main/java/com/linkedin/pinot/common/config/SegmentMergeConfig.java
new file mode 100644
index 0000000..d96952f
--- /dev/null
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/config/SegmentMergeConfig.java
@@ -0,0 +1,137 @@
+/**
+ * Copyright (C) 2014-2016 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.common.config;
+
+import javax.annotation.Nonnull;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class SegmentMergeConfig {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(RollupConfig.class);
+
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+  private static final String MERGE_TYPE = "mergeType";
+  private static final String MERGE_STRATEGY = "mergeStrategy";
+  private static final String MIN_NUM_SEGMENTS = "minNumSegments";
+  private static final String MIN_NUM_TOTAL_DOCS = "minNumTotalDocs";
+  private static final String ROLLUP_CONFIG = "rollupConfig";
+  private static final String OLD_SEGMENT_CLEANUP = "oldSegmentCleanup";
+
+  @ConfigKey(value = "mergeType")
+  private String _mergeType;
+
+  @ConfigKey(value = "mergeStrategy")
+  private String _mergeStrategy;
+
+  @ConfigKey(value = "rollupConfig")
+  private RollupConfig _rollupConfig;
+
+  @ConfigKey(value = "minNumSegments")
+  private int _minNumSegments;
+
+  @ConfigKey(value = "minNumTotalDocs")
+  private long _minNumTotalDocs;
+
+  @ConfigKey(value = "oldSegmentCleanup")
+  private boolean _oldSegmentCleanup;
+
+  public String getMergeType() {
+    return _mergeType;
+  }
+
+  public void setMergeType(String mergeType) {
+    _mergeType = mergeType;
+  }
+
+  public String getMergeStrategy() {
+    return _mergeStrategy;
+  }
+
+  public void setMergeStrategy(String mergeStrategy) {
+    _mergeStrategy = mergeStrategy;
+  }
+
+  public int getMinNumSegments() {
+    return _minNumSegments;
+  }
+
+  public void setMinNumSegments(int minNumSegments) {
+    _minNumSegments = minNumSegments;
+  }
+
+  public long getMinNumTotalDocs() {
+    return _minNumTotalDocs;
+  }
+
+  public void setMinNumTotalDocs(long minNumTotalDocs) {
+    _minNumTotalDocs = minNumTotalDocs;
+  }
+
+  public boolean getOldSegmentCleanup() {
+    return _oldSegmentCleanup;
+  }
+
+  public void setOldSegmentCleanup(boolean oldSegmentCleanup) {
+    _oldSegmentCleanup = oldSegmentCleanup;
+  }
+
+  public RollupConfig getRollupConfig() {
+    return _rollupConfig;
+  }
+
+  public void setRollupConfig(RollupConfig rollupConfig) {
+    _rollupConfig = rollupConfig;
+  }
+
+  @Nonnull
+  public JSONObject toJSON() {
+    JSONObject mergeRollupConfigJsonObject = new JSONObject();
+    try {
+      mergeRollupConfigJsonObject.put(MERGE_TYPE, _mergeType);
+      mergeRollupConfigJsonObject.put(MERGE_STRATEGY, _mergeStrategy);
+      mergeRollupConfigJsonObject.put(ROLLUP_CONFIG, _rollupConfig);
+      mergeRollupConfigJsonObject.put(MIN_NUM_SEGMENTS, _minNumSegments);
+      mergeRollupConfigJsonObject.put(MIN_NUM_TOTAL_DOCS, _minNumTotalDocs);
+      mergeRollupConfigJsonObject.put(OLD_SEGMENT_CLEANUP, _oldSegmentCleanup);
+    } catch (Exception e) {
+      LOGGER.error("Failed to convert rollup config to json", e);
+    }
+
+    return mergeRollupConfigJsonObject;
+  }
+
+  public String toJSONString() {
+    try {
+      return toJSON().toString(2);
+    } catch (Exception e) {
+      return e.toString();
+    }
+  }
+
+  public static void main(String[] args) {
+    SegmentMergeConfig config = new SegmentMergeConfig();
+    config.setMergeType("CONCATENATE");
+    System.out.println(config.toJSONString());
+    System.out.println(config.toJSON());
+  }
+}
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/config/TableConfig.java b/pinot-common/src/main/java/com/linkedin/pinot/common/config/TableConfig.java
index 960c7f0..b4fa08a 100644
--- a/pinot-common/src/main/java/com/linkedin/pinot/common/config/TableConfig.java
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/config/TableConfig.java
@@ -47,6 +47,7 @@ public class TableConfig {
   private static final String QUOTA_CONFIG_KEY = "quota";
   private static final String TASK_CONFIG_KEY = "task";
   private static final String ROUTING_CONFIG_KEY = "routing";
+  private static final String SEGMENT_MERGE_CONFIG_KEY = "segmentMergeConfig";
 
   @ConfigKey("name")
   @ConfigDoc(value = "The name for the table.", mandatory = true, exampleValue = "myTable")
@@ -78,6 +79,9 @@ public class TableConfig {
   @NestedConfig
   private RoutingConfig _routingConfig;
 
+  @NestedConfig
+  private SegmentMergeConfig _segmentMergeConfig;
+
   public TableConfig() {
     // TODO: currently these 2 fields are annotated as non-null. Revisit to see whether that's necessary
     _tenantConfig = new TenantConfig();
@@ -87,7 +91,8 @@ public class TableConfig {
   private TableConfig(@Nonnull String tableName, @Nonnull TableType tableType,
       @Nonnull SegmentsValidationAndRetentionConfig validationConfig, @Nonnull TenantConfig tenantConfig,
       @Nonnull IndexingConfig indexingConfig, @Nonnull TableCustomConfig customConfig,
-      @Nullable QuotaConfig quotaConfig, @Nullable TableTaskConfig taskConfig, @Nullable RoutingConfig routingConfig) {
+      @Nullable QuotaConfig quotaConfig, @Nullable TableTaskConfig taskConfig, @Nullable RoutingConfig routingConfig,
+      @Nullable SegmentMergeConfig segmentMergeConfig) {
     _tableName = TableNameBuilder.forType(tableType).tableNameWithType(tableName);
     _tableType = tableType;
     _validationConfig = validationConfig;
@@ -97,19 +102,18 @@ public class TableConfig {
     _quotaConfig = quotaConfig;
     _taskConfig = taskConfig;
     _routingConfig = routingConfig;
+    _segmentMergeConfig = segmentMergeConfig;
   }
 
   // For backward compatible
   @Deprecated
   @Nonnull
-  public static TableConfig init(@Nonnull String jsonConfigString)
-      throws IOException, JSONException {
+  public static TableConfig init(@Nonnull String jsonConfigString) throws IOException, JSONException {
     return fromJSONConfig(new JSONObject(jsonConfigString));
   }
 
   @Nonnull
-  public static TableConfig fromJSONConfig(@Nonnull JSONObject jsonConfig)
-      throws IOException, JSONException {
+  public static TableConfig fromJSONConfig(@Nonnull JSONObject jsonConfig) throws IOException, JSONException {
     TableType tableType = TableType.valueOf(jsonConfig.getString(TABLE_TYPE_KEY).toUpperCase());
     String tableName = TableNameBuilder.forType(tableType).tableNameWithType(jsonConfig.getString(TABLE_NAME_KEY));
     SegmentsValidationAndRetentionConfig validationConfig =
@@ -136,13 +140,17 @@ public class TableConfig {
           OBJECT_MAPPER.readValue(jsonConfig.getJSONObject(ROUTING_CONFIG_KEY).toString(), RoutingConfig.class);
     }
 
+    SegmentMergeConfig segmentMergeConfig = null;
+    if (jsonConfig.has(SEGMENT_MERGE_CONFIG_KEY)) {
+      segmentMergeConfig = OBJECT_MAPPER.readValue(jsonConfig.getJSONObject(SEGMENT_MERGE_CONFIG_KEY).toString(), SegmentMergeConfig.class);
+    }
+
     return new TableConfig(tableName, tableType, validationConfig, tenantConfig, indexingConfig, customConfig,
-        quotaConfig, taskConfig, routingConfig);
+        quotaConfig, taskConfig, routingConfig, segmentMergeConfig);
   }
 
   @Nonnull
-  public static JSONObject toJSONConfig(@Nonnull TableConfig tableConfig)
-      throws IOException, JSONException {
+  public static JSONObject toJSONConfig(@Nonnull TableConfig tableConfig) throws IOException, JSONException {
     JSONObject jsonConfig = new JSONObject();
     jsonConfig.put(TABLE_NAME_KEY, tableConfig._tableName);
     jsonConfig.put(TABLE_TYPE_KEY, tableConfig._tableType.toString());
@@ -160,12 +168,14 @@ public class TableConfig {
     if (tableConfig._routingConfig != null) {
       jsonConfig.put(ROUTING_CONFIG_KEY, new JSONObject(OBJECT_MAPPER.writeValueAsString(tableConfig._routingConfig)));
     }
+    if (tableConfig._segmentMergeConfig != null) {
+      jsonConfig.put(SEGMENT_MERGE_CONFIG_KEY, new JSONObject(OBJECT_MAPPER.writeValueAsString(tableConfig._segmentMergeConfig)));
+    }
     return jsonConfig;
   }
 
   @Nonnull
-  public static TableConfig fromZnRecord(@Nonnull ZNRecord znRecord)
-      throws IOException, JSONException {
+  public static TableConfig fromZnRecord(@Nonnull ZNRecord znRecord) throws IOException, JSONException {
     Map<String, String> simpleFields = znRecord.getSimpleFields();
     TableType tableType = TableType.valueOf(simpleFields.get(TABLE_TYPE_KEY).toUpperCase());
     String tableName = TableNameBuilder.forType(tableType).tableNameWithType(simpleFields.get(TABLE_NAME_KEY));
@@ -194,8 +204,14 @@ public class TableConfig {
       routingConfig = OBJECT_MAPPER.readValue(routingConfigString, RoutingConfig.class);
     }
 
+    String segmentMergeConfigString = simpleFields.get(SEGMENT_MERGE_CONFIG_KEY);
+    SegmentMergeConfig segmentMergeConfig = null;
+    if (segmentMergeConfigString != null) {
+      segmentMergeConfig = OBJECT_MAPPER.readValue(segmentMergeConfigString, SegmentMergeConfig.class);
+    }
+
     return new TableConfig(tableName, tableType, validationConfig, tenantConfig, indexingConfig, customConfig,
-        quotaConfig, taskConfig, routingConfig);
+        quotaConfig, taskConfig, routingConfig, segmentMergeConfig);
   }
 
   @Nonnull
@@ -218,6 +234,9 @@ public class TableConfig {
       if (tableConfig._routingConfig != null) {
         simpleFields.put(ROUTING_CONFIG_KEY, OBJECT_MAPPER.writeValueAsString(tableConfig._routingConfig));
       }
+      if (tableConfig._segmentMergeConfig != null) {
+        simpleFields.put(SEGMENT_MERGE_CONFIG_KEY, OBJECT_MAPPER.writeValueAsString(tableConfig._segmentMergeConfig));
+      }
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
@@ -305,9 +324,17 @@ public class TableConfig {
     _routingConfig = routingConfig;
   }
 
+  @Nullable
+  public SegmentMergeConfig getSegmentMergeConfig() {
+    return _segmentMergeConfig;
+  }
+
+  public void setSegmentMergeConfig(SegmentMergeConfig segmentMergeConfig) {
+    _segmentMergeConfig = segmentMergeConfig;
+  }
+
   @Nonnull
-  public String toJSONConfigString()
-      throws IOException, JSONException {
+  public String toJSONConfigString() throws IOException, JSONException {
     return toJSONConfig(this).toString();
   }
 
@@ -332,18 +359,14 @@ public class TableConfig {
 
     TableConfig that = (TableConfig) o;
 
-    return EqualityUtils.isEqual(_tableName, that._tableName) &&
-        EqualityUtils.isEqual(_tableType, that._tableType) &&
-        EqualityUtils.isEqual(_validationConfig, that._validationConfig) &&
-        EqualityUtils.isEqual(_tenantConfig, that._tenantConfig) &&
-        EqualityUtils.isEqual(_indexingConfig, that._indexingConfig) &&
-        EqualityUtils.isEqual(_customConfig, that._customConfig) &&
-        EqualityUtils.isEqual(_quotaConfig, that._quotaConfig) &&
-        EqualityUtils.isEqual(_taskConfig, that._taskConfig) &&
-        EqualityUtils.isEqual(_routingConfig, that._routingConfig);
+    return EqualityUtils.isEqual(_tableName, that._tableName) && EqualityUtils.isEqual(_tableType, that._tableType)
+        && EqualityUtils.isEqual(_validationConfig, that._validationConfig) && EqualityUtils.isEqual(_tenantConfig,
+        that._tenantConfig) && EqualityUtils.isEqual(_indexingConfig, that._indexingConfig) && EqualityUtils.isEqual(
+        _customConfig, that._customConfig) && EqualityUtils.isEqual(_quotaConfig, that._quotaConfig)
+        && EqualityUtils.isEqual(_taskConfig, that._taskConfig) && EqualityUtils.isEqual(_routingConfig,
+        that._routingConfig);
   }
 
-
   @Override
   public int hashCode() {
     int result = EqualityUtils.hashCodeOf(_tableName);
@@ -401,9 +424,11 @@ public class TableConfig {
     private QuotaConfig _quotaConfig;
     private TableTaskConfig _taskConfig;
     private RoutingConfig _routingConfig;
+    private SegmentMergeConfig _segmentMergeConfig;
     private HllConfig _hllConfig;
     private StarTreeIndexSpec _starTreeIndexSpec;
 
+
     public Builder(TableType tableType) {
       _tableType = tableType;
     }
@@ -549,8 +574,15 @@ public class TableConfig {
       return this;
     }
 
-    public TableConfig build()
-        throws IOException, JSONException {
+    public SegmentMergeConfig getSegmentMergeConfig() {
+      return _segmentMergeConfig;
+    }
+
+    public void setSegmentMergeConfig(SegmentMergeConfig segmentMergeConfig) {
+      _segmentMergeConfig = segmentMergeConfig;
+    }
+
+    public TableConfig build() throws IOException, JSONException {
       // Validation config
       SegmentsValidationAndRetentionConfig validationConfig = new SegmentsValidationAndRetentionConfig();
       validationConfig.setTimeColumnName(_timeColumnName);
@@ -598,7 +630,7 @@ public class TableConfig {
       }
 
       return new TableConfig(_tableName, _tableType, validationConfig, tenantConfig, indexingConfig, _customConfig,
-          _quotaConfig, _taskConfig, _routingConfig);
+          _quotaConfig, _taskConfig, _routingConfig, _segmentMergeConfig);
     }
   }
 }
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentGroup.java b/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentGroup.java
new file mode 100644
index 0000000..fb28ac1
--- /dev/null
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentGroup.java
@@ -0,0 +1,72 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.common.lineage;
+
+import java.util.List;
+import java.util.Set;
+
+
+/**
+ * Class to represent segment group
+ */
+public class SegmentGroup {
+
+  private String _groupId;
+  private int _groupLevel;
+  private SegmentGroup _parentGroup;
+  private List<SegmentGroup> _childrenGroups;
+  private Set<String> _segments;
+
+  public String getGroupId() {
+    return _groupId;
+  }
+
+  public void setGroupId(String groupId) {
+    _groupId = groupId;
+  }
+
+  public SegmentGroup getParentGroup() {
+    return _parentGroup;
+  }
+
+  public void setParentGroup(SegmentGroup parentGroup) {
+    _parentGroup = parentGroup;
+  }
+
+  public List<SegmentGroup> getChildrenGroups() {
+    return _childrenGroups;
+  }
+
+  public void setChildrenGroups(List<SegmentGroup> childrenGroups) {
+    _childrenGroups = childrenGroups;
+  }
+
+  public Set<String> getSegments() {
+    return _segments;
+  }
+
+  public void setSegments(Set<String> segments) {
+    _segments = segments;
+  }
+
+  public int getGroupLevel() {
+    return _groupLevel;
+  }
+
+  public void setGroupLevel(int groupLevel) {
+    _groupLevel = groupLevel;
+  }
+}
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentMergeLineage.java b/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentMergeLineage.java
new file mode 100644
index 0000000..f4a5bdf
--- /dev/null
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentMergeLineage.java
@@ -0,0 +1,333 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.common.lineage;
+
+import com.linkedin.pinot.common.exception.InvalidConfigException;
+import com.linkedin.pinot.common.utils.EqualityUtils;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.ZNRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Class to represent segment merge lineage information.
+ *
+ * Segment merge lineage information is serialized into a znode and stored in a helix property store (zookeeper). This
+ * information will be used by the broker, segment merge task generator, and retention manager.
+ *
+ * For each segment group, we are storing the following information:
+ * 1. group id
+ *    - group identifier (will be stored in time based uuid format)
+ * 2. group level
+ *    - segment level allows us to have a hierarchical representation of the segment lineage. When we assign the merge
+ *      task, we will only merge/roll-up segments with the same level.
+ *      (e.g. If hourly segment groups are in level 0, daily segment groups will belong to level 1)
+ * 3. segments
+ *    - segments that belong to a particular segment group
+ * 4. lineage information
+ *    - If a segment group is created by merging multiple children segment groups, we write the lineage information
+ *      (e.g. segment group C is merged from segment group A, B)
+ */
+public class SegmentMergeLineage {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentMergeLineage.class);
+
+  private static final String LEVEL_KEY_PREFIX = "level_";
+  private static final String ROOT_NODE_GROUP_ID = "root";
+  private static final String SEGMENT_DELIMITER = ",";
+  private static final int DEFAULT_GROUP_LEVEL = 0;
+
+  private String _tableNameWithType;
+
+  // Mapping of group id to children group ids
+  private Map<String, List<String>> _parentGroupToChildrenGroupsMap;
+
+  // Mapping of group level to group id to segments that belong to a group
+  // Segment level represents
+  private Map<Integer, Map<String, List<String>>> _levelToGroupToSegmentsMap;
+
+  public SegmentMergeLineage(String tableNameWithType) {
+    _tableNameWithType = tableNameWithType;
+    _parentGroupToChildrenGroupsMap = new HashMap<>();
+    _levelToGroupToSegmentsMap = new HashMap<>();
+  }
+
+  public SegmentMergeLineage(String tableNameWithType, Map<String, List<String>> segmentGroupLineageMap,
+      Map<Integer, Map<String, List<String>>> levelToGroupToSegmentMap) {
+    _tableNameWithType = tableNameWithType;
+    _parentGroupToChildrenGroupsMap = segmentGroupLineageMap;
+    _levelToGroupToSegmentsMap = levelToGroupToSegmentMap;
+  }
+
+  public String getTableName() {
+    return _tableNameWithType;
+  }
+
+  public static SegmentMergeLineage fromZNRecord(ZNRecord record) {
+    String tableNameWithType = record.getId();
+    Map<String, List<String>> segmentGroupLineageMap = record.getListFields();
+
+    Map<Integer, Map<String, List<String>>> groupToSegmentsMap = new HashMap<>();
+    for (Map.Entry<String, Map<String, String>> entry : record.getMapFields().entrySet()) {
+      String levelKey = entry.getKey();
+      Integer level = Integer.parseInt(levelKey.substring(LEVEL_KEY_PREFIX.length()));
+      Map<String, List<String>> groupToSegmentsForLevel = new HashMap<>();
+      for (Map.Entry<String, String> groupEntry : entry.getValue().entrySet()) {
+        String groupId = groupEntry.getKey();
+        String segmentsString = groupEntry.getValue();
+        List<String> segments = Arrays.asList(segmentsString.split(SEGMENT_DELIMITER));
+        groupToSegmentsForLevel.put(groupId, new ArrayList<>(segments));
+      }
+      groupToSegmentsMap.put(level, groupToSegmentsForLevel);
+    }
+    return new SegmentMergeLineage(tableNameWithType, segmentGroupLineageMap, groupToSegmentsMap);
+  }
+
+  public ZNRecord toZNRecord() {
+    ZNRecord record = new ZNRecord(_tableNameWithType);
+    record.setListFields(_parentGroupToChildrenGroupsMap);
+    Map<String, Map<String, String>> groupToSegmentsMap = new HashMap<>();
+
+    for (Map.Entry<Integer, Map<String, List<String>>> entry : _levelToGroupToSegmentsMap.entrySet()) {
+      String key = LEVEL_KEY_PREFIX + entry.getKey();
+      Map<String, String> groupSegmentsForLevel = new HashMap<>();
+      for (Map.Entry<String, List<String>> groupEntry : entry.getValue().entrySet()) {
+        String groupId = groupEntry.getKey();
+        String segments = String.join(SEGMENT_DELIMITER, groupEntry.getValue());
+        groupSegmentsForLevel.put(groupId, segments);
+      }
+      groupToSegmentsMap.put(key, groupSegmentsForLevel);
+    }
+    record.setMapFields(groupToSegmentsMap);
+
+    return record;
+  }
+
+  /**
+   * Add segment merge lineage information
+   *
+   * @param groupId a group id
+   * @param currentGroupSegments a list of segments that belongs to the group
+   * @param childrenGroups a list of children groups that the current group covers
+   */
+  public void addSegmentGroup(String groupId, List<String> currentGroupSegments, List<String> childrenGroups)
+      throws InvalidConfigException {
+    // Get group level
+    Integer groupLevel = getGroupLevel(childrenGroups);
+
+    // Update group to segments map
+    Map<String, List<String>> groupToSegmentMap =
+        _levelToGroupToSegmentsMap.computeIfAbsent(groupLevel, k -> new HashMap<>());
+    if (groupToSegmentMap.containsKey(groupId)) {
+      throw new InvalidConfigException("Group id : " + groupId + " already exists.");
+    }
+    groupToSegmentMap.put(groupId, new ArrayList<>(currentGroupSegments));
+    _levelToGroupToSegmentsMap.put(groupLevel, groupToSegmentMap);
+
+    // Update segment group lineage map
+    if (groupLevel > DEFAULT_GROUP_LEVEL) {
+      if (_parentGroupToChildrenGroupsMap.containsKey(groupId)) {
+        throw new InvalidConfigException("Group id : " + groupId + " already exists.");
+      } else {
+        _parentGroupToChildrenGroupsMap.put(groupId, new ArrayList<>(childrenGroups));
+      }
+    }
+
+    LOGGER.info("New group has been added successfully to the segment lineage. (groupId: {}, currentGroupSegments: {}, "
+        + "childrenGroups: {}", groupId, currentGroupSegments, childrenGroups);
+  }
+
+  /**
+   * Remove segment merge information given a group id
+   *
+   * @param groupId a group id
+   */
+  public void removeSegmentGroup(String groupId) {
+    // Clean up the group id from parent to children group mapping
+    _parentGroupToChildrenGroupsMap.remove(groupId);
+    for (List<String> childrenGroups : _parentGroupToChildrenGroupsMap.values()) {
+      childrenGroups.remove(groupId);
+    }
+
+    // Clean up the group id from group to segments mapping
+    for (Map<String, List<String>> groupToSegments : _levelToGroupToSegmentsMap.values()) {
+      groupToSegments.remove(groupId);
+    }
+
+    LOGGER.info("Group {} has been successfully removed.", groupId);
+  }
+
+  /**
+   * Construct a lineage tree and returns the root node
+   *
+   * @return a root node for lineage tree
+   */
+  public SegmentGroup getMergeLineageRootSegmentGroup() {
+    // Create group nodes
+    Map<String, SegmentGroup> groupNodes = new HashMap<>();
+    for (Map.Entry<Integer, Map<String, List<String>>> groupEntryForLevel : _levelToGroupToSegmentsMap.entrySet()) {
+      Integer level = groupEntryForLevel.getKey();
+      Map<String, List<String>> groupToSegmentsForLevel = groupEntryForLevel.getValue();
+      for (Map.Entry<String, List<String>> entry : groupToSegmentsForLevel.entrySet()) {
+        String groupId = entry.getKey();
+        List<String> segments = entry.getValue();
+        SegmentGroup groupNode = new SegmentGroup();
+        groupNode.setGroupId(groupId);
+        groupNode.setSegments(new HashSet<>(segments));
+        groupNode.setGroupLevel(level);
+        groupNodes.put(groupId, groupNode);
+      }
+    }
+
+    // Add edges by updating children & parent group information
+    for (Map.Entry<String, List<String>> lineageEntry : _parentGroupToChildrenGroupsMap.entrySet()) {
+      String parentGroupId = lineageEntry.getKey();
+      List<String> childrenGroupIds = lineageEntry.getValue();
+      List<SegmentGroup> childrenGroups = new ArrayList<>();
+      SegmentGroup parentNode = groupNodes.get(parentGroupId);
+      for (String groupId : childrenGroupIds) {
+        SegmentGroup childNode = groupNodes.get(groupId);
+        if (childNode != null) {
+          childrenGroups.add(childNode);
+          childNode.setParentGroup(parentNode);
+        }
+      }
+      parentNode.setChildrenGroups(childrenGroups);
+    }
+
+    // Create a root node
+    SegmentGroup root = new SegmentGroup();
+    root.setGroupId(ROOT_NODE_GROUP_ID);
+    List<SegmentGroup> childrenForRoot = new ArrayList<>();
+    for (SegmentGroup group : groupNodes.values()) {
+      if (group.getParentGroup() == null) {
+        group.setParentGroup(root);
+        childrenForRoot.add(group);
+      }
+    }
+    root.setChildrenGroups(childrenForRoot);
+
+    return root;
+  }
+
+  /**
+   * Get a list of segments for a given group id
+   *
+   * @param groupId a group id
+   * @return a list of segments that belongs to the given group id, null if the group does not exist
+   */
+  public List<String> getSegmentsForGroup(String groupId) {
+    for (Map<String, List<String>> groupToSegmentMap : _levelToGroupToSegmentsMap.values()) {
+      List<String> segments = groupToSegmentMap.get(groupId);
+      if (segments != null) {
+        return segments;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Get a list of children group ids for a given group id
+   *
+   * @param groupId a group id
+   * @return a list of children groups that are covered by the given group id, null if the group does not exist
+   */
+  public List<String> getChildrenForGroup(String groupId) {
+    return _parentGroupToChildrenGroupsMap.get(groupId);
+  }
+
+  /**
+   * Get a list of all group levels
+   *
+   * @return a list of all group levels
+   */
+  public List<Integer> getAllGroupLevels() {
+    List<Integer> groupLevels = new ArrayList<>(_levelToGroupToSegmentsMap.keySet());
+    Collections.sort(groupLevels);
+    return groupLevels;
+  }
+
+  /**
+   * Get a list of group ids for a given group level
+   *
+   * @param groupLevel a group level
+   * @return a list of group ids that belongs to the given group level, null if the group level does not exist
+   */
+  public List<String> getGroupIdsForGroupLevel(int groupLevel) {
+    Map<String, List<String>> groupToSegmentsMap = _levelToGroupToSegmentsMap.get(groupLevel);
+    if (groupToSegmentsMap != null) {
+      return new ArrayList<>(groupToSegmentsMap.keySet());
+    }
+    return null;
+  }
+
+  /**
+   * Helper function to compute group level given children groups
+   *
+   * @param childrenGroups a list of children group ids
+   * @return group level
+   */
+  private Integer getGroupLevel(List<String> childrenGroups) throws InvalidConfigException {
+    // If no children exists, the group belongs to the base level.
+    if (childrenGroups == null || childrenGroups.isEmpty()) {
+      return DEFAULT_GROUP_LEVEL;
+    }
+
+    for (Map.Entry<Integer, Map<String, List<String>>> entry : _levelToGroupToSegmentsMap.entrySet()) {
+      Integer currentLevel = entry.getKey();
+      Map<String, List<String>> currentLevelGroupToSegmentsMap = entry.getValue();
+      if (currentLevelGroupToSegmentsMap.keySet().containsAll(childrenGroups)) {
+        return currentLevel + 1;
+      }
+    }
+
+    // At this point, not all children groups are covered, cannot add group
+    throw new InvalidConfigException("Cannot compute group level because not all children groups exist "
+        + "in the segment merge lineage, children groups: " + childrenGroups);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (EqualityUtils.isSameReference(this, o)) {
+      return true;
+    }
+
+    if (EqualityUtils.isNullOrNotSameClass(this, o)) {
+      return false;
+    }
+
+    SegmentMergeLineage that = (SegmentMergeLineage) o;
+
+    return EqualityUtils.isEqual(_tableNameWithType, that._tableNameWithType) && EqualityUtils.isEqual(
+        _parentGroupToChildrenGroupsMap, that._parentGroupToChildrenGroupsMap) && EqualityUtils.isEqual(
+        _levelToGroupToSegmentsMap, that._levelToGroupToSegmentsMap);
+  }
+
+  @Override
+  public int hashCode() {
+    int result = EqualityUtils.hashCodeOf(_tableNameWithType);
+    result = EqualityUtils.hashCodeOf(result, _parentGroupToChildrenGroupsMap);
+    result = EqualityUtils.hashCodeOf(result, _levelToGroupToSegmentsMap);
+    return result;
+  }
+}
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentMergeLineageAccessHelper.java b/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentMergeLineageAccessHelper.java
new file mode 100644
index 0000000..00eb7c3
--- /dev/null
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentMergeLineageAccessHelper.java
@@ -0,0 +1,82 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.common.lineage;
+
+import com.linkedin.pinot.common.metadata.ZKMetadataProvider;
+import com.linkedin.pinot.common.utils.retry.RetryPolicies;
+import com.linkedin.pinot.common.utils.retry.RetryPolicy;
+import java.util.List;
+import org.apache.helix.AccessOption;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.zookeeper.data.Stat;
+
+
+/**
+ * Class to help to read, write segment merge lineage
+ */
+public class SegmentMergeLineageAccessHelper {
+
+  /**
+   * Read the segment merge lineage ZNRecord from the property store
+   *
+   * @param propertyStore a property store
+   * @param tableNameWithType a table name with type
+   * @return a ZNRecord of segment merge lineage
+   */
+  public static ZNRecord getSegmentMergeLineageZNRecord(ZkHelixPropertyStore<ZNRecord> propertyStore,
+      String tableNameWithType) {
+    String path = ZKMetadataProvider.constructPropertyStorePathForSegmentMergeLineage(tableNameWithType);
+    Stat stat = new Stat();
+    ZNRecord segmentMergeLineageZNRecord = propertyStore.get(path, stat, AccessOption.PERSISTENT);
+    if (segmentMergeLineageZNRecord != null) {
+      segmentMergeLineageZNRecord.setVersion(stat.getVersion());
+    }
+    return segmentMergeLineageZNRecord;
+  }
+
+  /**
+   * Read the segment merge lineage from the property store
+   *
+   * @param propertyStore  a property store
+   * @param tableNameWithType a table name with type
+   * @return a segment merge lineage
+   */
+  public static SegmentMergeLineage getSegmentMergeLineage(ZkHelixPropertyStore<ZNRecord> propertyStore,
+      String tableNameWithType) {
+    String path = ZKMetadataProvider.constructPropertyStorePathForSegmentMergeLineage(tableNameWithType);
+    ZNRecord segmentMergeLineageZNRecord = propertyStore.get(path, null, AccessOption.PERSISTENT);
+    return SegmentMergeLineage.fromZNRecord(segmentMergeLineageZNRecord);
+  }
+
+  /**
+   * Write the segment merge lineage to the property store
+   *
+   * @param propertyStore a property store
+   * @param segmentMergeLineage a segment merge lineage
+   * @return true if update is successful. false otherwise.
+   */
+  public static boolean writeSegmentMergeLineage(ZkHelixPropertyStore<ZNRecord> propertyStore,
+      SegmentMergeLineage segmentMergeLineage, int expectedVersion) {
+    String tableNameWithType = segmentMergeLineage.getTableName();
+    String path = ZKMetadataProvider.constructPropertyStorePathForSegmentMergeLineage(tableNameWithType);
+    return propertyStore.set(path, segmentMergeLineage.toZNRecord(), expectedVersion, AccessOption.PERSISTENT);
+  }
+}
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/metadata/ZKMetadataProvider.java b/pinot-common/src/main/java/com/linkedin/pinot/common/metadata/ZKMetadataProvider.java
index fe55a0c..e9d6b63 100644
--- a/pinot-common/src/main/java/com/linkedin/pinot/common/metadata/ZKMetadataProvider.java
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/metadata/ZKMetadataProvider.java
@@ -53,6 +53,7 @@ public class ZKMetadataProvider {
   private static final String PROPERTYSTORE_TABLE_CONFIGS_PREFIX = "/CONFIGS/TABLE";
   private static final String PROPERTYSTORE_INSTANCE_CONFIGS_PREFIX = "/CONFIGS/INSTANCE";
   private static final String PROPERTYSTORE_CLUSTER_CONFIGS_PREFIX = "/CONFIGS/CLUSTER";
+  private static final String PROPERTYSTORE_SEGMENT_MERGE_LINEAGE = "/SEGMENT_MERGE_LINEAGE";
 
   public static void setRealtimeTableConfig(ZkHelixPropertyStore<ZNRecord> propertyStore, String realtimeTableName,
       ZNRecord znRecord) {
@@ -106,6 +107,10 @@ public class ZKMetadataProvider {
     return StringUtil.join("/", PROPERTYSTORE_CLUSTER_CONFIGS_PREFIX, controllerConfigKey);
   }
 
+  public static String constructPropertyStorePathForSegmentMergeLineage(String tableNameWithType) {
+    return StringUtil.join("/", PROPERTYSTORE_SEGMENT_MERGE_LINEAGE, tableNameWithType);
+  }
+
   public static boolean isSegmentExisted(ZkHelixPropertyStore<ZNRecord> propertyStore, String resourceNameForResource,
       String segmentName) {
     return propertyStore.exists(constructPropertyStorePathForSegment(resourceNameForResource, segmentName),
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/com/linkedin/pinot/common/utils/FileUploadDownloadClient.java
index ca4b423..c8b15f2 100644
--- a/pinot-common/src/main/java/com/linkedin/pinot/common/utils/FileUploadDownloadClient.java
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/utils/FileUploadDownloadClient.java
@@ -26,6 +26,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.List;
 import javax.annotation.Nullable;
 import javax.net.ssl.SSLContext;
@@ -52,6 +53,7 @@ import org.apache.http.entity.mime.content.InputStreamBody;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
 import org.apache.http.util.EntityUtils;
+import org.json.JSONArray;
 import org.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -70,6 +72,7 @@ public class FileUploadDownloadClient implements Closeable {
     public static final String DOWNLOAD_URI = "DOWNLOAD_URI";
     public static final String SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER = "Pinot-SegmentZKMetadataCustomMapModifier";
     public static final String CRYPTER = "CRYPTER";
+    public static final String MERGED_SEGMENT_PUSH = "MERGED_SEGMENT_PUSH";
   }
 
   public static class QueryParameters {
@@ -94,6 +97,7 @@ public class FileUploadDownloadClient implements Closeable {
   private static final String SEGMENT_PATH = "/v2/segments";
   private static final String SEGMENT_METADATA_PATH = "/segmentmetadata";
   private static final String TABLES_PATH = "/tables";
+  private static final String LINEAGE_PATH = "/lineage";
   private static final String TYPE_DELIMITER = "?type=";
   private static final String SLASH = "/";
 
@@ -137,6 +141,12 @@ public class FileUploadDownloadClient implements Closeable {
     return getURI(HTTPS, host, port, SCHEMA_PATH);
   }
 
+  public static URI getUpdateSegmentMergeLineageHttpURI(String host, int port, String tableName, String type)
+      throws URISyntaxException {
+    String path = TABLES_PATH + SLASH + tableName + LINEAGE_PATH + TYPE_DELIMITER + type;
+    return getURI(HTTP, host, port, path);
+  }
+
   /**
    * This method calls the old segment upload endpoint. We will deprecate this behavior soon. Please call
    * getUploadSegmentHttpURI to construct your request.
@@ -154,7 +164,7 @@ public class FileUploadDownloadClient implements Closeable {
   public static URI getOldUploadSegmentHttpsURI(String host, int port) throws URISyntaxException {
     return getURI(HTTPS, host, port, OLD_SEGMENT_PATH);
   }
-  
+
   public static URI getUploadSegmentHttpURI(String host, int port) throws URISyntaxException {
     return getURI(HTTP, host, port, SEGMENT_PATH);
   }
@@ -222,6 +232,29 @@ public class FileUploadDownloadClient implements Closeable {
         parameters, socketTimeoutMs);
   }
 
+  private static HttpUriRequest getUpdateSegmentMergeLineageRequest(URI uri, List<String> childrenGroupIds,
+      List<String> segments, int socketTimeoutMs) {
+    JSONObject jsonObject = new JSONObject();
+    JSONArray childrenGroupIdsJsonArray = new JSONArray();
+//    for (String groupId : childrenGroupIds) {
+//      childrenGroupIdsJsonArray.put(groupId);
+//    }
+    jsonObject.put("childrenGroupIds", childrenGroupIds);
+
+//    JSONArray segmentsJsonArray = new JSONArray();
+//    for (String segment : segments) {
+//      segmentsJsonArray.put(segment);
+//    }
+    jsonObject.put("segments", segments);
+
+    RequestBuilder requestBuilder = RequestBuilder.post(uri)
+        .setVersion(HttpVersion.HTTP_1_1)
+        .setHeader(HttpHeaders.CONTENT_TYPE, "application/json")
+        .setEntity(new StringEntity(jsonObject.toString(), ContentType.APPLICATION_JSON));
+    setTimeout(requestBuilder, socketTimeoutMs);
+    return requestBuilder.build();
+  }
+
   private static HttpUriRequest getSendSegmentUriRequest(URI uri, String downloadUri, @Nullable List<Header> headers,
       @Nullable List<NameValuePair> parameters, int socketTimeoutMs) {
     RequestBuilder requestBuilder = RequestBuilder.post(uri)
@@ -398,6 +431,11 @@ public class FileUploadDownloadClient implements Closeable {
     return sendRequest(getUploadSegmentMetadataRequest(uri, segmentName, segmentMetadataFile, headers, parameters, socketTimeoutMs));
   }
 
+  public SimpleHttpResponse updateSegmentMergeLineage(URI uri, List<String> childrenGroupIds, List<String> segments, int socketTimeoutMs)
+      throws IOException, HttpErrorStatusException {
+    return sendRequest(getUpdateSegmentMergeLineageRequest(uri, childrenGroupIds, segments, socketTimeoutMs));
+  }
+
 
   /**
    * Upload segment with segment file.
@@ -595,4 +633,15 @@ public class FileUploadDownloadClient implements Closeable {
   public void close() throws IOException {
     _httpClient.close();
   }
+
+  public static void main(String[] args) {
+    List<String> list = new ArrayList<>();
+    list.add("aa");
+    list.add("bb");
+
+    JSONObject object = new JSONObject();
+    object.put("a", list);
+
+    System.out.println(object.toString());
+  }
 }
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/utils/UUIDUtils.java b/pinot-common/src/main/java/com/linkedin/pinot/common/utils/UUIDUtils.java
new file mode 100644
index 0000000..d54a579
--- /dev/null
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/utils/UUIDUtils.java
@@ -0,0 +1,29 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.common.utils;
+
+import java.util.UUID;
+
+
+public class UUIDUtils {
+  // Mimics the implementation from Hector's TimeUUIDUtils class:
+  // https://github.com/rantav/hector/blob/master/core/src/main/java/me/prettyprint/cassandra/utils/TimeUUIDUtils.java
+  private static final long NUM_100NS_INTERVALS_SINCE_UUID_EPOCH = 0x01b21dd213814000L;
+
+  public static long getUnixTimeFromUUID(UUID uuid) {
+    return (uuid.timestamp() - NUM_100NS_INTERVALS_SINCE_UUID_EPOCH) / 10000;
+  }
+}
diff --git a/pinot-common/src/test/java/com/linkedin/pinot/common/lineage/SegmentMergeLineageTest.java b/pinot-common/src/test/java/com/linkedin/pinot/common/lineage/SegmentMergeLineageTest.java
new file mode 100644
index 0000000..923fbfc
--- /dev/null
+++ b/pinot-common/src/test/java/com/linkedin/pinot/common/lineage/SegmentMergeLineageTest.java
@@ -0,0 +1,106 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.common.lineage;
+
+import com.linkedin.pinot.common.exception.InvalidConfigException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class SegmentMergeLineageTest {
+
+  @Test
+  public void testSegmentMergeLineage() throws Exception {
+    SegmentMergeLineage segmentMergeLineage = new SegmentMergeLineage("test_OFFLINE");
+    String groupId1 = "G1";
+    List<String> groupSegments1 = Arrays.asList(new String[]{"segment1", "segment2", "segment3"});
+    segmentMergeLineage.addSegmentGroup(groupId1, groupSegments1, null);
+    Assert.assertEquals(segmentMergeLineage.getSegmentsForGroup(groupId1), groupSegments1);
+
+    String groupId2 = "G2";
+    List<String> groupSegments2 = Arrays.asList(new String[]{"segment4", "segment5"});
+    segmentMergeLineage.addSegmentGroup(groupId2, groupSegments2, null);
+    Assert.assertEquals(segmentMergeLineage.getSegmentsForGroup(groupId2), groupSegments2);
+
+    String groupId3 = "G3";
+    List<String> groupSegments3 = Arrays.asList(new String[]{"segment6"});
+    segmentMergeLineage.addSegmentGroup(groupId3, groupSegments3, Arrays.asList(new String[]{groupId1, groupId2}));
+    Assert.assertEquals(segmentMergeLineage.getSegmentsForGroup(groupId3), groupSegments3);
+
+    // Check available APIs
+    Assert.assertEquals(segmentMergeLineage.getTableName(), "test_OFFLINE");
+    Assert.assertEquals(segmentMergeLineage.getChildrenForGroup(groupId3),
+        Arrays.asList(new String[]{groupId1, groupId2}));
+    Assert.assertEquals(segmentMergeLineage.getAllGroupLevels(), Arrays.asList(new Integer[]{0, 1}));
+    Assert.assertTrue(segmentMergeLineage.equals(SegmentMergeLineage.fromZNRecord(segmentMergeLineage.toZNRecord())));
+    Assert.assertEquals(segmentMergeLineage.getGroupIdsForGroupLevel(0),
+        Arrays.asList(new String[]{groupId1, groupId2}));
+    Assert.assertEquals(segmentMergeLineage.getGroupIdsForGroupLevel(1),
+        Arrays.asList(new String[]{groupId3}));
+    validateSegmentGroup(segmentMergeLineage);
+
+    // Check ZNRecord conversion
+    Assert.assertEquals(segmentMergeLineage, SegmentMergeLineage.fromZNRecord(segmentMergeLineage.toZNRecord()));
+
+    // Test removing groups
+    segmentMergeLineage.removeSegmentGroup(groupId1);
+    Assert.assertNull(segmentMergeLineage.getChildrenForGroup(groupId1));
+    Assert.assertNull(segmentMergeLineage.getSegmentsForGroup(groupId1));
+    Assert.assertFalse(segmentMergeLineage.getGroupIdsForGroupLevel(0).contains(groupId1));
+  }
+
+  @Test(expectedExceptions = InvalidConfigException.class)
+  public void testUpdateWithDuplicateGroupId() throws Exception {
+    SegmentMergeLineage segmentMergeLineage = new SegmentMergeLineage("test_OFFLINE");
+    String groupId1 = "G1";
+    List<String> groupSegments1 = Arrays.asList(new String[]{"segment1, segment2, segment3"});
+    segmentMergeLineage.addSegmentGroup(groupId1, groupSegments1, null);
+    Assert.assertEquals(segmentMergeLineage.getSegmentsForGroup(groupId1), groupSegments1);
+
+    List<String> groupSegments2 = Arrays.asList(new String[]{"segment4, segment5, segment6"});
+    segmentMergeLineage.addSegmentGroup(groupId1, groupSegments2, null);
+  }
+
+  private void validateSegmentGroup(SegmentMergeLineage segmentMergeLineage) {
+    SegmentGroup segmentGroup = segmentMergeLineage.getMergeLineageRootSegmentGroup();
+    for (SegmentGroup child : segmentGroup.getChildrenGroups()) {
+      validateSegmentGroupNode(child, segmentMergeLineage);
+    }
+  }
+
+  private void validateSegmentGroupNode(SegmentGroup segmentGroup, SegmentMergeLineage segmentMergeLineage) {
+    String groupId = segmentGroup.getGroupId();
+    Assert.assertEquals(segmentGroup.getSegments(), new HashSet<>(segmentMergeLineage.getSegmentsForGroup(groupId)));
+    Assert.assertTrue(segmentMergeLineage.getGroupIdsForGroupLevel(segmentGroup.getGroupLevel()).contains(groupId));
+
+    List<SegmentGroup> childrenGroups = segmentGroup.getChildrenGroups();
+    if (childrenGroups != null) {
+      List<String> childrenGroupIds = new ArrayList<>();
+      for (SegmentGroup child : childrenGroups) {
+        childrenGroupIds.add(child.getGroupId());
+      }
+      Assert.assertEquals(childrenGroupIds, segmentMergeLineage.getChildrenForGroup(groupId));
+
+      for (SegmentGroup child : segmentGroup.getChildrenGroups()) {
+        validateSegmentGroupNode(child, segmentMergeLineage);
+      }
+    }
+  }
+}
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java
index 418c2ef..af600e3 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java
@@ -254,6 +254,9 @@ public class PinotSegmentUploadRestletResource {
     // Get URI of current segment location
     String currentSegmentLocationURI = headers.getHeaderString(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI);
 
+    String mergedSegmentPushStr = headers.getHeaderString(FileUploadDownloadClient.CustomHeaders.MERGED_SEGMENT_PUSH);
+    boolean mergedSegmentPush = Boolean.valueOf(mergedSegmentPushStr);
+
     File tempEncryptedFile = null;
     File tempDecryptedFile = null;
     File tempSegmentDir = null;
@@ -315,7 +318,7 @@ public class PinotSegmentUploadRestletResource {
 
       // Zk operations
       completeZkOperations(enableParallelPushProtection, headers, tempEncryptedFile, provider, segmentMetadata,
-          segmentName, zkDownloadUri, moveSegmentToFinalLocation);
+          segmentName, zkDownloadUri, moveSegmentToFinalLocation, mergedSegmentPush);
 
       return new SuccessResponse("Successfully uploaded segment: " + segmentMetadata.getName() + " of table: "
           + segmentMetadata.getTableName());
@@ -373,7 +376,7 @@ public class PinotSegmentUploadRestletResource {
 
   private void completeZkOperations(boolean enableParallelPushProtection, HttpHeaders headers, File tempDecryptedFile,
       FileUploadPathProvider provider, SegmentMetadata segmentMetadata, String segmentName, String zkDownloadURI,
-      boolean moveSegmentToFinalLocation)
+      boolean moveSegmentToFinalLocation, boolean mergedSegmentPush)
       throws Exception {
     String finalSegmentPath =
         StringUtil.join("/", provider.getBaseDataDirURI().toString(), segmentMetadata.getTableName(),
@@ -381,7 +384,7 @@ public class PinotSegmentUploadRestletResource {
     URI finalSegmentLocationURI = new URI(finalSegmentPath);
     ZKOperator zkOperator = new ZKOperator(_pinotHelixResourceManager, _controllerConf, _controllerMetrics);
     zkOperator.completeSegmentOperations(segmentMetadata, finalSegmentLocationURI, tempDecryptedFile,
-        enableParallelPushProtection, headers, zkDownloadURI, moveSegmentToFinalLocation);
+        enableParallelPushProtection, headers, zkDownloadURI, moveSegmentToFinalLocation, mergedSegmentPush);
   }
 
   private void decryptFile(String crypterClassHeader, File tempEncryptedFile, File tempDecryptedFile) {
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/resources/PinotTableRestletResource.java
index ae98133..38aef03 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/resources/PinotTableRestletResource.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -15,6 +15,7 @@
  */
 package com.linkedin.pinot.controller.api.resources;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import com.linkedin.pinot.common.config.SegmentsValidationAndRetentionConfig;
 import com.linkedin.pinot.common.config.TableConfig;
@@ -516,4 +517,51 @@ public class PinotTableRestletResource {
     }
     return ResourceUtils.convertToJsonString(result);
   }
+
+  public static class LineageParameters {
+    @JsonProperty("segments")
+    List<String> segments;
+
+    @JsonProperty("childrenGroupIds")
+    List<String> childrenGroupIds;
+  }
+
+  @POST
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/tables/{tableName}/segments/lineage")
+  @ApiOperation(value = "Updates segment merge lineage information", notes = "Updates segment merge lineage information")
+  public SuccessResponse updateSegmentMergeLineage(
+      @ApiParam(value = "Name of table to update segment merge lineage", required = true) @Nonnull @PathParam(value = "tableName") String tableName,
+      @ApiParam(value = "offline|realtime") @Nonnull @QueryParam("type") String tableType,
+      LineageParameters lineageParameters
+
+  ) {
+    if (!EnumUtils.isValidEnum(CommonConstants.Helix.TableType.class, tableType.toUpperCase())) {
+      throw new ControllerApplicationException(LOGGER, "Illegal table type " + tableType, Response.Status.BAD_REQUEST);
+    }
+
+    TableType type = TableType.valueOf(tableType.toUpperCase());
+    String tableNameWithType;
+    if (type == TableType.OFFLINE && _pinotHelixResourceManager.hasOfflineTable(tableName)) {
+      tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+    } else if (type == TableType.REALTIME && _pinotHelixResourceManager.hasRealtimeTable(tableName)) {
+      tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName);
+    } else {
+      throw new ControllerApplicationException(LOGGER, "Table " + tableName + ", type " + type + " does not exist",
+          Response.Status.NOT_FOUND);
+    }
+
+    boolean success =
+        _pinotHelixResourceManager.updateSegmentMergeLineage(tableNameWithType, lineageParameters.segments,
+            lineageParameters.childrenGroupIds);
+
+    if (success) {
+      return new SuccessResponse(
+          "Segment merge lineage has been updated successfully. (table: " + tableNameWithType + ", segments: "
+              + lineageParameters.segments + ", childrenGroups: " + lineageParameters.childrenGroupIds + ")");
+    } else {
+      throw new ControllerApplicationException(LOGGER, "Failed to update segment merge lineage for table: "
+          + tableNameWithType, Response.Status.INTERNAL_SERVER_ERROR);
+    }
+  }
 }
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/upload/ZKOperator.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/upload/ZKOperator.java
index f1c6ad4..abd3698 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/upload/ZKOperator.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/upload/ZKOperator.java
@@ -15,12 +15,15 @@
  */
 package com.linkedin.pinot.controller.api.upload;
 
+import com.linkedin.pinot.common.config.SegmentMergeConfig;
+import com.linkedin.pinot.common.config.TableConfig;
 import com.linkedin.pinot.common.config.TableNameBuilder;
 import com.linkedin.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
 import com.linkedin.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
 import com.linkedin.pinot.common.metrics.ControllerMeter;
 import com.linkedin.pinot.common.metrics.ControllerMetrics;
 import com.linkedin.pinot.common.segment.SegmentMetadata;
+import com.linkedin.pinot.common.utils.CommonConstants;
 import com.linkedin.pinot.common.utils.FileUploadDownloadClient;
 import com.linkedin.pinot.controller.ControllerConf;
 import com.linkedin.pinot.controller.api.resources.ControllerApplicationException;
@@ -29,6 +32,7 @@ import com.linkedin.pinot.filesystem.PinotFS;
 import com.linkedin.pinot.filesystem.PinotFSFactory;
 import java.io.File;
 import java.net.URI;
+import java.util.Collections;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.Response;
 import org.apache.helix.ZNRecord;
@@ -55,8 +59,7 @@ public class ZKOperator {
 
   public void completeSegmentOperations(SegmentMetadata segmentMetadata, URI finalSegmentLocationURI,
       File currentSegmentLocation, boolean enableParallelPushProtection, HttpHeaders headers, String zkDownloadURI,
-      boolean moveSegmentToFinalLocation)
-      throws Exception {
+      boolean moveSegmentToFinalLocation, boolean mergedSegmentPush) throws Exception {
     String rawTableName = segmentMetadata.getTableName();
     String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
     String segmentName = segmentMetadata.getName();
@@ -68,19 +71,21 @@ public class ZKOperator {
       LOGGER.info("Adding new segment {} from table {}", segmentName, rawTableName);
       String crypter = headers.getHeaderString(FileUploadDownloadClient.CustomHeaders.CRYPTER);
       processNewSegment(segmentMetadata, finalSegmentLocationURI, currentSegmentLocation, zkDownloadURI, crypter,
-          rawTableName, segmentName, moveSegmentToFinalLocation);
+          rawTableName, segmentName, moveSegmentToFinalLocation, mergedSegmentPush);
       return;
     }
 
     LOGGER.info("Segment {} from table {} already exists, refreshing if necessary", segmentName, rawTableName);
 
     processExistingSegment(segmentMetadata, finalSegmentLocationURI, currentSegmentLocation,
-        enableParallelPushProtection, headers, zkDownloadURI, offlineTableName, segmentName, znRecord, moveSegmentToFinalLocation);
+        enableParallelPushProtection, headers, zkDownloadURI, offlineTableName, segmentName, znRecord,
+        moveSegmentToFinalLocation);
   }
 
   private void processExistingSegment(SegmentMetadata segmentMetadata, URI finalSegmentLocationURI,
       File currentSegmentLocation, boolean enableParallelPushProtection, HttpHeaders headers, String zkDownloadURI,
-      String offlineTableName, String segmentName, ZNRecord znRecord, boolean moveSegmentToFinalLocation) throws Exception {
+      String offlineTableName, String segmentName, ZNRecord znRecord, boolean moveSegmentToFinalLocation)
+      throws Exception {
 
     OfflineSegmentZKMetadata existingSegmentZKMetadata = new OfflineSegmentZKMetadata(znRecord);
     long existingCrc = existingSegmentZKMetadata.getCrc();
@@ -160,9 +165,11 @@ public class ZKOperator {
             newCrc, existingCrc, segmentName);
         if (moveSegmentToFinalLocation) {
           moveSegmentToPermanentDirectory(currentSegmentLocation, finalSegmentLocationURI);
-          LOGGER.info("Moved segment {} from temp location {} to {}", segmentName, currentSegmentLocation.getAbsolutePath(), finalSegmentLocationURI.getPath());
+          LOGGER.info("Moved segment {} from temp location {} to {}", segmentName,
+              currentSegmentLocation.getAbsolutePath(), finalSegmentLocationURI.getPath());
         } else {
-          LOGGER.info("Skipping segment move, keeping segment {} from table {} at {}", segmentName, offlineTableName, zkDownloadURI);
+          LOGGER.info("Skipping segment move, keeping segment {} from table {} at {}", segmentName, offlineTableName,
+              zkDownloadURI);
         }
 
         _pinotHelixResourceManager.refreshSegment(segmentMetadata, existingSegmentZKMetadata);
@@ -196,19 +203,40 @@ public class ZKOperator {
 
   private void processNewSegment(SegmentMetadata segmentMetadata, URI finalSegmentLocationURI,
       File currentSegmentLocation, String zkDownloadURI, String crypter, String rawTableName, String segmentName,
-      boolean moveSegmentToFinalLocation) {
+      boolean moveSegmentToFinalLocation, boolean mergedSegmentPush) {
+
+    if (!mergedSegmentPush) {
+      // Fetch table config
+      TableConfig tableConfig =
+          _pinotHelixResourceManager.getTableConfig(rawTableName, CommonConstants.Helix.TableType.OFFLINE);
+      SegmentMergeConfig segmentMergeConfig = tableConfig.getSegmentMergeConfig();
+
+      // Update lineage information for base segment
+      if (segmentMergeConfig != null) {
+        try {
+          _pinotHelixResourceManager.updateSegmentMergeLineage(tableConfig.getTableName(),
+              Collections.singletonList(segmentName), null);
+        } catch (Exception e) {
+          LOGGER.error("Cannot update segment merge lineage information for table {}, segment {}", rawTableName,
+              segmentName, e);
+          throw new RuntimeException(e);
+        }
+      }
+    }
+
     // For v1 segment uploads, we will not move the segment
     if (moveSegmentToFinalLocation) {
       try {
         moveSegmentToPermanentDirectory(currentSegmentLocation, finalSegmentLocationURI);
-        LOGGER.info("Moved segment {} from temp location {} to {}", segmentName, currentSegmentLocation.getAbsolutePath(),
-            finalSegmentLocationURI.getPath());
+        LOGGER.info("Moved segment {} from temp location {} to {}", segmentName,
+            currentSegmentLocation.getAbsolutePath(), finalSegmentLocationURI.getPath());
       } catch (Exception e) {
         LOGGER.error("Could not move segment {} from table {} to permanent directory", segmentName, rawTableName, e);
         throw new RuntimeException(e);
       }
     } else {
-      LOGGER.info("Skipping segment move, keeping segment {} from table {} at {}", segmentName, rawTableName, zkDownloadURI);
+      LOGGER.info("Skipping segment move, keeping segment {} from table {} at {}", segmentName, rawTableName,
+          zkDownloadURI);
     }
     _pinotHelixResourceManager.addNewSegment(segmentMetadata, zkDownloadURI, crypter);
   }
@@ -218,7 +246,8 @@ public class ZKOperator {
     PinotFS pinotFS = PinotFSFactory.create(finalSegmentLocationURI.getScheme());
 
     // Overwrites current segment file
-    LOGGER.info("Copying segment from {} to {}", currentSegmentLocation.getAbsolutePath(), finalSegmentLocationURI.toString());
+    LOGGER.info("Copying segment from {} to {}", currentSegmentLocation.getAbsolutePath(),
+        finalSegmentLocationURI.toString());
     pinotFS.copyFromLocalFile(currentSegmentLocation, finalSegmentLocationURI);
   }
 }
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/ControllerRequestURLBuilder.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/ControllerRequestURLBuilder.java
index 9a79cc3..f45f533 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/ControllerRequestURLBuilder.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/ControllerRequestURLBuilder.java
@@ -239,7 +239,12 @@ public class ControllerRequestURLBuilder {
   public String forSegmentListAPIWithTableType(String tableName, String tableType) {
     return StringUtil.join("/", StringUtils.chomp(_baseUrl, "/"), "segments", tableName + "?type=" + tableType);
   }
+
   public String forSegmentListAPI(String tableName) {
     return StringUtil.join("/", StringUtils.chomp(_baseUrl, "/"), "segments", tableName);
   }
+
+  public String forSegmentMergeLineagePostAPI(String tableName, String tableType) {
+    return StringUtil.join("/", StringUtils.chomp(_baseUrl, "/"), "tables", tableName, "lineage" + "?type=", tableType);
+  }
 }
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java
index c12f70e..aa08350 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -15,6 +15,7 @@
  */
 package com.linkedin.pinot.controller.helix.core;
 
+import com.fasterxml.uuid.Generators;
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
@@ -36,6 +37,8 @@ import com.linkedin.pinot.common.config.TenantConfig;
 import com.linkedin.pinot.common.data.Schema;
 import com.linkedin.pinot.common.exception.InvalidConfigException;
 import com.linkedin.pinot.common.exception.TableNotFoundException;
+import com.linkedin.pinot.common.lineage.SegmentMergeLineage;
+import com.linkedin.pinot.common.lineage.SegmentMergeLineageAccessHelper;
 import com.linkedin.pinot.common.messages.SegmentRefreshMessage;
 import com.linkedin.pinot.common.messages.SegmentReloadMessage;
 import com.linkedin.pinot.common.messages.TimeboundaryRefreshMessage;
@@ -82,6 +85,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import org.I0Itec.zkclient.exception.ZkBadVersionException;
 import org.apache.commons.configuration.Configuration;
 import org.apache.helix.AccessOption;
 import org.apache.helix.ClusterMessagingService;
@@ -145,8 +149,8 @@ public class PinotHelixResourceManager {
 
   public PinotHelixResourceManager(@Nonnull String zkURL, @Nonnull String helixClusterName,
       @Nonnull String controllerInstanceId, @Nonnull String dataDir) {
-    this(zkURL, helixClusterName, controllerInstanceId, dataDir, DEFAULT_EXTERNAL_VIEW_UPDATE_TIMEOUT_MILLIS,
-        false, false);
+    this(zkURL, helixClusterName, controllerInstanceId, dataDir, DEFAULT_EXTERNAL_VIEW_UPDATE_TIMEOUT_MILLIS, false,
+        false);
   }
 
   public PinotHelixResourceManager(@Nonnull ControllerConf controllerConf) {
@@ -1084,6 +1088,10 @@ public class PinotHelixResourceManager {
         _propertyStore.create(ZKMetadataProvider.constructPropertyStorePathForResource(tableNameWithType),
             new ZNRecord(tableNameWithType), AccessOption.PERSISTENT);
 
+        // Add segment merge lineage znode if required
+        _propertyStore.create(ZKMetadataProvider.constructPropertyStorePathForSegmentMergeLineage(tableNameWithType),
+            new ZNRecord(tableNameWithType), AccessOption.PERSISTENT);
+
         // Update replica group partition assignment to the property store if applicable
         updateReplicaGroupPartitionAssignment(tableConfig);
         break;
@@ -1524,7 +1532,7 @@ public class PinotHelixResourceManager {
 
     ClusterMessagingService messagingService = _helixZkManager.getMessagingService();
     LOGGER.info("Sending timeboundary refresh message for segment {} of table {}:{} to recipients {}", segmentName,
-            rawTableName, refreshMessage, recipientCriteria);
+        rawTableName, refreshMessage, recipientCriteria);
     // Helix sets the timeoutMs argument specified in 'send' call as the processing timeout of the message.
     int nMsgsSent = messagingService.send(recipientCriteria, refreshMessage, null, timeoutMs);
     if (nMsgsSent > 0) {
@@ -1534,7 +1542,7 @@ public class PinotHelixResourceManager {
       // May be the case when none of the brokers are up yet. That is OK, because when they come up they will get
       // the latest time boundary info.
       LOGGER.warn("Unable to send timeboundary refresh message for {} of table {}, nMsgs={}", segmentName,
-              offlineTableName, nMsgsSent);
+          offlineTableName, nMsgsSent);
     }
   }
 
@@ -2192,6 +2200,58 @@ public class PinotHelixResourceManager {
     return endpointToInstance;
   }
 
+  public SegmentMergeLineage getSegmentMergeLineage(String tableNameWithType) {
+    ZNRecord segmentMergeLineageZNRecord =
+        SegmentMergeLineageAccessHelper.getSegmentMergeLineageZNRecord(getPropertyStore(), tableNameWithType);
+    return SegmentMergeLineage.fromZNRecord(segmentMergeLineageZNRecord);
+  }
+
+  public boolean updateSegmentMergeLineage(String tableNameWithType, List<String> segments,
+      List<String> childrenGroupIds) {
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch segment merge lineage from the property store
+        ZNRecord segmentMergeLineageZNRecord =
+            SegmentMergeLineageAccessHelper.getSegmentMergeLineageZNRecord(getPropertyStore(), tableNameWithType);
+        SegmentMergeLineage segmentMergeLineage = SegmentMergeLineage.fromZNRecord(segmentMergeLineageZNRecord);
+        int expectedVersion = segmentMergeLineageZNRecord.getVersion();
+
+        if (segmentMergeLineage == null) {
+          segmentMergeLineage = new SegmentMergeLineage(tableNameWithType);
+        }
+
+        // Add a new segment group
+        String groupId = Generators.timeBasedGenerator().generate().toString();
+        segmentMergeLineage.addSegmentGroup(groupId, segments, childrenGroupIds);
+
+        try {
+          if (SegmentMergeLineageAccessHelper.writeSegmentMergeLineage(getPropertyStore(), segmentMergeLineage,
+              expectedVersion)) {
+            return true;
+          } else {
+            LOGGER.warn("Failed to update segment merge lineage for table: {}", tableNameWithType);
+            return false;
+          }
+        } catch (ZkBadVersionException e) {
+          LOGGER.warn("Version changed while updating segment merge lineage for table: {}", tableNameWithType, e);
+          return false;
+        } catch (Exception e) {
+          LOGGER.warn("Caught exeception file while updating segment merge lineage for table: {}", tableNameWithType,
+              e);
+          return false;
+        }
+      });
+
+      LOGGER.info("Segment merge lineage has been updated successfully. (tableNameWithType: {}, segments: {}, "
+          + "childrenGroupIds: {})", tableNameWithType, segments, childrenGroupIds);
+      return true;
+    } catch (Exception e) {
+      LOGGER.error("Caught Exception while updating segment merge lineage for table: {}", tableNameWithType, e);
+      return false;
+    }
+  }
+
+
   /*
    * Uncomment and use for testing on a real cluster
   public static void main(String[] args) throws Exception {
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/ClusterInfoProvider.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/ClusterInfoProvider.java
index 750d8a2..765b4e3 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/ClusterInfoProvider.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/ClusterInfoProvider.java
@@ -20,6 +20,7 @@ import com.linkedin.pinot.common.config.PinotTaskConfig;
 import com.linkedin.pinot.common.config.TableConfig;
 import com.linkedin.pinot.common.config.TableNameBuilder;
 import com.linkedin.pinot.common.data.Schema;
+import com.linkedin.pinot.common.lineage.SegmentMergeLineage;
 import com.linkedin.pinot.common.metadata.ZKMetadataProvider;
 import com.linkedin.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
 import com.linkedin.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
@@ -128,4 +129,9 @@ public class ClusterInfoProvider {
   public String getVipUrl() {
     return _controllerConf.generateVipUrl();
   }
+
+
+  public SegmentMergeLineage getSegmentMergeLineage(String tableNameWithType) {
+    return _pinotHelixResourceManager.getSegmentMergeLineage(tableNameWithType);
+  }
 }
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/generator/SegmentMergeRollupTaskGenerator.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/generator/SegmentMergeRollupTaskGenerator.java
new file mode 100644
index 0000000..028a53f
--- /dev/null
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/generator/SegmentMergeRollupTaskGenerator.java
@@ -0,0 +1,313 @@
+package com.linkedin.pinot.controller.helix.core.minion.generator;
+
+import com.google.common.base.Preconditions;
+import com.linkedin.pinot.common.config.PinotTaskConfig;
+import com.linkedin.pinot.common.config.TableConfig;
+import com.linkedin.pinot.common.config.TableNameBuilder;
+import com.linkedin.pinot.common.config.TableTaskConfig;
+import com.linkedin.pinot.common.data.Segment;
+import com.linkedin.pinot.common.lineage.SegmentGroup;
+import com.linkedin.pinot.common.lineage.SegmentMergeLineage;
+import com.linkedin.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
+import com.linkedin.pinot.common.segment.SegmentMetadata;
+import com.linkedin.pinot.common.utils.CommonConstants;
+import com.linkedin.pinot.controller.helix.core.minion.ClusterInfoProvider;
+import com.linkedin.pinot.controller.helix.core.util.ZKMetadataUtils;
+import com.linkedin.pinot.core.common.MinionConstants;
+import com.linkedin.pinot.core.operator.transform.transformer.timeunit.TimeUnitTransformer;
+import com.linkedin.pinot.core.operator.transform.transformer.timeunit.TimeUnitTransformerFactory;
+import com.linkedin.pinot.core.segment.index.SegmentMetadataImpl;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import javax.annotation.Nonnull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class SegmentMergeRollupTaskGenerator implements PinotTaskGenerator {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentMergeRollupTaskGenerator.class);
+
+  private final ClusterInfoProvider _clusterInfoProvider;
+
+  public SegmentMergeRollupTaskGenerator(ClusterInfoProvider clusterInfoProvider) {
+    _clusterInfoProvider = clusterInfoProvider;
+  }
+
+  @Nonnull
+  @Override
+  public String getTaskType() {
+    return MinionConstants.SegmentMergeRollupTask.TASK_TYPE;
+  }
+
+  @Nonnull
+  @Override
+  public List<PinotTaskConfig> generateTasks(@Nonnull List<TableConfig> tableConfigs) {
+    List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+
+    // Get the segments that are being converted so that we don't submit them again
+    Set<Segment> runningSegments =
+        TaskGeneratorUtils.getRunningSegments(MinionConstants.ConvertToRawIndexTask.TASK_TYPE, _clusterInfoProvider);
+
+    for (TableConfig tableConfig : tableConfigs) {
+      // Only generate tasks for OFFLINE tables
+      String offlineTableName = tableConfig.getTableName();
+      if (tableConfig.getTableType() != CommonConstants.Helix.TableType.OFFLINE) {
+        LOGGER.warn("Skip generating SegmentMergeRollup for non-OFFLINE table: {}", offlineTableName);
+        continue;
+      }
+
+      TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
+      Preconditions.checkNotNull(tableTaskConfig);
+      Map<String, String> taskConfigs =
+          tableTaskConfig.getConfigsForTaskType(MinionConstants.SegmentMergeRollupTask.TASK_TYPE);
+      Preconditions.checkNotNull(tableConfigs);
+
+      // Get max number of tasks for this table
+      int tableMaxNumTasks;
+      String tableMaxNumTasksConfig = taskConfigs.get(MinionConstants.TABLE_MAX_NUM_TASKS_KEY);
+      if (tableMaxNumTasksConfig != null) {
+        try {
+          tableMaxNumTasks = Integer.valueOf(tableMaxNumTasksConfig);
+        } catch (Exception e) {
+          tableMaxNumTasks = Integer.MAX_VALUE;
+        }
+      } else {
+        tableMaxNumTasks = Integer.MAX_VALUE;
+      }
+
+      // Generate tasks
+      int tableNumTasks = 0;
+
+      List<OfflineSegmentZKMetadata> metadataList = _clusterInfoProvider.getOfflineSegmentsMetadata(offlineTableName);
+
+      SegmentMergeLineage segmentMergeLineage = _clusterInfoProvider.getSegmentMergeLineage(offlineTableName);
+
+      // Get a list of original segments covered by merged segments
+
+      List<Integer> groupLevelList = segmentMergeLineage.getAllGroupLevels();
+      Set<String> uncoveredGroups = new HashSet<>();
+      Set<String> coveredGroups = new HashSet<>();
+      for (Integer groupLevel : groupLevelList) {
+        List<String> groupIdsForGroupLevel;
+        groupIdsForGroupLevel = segmentMergeLineage.getGroupIdsForGroupLevel(groupLevel);
+        uncoveredGroups.addAll(groupIdsForGroupLevel);
+
+        for (String groupId : groupIdsForGroupLevel) {
+          List<String> coveredChildrenForGroupId = segmentMergeLineage.getChildrenForGroup(groupId);
+          if (coveredChildrenForGroupId != null && !coveredChildrenForGroupId.isEmpty()) {
+            coveredGroups.addAll(coveredChildrenForGroupId);
+          }
+        }
+      }
+
+
+      uncoveredGroups.removeAll(coveredGroups);
+
+      Map<String, OfflineSegmentZKMetadata> segmentNameToZkMetadataMap = new HashMap<>();
+      for (OfflineSegmentZKMetadata segmentZKMetadata : metadataList) {
+        segmentNameToZkMetadataMap.put(segmentZKMetadata.getSegmentName(), segmentZKMetadata);
+      }
+
+      LOGGER.info("uncovered groups: ", uncoveredGroups);
+
+      List<OfflineSegmentZKMetadata> segmentsToMerge = new ArrayList<>();
+      for (String groupId : uncoveredGroups) {
+        for (String segmentName : segmentMergeLineage.getSegmentsForGroup(groupId)) {
+          segmentsToMerge.add(segmentNameToZkMetadataMap.get(segmentName));
+        }
+      }
+
+//      List<OfflineSegmentZKMetadata> segmentsToMerge = new ArrayList<>();
+//      for (OfflineSegmentZKMetadata segmentZKMetadata: metadataList) {
+//        String segmentName = segmentZKMetadata.getSegmentName();
+//        if (!coveredSegments.contains(segmentName)) {
+//          segmentsToMerge.add(segmentZKMetadata);
+//        }
+//      }
+//      if (!segmentsToRemove.isEmpty()) {
+//        LOGGER.info("Removing Segments since they are covered by merged segments: " + String.join(",", segmentsToRemove));
+//        _clusterInfoProvider.removeSegments(offlineTableName, segmentsToRemove);
+//      }
+
+      if (segmentsToMerge.size() > 1) {
+        String downloadUrls = segmentsToMerge.stream()
+            .map(OfflineSegmentZKMetadata::getDownloadUrl)
+            .collect(Collectors.joining(MinionConstants.URL_SEPARATOR));
+
+        String segmentNames = segmentsToMerge.stream()
+            .map(OfflineSegmentZKMetadata::getSegmentName)
+            .collect(Collectors.joining(","));
+
+
+        String crcList = segmentsToMerge.stream()
+            .map(OfflineSegmentZKMetadata::getCrc)
+            .map((crc) -> Long.toString(crc))
+            .collect(Collectors.joining(","));
+
+        String rawTableName = TableNameBuilder.extractRawTableName(offlineTableName);
+        String mergedSegmentName = computeMergedSegmentName(rawTableName, segmentsToMerge);
+        String groupsToCover = String.join(",", uncoveredGroups);
+
+
+        Map<String, String> config = new HashMap<>();
+        config.put(MinionConstants.TABLE_NAME_KEY, offlineTableName);
+        config.put(MinionConstants.SEGMENT_NAME_KEY, segmentNames);
+        config.put(MinionConstants.DOWNLOAD_URL_KEY, downloadUrls);
+        config.put(MinionConstants.UPLOAD_URL_KEY, _clusterInfoProvider.getVipUrl() + "/segments");
+        config.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY, crcList);
+
+        config.put(MinionConstants.SegmentMergeRollupTask.MERGE_TYPE, "CONCATENATE");
+
+
+        config.put(MinionConstants.SegmentMergeRollupTask.MERGED_SEGEMNT_NAME_KEY, mergedSegmentName);
+        config.put(MinionConstants.SegmentMergeRollupTask.GROUPS_TO_COVER_KEY, groupsToCover);
+        config.put(MinionConstants.CONTROLLER_API_URL, _clusterInfoProvider.getVipUrl());
+        pinotTaskConfigs.add(new PinotTaskConfig(MinionConstants.SegmentMergeRollupTask.TASK_TYPE, config));
+
+      }
+//      for (OfflineSegmentZKMetadata offlineSegmentZKMetadata : _clusterInfoProvider.getOfflineSegmentsMetadata(
+//          offlineTableName)) {
+//        // Generate up to tableMaxNumTasks tasks each time for each table
+//        if (tableNumTasks == tableMaxNumTasks) {
+//          break;
+//        }
+//
+//        // Skip segments that are already submitted
+//        String segmentName = offlineSegmentZKMetadata.getSegmentName();
+//        if (runningSegments.contains(new Segment(offlineTableName, segmentName))) {
+//          continue;
+//        }
+//
+//        // Only submit segments that have not been converted
+//        Map<String, String> customMap = offlineSegmentZKMetadata.getCustomMap();
+//        if (customMap == null || !customMap.containsKey(
+//            MinionConstants.ConvertToRawIndexTask.COLUMNS_TO_CONVERT_KEY + MinionConstants.TASK_TIME_SUFFIX)) {
+//          Map<String, String> configs = new HashMap<>();
+//          configs.put(MinionConstants.TABLE_NAME_KEY, offlineTableName);
+//          configs.put(MinionConstants.SEGMENT_NAME_KEY, segmentName);
+//          configs.put(MinionConstants.DOWNLOAD_URL_KEY, offlineSegmentZKMetadata.getDownloadUrl());
+//          configs.put(MinionConstants.UPLOAD_URL_KEY, _clusterInfoProvider.getVipUrl() + "/segments");
+//          configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY, String.valueOf(offlineSegmentZKMetadata.getCrc()));
+//          if (columnsToConvertConfig != null) {
+//            configs.put(MinionConstants.ConvertToRawIndexTask.COLUMNS_TO_CONVERT_KEY, columnsToConvertConfig);
+//          }
+//
+//          pinotTaskConfigs.add(new PinotTaskConfig(MinionConstants.SegmentMergeRollupTask.TASK_TYPE, configs));
+//          tableNumTasks++;
+//        }
+//      }
+    }
+
+    return pinotTaskConfigs;
+  }
+
+  private String computeMergedSegmentName(String tableName, List<OfflineSegmentZKMetadata> metadataList) {
+    long minStartTime = Long.MAX_VALUE;
+    long maxEndTime = Long.MIN_VALUE;
+
+    for (OfflineSegmentZKMetadata metadata : metadataList) {
+      long currentSegmentStartTime = metadata.getStartTime();
+      long currentSegmentEndTime = metadata.getEndTime();
+
+      if (currentSegmentStartTime < minStartTime) {
+        minStartTime = currentSegmentStartTime;
+      }
+
+      if (currentSegmentEndTime > maxEndTime) {
+        maxEndTime = currentSegmentEndTime;
+      }
+    }
+    return "merged_" + tableName + "_" + minStartTime + "_" + maxEndTime + "_" + System.currentTimeMillis();
+  }
+
+  @Override
+  public int getNumConcurrentTasksPerInstance() {
+    return DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
+  }
+
+  @Override
+  public void nonLeaderCleanUp() {
+
+  }
+
+  private void addCoveredSegments(SegmentGroup node, Set<String> coveredSegments) {
+    Set<String> currentSegments = node.getSegments();
+    if (currentSegments != null && !currentSegments.isEmpty()) {
+      coveredSegments.addAll(node.getSegments());
+    }
+    if (node.getChildrenGroups() != null) {
+      for (SegmentGroup children : node.getChildrenGroups()) {
+        addCoveredSegments(children, coveredSegments);
+      }
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    File inputDir = new File("/Users/snlee/data/careers_comms_processing_hourly_additive/extracted");
+    try {
+      int segmentNum = 0;
+      List<OfflineSegmentZKMetadata> zkMetadataList = new ArrayList<>();
+      for (File index : inputDir.listFiles()) {
+        SegmentMetadata segmentMetadata = new SegmentMetadataImpl(index);
+        OfflineSegmentZKMetadata zkMetadata = new OfflineSegmentZKMetadata();
+        ZKMetadataUtils.updateSegmentMetadata(zkMetadata, segmentMetadata);
+        zkMetadataList.add(zkMetadata);
+      }
+
+
+      Map<Long, List<String>> grouping = getGrouping(zkMetadataList);
+
+      produceResultStat(grouping);
+
+
+    } finally {
+//      FileUtils.deleteQuietly(tmpDir);
+    }
+
+  }
+
+  public static Map<Long, List<String>> getGrouping(List<OfflineSegmentZKMetadata> zkMetadataList) {
+    Map<Long, List<String>> result = new TreeMap<>();
+
+    for (OfflineSegmentZKMetadata zkMetadata : zkMetadataList) {
+      long endTime = zkMetadata.getEndTime();
+      TimeUnit timeUnit = zkMetadata.getTimeUnit();
+      TimeUnitTransformer timeUnitTransformer =
+          TimeUnitTransformerFactory.getTimeUnitTransformer(timeUnit, "months");
+      long[] input = new long[1];
+      input[0] = endTime;
+      long[] output = new long[1];
+      timeUnitTransformer.transform(input, output, 1);
+
+      List<String> group = result.computeIfAbsent(output[0], k -> new ArrayList<>());
+      group.add(zkMetadata.getSegmentName());
+    }
+
+    return result;
+  }
+
+
+  public static void produceResultStat(Map<Long, List<String>> result) {
+    System.out.println("size breakdown: ");
+    for (Map.Entry<Long, List<String>> entry : result.entrySet()) {
+      System.out.println(entry.getKey() + ": " + entry.getValue().size());
+    }
+
+    System.out.println("segments: ");
+    for (Map.Entry<Long, List<String>> entry : result.entrySet()) {
+      List<String> segmentList = entry.getValue();
+      Collections.sort(segmentList);
+      System.out.println(entry.getKey() + ": " + segmentList.get(0) + ", " + segmentList.get(segmentList.size()-1));
+    }
+
+  }
+}
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java
index 49bc3d4..4098607 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java
@@ -32,6 +32,7 @@ public class TaskGeneratorRegistry {
 
   public TaskGeneratorRegistry(@Nonnull ClusterInfoProvider clusterInfoProvider) {
     registerTaskGenerator(new ConvertToRawIndexTaskGenerator(clusterInfoProvider));
+    registerTaskGenerator(new SegmentMergeRollupTaskGenerator(clusterInfoProvider));
   }
 
   /**
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/com/linkedin/pinot/core/common/MinionConstants.java
index d8a0b9f..640b946 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/common/MinionConstants.java
@@ -25,6 +25,7 @@ public class MinionConstants {
   public static final String SEGMENT_NAME_KEY = "segmentName";
   public static final String DOWNLOAD_URL_KEY = "downloadURL";
   public static final String UPLOAD_URL_KEY = "uploadURL";
+  public static final String CONTROLLER_API_URL = "controllerApiUrl";
   public static final String URL_SEPARATOR = ",";
 
   /**
@@ -48,6 +49,13 @@ public class MinionConstants {
     public static final String COLUMNS_TO_CONVERT_KEY = "columnsToConvert";
   }
 
+  public static class SegmentMergeRollupTask {
+    public static final String TASK_TYPE = "SegmentMergeRollupTask";
+    public static final String MERGE_TYPE = "mergeType";
+    public static final String MERGED_SEGEMNT_NAME_KEY = "mergedSegmentNameKey";
+    public static final String GROUPS_TO_COVER_KEY = "groupsToCoverKey";
+  }
+
   // Purges rows inside segment that match chosen criteria
   public static class PurgeTask {
     public static final String TASK_TYPE = "PurgeTask";
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/data/readers/PinotSegmentRecordReader.java b/pinot-core/src/main/java/com/linkedin/pinot/core/data/readers/PinotSegmentRecordReader.java
index 4712f36..f80850e 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/data/readers/PinotSegmentRecordReader.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/data/readers/PinotSegmentRecordReader.java
@@ -16,6 +16,7 @@
 package com.linkedin.pinot.core.data.readers;
 
 import com.google.common.base.Preconditions;
+import com.linkedin.pinot.common.data.DimensionFieldSpec;
 import com.linkedin.pinot.common.data.FieldSpec;
 import com.linkedin.pinot.common.data.Schema;
 import com.linkedin.pinot.common.segment.ReadMode;
@@ -26,6 +27,8 @@ import com.linkedin.pinot.core.data.readers.sort.SegmentSorter;
 import com.linkedin.pinot.core.indexsegment.immutable.ImmutableSegment;
 import com.linkedin.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
 import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
diff --git a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
index a2cdb1a..dc4eb63 100644
--- a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
@@ -94,6 +94,8 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
     Map<String, MinionEventObserverFactory> eventObserverFactoryRegistry =
         Collections.singletonMap(TestTaskGenerator.TASK_TYPE, new TestEventObserverFactory());
     startMinions(NUM_MINIONS, taskExecutorFactoryRegistry, eventObserverFactoryRegistry);
+
+    Thread.sleep(1000000000);
   }
 
   @Test
diff --git a/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/BaseMultipleSegmentsConversionExecutor.java b/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/BaseMultipleSegmentsConversionExecutor.java
index 06cd811..dc40bd0 100644
--- a/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/BaseMultipleSegmentsConversionExecutor.java
+++ b/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/BaseMultipleSegmentsConversionExecutor.java
@@ -30,6 +30,7 @@ import java.util.Map;
 import java.util.stream.Collectors;
 import javax.annotation.Nonnull;
 import org.apache.commons.io.FileUtils;
+import org.apache.http.Header;
 import org.apache.http.NameValuePair;
 import org.apache.http.message.BasicNameValuePair;
 import org.slf4j.Logger;
@@ -59,6 +60,12 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
   protected abstract List<SegmentConversionResult> convert(@Nonnull PinotTaskConfig pinotTaskConfig,
       @Nonnull List<File> originalIndexDir, @Nonnull File workingDir) throws Exception;
 
+  protected abstract void preprocessBeforeUpload(PinotTaskConfig pinotTaskConfig, List<SegmentConversionResult> conversionResults) throws Exception;
+
+  protected List<Header> getHttpHeaders() {
+    return null;
+  }
+
   @Override
   public List<SegmentConversionResult> executeTask(@Nonnull PinotTaskConfig pinotTaskConfig) throws Exception {
     String taskType = pinotTaskConfig.getTaskType();
@@ -122,6 +129,10 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
             taskType + " on table: " + tableNameWithType + ", segments: " + inputSegmentNames + " got cancelled");
       }
 
+      preprocessBeforeUpload(pinotTaskConfig, segmentConversionResults);
+
+      List<Header> headers = getHttpHeaders();
+
       // Upload the tarred segments
       for (int i = 0; i < numOutputSegments; i++) {
         File convertedTarredSegmentFile = tarredSegmentFiles.get(i);
@@ -131,7 +142,7 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
         List<NameValuePair> parameters = Collections.singletonList(
             new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION, "true"));
 
-        SegmentConversionUtils.uploadSegment(configs, null, parameters, tableNameWithType, resultSegmentName, uploadURL,
+        SegmentConversionUtils.uploadSegment(configs, headers, parameters, tableNameWithType, resultSegmentName, uploadURL,
             convertedTarredSegmentFile);
       }
 
diff --git a/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/SegmentConversionUtils.java b/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/SegmentConversionUtils.java
index d47fd0e..8cdb499 100644
--- a/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/SegmentConversionUtils.java
+++ b/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/SegmentConversionUtils.java
@@ -17,18 +17,25 @@ package com.linkedin.pinot.minion.executor;
 import com.linkedin.pinot.common.exception.HttpErrorStatusException;
 import com.linkedin.pinot.common.utils.FileUploadDownloadClient;
 import com.linkedin.pinot.common.utils.SimpleHttpResponse;
+import com.linkedin.pinot.common.utils.retry.AttemptsExceededException;
+import com.linkedin.pinot.common.utils.retry.RetriableOperationException;
 import com.linkedin.pinot.common.utils.retry.RetryPolicies;
 import com.linkedin.pinot.common.utils.retry.RetryPolicy;
 import com.linkedin.pinot.core.common.MinionConstants;
 import com.linkedin.pinot.minion.MinionContext;
 import java.io.File;
+import java.io.IOException;
 import java.net.URI;
 import java.util.List;
 import java.util.Map;
 import javax.net.ssl.SSLContext;
+import javax.validation.constraints.Min;
+import javax.ws.rs.HEAD;
 import org.apache.http.Header;
 import org.apache.http.HttpStatus;
+import org.apache.http.HttpVersion;
 import org.apache.http.NameValuePair;
+import org.apache.http.client.methods.RequestBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -91,4 +98,47 @@ public class SegmentConversionUtils {
       });
     }
   }
+
+  public static void uploadSegmentMergeLineage(Map<String, String> configs, String updateLineageUrl,
+      List<String> childrenSegmentGroups, List<String> segments) throws Exception {
+    String maxNumAttemptsConfig = configs.get(MinionConstants.MAX_NUM_ATTEMPTS_KEY);
+    int maxNumAttempts =
+        maxNumAttemptsConfig != null ? Integer.parseInt(maxNumAttemptsConfig) : DEFAULT_MAX_NUM_ATTEMPTS;
+    String initialRetryDelayMsConfig = configs.get(MinionConstants.INITIAL_RETRY_DELAY_MS_KEY);
+    long initialRetryDelayMs =
+        initialRetryDelayMsConfig != null ? Long.parseLong(initialRetryDelayMsConfig) : DEFAULT_INITIAL_RETRY_DELAY_MS;
+    String retryScaleFactorConfig = configs.get(MinionConstants.RETRY_SCALE_FACTOR_KEY);
+    double retryScaleFactor =
+        retryScaleFactorConfig != null ? Double.parseDouble(retryScaleFactorConfig) : DEFAULT_RETRY_SCALE_FACTOR;
+    RetryPolicy retryPolicy =
+        RetryPolicies.exponentialBackoffRetryPolicy(maxNumAttempts, initialRetryDelayMs, retryScaleFactor);
+
+    SSLContext sslContext = MinionContext.getInstance().getSSLContext();
+    try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient(sslContext)) {
+      retryPolicy.attempt(() -> {
+        try {
+          SimpleHttpResponse response =
+              fileUploadDownloadClient.updateSegmentMergeLineage(new URI(updateLineageUrl), childrenSegmentGroups,
+                  segments, FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS);
+          return true;
+        } catch (HttpErrorStatusException e) {
+          int statusCode = e.getStatusCode();
+          if (statusCode == HttpStatus.SC_CONFLICT || statusCode >= 500) {
+            // Temporary exception
+            return false;
+          } else {
+            // Permanent exception
+            throw e;
+          }
+        } catch (Exception e) {
+          return false;
+        }
+      });
+    }
+  }
+
+  public static void main(String[] args) {
+//    RequestBuilder requestBuilder = RequestBuilder.post("localhost:")
+    RequestBuilder requestBuilder = RequestBuilder.post("localhost:8998/table/");
+  }
 }
diff --git a/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/SegmentMergeRollupTaskExecutor.java b/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/SegmentMergeRollupTaskExecutor.java
new file mode 100644
index 0000000..a4ecd81
--- /dev/null
+++ b/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/SegmentMergeRollupTaskExecutor.java
@@ -0,0 +1,105 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.minion.executor;
+
+import com.linkedin.pinot.common.config.PinotTaskConfig;
+import com.linkedin.pinot.common.config.TableNameBuilder;
+import com.linkedin.pinot.common.utils.FileUploadDownloadClient;
+import com.linkedin.pinot.core.common.MinionConstants;
+import com.linkedin.pinot.core.minion.rollup.MergeRollupSegmentConverter;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nonnull;
+import org.apache.http.Header;
+import org.apache.http.message.BasicHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class SegmentMergeRollupTaskExecutor extends BaseMultipleSegmentsConversionExecutor {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentMergeRollupTaskExecutor.class);
+
+  @Override
+  protected List<SegmentConversionResult> convert(@Nonnull PinotTaskConfig pinotTaskConfig,
+      @Nonnull List<File> originalIndexDirs, @Nonnull File workingDir) throws Exception {
+    Map<String, String> configs = pinotTaskConfig.getConfigs();
+    String mergeType = configs.get(MinionConstants.SegmentMergeRollupTask.MERGE_TYPE);
+    String mergedSegmentName = configs.get(MinionConstants.SegmentMergeRollupTask.MERGED_SEGEMNT_NAME_KEY);
+    String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
+
+    MergeRollupSegmentConverter rollupSegmentConverter =
+        new MergeRollupSegmentConverter.Builder().setMergeType(mergeType)
+            .setTableName(tableNameWithType)
+            .setSegmentName(mergedSegmentName)
+            .setInputIndexDirs(originalIndexDirs)
+            .setWorkingDir(workingDir)
+            .build();
+    List<File> resultFiles = rollupSegmentConverter.convert();
+    List<SegmentConversionResult> results = new ArrayList<>();
+    for (File file : resultFiles) {
+      String outputSegmentName = file.getName();
+      results.add(new SegmentConversionResult.Builder().setFile(file)
+          .setSegmentName(outputSegmentName)
+          .setTableNameWithType(tableNameWithType)
+          .build());
+    }
+    return results;
+  }
+
+  @Override
+  protected void preprocessBeforeUpload(@Nonnull PinotTaskConfig pinotTaskConfig,
+      List<SegmentConversionResult> conversionResults) throws Exception {
+    Map<String, String> configs = pinotTaskConfig.getConfigs();
+    String groupsToCoverStr = configs.get(MinionConstants.SegmentMergeRollupTask.GROUPS_TO_COVER_KEY);
+
+    String vipUrl = configs.get(MinionConstants.CONTROLLER_API_URL);
+    List<String> childrenGroupIds = new ArrayList<>();
+    for (String groupId : groupsToCoverStr.split(",")) {
+      childrenGroupIds.add(groupId.trim());
+    }
+    //    SegmentConversionUtils.uploadSegmentMergeLineage();
+
+    List<String> segments = new ArrayList<>();
+
+    for (SegmentConversionResult conversionResult : conversionResults) {
+      segments.add(conversionResult.getSegmentName());
+    }
+
+    LOGGER.info("Updating segment merge lineage metadata. segments: {}, childrenGroupIds: {}", segments,
+        childrenGroupIds);
+    // list of segments, list of children group ids
+    // Need to update merged segment
+    String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
+
+    String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
+    String tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType).toString();
+
+    String requestUrl = vipUrl + "/tables" + "/" + rawTableName + "/segments/lineage?type=" + tableType;
+    SegmentConversionUtils.uploadSegmentMergeLineage(configs, requestUrl, childrenGroupIds, segments);
+
+  }
+
+  @Override
+  protected List<Header> getHttpHeaders() {
+    // Since we update segment merge lineage in "pre-process step", we need to indicate this segment upload is merged segment.
+    Header mergedSegmentPushHeader =
+        new BasicHeader(FileUploadDownloadClient.CustomHeaders.MERGED_SEGMENT_PUSH, "true");
+    return Collections.singletonList(mergedSegmentPushHeader);
+  }
+}
diff --git a/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/SegmentMergeRollupTaskExecutorFactory.java b/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/SegmentMergeRollupTaskExecutorFactory.java
new file mode 100644
index 0000000..dd97948
--- /dev/null
+++ b/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/SegmentMergeRollupTaskExecutorFactory.java
@@ -0,0 +1,23 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.minion.executor;
+
+public class SegmentMergeRollupTaskExecutorFactory implements PinotTaskExecutorFactory {
+  @Override
+  public PinotTaskExecutor create() {
+    return new SegmentMergeRollupTaskExecutor();
+  }
+}
diff --git a/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/TaskExecutorFactoryRegistry.java b/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/TaskExecutorFactoryRegistry.java
index 91e1ea6..d316c6e 100644
--- a/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/TaskExecutorFactoryRegistry.java
+++ b/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/TaskExecutorFactoryRegistry.java
@@ -32,6 +32,8 @@ public class TaskExecutorFactoryRegistry {
     registerTaskExecutorFactory(MinionConstants.ConvertToRawIndexTask.TASK_TYPE,
         new ConvertToRawIndexTaskExecutorFactory());
     registerTaskExecutorFactory(MinionConstants.PurgeTask.TASK_TYPE, new PurgeTaskExecutorFactory());
+    registerTaskExecutorFactory(MinionConstants.SegmentMergeRollupTask.TASK_TYPE,
+        new SegmentMergeRollupTaskExecutorFactory());
   }
 
   /**
diff --git a/pom.xml b/pom.xml
index 81a6dbb..e020a5e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -470,6 +470,11 @@
         <version>${jackson.version}</version>
       </dependency>
       <dependency>
+        <groupId>com.fasterxml.uuid</groupId>
+        <artifactId>java-uuid-generator</artifactId>
+        <version>3.1.5</version>
+      </dependency>
+      <dependency>
         <groupId>org.yaml</groupId>
         <artifactId>snakeyaml</artifactId>
         <version>1.16</version>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 02/02: Improve the logging for segment merge command 1. add min start / max end time for input segments and merged segment 2. added the total number of documents for input and merged segment

Posted by sn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch improve-merge-command
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 5a8ef509f0c4f2a14d533ced712fd94d7088669e
Author: Seunghyun Lee <sn...@linkedin.com>
AuthorDate: Fri Nov 30 16:27:21 2018 -0800

    Improve the logging for segment merge command
    1. add min start / max end time for input segments and merged segment
    2. added the total number of documents for input and merged segment
---
 .../segment/converter/SegmentMergeCommand.java     | 47 +++++++++++++---------
 1 file changed, 27 insertions(+), 20 deletions(-)

diff --git a/pinot-tools/src/main/java/com/linkedin/pinot/tools/segment/converter/SegmentMergeCommand.java b/pinot-tools/src/main/java/com/linkedin/pinot/tools/segment/converter/SegmentMergeCommand.java
index 1c0da29..8cddcad 100644
--- a/pinot-tools/src/main/java/com/linkedin/pinot/tools/segment/converter/SegmentMergeCommand.java
+++ b/pinot-tools/src/main/java/com/linkedin/pinot/tools/segment/converter/SegmentMergeCommand.java
@@ -151,9 +151,27 @@ public class SegmentMergeCommand extends AbstractBaseAdminCommand implements Com
       LOGGER.info("Table config: {}", tableConfig);
       LOGGER.info("Schema : {}", schema);
 
+      // Compute mix/max time from segment metadata
+      long minStartTime = Long.MAX_VALUE;
+      long maxEndTime = Long.MIN_VALUE;
+      long totalNumDocsBeforeMerge = 0;
+      for (File indexDir : inputIndexDirs) {
+        SegmentMetadata segmentMetadata = new SegmentMetadataImpl(indexDir);
+        long currentStartTime = segmentMetadata.getStartTime();
+        if (currentStartTime < minStartTime) {
+          minStartTime = currentStartTime;
+        }
+
+        long currentEndTime = segmentMetadata.getEndTime();
+        if (currentEndTime > maxEndTime) {
+          maxEndTime = currentEndTime;
+        }
+        totalNumDocsBeforeMerge += segmentMetadata.getTotalDocs();
+      }
+
       // Compute segment name if it is not specified
       if (_outputSegmentName == null) {
-        _outputSegmentName = getDefaultSegmentName(tableConfig, schema, inputIndexDirs);
+        _outputSegmentName = getDefaultSegmentName(tableConfig, schema, inputIndexDirs, minStartTime, maxEndTime);
       }
       LOGGER.info("Output segment name: {}", _outputSegmentName);
 
@@ -195,6 +213,11 @@ public class SegmentMergeCommand extends AbstractBaseAdminCommand implements Com
         FileUtils.moveDirectory(outputSegment, finalOutputPath);
       }
       LOGGER.info("Segment has been merged correctly. Output file is located at {}", finalOutputPath);
+      LOGGER.info("Min start time / max end time for input segments : " + minStartTime + " / " + maxEndTime);
+      LOGGER.info("Min start time / max end time for merged segment: " + outputSegmentMetadata.getStartTime() + " / "
+          + outputSegmentMetadata.getEndTime());
+      LOGGER.info("Total number of documents for input segments: " + totalNumDocsBeforeMerge);
+      LOGGER.info("Total number of documents for merged segment: " + outputSegmentMetadata.getTotalDocs());
     } finally {
       // Clean up working directory
       FileUtils.deleteQuietly(workingDir);
@@ -207,25 +230,9 @@ public class SegmentMergeCommand extends AbstractBaseAdminCommand implements Com
     return "Create the merged segment using concatenation";
   }
 
-  private String getDefaultSegmentName(TableConfig tableConfig, Schema schema, List<File> inputIndexDirs)
-      throws Exception {
-    String tableName = TableNameBuilder.extractRawTableName(tableConfig.getTableName());
-
-    // Compute mix/max time from segment metadata
-    long minStartTime = Long.MAX_VALUE;
-    long maxEndTime = Long.MIN_VALUE;
-    for (File indexDir : inputIndexDirs) {
-      SegmentMetadata segmentMetadata = new SegmentMetadataImpl(indexDir);
-      long currentStartTime = segmentMetadata.getStartTime();
-      if (currentStartTime < minStartTime) {
-        minStartTime = currentStartTime;
-      }
-
-      long currentEndTime = segmentMetadata.getEndTime();
-      if (currentEndTime > maxEndTime) {
-        maxEndTime = currentEndTime;
-      }
-    }
+  private String getDefaultSegmentName(TableConfig tableConfig, Schema schema, List<File> inputIndexDirs,
+      long minStartTime, long maxEndTime) throws Exception {
+    String tableName = tableConfig.getTableName();
 
     // Fetch time related configurations from schema and table config.
     String pushFrequency = tableConfig.getValidationConfig().getSegmentPushFrequency();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org