You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2018/12/01 00:28:21 UTC

[incubator-pinot] 01/02: initial commit

This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch improve-merge-command
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit cc3b86a7e97e0face9fe828fff3e049c4cc43a8e
Author: Seunghyun Lee <sn...@linkedin.com>
AuthorDate: Mon Nov 26 16:24:48 2018 -0800

    initial commit
---
 .../routing/HelixExternalViewBasedRouting.java     |  22 +-
 .../BasePartitionAwareRoutingTableBuilder.java     |   6 +-
 .../routing/builder/BaseRoutingTableBuilder.java   |  11 +-
 .../builder/DefaultOfflineRoutingTableBuilder.java |   5 +-
 .../DefaultRealtimeRoutingTableBuilder.java        |  11 +-
 .../routing/builder/RoutingTableBuilder.java       |   3 +-
 .../routing/selector/DefaultSegmentSelector.java   |  42 +++
 .../routing/selector/MergedSegmentSelector.java    |  92 ++++++
 .../broker/routing/selector/SegmentSelector.java   |  46 +++
 .../routing/selector/SegmentSelectorFactory.java   |  40 +++
 .../broker/routing/RoutingTableBenchmark.java      | 122 ++++++++
 .../pinot/broker/routing/RoutingTableTest.java     |   1 +
 .../BalancedRandomRoutingTableBuilderTest.java     |   2 +-
 .../HighLevelConsumerRoutingTableBuilderTest.java  |   2 +-
 ...rtitionAwareOfflineRoutingTableBuilderTest.java |   9 +-
 ...titionAwareRealtimeRoutingTableBuilderTest.java |  11 +-
 .../broker/selector/MergeSegmentSelectorTest.java  |  77 +++++
 .../{routing => util}/FakePropertyStore.java       |  13 +-
 pinot-common/pom.xml                               |   4 +
 .../common/config/MultiLevelRollupSetting.java     |  77 +++++
 .../linkedin/pinot/common/config/RollupConfig.java |  59 ++++
 .../pinot/common/config/SegmentMergeConfig.java    | 137 +++++++++
 .../linkedin/pinot/common/config/TableConfig.java  |  84 ++++--
 .../pinot/common/lineage/SegmentGroup.java         |  72 +++++
 .../pinot/common/lineage/SegmentMergeLineage.java  | 333 +++++++++++++++++++++
 .../lineage/SegmentMergeLineageAccessHelper.java   |  82 +++++
 .../pinot/common/metadata/ZKMetadataProvider.java  |   5 +
 .../common/utils/FileUploadDownloadClient.java     |  51 +++-
 .../com/linkedin/pinot/common/utils/UUIDUtils.java |  29 ++
 .../common/lineage/SegmentMergeLineageTest.java    | 106 +++++++
 .../PinotSegmentUploadRestletResource.java         |   9 +-
 .../api/resources/PinotTableRestletResource.java   |  48 +++
 .../pinot/controller/api/upload/ZKOperator.java    |  53 +++-
 .../helix/ControllerRequestURLBuilder.java         |   5 +
 .../helix/core/PinotHelixResourceManager.java      |  68 ++++-
 .../helix/core/minion/ClusterInfoProvider.java     |   6 +
 .../generator/SegmentMergeRollupTaskGenerator.java | 313 +++++++++++++++++++
 .../minion/generator/TaskGeneratorRegistry.java    |   1 +
 .../pinot/core/common/MinionConstants.java         |   8 +
 .../data/readers/PinotSegmentRecordReader.java     |   3 +
 .../tests/SimpleMinionClusterIntegrationTest.java  |   2 +
 .../BaseMultipleSegmentsConversionExecutor.java    |  13 +-
 .../minion/executor/SegmentConversionUtils.java    |  50 ++++
 .../executor/SegmentMergeRollupTaskExecutor.java   | 105 +++++++
 .../SegmentMergeRollupTaskExecutorFactory.java     |  23 ++
 .../executor/TaskExecutorFactoryRegistry.java      |   2 +
 pom.xml                                            |   5 +
 47 files changed, 2189 insertions(+), 79 deletions(-)

diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/HelixExternalViewBasedRouting.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/HelixExternalViewBasedRouting.java
index c3db57d..287eeea 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/HelixExternalViewBasedRouting.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/HelixExternalViewBasedRouting.java
@@ -17,6 +17,8 @@ package com.linkedin.pinot.broker.routing;
 
 import com.google.common.collect.Sets;
 import com.linkedin.pinot.broker.routing.builder.RoutingTableBuilder;
+import com.linkedin.pinot.broker.routing.selector.SegmentSelector;
+import com.linkedin.pinot.broker.routing.selector.SegmentSelectorFactory;
 import com.linkedin.pinot.common.config.TableConfig;
 import com.linkedin.pinot.common.config.TableNameBuilder;
 import com.linkedin.pinot.common.metrics.BrokerMeter;
@@ -67,6 +69,7 @@ public class HelixExternalViewBasedRouting implements RoutingTable {
   private final Map<String, Map<String, InstanceConfig>> _lastKnownInstanceConfigsForTable = new ConcurrentHashMap<>();
   private final Map<String, InstanceConfig> _lastKnownInstanceConfigs = new ConcurrentHashMap<>();
   private final Map<String, Set<String>> _tablesForInstance = new ConcurrentHashMap<>();
+  private final Map<String, SegmentSelector> _segmentSelectorMap = new ConcurrentHashMap<>();
 
   private final HelixExternalViewBasedTimeBoundaryService _timeBoundaryService;
   private final HelixManager _helixManager;
@@ -76,25 +79,25 @@ public class HelixExternalViewBasedRouting implements RoutingTable {
 
   private Configuration _configuration;
 
-  private ZkHelixPropertyStore<ZNRecord> _propertyStore;
-
   private RoutingTableBuilderFactory _routingTableBuilderFactory;
+  private SegmentSelectorFactory _segmentSelectorFactory;
+
 
   public HelixExternalViewBasedRouting(ZkHelixPropertyStore<ZNRecord> propertyStore, HelixManager helixManager,
       Configuration configuration) {
-    _propertyStore = propertyStore;
     _configuration = configuration;
     _timeBoundaryService = new HelixExternalViewBasedTimeBoundaryService(propertyStore);
     _routingTableBuilderMap = new HashMap<>();
     _helixManager = helixManager;
     _routingTableBuilderFactory = new RoutingTableBuilderFactory(_configuration, propertyStore);
+    _segmentSelectorFactory = new SegmentSelectorFactory(propertyStore);
   }
 
   @Override
   public Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request) {
     String tableName = request.getTableName();
     RoutingTableBuilder routingTableBuilder = _routingTableBuilderMap.get(tableName);
-    return routingTableBuilder.getRoutingTable(request);
+    return routingTableBuilder.getRoutingTable(request, _segmentSelectorMap.get(tableName));
   }
 
   @Override
@@ -116,6 +119,11 @@ public class HelixExternalViewBasedRouting implements RoutingTable {
         tableName);
     _routingTableBuilderMap.put(tableName, routingTableBuilder);
 
+    // Initialize segment selector
+    SegmentSelector segmentSelector = _segmentSelectorFactory.createSegmentSelector(tableConfig);
+    LOGGER.info("Initialized segmentSelector: {} for table {}", segmentSelector.getClass().getName(), tableName);
+    _segmentSelectorMap.put(tableName, segmentSelector);
+
     // Build the routing table
     if (externalView == null) {
       // It is possible for us to get a request to serve a table for which there is no external view. In this case, just
@@ -223,8 +231,11 @@ public class HelixExternalViewBasedRouting implements RoutingTable {
     int externalViewRecordVersion = externalView.getRecord().getVersion();
     _lastKnownExternalViewVersionMap.put(tableNameWithType, externalViewRecordVersion);
 
+
+
     RoutingTableBuilder routingTableBuilder = _routingTableBuilderMap.get(tableNameWithType);
-    if (routingTableBuilder == null) {
+    SegmentSelector segmentSelector = _segmentSelectorMap.get(tableNameWithType);
+    if (routingTableBuilder == null || segmentSelector == null) {
       //TODO: warn
       return;
     }
@@ -237,6 +248,7 @@ public class HelixExternalViewBasedRouting implements RoutingTable {
       Map<String, InstanceConfig> relevantInstanceConfigs = new HashMap<>();
 
       routingTableBuilder.computeOnExternalViewChange(tableNameWithType, externalView, instanceConfigs);
+      segmentSelector.computeOnExternalViewChange();
 
       // Keep track of the instance configs that are used in that routing table
       updateInstanceConfigsMapFromExternalView(relevantInstanceConfigs, instanceConfigs, externalView);
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/BasePartitionAwareRoutingTableBuilder.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/BasePartitionAwareRoutingTableBuilder.java
index 8d1f568..732c636 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/BasePartitionAwareRoutingTableBuilder.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/BasePartitionAwareRoutingTableBuilder.java
@@ -18,6 +18,7 @@ package com.linkedin.pinot.broker.routing.builder;
 import com.linkedin.pinot.broker.pruner.SegmentPrunerContext;
 import com.linkedin.pinot.broker.pruner.SegmentZKMetadataPrunerService;
 import com.linkedin.pinot.broker.routing.RoutingTableLookupRequest;
+import com.linkedin.pinot.broker.routing.selector.SegmentSelector;
 import com.linkedin.pinot.common.config.TableConfig;
 import com.linkedin.pinot.common.metadata.segment.SegmentZKMetadata;
 import com.linkedin.pinot.common.metrics.BrokerMeter;
@@ -82,7 +83,7 @@ public abstract class BasePartitionAwareRoutingTableBuilder implements RoutingTa
   }
 
   @Override
-  public Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request) {
+  public Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request, SegmentSelector segmentSelector) {
     // Copy the reference for the current segment to replica to server mapping for snapshot
     Map<String, Map<Integer, String>> segmentToReplicaToServerMap = _segmentToReplicaToServerMap;
 
@@ -90,6 +91,9 @@ public abstract class BasePartitionAwareRoutingTableBuilder implements RoutingTa
     Set<String> segmentsToQuery = segmentToReplicaToServerMap.keySet();
 
     // TODO: add the selection logic here
+    if (segmentSelector != null) {
+      segmentsToQuery = segmentSelector.selectSegments(request, segmentsToQuery);
+    }
 
     Map<String, List<String>> routingTable = new HashMap<>();
     SegmentPrunerContext prunerContext = new SegmentPrunerContext(request.getBrokerRequest());
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/BaseRoutingTableBuilder.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/BaseRoutingTableBuilder.java
index c00d30d..1d59e89 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/BaseRoutingTableBuilder.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/BaseRoutingTableBuilder.java
@@ -16,6 +16,7 @@
 package com.linkedin.pinot.broker.routing.builder;
 
 import com.linkedin.pinot.broker.routing.RoutingTableLookupRequest;
+import com.linkedin.pinot.broker.routing.selector.SegmentSelector;
 import com.linkedin.pinot.common.config.RoutingConfig;
 import com.linkedin.pinot.common.config.TableConfig;
 import com.linkedin.pinot.common.metrics.BrokerMeter;
@@ -123,16 +124,16 @@ public abstract class BaseRoutingTableBuilder implements RoutingTableBuilder {
     }
   }
 
-  @Override
-  public Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request) {
+  public Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request, SegmentSelector segmentSelector) {
     if (_enableDynamicComputing) {
       // Copy the pointer for snapshot since the pointer for segment to servers map can change at anytime
       Map<String, List<String>> segmentToServersMap = _segmentToServersMap;
 
-      // Get all existing segments
-      Set<String> segmentsToQuery = segmentToServersMap.keySet();
-
       // TODO: add the selection logic here
+      Set<String> segmentsToQuery = segmentToServersMap.keySet();
+      if (segmentSelector != null) {
+        segmentsToQuery = segmentSelector.selectSegments(request, segmentsToQuery);
+      }
 
       // Compute the final routing table
       return computeDynamicRoutingTable(segmentToServersMap, segmentsToQuery);
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/DefaultOfflineRoutingTableBuilder.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/DefaultOfflineRoutingTableBuilder.java
index 07cea6b..64ba82f 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/DefaultOfflineRoutingTableBuilder.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/DefaultOfflineRoutingTableBuilder.java
@@ -16,6 +16,7 @@
 package com.linkedin.pinot.broker.routing.builder;
 
 import com.linkedin.pinot.broker.routing.RoutingTableLookupRequest;
+import com.linkedin.pinot.broker.routing.selector.SegmentSelector;
 import com.linkedin.pinot.common.config.TableConfig;
 import com.linkedin.pinot.common.metrics.BrokerMetrics;
 import java.util.HashSet;
@@ -121,8 +122,8 @@ public class DefaultOfflineRoutingTableBuilder implements RoutingTableBuilder {
   }
 
   @Override
-  public Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request) {
-    return _routingTableBuilder.getRoutingTable(request);
+  public Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request, SegmentSelector segmentSelector) {
+    return _routingTableBuilder.getRoutingTable(request, segmentSelector);
   }
 
   @Override
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/DefaultRealtimeRoutingTableBuilder.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/DefaultRealtimeRoutingTableBuilder.java
index d78de04..72a9a79 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/DefaultRealtimeRoutingTableBuilder.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/DefaultRealtimeRoutingTableBuilder.java
@@ -16,6 +16,7 @@
 package com.linkedin.pinot.broker.routing.builder;
 
 import com.linkedin.pinot.broker.routing.RoutingTableLookupRequest;
+import com.linkedin.pinot.broker.routing.selector.SegmentSelector;
 import com.linkedin.pinot.common.config.TableConfig;
 import com.linkedin.pinot.common.metrics.BrokerMetrics;
 import com.linkedin.pinot.common.utils.SegmentName;
@@ -68,7 +69,7 @@ public class DefaultRealtimeRoutingTableBuilder implements RoutingTableBuilder {
   }
 
   @Override
-  public Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request) {
+  public Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request, SegmentSelector segmentSelector) {
     boolean forceLLC = false;
     boolean forceHLC = false;
     for (String routingOption : request.getRoutingOptions()) {
@@ -85,14 +86,14 @@ public class DefaultRealtimeRoutingTableBuilder implements RoutingTableBuilder {
     }
 
     if (forceLLC) {
-      return _realtimeLLCRoutingTableBuilder.getRoutingTable(request);
+      return _realtimeLLCRoutingTableBuilder.getRoutingTable(request, segmentSelector);
     } else if (forceHLC) {
-      return _realtimeHLCRoutingTableBuilder.getRoutingTable(request);
+      return _realtimeHLCRoutingTableBuilder.getRoutingTable(request, segmentSelector);
     } else {
       if (_hasLLC) {
-        return _realtimeLLCRoutingTableBuilder.getRoutingTable(request);
+        return _realtimeLLCRoutingTableBuilder.getRoutingTable(request, segmentSelector);
       } else if (_hasHLC) {
-        return _realtimeHLCRoutingTableBuilder.getRoutingTable(request);
+        return _realtimeHLCRoutingTableBuilder.getRoutingTable(request, segmentSelector);
       } else {
         return Collections.emptyMap();
       }
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/RoutingTableBuilder.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/RoutingTableBuilder.java
index 5a9c320..3fa2bee 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/RoutingTableBuilder.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/RoutingTableBuilder.java
@@ -16,6 +16,7 @@
 package com.linkedin.pinot.broker.routing.builder;
 
 import com.linkedin.pinot.broker.routing.RoutingTableLookupRequest;
+import com.linkedin.pinot.broker.routing.selector.SegmentSelector;
 import com.linkedin.pinot.common.config.TableConfig;
 import com.linkedin.pinot.common.metrics.BrokerMetrics;
 import java.util.List;
@@ -47,7 +48,7 @@ public interface RoutingTableBuilder {
   /**
    * Get the routing table based on the given lookup request.
    */
-  Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request);
+  Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request, SegmentSelector segmentSelector);
 
   /**
    * Get all pre-computed routing tables.
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/selector/DefaultSegmentSelector.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/selector/DefaultSegmentSelector.java
new file mode 100644
index 0000000..b8105f6
--- /dev/null
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/selector/DefaultSegmentSelector.java
@@ -0,0 +1,42 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.broker.routing.selector;
+
+import com.linkedin.pinot.broker.routing.RoutingTableLookupRequest;
+import com.linkedin.pinot.common.config.TableConfig;
+import java.util.Set;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+
+
+/**
+ * Default Segment Selector
+ */
+public class DefaultSegmentSelector implements SegmentSelector {
+
+  @Override
+  public void init(TableConfig tableConfig, ZkHelixPropertyStore<ZNRecord> propertyStore) {
+  }
+
+  @Override
+  public void computeOnExternalViewChange() {
+  }
+
+  @Override
+  public Set<String> selectSegments(RoutingTableLookupRequest request, Set<String> segments) {
+    return segments;
+  }
+}
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/selector/MergedSegmentSelector.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/selector/MergedSegmentSelector.java
new file mode 100644
index 0000000..7d74146
--- /dev/null
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/selector/MergedSegmentSelector.java
@@ -0,0 +1,92 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.broker.routing.selector;
+
+import com.linkedin.pinot.broker.routing.RoutingTableLookupRequest;
+import com.linkedin.pinot.common.config.TableConfig;
+import com.linkedin.pinot.common.lineage.SegmentGroup;
+import com.linkedin.pinot.common.lineage.SegmentMergeLineage;
+import com.linkedin.pinot.common.lineage.SegmentMergeLineageAccessHelper;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+
+
+/**
+ * Segment selector for merged segments
+ */
+public class MergedSegmentSelector implements SegmentSelector {
+  private String _tableNameWithType;
+  private ZkHelixPropertyStore<ZNRecord> _propertyStore;
+  private volatile SegmentGroup _rootSegmentGroup;
+
+  @Override
+  public void init(TableConfig tableConfig, ZkHelixPropertyStore<ZNRecord> propertyStore) {
+    _tableNameWithType = tableConfig.getTableName();
+    _propertyStore = propertyStore;
+  }
+
+  @Override
+  public void computeOnExternalViewChange() {
+    SegmentMergeLineage segmentMergeLineage = SegmentMergeLineageAccessHelper.getSegmentMergeLineage(_propertyStore,
+        _tableNameWithType);
+    _rootSegmentGroup = segmentMergeLineage.getMergeLineageRootSegmentGroup();
+  }
+
+  @Override
+  public Set<String> selectSegments(RoutingTableLookupRequest request, Set<String> segmentsToQuery) {
+    Set<String> selectedSegments = new HashSet<>(segmentsToQuery);
+    for (SegmentGroup segmentGroup : _rootSegmentGroup.getChildrenGroups()) {
+      computeSelectionProcessForSegmentGroup(segmentGroup, selectedSegments, segmentsToQuery);
+    }
+    return selectedSegments;
+  }
+
+  private void computeSelectionProcessForSegmentGroup(SegmentGroup segmentGroup, Set<String> selectedSegments,
+      Set<String> availableSegments) {
+    Set<String> segmentsForGroup = segmentGroup.getSegments();
+
+    if (availableSegments.containsAll(segmentsForGroup)) {
+      // If we pick the current group node, we delete all segments covered by children groups
+      if (segmentGroup.getChildrenGroups() == null || segmentGroup.getChildrenGroups().isEmpty()) {
+        return;
+      }
+      for (SegmentGroup child: segmentGroup.getChildrenGroups()) {
+        removeSegmentsForSegmentGroup(child, selectedSegments);
+      }
+    } else {
+      // If the current group is not picked, we compute the selection recursively for children nodes
+      selectedSegments.removeAll(segmentsForGroup);
+      for (SegmentGroup child: segmentGroup.getChildrenGroups()) {
+        computeSelectionProcessForSegmentGroup(child, selectedSegments, availableSegments);
+      }
+    }
+  }
+
+  private void removeSegmentsForSegmentGroup(SegmentGroup segmentGroup, Set<String> selectedSegments) {
+    Set<String> segmentsForGroup = segmentGroup.getSegments();
+    selectedSegments.removeAll(segmentsForGroup);
+
+    if (segmentGroup.getChildrenGroups() == null || segmentGroup.getChildrenGroups().isEmpty()) {
+      return;
+    }
+
+    for (SegmentGroup child : segmentGroup.getChildrenGroups()) {
+      removeSegmentsForSegmentGroup(child, selectedSegments);
+    }
+  }
+}
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/selector/SegmentSelector.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/selector/SegmentSelector.java
new file mode 100644
index 0000000..f7bb19d
--- /dev/null
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/selector/SegmentSelector.java
@@ -0,0 +1,46 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.broker.routing.selector;
+
+import com.linkedin.pinot.broker.routing.RoutingTableLookupRequest;
+import com.linkedin.pinot.common.config.TableConfig;
+import java.util.Set;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+
+
+/**
+ * Interface for segment selector
+ */
+public interface SegmentSelector {
+
+  /**
+   * Initiate the segment selector
+   */
+  void init(TableConfig tableConfig, ZkHelixPropertyStore<ZNRecord> propertyStore);
+
+  /**
+   * Compute and update the information required for a selector
+   * <p>Should be called whenever there is an external view change
+   */
+  void computeOnExternalViewChange();
+
+  /**
+   * Compute selection algorithm.
+   * <p>May end up with modifying the input set of segments.
+   */
+  Set<String> selectSegments(RoutingTableLookupRequest request, Set<String> segmentsToQuery);
+}
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/selector/SegmentSelectorFactory.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/selector/SegmentSelectorFactory.java
new file mode 100644
index 0000000..1033bc9
--- /dev/null
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/selector/SegmentSelectorFactory.java
@@ -0,0 +1,40 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.broker.routing.selector;
+
+import com.linkedin.pinot.common.config.TableConfig;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+
+
+/**
+ * Factory for segment selector
+ */
+public class SegmentSelectorFactory {
+
+  private ZkHelixPropertyStore<ZNRecord> _propertyStore;
+
+  public SegmentSelectorFactory(ZkHelixPropertyStore<ZNRecord> propertyStore) {
+    _propertyStore = propertyStore;
+  }
+
+  public SegmentSelector createSegmentSelector(TableConfig tableConfig) {
+    // TODO: add the support for merged segment selector once merge config is updated.
+    SegmentSelector segmentSelector = new MergedSegmentSelector();
+    segmentSelector.init(tableConfig, _propertyStore);
+    return segmentSelector;
+  }
+}
diff --git a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/RoutingTableBenchmark.java b/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/RoutingTableBenchmark.java
new file mode 100644
index 0000000..046cb69
--- /dev/null
+++ b/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/RoutingTableBenchmark.java
@@ -0,0 +1,122 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.broker.routing;
+
+
+import com.linkedin.pinot.common.config.RoutingConfig;
+import com.linkedin.pinot.common.config.TableConfig;
+import com.linkedin.pinot.common.config.TableNameBuilder;
+import com.linkedin.pinot.common.utils.CommonConstants;
+import java.io.FileInputStream;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.io.IOUtils;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.testng.Assert;
+
+
+public class RoutingTableBenchmark {
+  public static final int NUM_ROUNDS = 10000;
+
+//  public static void main(String[] args) throws Exception {
+////    URL resourceUrl = RoutingTableBenchmark.class.getClassLoader().getResource("SampleExternalView.json");
+////    URL resourceUrl = RoutingTableBenchmark.class.getClassLoader().getResource("SampleRealtimeExternalView.json");
+//    URL resourceUrl = RoutingTableBenchmark.class.getClassLoader().getResource("AdStatistics.json");
+//
+//    Assert.assertNotNull(resourceUrl);
+//    String fileName = resourceUrl.getFile();
+//
+//    byte[] externalViewBytes = IOUtils.toByteArray(new FileInputStream(fileName));
+//    ExternalView externalView = new ExternalView((ZNRecord) new ZNRecordSerializer().deserialize(externalViewBytes));
+//    String tableName = externalView.getResourceName();
+//    List<InstanceConfig> instanceConfigs = getInstanceConfigs(externalView);
+//
+//    TableConfig preComputeTableConfig = generatePreComputeTableConfig(tableName);
+//    TableConfig dynamicTableConfig = generateDynamicTableConfig(tableName);
+//
+//    HelixExternalViewBasedRouting precomputeRouting = new HelixExternalViewBasedRouting(null, null, new BaseConfiguration());
+//    precomputeRouting.markDataResourceOnline(preComputeTableConfig, externalView, instanceConfigs);
+//
+//    HelixExternalViewBasedRouting dynamicRouting = new HelixExternalViewBasedRouting(null, null, new BaseConfiguration());
+//    dynamicRouting.markDataResourceOnline(dynamicTableConfig, externalView, instanceConfigs);
+//
+////    System.out.println("Pre-compute Routing table builder");
+////    for (int i = 0; i < NUM_ROUNDS; i++) {
+////      long start = System.nanoTime();
+////      Map<String, List<String>> routingTable = precomputeRouting.getRoutingTable(new RoutingTableLookupRequest(tableName));
+////      long end = System.nanoTime();
+////      System.out.println("round " + i + ": " + (end - start) / 1000000.0 + " ms");
+////    }
+////    System.out.println("");
+//
+//    System.out.println("Dynamic Routing table builder");
+//    for (int i = 0; i < NUM_ROUNDS; i++) {
+//      long start = System.nanoTime();
+//      Map<String, List<String>> routingTable = dynamicRouting.getRoutingTable(new RoutingTableLookupRequest(tableName));
+//      long end = System.nanoTime();
+//      System.out.println("round " + i + ": " + (end - start) / 1000000.0 + " ms");
+//    }
+//  }
+//
+//  private static List<InstanceConfig> getInstanceConfigs(ExternalView externalView) {
+//    List<InstanceConfig> instanceConfigs = new ArrayList<>();
+//    Set<String> instances = new HashSet<>();
+//
+//    // Collect all unique instances
+//    for (String partitionName : externalView.getPartitionSet()) {
+//      for (String instance : externalView.getStateMap(partitionName).keySet()) {
+//        if (!instances.contains(instance)) {
+//          instanceConfigs.add(new InstanceConfig(instance));
+//          instances.add(instance);
+//        }
+//      }
+//    }
+//
+//    return instanceConfigs;
+//  }
+//
+//  private static TableConfig generatePreComputeTableConfig(String tableName) throws Exception {
+//    CommonConstants.Helix.TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName);
+//    TableConfig.Builder builder = new TableConfig.Builder(tableType);
+//    builder.setTableName(tableName);
+//    TableConfig tableConfig = builder.build();
+////    tableConfig.getRoutingConfig().setRoutingTableBuilderName("KafkaLowLevel");
+//    return tableConfig;
+//  }
+//
+//  private static TableConfig generateDynamicTableConfig(String tableName) throws Exception {
+//    CommonConstants.Helix.TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName);
+//    TableConfig.Builder builder = new TableConfig.Builder(tableType);
+//    builder.setTableName(tableName);
+//    TableConfig tableConfig = builder.build();
+//    Map<String, String> option = new HashMap<>();
+//    option.put(RoutingConfig.ENABLE_DYNAMIC_COMPUTING_KEY, "true");
+//    tableConfig.getRoutingConfig().setRoutingTableBuilderOptions(option);
+////    tableConfig.getRoutingConfig().setRoutingTableBuilderName("KafkaLowLevel");
+//    return tableConfig;
+//  }
+
+}
+
diff --git a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/RoutingTableTest.java b/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/RoutingTableTest.java
index 25600d3..3413711 100644
--- a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/RoutingTableTest.java
+++ b/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/RoutingTableTest.java
@@ -17,6 +17,7 @@ package com.linkedin.pinot.broker.routing;
 
 import com.linkedin.pinot.broker.routing.builder.HighLevelConsumerBasedRoutingTableBuilder;
 import com.linkedin.pinot.broker.routing.builder.RoutingTableBuilder;
+import com.linkedin.pinot.broker.util.FakePropertyStore;
 import com.linkedin.pinot.common.config.TableConfig;
 import com.linkedin.pinot.common.config.TableConfig.Builder;
 import com.linkedin.pinot.common.config.TableNameBuilder;
diff --git a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/BalancedRandomRoutingTableBuilderTest.java b/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/BalancedRandomRoutingTableBuilderTest.java
index 7bae71d..46ca40c 100644
--- a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/BalancedRandomRoutingTableBuilderTest.java
+++ b/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/BalancedRandomRoutingTableBuilderTest.java
@@ -79,7 +79,7 @@ public class BalancedRandomRoutingTableBuilderTest {
     // Build routing table
     routingTableBuilder.computeOnExternalViewChange("dummy", externalView, instanceConfigList);
     RoutingTableLookupRequest request = new RoutingTableLookupRequest(tableNameWithType);
-    Map<String, List<String>> routingTable = routingTableBuilder.getRoutingTable(request);
+    Map<String, List<String>> routingTable = routingTableBuilder.getRoutingTable(request, null);
 
     Set<String> segmentsInRoutingTable = new HashSet<>();
     for (List<String> segments : routingTable.values()) {
diff --git a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/HighLevelConsumerRoutingTableBuilderTest.java b/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/HighLevelConsumerRoutingTableBuilderTest.java
index 74f53c3..c2ef4e4 100644
--- a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/HighLevelConsumerRoutingTableBuilderTest.java
+++ b/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/HighLevelConsumerRoutingTableBuilderTest.java
@@ -95,7 +95,7 @@ public class HighLevelConsumerRoutingTableBuilderTest {
       // Check if the routing table result is correct
       for (int run = 0; run < MAX_NUM_GROUPS * 10; run++) {
         RoutingTableLookupRequest request = new RoutingTableLookupRequest(tableNameWithType);
-        Map<String, List<String>> routingTable = routingTableBuilder.getRoutingTable(request);
+        Map<String, List<String>> routingTable = routingTableBuilder.getRoutingTable(request, null);
         Set<String> coveredSegments = new HashSet<>();
         for (List<String> segmentsForServer : routingTable.values()) {
           coveredSegments.addAll(segmentsForServer);
diff --git a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilderTest.java b/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilderTest.java
index c110b6e..5a44a28 100644
--- a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilderTest.java
+++ b/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilderTest.java
@@ -15,7 +15,7 @@
  */
 package com.linkedin.pinot.broker.routing.builder;
 
-import com.linkedin.pinot.broker.routing.FakePropertyStore;
+import com.linkedin.pinot.broker.util.FakePropertyStore;
 import com.linkedin.pinot.broker.routing.RoutingTableLookupRequest;
 import com.linkedin.pinot.common.config.ReplicaGroupStrategyConfig;
 import com.linkedin.pinot.common.config.RoutingConfig;
@@ -50,7 +50,6 @@ public class PartitionAwareOfflineRoutingTableBuilderTest {
   private static final String PARTITION_COLUMN = "memberId";
 
   private static final Pql2Compiler COMPILER = new Pql2Compiler();
-
   private static final Random RANDOM = new Random();
 
   private int NUM_REPLICA;
@@ -114,7 +113,7 @@ public class PartitionAwareOfflineRoutingTableBuilderTest {
       // Check the query that requires to scan all segment.
       String countStarQuery = "select count(*) from myTable";
       Map<String, List<String>> routingTable =
-          routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery));
+          routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery), null);
 
       // Check that the number of servers picked are always equal or less than the number of servers
       // from a single replica group.
@@ -134,7 +133,7 @@ public class PartitionAwareOfflineRoutingTableBuilderTest {
       for (int queryPartition = 0; queryPartition < 100; queryPartition++) {
         String filterQuery = "select count(*) from myTable where " + PARTITION_COLUMN + " = " + queryPartition;
         routingTable =
-            routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(filterQuery));
+            routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(filterQuery), null);
 
         // Check that the number of servers picked are always equal or less than the number of servers
         // in a single replica group.
@@ -218,7 +217,7 @@ public class PartitionAwareOfflineRoutingTableBuilderTest {
     for (int i = 0; i < 100; i++) {
       String countStarQuery = "select count(*) from " + OFFLINE_TABLE_NAME;
       Map<String, List<String>> routingTable =
-          routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery));
+          routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery), null);
       Assert.assertEquals(routingTable.keySet().size(), 1);
       servers.add(routingTable.keySet().iterator().next());
     }
diff --git a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/PartitionAwareRealtimeRoutingTableBuilderTest.java b/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/PartitionAwareRealtimeRoutingTableBuilderTest.java
index 7f349f8..93f2c9a 100644
--- a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/PartitionAwareRealtimeRoutingTableBuilderTest.java
+++ b/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/PartitionAwareRealtimeRoutingTableBuilderTest.java
@@ -15,7 +15,8 @@
  */
 package com.linkedin.pinot.broker.routing.builder;
 
-import com.linkedin.pinot.broker.routing.FakePropertyStore;
+import com.linkedin.pinot.broker.routing.selector.SegmentSelector;
+import com.linkedin.pinot.broker.util.FakePropertyStore;
 import com.linkedin.pinot.broker.routing.RoutingTableLookupRequest;
 import com.linkedin.pinot.common.config.ColumnPartitionConfig;
 import com.linkedin.pinot.common.config.RoutingConfig;
@@ -102,7 +103,7 @@ public class PartitionAwareRealtimeRoutingTableBuilderTest {
       // Check the query that requires to scan all segment.
       String countStarQuery = "select count(*) from myTable";
       Map<String, List<String>> routingTable =
-          routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery));
+          routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery), null);
 
       // Check that all segments are covered exactly for once.
       Set<String> assignedSegments = new HashSet<>();
@@ -117,7 +118,7 @@ public class PartitionAwareRealtimeRoutingTableBuilderTest {
       // Check the broker side server and segment pruning.
       for (int queryPartition = 0; queryPartition < 100; queryPartition++) {
         String filterQuery = "select count(*) from myTable where " + PARTITION_COLUMN + " = " + queryPartition;
-        routingTable = routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(filterQuery));
+        routingTable = routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(filterQuery), null);
 
         int partition = queryPartition % NUM_PARTITION;
         assignedSegments = new HashSet<>();
@@ -178,7 +179,7 @@ public class PartitionAwareRealtimeRoutingTableBuilderTest {
     // Check the query that requires to scan all segment.
     String countStarQuery = "select count(*) from myTable";
     Map<String, List<String>> routingTable =
-        routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery));
+        routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery), null);
 
     // Check that all segments are covered exactly for once.
     Set<String> assignedSegments = new HashSet<>();
@@ -253,7 +254,7 @@ public class PartitionAwareRealtimeRoutingTableBuilderTest {
     for (int i = 0; i < 100; i++) {
       String countStarQuery = "select count(*) from " + REALTIME_TABLE_NAME;
       Map<String, List<String>> routingTable =
-          routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery));
+          routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery), null);
       Assert.assertEquals(routingTable.keySet().size(), 1);
       servers.addAll(routingTable.keySet());
     }
diff --git a/pinot-broker/src/test/java/com/linkedin/pinot/broker/selector/MergeSegmentSelectorTest.java b/pinot-broker/src/test/java/com/linkedin/pinot/broker/selector/MergeSegmentSelectorTest.java
new file mode 100644
index 0000000..738e943
--- /dev/null
+++ b/pinot-broker/src/test/java/com/linkedin/pinot/broker/selector/MergeSegmentSelectorTest.java
@@ -0,0 +1,77 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.broker.selector;
+
+import com.fasterxml.uuid.Generators;
+import com.linkedin.pinot.broker.routing.selector.MergedSegmentSelector;
+import com.linkedin.pinot.broker.util.FakePropertyStore;
+import com.linkedin.pinot.common.config.TableConfig;
+import com.linkedin.pinot.common.lineage.SegmentMergeLineage;
+import com.linkedin.pinot.common.lineage.SegmentMergeLineageAccessHelper;
+import com.linkedin.pinot.common.utils.CommonConstants;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.testng.annotations.Test;
+
+
+public class MergeSegmentSelectorTest {
+  private static final String TEST_TABLE_NAME = "test_OFFLINE";
+
+  @Test
+  public void testMergeSegmentSelector() throws Exception {
+    MergedSegmentSelector segmentSelector = new MergedSegmentSelector();
+
+    TableConfig tableConfig =
+        new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(TEST_TABLE_NAME).build();
+    ZkHelixPropertyStore<ZNRecord> fakePropertyStore = new FakePropertyStore();
+    segmentSelector.init(tableConfig, fakePropertyStore);
+
+    SegmentMergeLineageAccessHelper.writeSegmentMergeLineage(fakePropertyStore, generateFakeMergeLineage(), 0);
+    segmentSelector.computeOnExternalViewChange();
+
+    Set<String> segmentsToQuery = new HashSet<>(
+        Arrays.asList(new String[]{"segment0", "segment1", "segment2", "segment3", "merged_segment0", "merged_segment1"}));
+
+    System.out.println("segments to query: " + segmentsToQuery);
+    System.out.println("segments after filter: " + segmentSelector.selectSegments(null, segmentsToQuery));
+
+  }
+
+  private SegmentMergeLineage generateFakeMergeLineage() throws Exception {
+    SegmentMergeLineage segmentMergeLineage = new SegmentMergeLineage(TEST_TABLE_NAME);
+
+    List<String> childrenGroups = new ArrayList<>();
+    for (int i = 0; i < 3; i++) {
+      String groupId = Generators.timeBasedGenerator().generate().toString();
+      segmentMergeLineage.addSegmentGroup(groupId, Arrays.asList(new String[]{"segment" + i}), null);
+      childrenGroups.add(groupId);
+    }
+
+    String mergedGroupId = Generators.timeBasedGenerator().generate().toString();
+    segmentMergeLineage.addSegmentGroup(mergedGroupId,
+        Arrays.asList(new String[]{"merged_segment0", "merged_segment1"}), childrenGroups);
+
+    String unmergedSegmentGroupId = Generators.timeBasedGenerator().generate().toString();
+    segmentMergeLineage.addSegmentGroup(unmergedSegmentGroupId, Arrays.asList(new String[]{"segment3"}), null);
+
+    return segmentMergeLineage;
+  }
+}
diff --git a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/FakePropertyStore.java b/pinot-broker/src/test/java/com/linkedin/pinot/broker/util/FakePropertyStore.java
similarity index 88%
rename from pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/FakePropertyStore.java
rename to pinot-broker/src/test/java/com/linkedin/pinot/broker/util/FakePropertyStore.java
index 6bb8322..e18c9e2 100644
--- a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/FakePropertyStore.java
+++ b/pinot-broker/src/test/java/com/linkedin/pinot/broker/util/FakePropertyStore.java
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package com.linkedin.pinot.broker.routing;
+package com.linkedin.pinot.broker.util;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -42,6 +42,17 @@ public class FakePropertyStore extends ZkHelixPropertyStore<ZNRecord> {
   }
 
   @Override
+  public boolean set(String path, ZNRecord stat, int expectedVersion, int options) {
+    try {
+      setContents(path, stat);
+      return true;
+    } catch (Exception e) {
+      return false;
+    }
+  }
+
+
+  @Override
   public boolean set(String path, ZNRecord stat, int options) {
     try {
       setContents(path, stat);
diff --git a/pinot-common/pom.xml b/pinot-common/pom.xml
index 95e4fc8..6f59e95 100644
--- a/pinot-common/pom.xml
+++ b/pinot-common/pom.xml
@@ -228,6 +228,10 @@
       <artifactId>jackson-annotations</artifactId>
     </dependency>
     <dependency>
+      <groupId>com.fasterxml.uuid</groupId>
+      <artifactId>java-uuid-generator</artifactId>
+    </dependency>
+    <dependency>
       <groupId>com.google.code.findbugs</groupId>
       <artifactId>jsr305</artifactId>
     </dependency>
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/config/MultiLevelRollupSetting.java b/pinot-common/src/main/java/com/linkedin/pinot/common/config/MultiLevelRollupSetting.java
new file mode 100644
index 0000000..d9da264
--- /dev/null
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/config/MultiLevelRollupSetting.java
@@ -0,0 +1,77 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.common.config;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class MultiLevelRollupSetting {
+  @ConfigKey("timeInputFormat")
+  public String _timeInputFormat;
+
+  @ConfigKey("timeOutputFormat")
+  public String _timeOutputFormat;
+
+  @ConfigKey("timeOutputGranularity")
+  public String _timeOutputGranularity;
+
+  @ConfigKey("minNumSegments")
+  public int _minNumSegments;
+
+  @ConfigKey("minNumTotalDocs")
+  public int _minNumTotalDocs;
+
+  public String getTimeInputFormat() {
+    return _timeInputFormat;
+  }
+
+  public void setTimeInputFormat(String timeInputFormat) {
+    _timeInputFormat = timeInputFormat;
+  }
+
+  public String getTimeOutputFormat() {
+    return _timeOutputFormat;
+  }
+
+  public void setTimeOutputFormat(String timeOutputFormat) {
+    _timeOutputFormat = timeOutputFormat;
+  }
+
+  public String getTimeOutputGranularity() {
+    return _timeOutputGranularity;
+  }
+
+  public void setTimeOutputGranularity(String timeOutputGranularity) {
+    _timeOutputGranularity = timeOutputGranularity;
+  }
+
+  public int getMinNumSegments() {
+    return _minNumSegments;
+  }
+
+  public void setMinNumSegments(int minNumSegments) {
+    _minNumSegments = minNumSegments;
+  }
+
+  public int getMinNumTotalDocs() {
+    return _minNumTotalDocs;
+  }
+
+  public void setMinNumTotalDocs(int minNumTotalDocs) {
+    _minNumTotalDocs = minNumTotalDocs;
+  }
+}
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/config/RollupConfig.java b/pinot-common/src/main/java/com/linkedin/pinot/common/config/RollupConfig.java
new file mode 100644
index 0000000..c86099f
--- /dev/null
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/config/RollupConfig.java
@@ -0,0 +1,59 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.common.config;
+
+import java.util.List;
+import java.util.Map;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class RollupConfig {
+
+  @ConfigKey("rollupType")
+  String rollupType;
+
+  @ConfigKey("preAggregateType")
+  @UseChildKeyHandler(SimpleMapChildKeyHandler.class)
+  Map<String, String> _preAggregateType;
+
+  @ConfigKey("multiLevelRollupSettings")
+  List<MultiLevelRollupSetting> _multiLevelRollupSettings;
+
+  public String getRollupType() {
+    return rollupType;
+  }
+
+  public void setRollupType(String rollupType) {
+    this.rollupType = rollupType;
+  }
+
+  public Map<String, String> getPreAggregateType() {
+    return _preAggregateType;
+  }
+
+  public void setPreAggregateType(Map<String, String> preAggregateType) {
+    _preAggregateType = preAggregateType;
+  }
+
+  public List<MultiLevelRollupSetting> getMultiLevelRollupSettings() {
+    return _multiLevelRollupSettings;
+  }
+
+  public void setMultiLevelRollupSettings(List<MultiLevelRollupSetting> multiLevelRollupSettings) {
+    _multiLevelRollupSettings = multiLevelRollupSettings;
+  }
+
+}
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/config/SegmentMergeConfig.java b/pinot-common/src/main/java/com/linkedin/pinot/common/config/SegmentMergeConfig.java
new file mode 100644
index 0000000..d96952f
--- /dev/null
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/config/SegmentMergeConfig.java
@@ -0,0 +1,137 @@
+/**
+ * Copyright (C) 2014-2016 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.common.config;
+
+import javax.annotation.Nonnull;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class SegmentMergeConfig {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(RollupConfig.class);
+
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+  private static final String MERGE_TYPE = "mergeType";
+  private static final String MERGE_STRATEGY = "mergeStrategy";
+  private static final String MIN_NUM_SEGMENTS = "minNumSegments";
+  private static final String MIN_NUM_TOTAL_DOCS = "minNumTotalDocs";
+  private static final String ROLLUP_CONFIG = "rollupConfig";
+  private static final String OLD_SEGMENT_CLEANUP = "oldSegmentCleanup";
+
+  @ConfigKey(value = "mergeType")
+  private String _mergeType;
+
+  @ConfigKey(value = "mergeStrategy")
+  private String _mergeStrategy;
+
+  @ConfigKey(value = "rollupConfig")
+  private RollupConfig _rollupConfig;
+
+  @ConfigKey(value = "minNumSegments")
+  private int _minNumSegments;
+
+  @ConfigKey(value = "minNumTotalDocs")
+  private long _minNumTotalDocs;
+
+  @ConfigKey(value = "oldSegmentCleanup")
+  private boolean _oldSegmentCleanup;
+
+  public String getMergeType() {
+    return _mergeType;
+  }
+
+  public void setMergeType(String mergeType) {
+    _mergeType = mergeType;
+  }
+
+  public String getMergeStrategy() {
+    return _mergeStrategy;
+  }
+
+  public void setMergeStrategy(String mergeStrategy) {
+    _mergeStrategy = mergeStrategy;
+  }
+
+  public int getMinNumSegments() {
+    return _minNumSegments;
+  }
+
+  public void setMinNumSegments(int minNumSegments) {
+    _minNumSegments = minNumSegments;
+  }
+
+  public long getMinNumTotalDocs() {
+    return _minNumTotalDocs;
+  }
+
+  public void setMinNumTotalDocs(long minNumTotalDocs) {
+    _minNumTotalDocs = minNumTotalDocs;
+  }
+
+  public boolean getOldSegmentCleanup() {
+    return _oldSegmentCleanup;
+  }
+
+  public void setOldSegmentCleanup(boolean oldSegmentCleanup) {
+    _oldSegmentCleanup = oldSegmentCleanup;
+  }
+
+  public RollupConfig getRollupConfig() {
+    return _rollupConfig;
+  }
+
+  public void setRollupConfig(RollupConfig rollupConfig) {
+    _rollupConfig = rollupConfig;
+  }
+
+  @Nonnull
+  public JSONObject toJSON() {
+    JSONObject mergeRollupConfigJsonObject = new JSONObject();
+    try {
+      mergeRollupConfigJsonObject.put(MERGE_TYPE, _mergeType);
+      mergeRollupConfigJsonObject.put(MERGE_STRATEGY, _mergeStrategy);
+      mergeRollupConfigJsonObject.put(ROLLUP_CONFIG, _rollupConfig);
+      mergeRollupConfigJsonObject.put(MIN_NUM_SEGMENTS, _minNumSegments);
+      mergeRollupConfigJsonObject.put(MIN_NUM_TOTAL_DOCS, _minNumTotalDocs);
+      mergeRollupConfigJsonObject.put(OLD_SEGMENT_CLEANUP, _oldSegmentCleanup);
+    } catch (Exception e) {
+      LOGGER.error("Failed to convert rollup config to json", e);
+    }
+
+    return mergeRollupConfigJsonObject;
+  }
+
+  public String toJSONString() {
+    try {
+      return toJSON().toString(2);
+    } catch (Exception e) {
+      return e.toString();
+    }
+  }
+
+  public static void main(String[] args) {
+    SegmentMergeConfig config = new SegmentMergeConfig();
+    config.setMergeType("CONCATENATE");
+    System.out.println(config.toJSONString());
+    System.out.println(config.toJSON());
+  }
+}
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/config/TableConfig.java b/pinot-common/src/main/java/com/linkedin/pinot/common/config/TableConfig.java
index 960c7f0..b4fa08a 100644
--- a/pinot-common/src/main/java/com/linkedin/pinot/common/config/TableConfig.java
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/config/TableConfig.java
@@ -47,6 +47,7 @@ public class TableConfig {
   private static final String QUOTA_CONFIG_KEY = "quota";
   private static final String TASK_CONFIG_KEY = "task";
   private static final String ROUTING_CONFIG_KEY = "routing";
+  private static final String SEGMENT_MERGE_CONFIG_KEY = "segmentMergeConfig";
 
   @ConfigKey("name")
   @ConfigDoc(value = "The name for the table.", mandatory = true, exampleValue = "myTable")
@@ -78,6 +79,9 @@ public class TableConfig {
   @NestedConfig
   private RoutingConfig _routingConfig;
 
+  @NestedConfig
+  private SegmentMergeConfig _segmentMergeConfig;
+
   public TableConfig() {
     // TODO: currently these 2 fields are annotated as non-null. Revisit to see whether that's necessary
     _tenantConfig = new TenantConfig();
@@ -87,7 +91,8 @@ public class TableConfig {
   private TableConfig(@Nonnull String tableName, @Nonnull TableType tableType,
       @Nonnull SegmentsValidationAndRetentionConfig validationConfig, @Nonnull TenantConfig tenantConfig,
       @Nonnull IndexingConfig indexingConfig, @Nonnull TableCustomConfig customConfig,
-      @Nullable QuotaConfig quotaConfig, @Nullable TableTaskConfig taskConfig, @Nullable RoutingConfig routingConfig) {
+      @Nullable QuotaConfig quotaConfig, @Nullable TableTaskConfig taskConfig, @Nullable RoutingConfig routingConfig,
+      @Nullable SegmentMergeConfig segmentMergeConfig) {
     _tableName = TableNameBuilder.forType(tableType).tableNameWithType(tableName);
     _tableType = tableType;
     _validationConfig = validationConfig;
@@ -97,19 +102,18 @@ public class TableConfig {
     _quotaConfig = quotaConfig;
     _taskConfig = taskConfig;
     _routingConfig = routingConfig;
+    _segmentMergeConfig = segmentMergeConfig;
   }
 
   // For backward compatible
   @Deprecated
   @Nonnull
-  public static TableConfig init(@Nonnull String jsonConfigString)
-      throws IOException, JSONException {
+  public static TableConfig init(@Nonnull String jsonConfigString) throws IOException, JSONException {
     return fromJSONConfig(new JSONObject(jsonConfigString));
   }
 
   @Nonnull
-  public static TableConfig fromJSONConfig(@Nonnull JSONObject jsonConfig)
-      throws IOException, JSONException {
+  public static TableConfig fromJSONConfig(@Nonnull JSONObject jsonConfig) throws IOException, JSONException {
     TableType tableType = TableType.valueOf(jsonConfig.getString(TABLE_TYPE_KEY).toUpperCase());
     String tableName = TableNameBuilder.forType(tableType).tableNameWithType(jsonConfig.getString(TABLE_NAME_KEY));
     SegmentsValidationAndRetentionConfig validationConfig =
@@ -136,13 +140,17 @@ public class TableConfig {
           OBJECT_MAPPER.readValue(jsonConfig.getJSONObject(ROUTING_CONFIG_KEY).toString(), RoutingConfig.class);
     }
 
+    SegmentMergeConfig segmentMergeConfig = null;
+    if (jsonConfig.has(SEGMENT_MERGE_CONFIG_KEY)) {
+      segmentMergeConfig = OBJECT_MAPPER.readValue(jsonConfig.getJSONObject(SEGMENT_MERGE_CONFIG_KEY).toString(), SegmentMergeConfig.class);
+    }
+
     return new TableConfig(tableName, tableType, validationConfig, tenantConfig, indexingConfig, customConfig,
-        quotaConfig, taskConfig, routingConfig);
+        quotaConfig, taskConfig, routingConfig, segmentMergeConfig);
   }
 
   @Nonnull
-  public static JSONObject toJSONConfig(@Nonnull TableConfig tableConfig)
-      throws IOException, JSONException {
+  public static JSONObject toJSONConfig(@Nonnull TableConfig tableConfig) throws IOException, JSONException {
     JSONObject jsonConfig = new JSONObject();
     jsonConfig.put(TABLE_NAME_KEY, tableConfig._tableName);
     jsonConfig.put(TABLE_TYPE_KEY, tableConfig._tableType.toString());
@@ -160,12 +168,14 @@ public class TableConfig {
     if (tableConfig._routingConfig != null) {
       jsonConfig.put(ROUTING_CONFIG_KEY, new JSONObject(OBJECT_MAPPER.writeValueAsString(tableConfig._routingConfig)));
     }
+    if (tableConfig._segmentMergeConfig != null) {
+      jsonConfig.put(SEGMENT_MERGE_CONFIG_KEY, new JSONObject(OBJECT_MAPPER.writeValueAsString(tableConfig._segmentMergeConfig)));
+    }
     return jsonConfig;
   }
 
   @Nonnull
-  public static TableConfig fromZnRecord(@Nonnull ZNRecord znRecord)
-      throws IOException, JSONException {
+  public static TableConfig fromZnRecord(@Nonnull ZNRecord znRecord) throws IOException, JSONException {
     Map<String, String> simpleFields = znRecord.getSimpleFields();
     TableType tableType = TableType.valueOf(simpleFields.get(TABLE_TYPE_KEY).toUpperCase());
     String tableName = TableNameBuilder.forType(tableType).tableNameWithType(simpleFields.get(TABLE_NAME_KEY));
@@ -194,8 +204,14 @@ public class TableConfig {
       routingConfig = OBJECT_MAPPER.readValue(routingConfigString, RoutingConfig.class);
     }
 
+    String segmentMergeConfigString = simpleFields.get(SEGMENT_MERGE_CONFIG_KEY);
+    SegmentMergeConfig segmentMergeConfig = null;
+    if (segmentMergeConfigString != null) {
+      segmentMergeConfig = OBJECT_MAPPER.readValue(segmentMergeConfigString, SegmentMergeConfig.class);
+    }
+
     return new TableConfig(tableName, tableType, validationConfig, tenantConfig, indexingConfig, customConfig,
-        quotaConfig, taskConfig, routingConfig);
+        quotaConfig, taskConfig, routingConfig, segmentMergeConfig);
   }
 
   @Nonnull
@@ -218,6 +234,9 @@ public class TableConfig {
       if (tableConfig._routingConfig != null) {
         simpleFields.put(ROUTING_CONFIG_KEY, OBJECT_MAPPER.writeValueAsString(tableConfig._routingConfig));
       }
+      if (tableConfig._segmentMergeConfig != null) {
+        simpleFields.put(SEGMENT_MERGE_CONFIG_KEY, OBJECT_MAPPER.writeValueAsString(tableConfig._segmentMergeConfig));
+      }
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
@@ -305,9 +324,17 @@ public class TableConfig {
     _routingConfig = routingConfig;
   }
 
+  @Nullable
+  public SegmentMergeConfig getSegmentMergeConfig() {
+    return _segmentMergeConfig;
+  }
+
+  public void setSegmentMergeConfig(SegmentMergeConfig segmentMergeConfig) {
+    _segmentMergeConfig = segmentMergeConfig;
+  }
+
   @Nonnull
-  public String toJSONConfigString()
-      throws IOException, JSONException {
+  public String toJSONConfigString() throws IOException, JSONException {
     return toJSONConfig(this).toString();
   }
 
@@ -332,18 +359,14 @@ public class TableConfig {
 
     TableConfig that = (TableConfig) o;
 
-    return EqualityUtils.isEqual(_tableName, that._tableName) &&
-        EqualityUtils.isEqual(_tableType, that._tableType) &&
-        EqualityUtils.isEqual(_validationConfig, that._validationConfig) &&
-        EqualityUtils.isEqual(_tenantConfig, that._tenantConfig) &&
-        EqualityUtils.isEqual(_indexingConfig, that._indexingConfig) &&
-        EqualityUtils.isEqual(_customConfig, that._customConfig) &&
-        EqualityUtils.isEqual(_quotaConfig, that._quotaConfig) &&
-        EqualityUtils.isEqual(_taskConfig, that._taskConfig) &&
-        EqualityUtils.isEqual(_routingConfig, that._routingConfig);
+    return EqualityUtils.isEqual(_tableName, that._tableName) && EqualityUtils.isEqual(_tableType, that._tableType)
+        && EqualityUtils.isEqual(_validationConfig, that._validationConfig) && EqualityUtils.isEqual(_tenantConfig,
+        that._tenantConfig) && EqualityUtils.isEqual(_indexingConfig, that._indexingConfig) && EqualityUtils.isEqual(
+        _customConfig, that._customConfig) && EqualityUtils.isEqual(_quotaConfig, that._quotaConfig)
+        && EqualityUtils.isEqual(_taskConfig, that._taskConfig) && EqualityUtils.isEqual(_routingConfig,
+        that._routingConfig);
   }
 
-
   @Override
   public int hashCode() {
     int result = EqualityUtils.hashCodeOf(_tableName);
@@ -401,9 +424,11 @@ public class TableConfig {
     private QuotaConfig _quotaConfig;
     private TableTaskConfig _taskConfig;
     private RoutingConfig _routingConfig;
+    private SegmentMergeConfig _segmentMergeConfig;
     private HllConfig _hllConfig;
     private StarTreeIndexSpec _starTreeIndexSpec;
 
+
     public Builder(TableType tableType) {
       _tableType = tableType;
     }
@@ -549,8 +574,15 @@ public class TableConfig {
       return this;
     }
 
-    public TableConfig build()
-        throws IOException, JSONException {
+    public SegmentMergeConfig getSegmentMergeConfig() {
+      return _segmentMergeConfig;
+    }
+
+    public void setSegmentMergeConfig(SegmentMergeConfig segmentMergeConfig) {
+      _segmentMergeConfig = segmentMergeConfig;
+    }
+
+    public TableConfig build() throws IOException, JSONException {
       // Validation config
       SegmentsValidationAndRetentionConfig validationConfig = new SegmentsValidationAndRetentionConfig();
       validationConfig.setTimeColumnName(_timeColumnName);
@@ -598,7 +630,7 @@ public class TableConfig {
       }
 
       return new TableConfig(_tableName, _tableType, validationConfig, tenantConfig, indexingConfig, _customConfig,
-          _quotaConfig, _taskConfig, _routingConfig);
+          _quotaConfig, _taskConfig, _routingConfig, _segmentMergeConfig);
     }
   }
 }
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentGroup.java b/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentGroup.java
new file mode 100644
index 0000000..fb28ac1
--- /dev/null
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentGroup.java
@@ -0,0 +1,72 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.common.lineage;
+
+import java.util.List;
+import java.util.Set;
+
+
+/**
+ * Class to represent segment group
+ */
+public class SegmentGroup {
+
+  private String _groupId;
+  private int _groupLevel;
+  private SegmentGroup _parentGroup;
+  private List<SegmentGroup> _childrenGroups;
+  private Set<String> _segments;
+
+  public String getGroupId() {
+    return _groupId;
+  }
+
+  public void setGroupId(String groupId) {
+    _groupId = groupId;
+  }
+
+  public SegmentGroup getParentGroup() {
+    return _parentGroup;
+  }
+
+  public void setParentGroup(SegmentGroup parentGroup) {
+    _parentGroup = parentGroup;
+  }
+
+  public List<SegmentGroup> getChildrenGroups() {
+    return _childrenGroups;
+  }
+
+  public void setChildrenGroups(List<SegmentGroup> childrenGroups) {
+    _childrenGroups = childrenGroups;
+  }
+
+  public Set<String> getSegments() {
+    return _segments;
+  }
+
+  public void setSegments(Set<String> segments) {
+    _segments = segments;
+  }
+
+  public int getGroupLevel() {
+    return _groupLevel;
+  }
+
+  public void setGroupLevel(int groupLevel) {
+    _groupLevel = groupLevel;
+  }
+}
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentMergeLineage.java b/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentMergeLineage.java
new file mode 100644
index 0000000..f4a5bdf
--- /dev/null
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentMergeLineage.java
@@ -0,0 +1,333 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.common.lineage;
+
+import com.linkedin.pinot.common.exception.InvalidConfigException;
+import com.linkedin.pinot.common.utils.EqualityUtils;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.ZNRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Class to represent segment merge lineage information.
+ *
+ * Segment merge lineage information is serialized into a znode and stored in a helix property store (zookeeper). This
+ * information will be used by the broker, segment merge task generator, and retention manager.
+ *
+ * For each segment group, we are storing the following information:
+ * 1. group id
+ *    - group identifier (will be stored in time based uuid format)
+ * 2. group level
+ *    - segment level allows us to have a hierarchical representation of the segment lineage. When we assign the merge
+ *      task, we will only merge/roll-up segments with the same level.
+ *      (e.g. If hourly segment groups are in level 0, daily segment groups will belong to level 1)
+ * 3. segments
+ *    - segments that belong to a particular segment group
+ * 4. lineage information
+ *    - If a segment group is created by merging multiple children segment groups, we write the lineage information
+ *      (e.g. segment group C is merged from segment group A, B)
+ */
+public class SegmentMergeLineage {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentMergeLineage.class);
+
+  private static final String LEVEL_KEY_PREFIX = "level_";
+  private static final String ROOT_NODE_GROUP_ID = "root";
+  private static final String SEGMENT_DELIMITER = ",";
+  private static final int DEFAULT_GROUP_LEVEL = 0;
+
+  private String _tableNameWithType;
+
+  // Mapping of group id to children group ids
+  private Map<String, List<String>> _parentGroupToChildrenGroupsMap;
+
+  // Mapping of group level to group id to segments that belong to a group
+  // Segment level represents
+  private Map<Integer, Map<String, List<String>>> _levelToGroupToSegmentsMap;
+
+  public SegmentMergeLineage(String tableNameWithType) {
+    _tableNameWithType = tableNameWithType;
+    _parentGroupToChildrenGroupsMap = new HashMap<>();
+    _levelToGroupToSegmentsMap = new HashMap<>();
+  }
+
+  public SegmentMergeLineage(String tableNameWithType, Map<String, List<String>> segmentGroupLineageMap,
+      Map<Integer, Map<String, List<String>>> levelToGroupToSegmentMap) {
+    _tableNameWithType = tableNameWithType;
+    _parentGroupToChildrenGroupsMap = segmentGroupLineageMap;
+    _levelToGroupToSegmentsMap = levelToGroupToSegmentMap;
+  }
+
+  public String getTableName() {
+    return _tableNameWithType;
+  }
+
+  public static SegmentMergeLineage fromZNRecord(ZNRecord record) {
+    String tableNameWithType = record.getId();
+    Map<String, List<String>> segmentGroupLineageMap = record.getListFields();
+
+    Map<Integer, Map<String, List<String>>> groupToSegmentsMap = new HashMap<>();
+    for (Map.Entry<String, Map<String, String>> entry : record.getMapFields().entrySet()) {
+      String levelKey = entry.getKey();
+      Integer level = Integer.parseInt(levelKey.substring(LEVEL_KEY_PREFIX.length()));
+      Map<String, List<String>> groupToSegmentsForLevel = new HashMap<>();
+      for (Map.Entry<String, String> groupEntry : entry.getValue().entrySet()) {
+        String groupId = groupEntry.getKey();
+        String segmentsString = groupEntry.getValue();
+        List<String> segments = Arrays.asList(segmentsString.split(SEGMENT_DELIMITER));
+        groupToSegmentsForLevel.put(groupId, new ArrayList<>(segments));
+      }
+      groupToSegmentsMap.put(level, groupToSegmentsForLevel);
+    }
+    return new SegmentMergeLineage(tableNameWithType, segmentGroupLineageMap, groupToSegmentsMap);
+  }
+
+  public ZNRecord toZNRecord() {
+    ZNRecord record = new ZNRecord(_tableNameWithType);
+    record.setListFields(_parentGroupToChildrenGroupsMap);
+    Map<String, Map<String, String>> groupToSegmentsMap = new HashMap<>();
+
+    for (Map.Entry<Integer, Map<String, List<String>>> entry : _levelToGroupToSegmentsMap.entrySet()) {
+      String key = LEVEL_KEY_PREFIX + entry.getKey();
+      Map<String, String> groupSegmentsForLevel = new HashMap<>();
+      for (Map.Entry<String, List<String>> groupEntry : entry.getValue().entrySet()) {
+        String groupId = groupEntry.getKey();
+        String segments = String.join(SEGMENT_DELIMITER, groupEntry.getValue());
+        groupSegmentsForLevel.put(groupId, segments);
+      }
+      groupToSegmentsMap.put(key, groupSegmentsForLevel);
+    }
+    record.setMapFields(groupToSegmentsMap);
+
+    return record;
+  }
+
+  /**
+   * Add segment merge lineage information
+   *
+   * @param groupId a group id
+   * @param currentGroupSegments a list of segments that belongs to the group
+   * @param childrenGroups a list of children groups that the current group covers
+   */
+  public void addSegmentGroup(String groupId, List<String> currentGroupSegments, List<String> childrenGroups)
+      throws InvalidConfigException {
+    // Get group level
+    Integer groupLevel = getGroupLevel(childrenGroups);
+
+    // Update group to segments map
+    Map<String, List<String>> groupToSegmentMap =
+        _levelToGroupToSegmentsMap.computeIfAbsent(groupLevel, k -> new HashMap<>());
+    if (groupToSegmentMap.containsKey(groupId)) {
+      throw new InvalidConfigException("Group id : " + groupId + " already exists.");
+    }
+    groupToSegmentMap.put(groupId, new ArrayList<>(currentGroupSegments));
+    _levelToGroupToSegmentsMap.put(groupLevel, groupToSegmentMap);
+
+    // Update segment group lineage map
+    if (groupLevel > DEFAULT_GROUP_LEVEL) {
+      if (_parentGroupToChildrenGroupsMap.containsKey(groupId)) {
+        throw new InvalidConfigException("Group id : " + groupId + " already exists.");
+      } else {
+        _parentGroupToChildrenGroupsMap.put(groupId, new ArrayList<>(childrenGroups));
+      }
+    }
+
+    LOGGER.info("New group has been added successfully to the segment lineage. (groupId: {}, currentGroupSegments: {}, "
+        + "childrenGroups: {}", groupId, currentGroupSegments, childrenGroups);
+  }
+
+  /**
+   * Remove segment merge information given a group id
+   *
+   * @param groupId a group id
+   */
+  public void removeSegmentGroup(String groupId) {
+    // Clean up the group id from parent to children group mapping
+    _parentGroupToChildrenGroupsMap.remove(groupId);
+    for (List<String> childrenGroups : _parentGroupToChildrenGroupsMap.values()) {
+      childrenGroups.remove(groupId);
+    }
+
+    // Clean up the group id from group to segments mapping
+    for (Map<String, List<String>> groupToSegments : _levelToGroupToSegmentsMap.values()) {
+      groupToSegments.remove(groupId);
+    }
+
+    LOGGER.info("Group {} has been successfully removed.", groupId);
+  }
+
+  /**
+   * Construct a lineage tree and returns the root node
+   *
+   * @return a root node for lineage tree
+   */
+  public SegmentGroup getMergeLineageRootSegmentGroup() {
+    // Create group nodes
+    Map<String, SegmentGroup> groupNodes = new HashMap<>();
+    for (Map.Entry<Integer, Map<String, List<String>>> groupEntryForLevel : _levelToGroupToSegmentsMap.entrySet()) {
+      Integer level = groupEntryForLevel.getKey();
+      Map<String, List<String>> groupToSegmentsForLevel = groupEntryForLevel.getValue();
+      for (Map.Entry<String, List<String>> entry : groupToSegmentsForLevel.entrySet()) {
+        String groupId = entry.getKey();
+        List<String> segments = entry.getValue();
+        SegmentGroup groupNode = new SegmentGroup();
+        groupNode.setGroupId(groupId);
+        groupNode.setSegments(new HashSet<>(segments));
+        groupNode.setGroupLevel(level);
+        groupNodes.put(groupId, groupNode);
+      }
+    }
+
+    // Add edges by updating children & parent group information
+    for (Map.Entry<String, List<String>> lineageEntry : _parentGroupToChildrenGroupsMap.entrySet()) {
+      String parentGroupId = lineageEntry.getKey();
+      List<String> childrenGroupIds = lineageEntry.getValue();
+      List<SegmentGroup> childrenGroups = new ArrayList<>();
+      SegmentGroup parentNode = groupNodes.get(parentGroupId);
+      for (String groupId : childrenGroupIds) {
+        SegmentGroup childNode = groupNodes.get(groupId);
+        if (childNode != null) {
+          childrenGroups.add(childNode);
+          childNode.setParentGroup(parentNode);
+        }
+      }
+      parentNode.setChildrenGroups(childrenGroups);
+    }
+
+    // Create a root node
+    SegmentGroup root = new SegmentGroup();
+    root.setGroupId(ROOT_NODE_GROUP_ID);
+    List<SegmentGroup> childrenForRoot = new ArrayList<>();
+    for (SegmentGroup group : groupNodes.values()) {
+      if (group.getParentGroup() == null) {
+        group.setParentGroup(root);
+        childrenForRoot.add(group);
+      }
+    }
+    root.setChildrenGroups(childrenForRoot);
+
+    return root;
+  }
+
+  /**
+   * Get a list of segments for a given group id
+   *
+   * @param groupId a group id
+   * @return a list of segments that belongs to the given group id, null if the group does not exist
+   */
+  public List<String> getSegmentsForGroup(String groupId) {
+    for (Map<String, List<String>> groupToSegmentMap : _levelToGroupToSegmentsMap.values()) {
+      List<String> segments = groupToSegmentMap.get(groupId);
+      if (segments != null) {
+        return segments;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Get a list of children group ids for a given group id
+   *
+   * @param groupId a group id
+   * @return a list of children groups that are covered by the given group id, null if the group does not exist
+   */
+  public List<String> getChildrenForGroup(String groupId) {
+    return _parentGroupToChildrenGroupsMap.get(groupId);
+  }
+
+  /**
+   * Get a list of all group levels
+   *
+   * @return a list of all group levels
+   */
+  public List<Integer> getAllGroupLevels() {
+    List<Integer> groupLevels = new ArrayList<>(_levelToGroupToSegmentsMap.keySet());
+    Collections.sort(groupLevels);
+    return groupLevels;
+  }
+
+  /**
+   * Get a list of group ids for a given group level
+   *
+   * @param groupLevel a group level
+   * @return a list of group ids that belongs to the given group level, null if the group level does not exist
+   */
+  public List<String> getGroupIdsForGroupLevel(int groupLevel) {
+    Map<String, List<String>> groupToSegmentsMap = _levelToGroupToSegmentsMap.get(groupLevel);
+    if (groupToSegmentsMap != null) {
+      return new ArrayList<>(groupToSegmentsMap.keySet());
+    }
+    return null;
+  }
+
+  /**
+   * Helper function to compute group level given children groups
+   *
+   * @param childrenGroups a list of children group ids
+   * @return group level
+   */
+  private Integer getGroupLevel(List<String> childrenGroups) throws InvalidConfigException {
+    // If no children exists, the group belongs to the base level.
+    if (childrenGroups == null || childrenGroups.isEmpty()) {
+      return DEFAULT_GROUP_LEVEL;
+    }
+
+    for (Map.Entry<Integer, Map<String, List<String>>> entry : _levelToGroupToSegmentsMap.entrySet()) {
+      Integer currentLevel = entry.getKey();
+      Map<String, List<String>> currentLevelGroupToSegmentsMap = entry.getValue();
+      if (currentLevelGroupToSegmentsMap.keySet().containsAll(childrenGroups)) {
+        return currentLevel + 1;
+      }
+    }
+
+    // At this point, not all children groups are covered, cannot add group
+    throw new InvalidConfigException("Cannot compute group level because not all children groups exist "
+        + "in the segment merge lineage, children groups: " + childrenGroups);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (EqualityUtils.isSameReference(this, o)) {
+      return true;
+    }
+
+    if (EqualityUtils.isNullOrNotSameClass(this, o)) {
+      return false;
+    }
+
+    SegmentMergeLineage that = (SegmentMergeLineage) o;
+
+    return EqualityUtils.isEqual(_tableNameWithType, that._tableNameWithType) && EqualityUtils.isEqual(
+        _parentGroupToChildrenGroupsMap, that._parentGroupToChildrenGroupsMap) && EqualityUtils.isEqual(
+        _levelToGroupToSegmentsMap, that._levelToGroupToSegmentsMap);
+  }
+
+  @Override
+  public int hashCode() {
+    int result = EqualityUtils.hashCodeOf(_tableNameWithType);
+    result = EqualityUtils.hashCodeOf(result, _parentGroupToChildrenGroupsMap);
+    result = EqualityUtils.hashCodeOf(result, _levelToGroupToSegmentsMap);
+    return result;
+  }
+}
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentMergeLineageAccessHelper.java b/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentMergeLineageAccessHelper.java
new file mode 100644
index 0000000..00eb7c3
--- /dev/null
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentMergeLineageAccessHelper.java
@@ -0,0 +1,82 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.common.lineage;
+
+import com.linkedin.pinot.common.metadata.ZKMetadataProvider;
+import com.linkedin.pinot.common.utils.retry.RetryPolicies;
+import com.linkedin.pinot.common.utils.retry.RetryPolicy;
+import java.util.List;
+import org.apache.helix.AccessOption;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.zookeeper.data.Stat;
+
+
+/**
+ * Class to help to read, write segment merge lineage
+ */
+public class SegmentMergeLineageAccessHelper {
+
+  /**
+   * Read the segment merge lineage ZNRecord from the property store
+   *
+   * @param propertyStore a property store
+   * @param tableNameWithType a table name with type
+   * @return a ZNRecord of segment merge lineage
+   */
+  public static ZNRecord getSegmentMergeLineageZNRecord(ZkHelixPropertyStore<ZNRecord> propertyStore,
+      String tableNameWithType) {
+    String path = ZKMetadataProvider.constructPropertyStorePathForSegmentMergeLineage(tableNameWithType);
+    Stat stat = new Stat();
+    ZNRecord segmentMergeLineageZNRecord = propertyStore.get(path, stat, AccessOption.PERSISTENT);
+    if (segmentMergeLineageZNRecord != null) {
+      segmentMergeLineageZNRecord.setVersion(stat.getVersion());
+    }
+    return segmentMergeLineageZNRecord;
+  }
+
+  /**
+   * Read the segment merge lineage from the property store
+   *
+   * @param propertyStore  a property store
+   * @param tableNameWithType a table name with type
+   * @return a segment merge lineage
+   */
+  public static SegmentMergeLineage getSegmentMergeLineage(ZkHelixPropertyStore<ZNRecord> propertyStore,
+      String tableNameWithType) {
+    String path = ZKMetadataProvider.constructPropertyStorePathForSegmentMergeLineage(tableNameWithType);
+    ZNRecord segmentMergeLineageZNRecord = propertyStore.get(path, null, AccessOption.PERSISTENT);
+    return SegmentMergeLineage.fromZNRecord(segmentMergeLineageZNRecord);
+  }
+
+  /**
+   * Write the segment merge lineage to the property store
+   *
+   * @param propertyStore a property store
+   * @param segmentMergeLineage a segment merge lineage
+   * @return true if update is successful. false otherwise.
+   */
+  public static boolean writeSegmentMergeLineage(ZkHelixPropertyStore<ZNRecord> propertyStore,
+      SegmentMergeLineage segmentMergeLineage, int expectedVersion) {
+    String tableNameWithType = segmentMergeLineage.getTableName();
+    String path = ZKMetadataProvider.constructPropertyStorePathForSegmentMergeLineage(tableNameWithType);
+    return propertyStore.set(path, segmentMergeLineage.toZNRecord(), expectedVersion, AccessOption.PERSISTENT);
+  }
+}
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/metadata/ZKMetadataProvider.java b/pinot-common/src/main/java/com/linkedin/pinot/common/metadata/ZKMetadataProvider.java
index fe55a0c..e9d6b63 100644
--- a/pinot-common/src/main/java/com/linkedin/pinot/common/metadata/ZKMetadataProvider.java
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/metadata/ZKMetadataProvider.java
@@ -53,6 +53,7 @@ public class ZKMetadataProvider {
   private static final String PROPERTYSTORE_TABLE_CONFIGS_PREFIX = "/CONFIGS/TABLE";
   private static final String PROPERTYSTORE_INSTANCE_CONFIGS_PREFIX = "/CONFIGS/INSTANCE";
   private static final String PROPERTYSTORE_CLUSTER_CONFIGS_PREFIX = "/CONFIGS/CLUSTER";
+  private static final String PROPERTYSTORE_SEGMENT_MERGE_LINEAGE = "/SEGMENT_MERGE_LINEAGE";
 
   public static void setRealtimeTableConfig(ZkHelixPropertyStore<ZNRecord> propertyStore, String realtimeTableName,
       ZNRecord znRecord) {
@@ -106,6 +107,10 @@ public class ZKMetadataProvider {
     return StringUtil.join("/", PROPERTYSTORE_CLUSTER_CONFIGS_PREFIX, controllerConfigKey);
   }
 
+  public static String constructPropertyStorePathForSegmentMergeLineage(String tableNameWithType) {
+    return StringUtil.join("/", PROPERTYSTORE_SEGMENT_MERGE_LINEAGE, tableNameWithType);
+  }
+
   public static boolean isSegmentExisted(ZkHelixPropertyStore<ZNRecord> propertyStore, String resourceNameForResource,
       String segmentName) {
     return propertyStore.exists(constructPropertyStorePathForSegment(resourceNameForResource, segmentName),
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/com/linkedin/pinot/common/utils/FileUploadDownloadClient.java
index ca4b423..c8b15f2 100644
--- a/pinot-common/src/main/java/com/linkedin/pinot/common/utils/FileUploadDownloadClient.java
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/utils/FileUploadDownloadClient.java
@@ -26,6 +26,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.List;
 import javax.annotation.Nullable;
 import javax.net.ssl.SSLContext;
@@ -52,6 +53,7 @@ import org.apache.http.entity.mime.content.InputStreamBody;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
 import org.apache.http.util.EntityUtils;
+import org.json.JSONArray;
 import org.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -70,6 +72,7 @@ public class FileUploadDownloadClient implements Closeable {
     public static final String DOWNLOAD_URI = "DOWNLOAD_URI";
     public static final String SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER = "Pinot-SegmentZKMetadataCustomMapModifier";
     public static final String CRYPTER = "CRYPTER";
+    public static final String MERGED_SEGMENT_PUSH = "MERGED_SEGMENT_PUSH";
   }
 
   public static class QueryParameters {
@@ -94,6 +97,7 @@ public class FileUploadDownloadClient implements Closeable {
   private static final String SEGMENT_PATH = "/v2/segments";
   private static final String SEGMENT_METADATA_PATH = "/segmentmetadata";
   private static final String TABLES_PATH = "/tables";
+  private static final String LINEAGE_PATH = "/lineage";
   private static final String TYPE_DELIMITER = "?type=";
   private static final String SLASH = "/";
 
@@ -137,6 +141,12 @@ public class FileUploadDownloadClient implements Closeable {
     return getURI(HTTPS, host, port, SCHEMA_PATH);
   }
 
+  public static URI getUpdateSegmentMergeLineageHttpURI(String host, int port, String tableName, String type)
+      throws URISyntaxException {
+    String path = TABLES_PATH + SLASH + tableName + LINEAGE_PATH + TYPE_DELIMITER + type;
+    return getURI(HTTP, host, port, path);
+  }
+
   /**
    * This method calls the old segment upload endpoint. We will deprecate this behavior soon. Please call
    * getUploadSegmentHttpURI to construct your request.
@@ -154,7 +164,7 @@ public class FileUploadDownloadClient implements Closeable {
   public static URI getOldUploadSegmentHttpsURI(String host, int port) throws URISyntaxException {
     return getURI(HTTPS, host, port, OLD_SEGMENT_PATH);
   }
-  
+
   public static URI getUploadSegmentHttpURI(String host, int port) throws URISyntaxException {
     return getURI(HTTP, host, port, SEGMENT_PATH);
   }
@@ -222,6 +232,29 @@ public class FileUploadDownloadClient implements Closeable {
         parameters, socketTimeoutMs);
   }
 
+  private static HttpUriRequest getUpdateSegmentMergeLineageRequest(URI uri, List<String> childrenGroupIds,
+      List<String> segments, int socketTimeoutMs) {
+    JSONObject jsonObject = new JSONObject();
+    JSONArray childrenGroupIdsJsonArray = new JSONArray();
+//    for (String groupId : childrenGroupIds) {
+//      childrenGroupIdsJsonArray.put(groupId);
+//    }
+    jsonObject.put("childrenGroupIds", childrenGroupIds);
+
+//    JSONArray segmentsJsonArray = new JSONArray();
+//    for (String segment : segments) {
+//      segmentsJsonArray.put(segment);
+//    }
+    jsonObject.put("segments", segments);
+
+    RequestBuilder requestBuilder = RequestBuilder.post(uri)
+        .setVersion(HttpVersion.HTTP_1_1)
+        .setHeader(HttpHeaders.CONTENT_TYPE, "application/json")
+        .setEntity(new StringEntity(jsonObject.toString(), ContentType.APPLICATION_JSON));
+    setTimeout(requestBuilder, socketTimeoutMs);
+    return requestBuilder.build();
+  }
+
   private static HttpUriRequest getSendSegmentUriRequest(URI uri, String downloadUri, @Nullable List<Header> headers,
       @Nullable List<NameValuePair> parameters, int socketTimeoutMs) {
     RequestBuilder requestBuilder = RequestBuilder.post(uri)
@@ -398,6 +431,11 @@ public class FileUploadDownloadClient implements Closeable {
     return sendRequest(getUploadSegmentMetadataRequest(uri, segmentName, segmentMetadataFile, headers, parameters, socketTimeoutMs));
   }
 
+  public SimpleHttpResponse updateSegmentMergeLineage(URI uri, List<String> childrenGroupIds, List<String> segments, int socketTimeoutMs)
+      throws IOException, HttpErrorStatusException {
+    return sendRequest(getUpdateSegmentMergeLineageRequest(uri, childrenGroupIds, segments, socketTimeoutMs));
+  }
+
 
   /**
    * Upload segment with segment file.
@@ -595,4 +633,15 @@ public class FileUploadDownloadClient implements Closeable {
   public void close() throws IOException {
     _httpClient.close();
   }
+
+  public static void main(String[] args) {
+    List<String> list = new ArrayList<>();
+    list.add("aa");
+    list.add("bb");
+
+    JSONObject object = new JSONObject();
+    object.put("a", list);
+
+    System.out.println(object.toString());
+  }
 }
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/utils/UUIDUtils.java b/pinot-common/src/main/java/com/linkedin/pinot/common/utils/UUIDUtils.java
new file mode 100644
index 0000000..d54a579
--- /dev/null
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/utils/UUIDUtils.java
@@ -0,0 +1,29 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.common.utils;
+
+import java.util.UUID;
+
+
+public class UUIDUtils {
+  // Mimics the implementation from Hector's TimeUUIDUtils class:
+  // https://github.com/rantav/hector/blob/master/core/src/main/java/me/prettyprint/cassandra/utils/TimeUUIDUtils.java
+  private static final long NUM_100NS_INTERVALS_SINCE_UUID_EPOCH = 0x01b21dd213814000L;
+
+  public static long getUnixTimeFromUUID(UUID uuid) {
+    return (uuid.timestamp() - NUM_100NS_INTERVALS_SINCE_UUID_EPOCH) / 10000;
+  }
+}
diff --git a/pinot-common/src/test/java/com/linkedin/pinot/common/lineage/SegmentMergeLineageTest.java b/pinot-common/src/test/java/com/linkedin/pinot/common/lineage/SegmentMergeLineageTest.java
new file mode 100644
index 0000000..923fbfc
--- /dev/null
+++ b/pinot-common/src/test/java/com/linkedin/pinot/common/lineage/SegmentMergeLineageTest.java
@@ -0,0 +1,106 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.common.lineage;
+
+import com.linkedin.pinot.common.exception.InvalidConfigException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class SegmentMergeLineageTest {
+
+  @Test
+  public void testSegmentMergeLineage() throws Exception {
+    SegmentMergeLineage segmentMergeLineage = new SegmentMergeLineage("test_OFFLINE");
+    String groupId1 = "G1";
+    List<String> groupSegments1 = Arrays.asList(new String[]{"segment1", "segment2", "segment3"});
+    segmentMergeLineage.addSegmentGroup(groupId1, groupSegments1, null);
+    Assert.assertEquals(segmentMergeLineage.getSegmentsForGroup(groupId1), groupSegments1);
+
+    String groupId2 = "G2";
+    List<String> groupSegments2 = Arrays.asList(new String[]{"segment4", "segment5"});
+    segmentMergeLineage.addSegmentGroup(groupId2, groupSegments2, null);
+    Assert.assertEquals(segmentMergeLineage.getSegmentsForGroup(groupId2), groupSegments2);
+
+    String groupId3 = "G3";
+    List<String> groupSegments3 = Arrays.asList(new String[]{"segment6"});
+    segmentMergeLineage.addSegmentGroup(groupId3, groupSegments3, Arrays.asList(new String[]{groupId1, groupId2}));
+    Assert.assertEquals(segmentMergeLineage.getSegmentsForGroup(groupId3), groupSegments3);
+
+    // Check available APIs
+    Assert.assertEquals(segmentMergeLineage.getTableName(), "test_OFFLINE");
+    Assert.assertEquals(segmentMergeLineage.getChildrenForGroup(groupId3),
+        Arrays.asList(new String[]{groupId1, groupId2}));
+    Assert.assertEquals(segmentMergeLineage.getAllGroupLevels(), Arrays.asList(new Integer[]{0, 1}));
+    Assert.assertTrue(segmentMergeLineage.equals(SegmentMergeLineage.fromZNRecord(segmentMergeLineage.toZNRecord())));
+    Assert.assertEquals(segmentMergeLineage.getGroupIdsForGroupLevel(0),
+        Arrays.asList(new String[]{groupId1, groupId2}));
+    Assert.assertEquals(segmentMergeLineage.getGroupIdsForGroupLevel(1),
+        Arrays.asList(new String[]{groupId3}));
+    validateSegmentGroup(segmentMergeLineage);
+
+    // Check ZNRecord conversion
+    Assert.assertEquals(segmentMergeLineage, SegmentMergeLineage.fromZNRecord(segmentMergeLineage.toZNRecord()));
+
+    // Test removing groups
+    segmentMergeLineage.removeSegmentGroup(groupId1);
+    Assert.assertNull(segmentMergeLineage.getChildrenForGroup(groupId1));
+    Assert.assertNull(segmentMergeLineage.getSegmentsForGroup(groupId1));
+    Assert.assertFalse(segmentMergeLineage.getGroupIdsForGroupLevel(0).contains(groupId1));
+  }
+
+  @Test(expectedExceptions = InvalidConfigException.class)
+  public void testUpdateWithDuplicateGroupId() throws Exception {
+    SegmentMergeLineage segmentMergeLineage = new SegmentMergeLineage("test_OFFLINE");
+    String groupId1 = "G1";
+    List<String> groupSegments1 = Arrays.asList(new String[]{"segment1, segment2, segment3"});
+    segmentMergeLineage.addSegmentGroup(groupId1, groupSegments1, null);
+    Assert.assertEquals(segmentMergeLineage.getSegmentsForGroup(groupId1), groupSegments1);
+
+    List<String> groupSegments2 = Arrays.asList(new String[]{"segment4, segment5, segment6"});
+    segmentMergeLineage.addSegmentGroup(groupId1, groupSegments2, null);
+  }
+
+  private void validateSegmentGroup(SegmentMergeLineage segmentMergeLineage) {
+    SegmentGroup segmentGroup = segmentMergeLineage.getMergeLineageRootSegmentGroup();
+    for (SegmentGroup child : segmentGroup.getChildrenGroups()) {
+      validateSegmentGroupNode(child, segmentMergeLineage);
+    }
+  }
+
+  private void validateSegmentGroupNode(SegmentGroup segmentGroup, SegmentMergeLineage segmentMergeLineage) {
+    String groupId = segmentGroup.getGroupId();
+    Assert.assertEquals(segmentGroup.getSegments(), new HashSet<>(segmentMergeLineage.getSegmentsForGroup(groupId)));
+    Assert.assertTrue(segmentMergeLineage.getGroupIdsForGroupLevel(segmentGroup.getGroupLevel()).contains(groupId));
+
+    List<SegmentGroup> childrenGroups = segmentGroup.getChildrenGroups();
+    if (childrenGroups != null) {
+      List<String> childrenGroupIds = new ArrayList<>();
+      for (SegmentGroup child : childrenGroups) {
+        childrenGroupIds.add(child.getGroupId());
+      }
+      Assert.assertEquals(childrenGroupIds, segmentMergeLineage.getChildrenForGroup(groupId));
+
+      for (SegmentGroup child : segmentGroup.getChildrenGroups()) {
+        validateSegmentGroupNode(child, segmentMergeLineage);
+      }
+    }
+  }
+}
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java
index 418c2ef..af600e3 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java
@@ -254,6 +254,9 @@ public class PinotSegmentUploadRestletResource {
     // Get URI of current segment location
     String currentSegmentLocationURI = headers.getHeaderString(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI);
 
+    String mergedSegmentPushStr = headers.getHeaderString(FileUploadDownloadClient.CustomHeaders.MERGED_SEGMENT_PUSH);
+    boolean mergedSegmentPush = Boolean.valueOf(mergedSegmentPushStr);
+
     File tempEncryptedFile = null;
     File tempDecryptedFile = null;
     File tempSegmentDir = null;
@@ -315,7 +318,7 @@ public class PinotSegmentUploadRestletResource {
 
       // Zk operations
       completeZkOperations(enableParallelPushProtection, headers, tempEncryptedFile, provider, segmentMetadata,
-          segmentName, zkDownloadUri, moveSegmentToFinalLocation);
+          segmentName, zkDownloadUri, moveSegmentToFinalLocation, mergedSegmentPush);
 
       return new SuccessResponse("Successfully uploaded segment: " + segmentMetadata.getName() + " of table: "
           + segmentMetadata.getTableName());
@@ -373,7 +376,7 @@ public class PinotSegmentUploadRestletResource {
 
   private void completeZkOperations(boolean enableParallelPushProtection, HttpHeaders headers, File tempDecryptedFile,
       FileUploadPathProvider provider, SegmentMetadata segmentMetadata, String segmentName, String zkDownloadURI,
-      boolean moveSegmentToFinalLocation)
+      boolean moveSegmentToFinalLocation, boolean mergedSegmentPush)
       throws Exception {
     String finalSegmentPath =
         StringUtil.join("/", provider.getBaseDataDirURI().toString(), segmentMetadata.getTableName(),
@@ -381,7 +384,7 @@ public class PinotSegmentUploadRestletResource {
     URI finalSegmentLocationURI = new URI(finalSegmentPath);
     ZKOperator zkOperator = new ZKOperator(_pinotHelixResourceManager, _controllerConf, _controllerMetrics);
     zkOperator.completeSegmentOperations(segmentMetadata, finalSegmentLocationURI, tempDecryptedFile,
-        enableParallelPushProtection, headers, zkDownloadURI, moveSegmentToFinalLocation);
+        enableParallelPushProtection, headers, zkDownloadURI, moveSegmentToFinalLocation, mergedSegmentPush);
   }
 
   private void decryptFile(String crypterClassHeader, File tempEncryptedFile, File tempDecryptedFile) {
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/resources/PinotTableRestletResource.java
index ae98133..38aef03 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/resources/PinotTableRestletResource.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -15,6 +15,7 @@
  */
 package com.linkedin.pinot.controller.api.resources;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import com.linkedin.pinot.common.config.SegmentsValidationAndRetentionConfig;
 import com.linkedin.pinot.common.config.TableConfig;
@@ -516,4 +517,51 @@ public class PinotTableRestletResource {
     }
     return ResourceUtils.convertToJsonString(result);
   }
+
+  public static class LineageParameters {
+    @JsonProperty("segments")
+    List<String> segments;
+
+    @JsonProperty("childrenGroupIds")
+    List<String> childrenGroupIds;
+  }
+
+  @POST
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/tables/{tableName}/segments/lineage")
+  @ApiOperation(value = "Updates segment merge lineage information", notes = "Updates segment merge lineage information")
+  public SuccessResponse updateSegmentMergeLineage(
+      @ApiParam(value = "Name of table to update segment merge lineage", required = true) @Nonnull @PathParam(value = "tableName") String tableName,
+      @ApiParam(value = "offline|realtime") @Nonnull @QueryParam("type") String tableType,
+      LineageParameters lineageParameters
+
+  ) {
+    if (!EnumUtils.isValidEnum(CommonConstants.Helix.TableType.class, tableType.toUpperCase())) {
+      throw new ControllerApplicationException(LOGGER, "Illegal table type " + tableType, Response.Status.BAD_REQUEST);
+    }
+
+    TableType type = TableType.valueOf(tableType.toUpperCase());
+    String tableNameWithType;
+    if (type == TableType.OFFLINE && _pinotHelixResourceManager.hasOfflineTable(tableName)) {
+      tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+    } else if (type == TableType.REALTIME && _pinotHelixResourceManager.hasRealtimeTable(tableName)) {
+      tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName);
+    } else {
+      throw new ControllerApplicationException(LOGGER, "Table " + tableName + ", type " + type + " does not exist",
+          Response.Status.NOT_FOUND);
+    }
+
+    boolean success =
+        _pinotHelixResourceManager.updateSegmentMergeLineage(tableNameWithType, lineageParameters.segments,
+            lineageParameters.childrenGroupIds);
+
+    if (success) {
+      return new SuccessResponse(
+          "Segment merge lineage has been updated successfully. (table: " + tableNameWithType + ", segments: "
+              + lineageParameters.segments + ", childrenGroups: " + lineageParameters.childrenGroupIds + ")");
+    } else {
+      throw new ControllerApplicationException(LOGGER, "Failed to update segment merge lineage for table: "
+          + tableNameWithType, Response.Status.INTERNAL_SERVER_ERROR);
+    }
+  }
 }
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/upload/ZKOperator.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/upload/ZKOperator.java
index f1c6ad4..abd3698 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/upload/ZKOperator.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/upload/ZKOperator.java
@@ -15,12 +15,15 @@
  */
 package com.linkedin.pinot.controller.api.upload;
 
+import com.linkedin.pinot.common.config.SegmentMergeConfig;
+import com.linkedin.pinot.common.config.TableConfig;
 import com.linkedin.pinot.common.config.TableNameBuilder;
 import com.linkedin.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
 import com.linkedin.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
 import com.linkedin.pinot.common.metrics.ControllerMeter;
 import com.linkedin.pinot.common.metrics.ControllerMetrics;
 import com.linkedin.pinot.common.segment.SegmentMetadata;
+import com.linkedin.pinot.common.utils.CommonConstants;
 import com.linkedin.pinot.common.utils.FileUploadDownloadClient;
 import com.linkedin.pinot.controller.ControllerConf;
 import com.linkedin.pinot.controller.api.resources.ControllerApplicationException;
@@ -29,6 +32,7 @@ import com.linkedin.pinot.filesystem.PinotFS;
 import com.linkedin.pinot.filesystem.PinotFSFactory;
 import java.io.File;
 import java.net.URI;
+import java.util.Collections;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.Response;
 import org.apache.helix.ZNRecord;
@@ -55,8 +59,7 @@ public class ZKOperator {
 
   public void completeSegmentOperations(SegmentMetadata segmentMetadata, URI finalSegmentLocationURI,
       File currentSegmentLocation, boolean enableParallelPushProtection, HttpHeaders headers, String zkDownloadURI,
-      boolean moveSegmentToFinalLocation)
-      throws Exception {
+      boolean moveSegmentToFinalLocation, boolean mergedSegmentPush) throws Exception {
     String rawTableName = segmentMetadata.getTableName();
     String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
     String segmentName = segmentMetadata.getName();
@@ -68,19 +71,21 @@ public class ZKOperator {
       LOGGER.info("Adding new segment {} from table {}", segmentName, rawTableName);
       String crypter = headers.getHeaderString(FileUploadDownloadClient.CustomHeaders.CRYPTER);
       processNewSegment(segmentMetadata, finalSegmentLocationURI, currentSegmentLocation, zkDownloadURI, crypter,
-          rawTableName, segmentName, moveSegmentToFinalLocation);
+          rawTableName, segmentName, moveSegmentToFinalLocation, mergedSegmentPush);
       return;
     }
 
     LOGGER.info("Segment {} from table {} already exists, refreshing if necessary", segmentName, rawTableName);
 
     processExistingSegment(segmentMetadata, finalSegmentLocationURI, currentSegmentLocation,
-        enableParallelPushProtection, headers, zkDownloadURI, offlineTableName, segmentName, znRecord, moveSegmentToFinalLocation);
+        enableParallelPushProtection, headers, zkDownloadURI, offlineTableName, segmentName, znRecord,
+        moveSegmentToFinalLocation);
   }
 
   private void processExistingSegment(SegmentMetadata segmentMetadata, URI finalSegmentLocationURI,
       File currentSegmentLocation, boolean enableParallelPushProtection, HttpHeaders headers, String zkDownloadURI,
-      String offlineTableName, String segmentName, ZNRecord znRecord, boolean moveSegmentToFinalLocation) throws Exception {
+      String offlineTableName, String segmentName, ZNRecord znRecord, boolean moveSegmentToFinalLocation)
+      throws Exception {
 
     OfflineSegmentZKMetadata existingSegmentZKMetadata = new OfflineSegmentZKMetadata(znRecord);
     long existingCrc = existingSegmentZKMetadata.getCrc();
@@ -160,9 +165,11 @@ public class ZKOperator {
             newCrc, existingCrc, segmentName);
         if (moveSegmentToFinalLocation) {
           moveSegmentToPermanentDirectory(currentSegmentLocation, finalSegmentLocationURI);
-          LOGGER.info("Moved segment {} from temp location {} to {}", segmentName, currentSegmentLocation.getAbsolutePath(), finalSegmentLocationURI.getPath());
+          LOGGER.info("Moved segment {} from temp location {} to {}", segmentName,
+              currentSegmentLocation.getAbsolutePath(), finalSegmentLocationURI.getPath());
         } else {
-          LOGGER.info("Skipping segment move, keeping segment {} from table {} at {}", segmentName, offlineTableName, zkDownloadURI);
+          LOGGER.info("Skipping segment move, keeping segment {} from table {} at {}", segmentName, offlineTableName,
+              zkDownloadURI);
         }
 
         _pinotHelixResourceManager.refreshSegment(segmentMetadata, existingSegmentZKMetadata);
@@ -196,19 +203,40 @@ public class ZKOperator {
 
   private void processNewSegment(SegmentMetadata segmentMetadata, URI finalSegmentLocationURI,
       File currentSegmentLocation, String zkDownloadURI, String crypter, String rawTableName, String segmentName,
-      boolean moveSegmentToFinalLocation) {
+      boolean moveSegmentToFinalLocation, boolean mergedSegmentPush) {
+
+    if (!mergedSegmentPush) {
+      // Fetch table config
+      TableConfig tableConfig =
+          _pinotHelixResourceManager.getTableConfig(rawTableName, CommonConstants.Helix.TableType.OFFLINE);
+      SegmentMergeConfig segmentMergeConfig = tableConfig.getSegmentMergeConfig();
+
+      // Update lineage information for base segment
+      if (segmentMergeConfig != null) {
+        try {
+          _pinotHelixResourceManager.updateSegmentMergeLineage(tableConfig.getTableName(),
+              Collections.singletonList(segmentName), null);
+        } catch (Exception e) {
+          LOGGER.error("Cannot update segment merge lineage information for table {}, segment {}", rawTableName,
+              segmentName, e);
+          throw new RuntimeException(e);
+        }
+      }
+    }
+
     // For v1 segment uploads, we will not move the segment
     if (moveSegmentToFinalLocation) {
       try {
         moveSegmentToPermanentDirectory(currentSegmentLocation, finalSegmentLocationURI);
-        LOGGER.info("Moved segment {} from temp location {} to {}", segmentName, currentSegmentLocation.getAbsolutePath(),
-            finalSegmentLocationURI.getPath());
+        LOGGER.info("Moved segment {} from temp location {} to {}", segmentName,
+            currentSegmentLocation.getAbsolutePath(), finalSegmentLocationURI.getPath());
       } catch (Exception e) {
         LOGGER.error("Could not move segment {} from table {} to permanent directory", segmentName, rawTableName, e);
         throw new RuntimeException(e);
       }
     } else {
-      LOGGER.info("Skipping segment move, keeping segment {} from table {} at {}", segmentName, rawTableName, zkDownloadURI);
+      LOGGER.info("Skipping segment move, keeping segment {} from table {} at {}", segmentName, rawTableName,
+          zkDownloadURI);
     }
     _pinotHelixResourceManager.addNewSegment(segmentMetadata, zkDownloadURI, crypter);
   }
@@ -218,7 +246,8 @@ public class ZKOperator {
     PinotFS pinotFS = PinotFSFactory.create(finalSegmentLocationURI.getScheme());
 
     // Overwrites current segment file
-    LOGGER.info("Copying segment from {} to {}", currentSegmentLocation.getAbsolutePath(), finalSegmentLocationURI.toString());
+    LOGGER.info("Copying segment from {} to {}", currentSegmentLocation.getAbsolutePath(),
+        finalSegmentLocationURI.toString());
     pinotFS.copyFromLocalFile(currentSegmentLocation, finalSegmentLocationURI);
   }
 }
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/ControllerRequestURLBuilder.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/ControllerRequestURLBuilder.java
index 9a79cc3..f45f533 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/ControllerRequestURLBuilder.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/ControllerRequestURLBuilder.java
@@ -239,7 +239,12 @@ public class ControllerRequestURLBuilder {
   public String forSegmentListAPIWithTableType(String tableName, String tableType) {
     return StringUtil.join("/", StringUtils.chomp(_baseUrl, "/"), "segments", tableName + "?type=" + tableType);
   }
+
   public String forSegmentListAPI(String tableName) {
     return StringUtil.join("/", StringUtils.chomp(_baseUrl, "/"), "segments", tableName);
   }
+
+  public String forSegmentMergeLineagePostAPI(String tableName, String tableType) {
+    return StringUtil.join("/", StringUtils.chomp(_baseUrl, "/"), "tables", tableName, "lineage" + "?type=", tableType);
+  }
 }
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java
index c12f70e..aa08350 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -15,6 +15,7 @@
  */
 package com.linkedin.pinot.controller.helix.core;
 
+import com.fasterxml.uuid.Generators;
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
@@ -36,6 +37,8 @@ import com.linkedin.pinot.common.config.TenantConfig;
 import com.linkedin.pinot.common.data.Schema;
 import com.linkedin.pinot.common.exception.InvalidConfigException;
 import com.linkedin.pinot.common.exception.TableNotFoundException;
+import com.linkedin.pinot.common.lineage.SegmentMergeLineage;
+import com.linkedin.pinot.common.lineage.SegmentMergeLineageAccessHelper;
 import com.linkedin.pinot.common.messages.SegmentRefreshMessage;
 import com.linkedin.pinot.common.messages.SegmentReloadMessage;
 import com.linkedin.pinot.common.messages.TimeboundaryRefreshMessage;
@@ -82,6 +85,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import org.I0Itec.zkclient.exception.ZkBadVersionException;
 import org.apache.commons.configuration.Configuration;
 import org.apache.helix.AccessOption;
 import org.apache.helix.ClusterMessagingService;
@@ -145,8 +149,8 @@ public class PinotHelixResourceManager {
 
   public PinotHelixResourceManager(@Nonnull String zkURL, @Nonnull String helixClusterName,
       @Nonnull String controllerInstanceId, @Nonnull String dataDir) {
-    this(zkURL, helixClusterName, controllerInstanceId, dataDir, DEFAULT_EXTERNAL_VIEW_UPDATE_TIMEOUT_MILLIS,
-        false, false);
+    this(zkURL, helixClusterName, controllerInstanceId, dataDir, DEFAULT_EXTERNAL_VIEW_UPDATE_TIMEOUT_MILLIS, false,
+        false);
   }
 
   public PinotHelixResourceManager(@Nonnull ControllerConf controllerConf) {
@@ -1084,6 +1088,10 @@ public class PinotHelixResourceManager {
         _propertyStore.create(ZKMetadataProvider.constructPropertyStorePathForResource(tableNameWithType),
             new ZNRecord(tableNameWithType), AccessOption.PERSISTENT);
 
+        // Add segment merge lineage znode if required
+        _propertyStore.create(ZKMetadataProvider.constructPropertyStorePathForSegmentMergeLineage(tableNameWithType),
+            new ZNRecord(tableNameWithType), AccessOption.PERSISTENT);
+
         // Update replica group partition assignment to the property store if applicable
         updateReplicaGroupPartitionAssignment(tableConfig);
         break;
@@ -1524,7 +1532,7 @@ public class PinotHelixResourceManager {
 
     ClusterMessagingService messagingService = _helixZkManager.getMessagingService();
     LOGGER.info("Sending timeboundary refresh message for segment {} of table {}:{} to recipients {}", segmentName,
-            rawTableName, refreshMessage, recipientCriteria);
+        rawTableName, refreshMessage, recipientCriteria);
     // Helix sets the timeoutMs argument specified in 'send' call as the processing timeout of the message.
     int nMsgsSent = messagingService.send(recipientCriteria, refreshMessage, null, timeoutMs);
     if (nMsgsSent > 0) {
@@ -1534,7 +1542,7 @@ public class PinotHelixResourceManager {
       // May be the case when none of the brokers are up yet. That is OK, because when they come up they will get
       // the latest time boundary info.
       LOGGER.warn("Unable to send timeboundary refresh message for {} of table {}, nMsgs={}", segmentName,
-              offlineTableName, nMsgsSent);
+          offlineTableName, nMsgsSent);
     }
   }
 
@@ -2192,6 +2200,58 @@ public class PinotHelixResourceManager {
     return endpointToInstance;
   }
 
+  public SegmentMergeLineage getSegmentMergeLineage(String tableNameWithType) {
+    ZNRecord segmentMergeLineageZNRecord =
+        SegmentMergeLineageAccessHelper.getSegmentMergeLineageZNRecord(getPropertyStore(), tableNameWithType);
+    return SegmentMergeLineage.fromZNRecord(segmentMergeLineageZNRecord);
+  }
+
+  public boolean updateSegmentMergeLineage(String tableNameWithType, List<String> segments,
+      List<String> childrenGroupIds) {
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch segment merge lineage from the property store
+        ZNRecord segmentMergeLineageZNRecord =
+            SegmentMergeLineageAccessHelper.getSegmentMergeLineageZNRecord(getPropertyStore(), tableNameWithType);
+        SegmentMergeLineage segmentMergeLineage = SegmentMergeLineage.fromZNRecord(segmentMergeLineageZNRecord);
+        int expectedVersion = segmentMergeLineageZNRecord.getVersion();
+
+        if (segmentMergeLineage == null) {
+          segmentMergeLineage = new SegmentMergeLineage(tableNameWithType);
+        }
+
+        // Add a new segment group
+        String groupId = Generators.timeBasedGenerator().generate().toString();
+        segmentMergeLineage.addSegmentGroup(groupId, segments, childrenGroupIds);
+
+        try {
+          if (SegmentMergeLineageAccessHelper.writeSegmentMergeLineage(getPropertyStore(), segmentMergeLineage,
+              expectedVersion)) {
+            return true;
+          } else {
+            LOGGER.warn("Failed to update segment merge lineage for table: {}", tableNameWithType);
+            return false;
+          }
+        } catch (ZkBadVersionException e) {
+          LOGGER.warn("Version changed while updating segment merge lineage for table: {}", tableNameWithType, e);
+          return false;
+        } catch (Exception e) {
+          LOGGER.warn("Caught exeception file while updating segment merge lineage for table: {}", tableNameWithType,
+              e);
+          return false;
+        }
+      });
+
+      LOGGER.info("Segment merge lineage has been updated successfully. (tableNameWithType: {}, segments: {}, "
+          + "childrenGroupIds: {})", tableNameWithType, segments, childrenGroupIds);
+      return true;
+    } catch (Exception e) {
+      LOGGER.error("Caught Exception while updating segment merge lineage for table: {}", tableNameWithType, e);
+      return false;
+    }
+  }
+
+
   /*
    * Uncomment and use for testing on a real cluster
   public static void main(String[] args) throws Exception {
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/ClusterInfoProvider.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/ClusterInfoProvider.java
index 750d8a2..765b4e3 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/ClusterInfoProvider.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/ClusterInfoProvider.java
@@ -20,6 +20,7 @@ import com.linkedin.pinot.common.config.PinotTaskConfig;
 import com.linkedin.pinot.common.config.TableConfig;
 import com.linkedin.pinot.common.config.TableNameBuilder;
 import com.linkedin.pinot.common.data.Schema;
+import com.linkedin.pinot.common.lineage.SegmentMergeLineage;
 import com.linkedin.pinot.common.metadata.ZKMetadataProvider;
 import com.linkedin.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
 import com.linkedin.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
@@ -128,4 +129,9 @@ public class ClusterInfoProvider {
   public String getVipUrl() {
     return _controllerConf.generateVipUrl();
   }
+
+
+  public SegmentMergeLineage getSegmentMergeLineage(String tableNameWithType) {
+    return _pinotHelixResourceManager.getSegmentMergeLineage(tableNameWithType);
+  }
 }
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/generator/SegmentMergeRollupTaskGenerator.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/generator/SegmentMergeRollupTaskGenerator.java
new file mode 100644
index 0000000..028a53f
--- /dev/null
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/generator/SegmentMergeRollupTaskGenerator.java
@@ -0,0 +1,313 @@
+package com.linkedin.pinot.controller.helix.core.minion.generator;
+
+import com.google.common.base.Preconditions;
+import com.linkedin.pinot.common.config.PinotTaskConfig;
+import com.linkedin.pinot.common.config.TableConfig;
+import com.linkedin.pinot.common.config.TableNameBuilder;
+import com.linkedin.pinot.common.config.TableTaskConfig;
+import com.linkedin.pinot.common.data.Segment;
+import com.linkedin.pinot.common.lineage.SegmentGroup;
+import com.linkedin.pinot.common.lineage.SegmentMergeLineage;
+import com.linkedin.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
+import com.linkedin.pinot.common.segment.SegmentMetadata;
+import com.linkedin.pinot.common.utils.CommonConstants;
+import com.linkedin.pinot.controller.helix.core.minion.ClusterInfoProvider;
+import com.linkedin.pinot.controller.helix.core.util.ZKMetadataUtils;
+import com.linkedin.pinot.core.common.MinionConstants;
+import com.linkedin.pinot.core.operator.transform.transformer.timeunit.TimeUnitTransformer;
+import com.linkedin.pinot.core.operator.transform.transformer.timeunit.TimeUnitTransformerFactory;
+import com.linkedin.pinot.core.segment.index.SegmentMetadataImpl;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import javax.annotation.Nonnull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class SegmentMergeRollupTaskGenerator implements PinotTaskGenerator {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentMergeRollupTaskGenerator.class);
+
+  private final ClusterInfoProvider _clusterInfoProvider;
+
+  public SegmentMergeRollupTaskGenerator(ClusterInfoProvider clusterInfoProvider) {
+    _clusterInfoProvider = clusterInfoProvider;
+  }
+
+  @Nonnull
+  @Override
+  public String getTaskType() {
+    return MinionConstants.SegmentMergeRollupTask.TASK_TYPE;
+  }
+
+  @Nonnull
+  @Override
+  public List<PinotTaskConfig> generateTasks(@Nonnull List<TableConfig> tableConfigs) {
+    List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+
+    // Get the segments that are being converted so that we don't submit them again
+    Set<Segment> runningSegments =
+        TaskGeneratorUtils.getRunningSegments(MinionConstants.ConvertToRawIndexTask.TASK_TYPE, _clusterInfoProvider);
+
+    for (TableConfig tableConfig : tableConfigs) {
+      // Only generate tasks for OFFLINE tables
+      String offlineTableName = tableConfig.getTableName();
+      if (tableConfig.getTableType() != CommonConstants.Helix.TableType.OFFLINE) {
+        LOGGER.warn("Skip generating SegmentMergeRollup for non-OFFLINE table: {}", offlineTableName);
+        continue;
+      }
+
+      TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
+      Preconditions.checkNotNull(tableTaskConfig);
+      Map<String, String> taskConfigs =
+          tableTaskConfig.getConfigsForTaskType(MinionConstants.SegmentMergeRollupTask.TASK_TYPE);
+      Preconditions.checkNotNull(tableConfigs);
+
+      // Get max number of tasks for this table
+      int tableMaxNumTasks;
+      String tableMaxNumTasksConfig = taskConfigs.get(MinionConstants.TABLE_MAX_NUM_TASKS_KEY);
+      if (tableMaxNumTasksConfig != null) {
+        try {
+          tableMaxNumTasks = Integer.valueOf(tableMaxNumTasksConfig);
+        } catch (Exception e) {
+          tableMaxNumTasks = Integer.MAX_VALUE;
+        }
+      } else {
+        tableMaxNumTasks = Integer.MAX_VALUE;
+      }
+
+      // Generate tasks
+      int tableNumTasks = 0;
+
+      List<OfflineSegmentZKMetadata> metadataList = _clusterInfoProvider.getOfflineSegmentsMetadata(offlineTableName);
+
+      SegmentMergeLineage segmentMergeLineage = _clusterInfoProvider.getSegmentMergeLineage(offlineTableName);
+
+      // Get a list of original segments covered by merged segments
+
+      List<Integer> groupLevelList = segmentMergeLineage.getAllGroupLevels();
+      Set<String> uncoveredGroups = new HashSet<>();
+      Set<String> coveredGroups = new HashSet<>();
+      for (Integer groupLevel : groupLevelList) {
+        List<String> groupIdsForGroupLevel;
+        groupIdsForGroupLevel = segmentMergeLineage.getGroupIdsForGroupLevel(groupLevel);
+        uncoveredGroups.addAll(groupIdsForGroupLevel);
+
+        for (String groupId : groupIdsForGroupLevel) {
+          List<String> coveredChildrenForGroupId = segmentMergeLineage.getChildrenForGroup(groupId);
+          if (coveredChildrenForGroupId != null && !coveredChildrenForGroupId.isEmpty()) {
+            coveredGroups.addAll(coveredChildrenForGroupId);
+          }
+        }
+      }
+
+
+      uncoveredGroups.removeAll(coveredGroups);
+
+      Map<String, OfflineSegmentZKMetadata> segmentNameToZkMetadataMap = new HashMap<>();
+      for (OfflineSegmentZKMetadata segmentZKMetadata : metadataList) {
+        segmentNameToZkMetadataMap.put(segmentZKMetadata.getSegmentName(), segmentZKMetadata);
+      }
+
+      LOGGER.info("uncovered groups: ", uncoveredGroups);
+
+      List<OfflineSegmentZKMetadata> segmentsToMerge = new ArrayList<>();
+      for (String groupId : uncoveredGroups) {
+        for (String segmentName : segmentMergeLineage.getSegmentsForGroup(groupId)) {
+          segmentsToMerge.add(segmentNameToZkMetadataMap.get(segmentName));
+        }
+      }
+
+//      List<OfflineSegmentZKMetadata> segmentsToMerge = new ArrayList<>();
+//      for (OfflineSegmentZKMetadata segmentZKMetadata: metadataList) {
+//        String segmentName = segmentZKMetadata.getSegmentName();
+//        if (!coveredSegments.contains(segmentName)) {
+//          segmentsToMerge.add(segmentZKMetadata);
+//        }
+//      }
+//      if (!segmentsToRemove.isEmpty()) {
+//        LOGGER.info("Removing Segments since they are covered by merged segments: " + String.join(",", segmentsToRemove));
+//        _clusterInfoProvider.removeSegments(offlineTableName, segmentsToRemove);
+//      }
+
+      if (segmentsToMerge.size() > 1) {
+        String downloadUrls = segmentsToMerge.stream()
+            .map(OfflineSegmentZKMetadata::getDownloadUrl)
+            .collect(Collectors.joining(MinionConstants.URL_SEPARATOR));
+
+        String segmentNames = segmentsToMerge.stream()
+            .map(OfflineSegmentZKMetadata::getSegmentName)
+            .collect(Collectors.joining(","));
+
+
+        String crcList = segmentsToMerge.stream()
+            .map(OfflineSegmentZKMetadata::getCrc)
+            .map((crc) -> Long.toString(crc))
+            .collect(Collectors.joining(","));
+
+        String rawTableName = TableNameBuilder.extractRawTableName(offlineTableName);
+        String mergedSegmentName = computeMergedSegmentName(rawTableName, segmentsToMerge);
+        String groupsToCover = String.join(",", uncoveredGroups);
+
+
+        Map<String, String> config = new HashMap<>();
+        config.put(MinionConstants.TABLE_NAME_KEY, offlineTableName);
+        config.put(MinionConstants.SEGMENT_NAME_KEY, segmentNames);
+        config.put(MinionConstants.DOWNLOAD_URL_KEY, downloadUrls);
+        config.put(MinionConstants.UPLOAD_URL_KEY, _clusterInfoProvider.getVipUrl() + "/segments");
+        config.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY, crcList);
+
+        config.put(MinionConstants.SegmentMergeRollupTask.MERGE_TYPE, "CONCATENATE");
+
+
+        config.put(MinionConstants.SegmentMergeRollupTask.MERGED_SEGEMNT_NAME_KEY, mergedSegmentName);
+        config.put(MinionConstants.SegmentMergeRollupTask.GROUPS_TO_COVER_KEY, groupsToCover);
+        config.put(MinionConstants.CONTROLLER_API_URL, _clusterInfoProvider.getVipUrl());
+        pinotTaskConfigs.add(new PinotTaskConfig(MinionConstants.SegmentMergeRollupTask.TASK_TYPE, config));
+
+      }
+//      for (OfflineSegmentZKMetadata offlineSegmentZKMetadata : _clusterInfoProvider.getOfflineSegmentsMetadata(
+//          offlineTableName)) {
+//        // Generate up to tableMaxNumTasks tasks each time for each table
+//        if (tableNumTasks == tableMaxNumTasks) {
+//          break;
+//        }
+//
+//        // Skip segments that are already submitted
+//        String segmentName = offlineSegmentZKMetadata.getSegmentName();
+//        if (runningSegments.contains(new Segment(offlineTableName, segmentName))) {
+//          continue;
+//        }
+//
+//        // Only submit segments that have not been converted
+//        Map<String, String> customMap = offlineSegmentZKMetadata.getCustomMap();
+//        if (customMap == null || !customMap.containsKey(
+//            MinionConstants.ConvertToRawIndexTask.COLUMNS_TO_CONVERT_KEY + MinionConstants.TASK_TIME_SUFFIX)) {
+//          Map<String, String> configs = new HashMap<>();
+//          configs.put(MinionConstants.TABLE_NAME_KEY, offlineTableName);
+//          configs.put(MinionConstants.SEGMENT_NAME_KEY, segmentName);
+//          configs.put(MinionConstants.DOWNLOAD_URL_KEY, offlineSegmentZKMetadata.getDownloadUrl());
+//          configs.put(MinionConstants.UPLOAD_URL_KEY, _clusterInfoProvider.getVipUrl() + "/segments");
+//          configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY, String.valueOf(offlineSegmentZKMetadata.getCrc()));
+//          if (columnsToConvertConfig != null) {
+//            configs.put(MinionConstants.ConvertToRawIndexTask.COLUMNS_TO_CONVERT_KEY, columnsToConvertConfig);
+//          }
+//
+//          pinotTaskConfigs.add(new PinotTaskConfig(MinionConstants.SegmentMergeRollupTask.TASK_TYPE, configs));
+//          tableNumTasks++;
+//        }
+//      }
+    }
+
+    return pinotTaskConfigs;
+  }
+
+  private String computeMergedSegmentName(String tableName, List<OfflineSegmentZKMetadata> metadataList) {
+    long minStartTime = Long.MAX_VALUE;
+    long maxEndTime = Long.MIN_VALUE;
+
+    for (OfflineSegmentZKMetadata metadata : metadataList) {
+      long currentSegmentStartTime = metadata.getStartTime();
+      long currentSegmentEndTime = metadata.getEndTime();
+
+      if (currentSegmentStartTime < minStartTime) {
+        minStartTime = currentSegmentStartTime;
+      }
+
+      if (currentSegmentEndTime > maxEndTime) {
+        maxEndTime = currentSegmentEndTime;
+      }
+    }
+    return "merged_" + tableName + "_" + minStartTime + "_" + maxEndTime + "_" + System.currentTimeMillis();
+  }
+
+  @Override
+  public int getNumConcurrentTasksPerInstance() {
+    return DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
+  }
+
+  @Override
+  public void nonLeaderCleanUp() {
+
+  }
+
+  private void addCoveredSegments(SegmentGroup node, Set<String> coveredSegments) {
+    Set<String> currentSegments = node.getSegments();
+    if (currentSegments != null && !currentSegments.isEmpty()) {
+      coveredSegments.addAll(node.getSegments());
+    }
+    if (node.getChildrenGroups() != null) {
+      for (SegmentGroup children : node.getChildrenGroups()) {
+        addCoveredSegments(children, coveredSegments);
+      }
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    File inputDir = new File("/Users/snlee/data/careers_comms_processing_hourly_additive/extracted");
+    try {
+      int segmentNum = 0;
+      List<OfflineSegmentZKMetadata> zkMetadataList = new ArrayList<>();
+      for (File index : inputDir.listFiles()) {
+        SegmentMetadata segmentMetadata = new SegmentMetadataImpl(index);
+        OfflineSegmentZKMetadata zkMetadata = new OfflineSegmentZKMetadata();
+        ZKMetadataUtils.updateSegmentMetadata(zkMetadata, segmentMetadata);
+        zkMetadataList.add(zkMetadata);
+      }
+
+
+      Map<Long, List<String>> grouping = getGrouping(zkMetadataList);
+
+      produceResultStat(grouping);
+
+
+    } finally {
+//      FileUtils.deleteQuietly(tmpDir);
+    }
+
+  }
+
+  public static Map<Long, List<String>> getGrouping(List<OfflineSegmentZKMetadata> zkMetadataList) {
+    Map<Long, List<String>> result = new TreeMap<>();
+
+    for (OfflineSegmentZKMetadata zkMetadata : zkMetadataList) {
+      long endTime = zkMetadata.getEndTime();
+      TimeUnit timeUnit = zkMetadata.getTimeUnit();
+      TimeUnitTransformer timeUnitTransformer =
+          TimeUnitTransformerFactory.getTimeUnitTransformer(timeUnit, "months");
+      long[] input = new long[1];
+      input[0] = endTime;
+      long[] output = new long[1];
+      timeUnitTransformer.transform(input, output, 1);
+
+      List<String> group = result.computeIfAbsent(output[0], k -> new ArrayList<>());
+      group.add(zkMetadata.getSegmentName());
+    }
+
+    return result;
+  }
+
+
+  public static void produceResultStat(Map<Long, List<String>> result) {
+    System.out.println("size breakdown: ");
+    for (Map.Entry<Long, List<String>> entry : result.entrySet()) {
+      System.out.println(entry.getKey() + ": " + entry.getValue().size());
+    }
+
+    System.out.println("segments: ");
+    for (Map.Entry<Long, List<String>> entry : result.entrySet()) {
+      List<String> segmentList = entry.getValue();
+      Collections.sort(segmentList);
+      System.out.println(entry.getKey() + ": " + segmentList.get(0) + ", " + segmentList.get(segmentList.size()-1));
+    }
+
+  }
+}
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java
index 49bc3d4..4098607 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java
@@ -32,6 +32,7 @@ public class TaskGeneratorRegistry {
 
   public TaskGeneratorRegistry(@Nonnull ClusterInfoProvider clusterInfoProvider) {
     registerTaskGenerator(new ConvertToRawIndexTaskGenerator(clusterInfoProvider));
+    registerTaskGenerator(new SegmentMergeRollupTaskGenerator(clusterInfoProvider));
   }
 
   /**
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/com/linkedin/pinot/core/common/MinionConstants.java
index d8a0b9f..640b946 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/common/MinionConstants.java
@@ -25,6 +25,7 @@ public class MinionConstants {
   public static final String SEGMENT_NAME_KEY = "segmentName";
   public static final String DOWNLOAD_URL_KEY = "downloadURL";
   public static final String UPLOAD_URL_KEY = "uploadURL";
+  public static final String CONTROLLER_API_URL = "controllerApiUrl";
   public static final String URL_SEPARATOR = ",";
 
   /**
@@ -48,6 +49,13 @@ public class MinionConstants {
     public static final String COLUMNS_TO_CONVERT_KEY = "columnsToConvert";
   }
 
+  public static class SegmentMergeRollupTask {
+    public static final String TASK_TYPE = "SegmentMergeRollupTask";
+    public static final String MERGE_TYPE = "mergeType";
+    public static final String MERGED_SEGEMNT_NAME_KEY = "mergedSegmentNameKey";
+    public static final String GROUPS_TO_COVER_KEY = "groupsToCoverKey";
+  }
+
   // Purges rows inside segment that match chosen criteria
   public static class PurgeTask {
     public static final String TASK_TYPE = "PurgeTask";
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/data/readers/PinotSegmentRecordReader.java b/pinot-core/src/main/java/com/linkedin/pinot/core/data/readers/PinotSegmentRecordReader.java
index 4712f36..f80850e 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/data/readers/PinotSegmentRecordReader.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/data/readers/PinotSegmentRecordReader.java
@@ -16,6 +16,7 @@
 package com.linkedin.pinot.core.data.readers;
 
 import com.google.common.base.Preconditions;
+import com.linkedin.pinot.common.data.DimensionFieldSpec;
 import com.linkedin.pinot.common.data.FieldSpec;
 import com.linkedin.pinot.common.data.Schema;
 import com.linkedin.pinot.common.segment.ReadMode;
@@ -26,6 +27,8 @@ import com.linkedin.pinot.core.data.readers.sort.SegmentSorter;
 import com.linkedin.pinot.core.indexsegment.immutable.ImmutableSegment;
 import com.linkedin.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
 import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
diff --git a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
index a2cdb1a..dc4eb63 100644
--- a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
@@ -94,6 +94,8 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
     Map<String, MinionEventObserverFactory> eventObserverFactoryRegistry =
         Collections.singletonMap(TestTaskGenerator.TASK_TYPE, new TestEventObserverFactory());
     startMinions(NUM_MINIONS, taskExecutorFactoryRegistry, eventObserverFactoryRegistry);
+
+    Thread.sleep(1000000000);
   }
 
   @Test
diff --git a/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/BaseMultipleSegmentsConversionExecutor.java b/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/BaseMultipleSegmentsConversionExecutor.java
index 06cd811..dc40bd0 100644
--- a/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/BaseMultipleSegmentsConversionExecutor.java
+++ b/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/BaseMultipleSegmentsConversionExecutor.java
@@ -30,6 +30,7 @@ import java.util.Map;
 import java.util.stream.Collectors;
 import javax.annotation.Nonnull;
 import org.apache.commons.io.FileUtils;
+import org.apache.http.Header;
 import org.apache.http.NameValuePair;
 import org.apache.http.message.BasicNameValuePair;
 import org.slf4j.Logger;
@@ -59,6 +60,12 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
   protected abstract List<SegmentConversionResult> convert(@Nonnull PinotTaskConfig pinotTaskConfig,
       @Nonnull List<File> originalIndexDir, @Nonnull File workingDir) throws Exception;
 
+  protected abstract void preprocessBeforeUpload(PinotTaskConfig pinotTaskConfig, List<SegmentConversionResult> conversionResults) throws Exception;
+
+  protected List<Header> getHttpHeaders() {
+    return null;
+  }
+
   @Override
   public List<SegmentConversionResult> executeTask(@Nonnull PinotTaskConfig pinotTaskConfig) throws Exception {
     String taskType = pinotTaskConfig.getTaskType();
@@ -122,6 +129,10 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
             taskType + " on table: " + tableNameWithType + ", segments: " + inputSegmentNames + " got cancelled");
       }
 
+      preprocessBeforeUpload(pinotTaskConfig, segmentConversionResults);
+
+      List<Header> headers = getHttpHeaders();
+
       // Upload the tarred segments
       for (int i = 0; i < numOutputSegments; i++) {
         File convertedTarredSegmentFile = tarredSegmentFiles.get(i);
@@ -131,7 +142,7 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
         List<NameValuePair> parameters = Collections.singletonList(
             new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION, "true"));
 
-        SegmentConversionUtils.uploadSegment(configs, null, parameters, tableNameWithType, resultSegmentName, uploadURL,
+        SegmentConversionUtils.uploadSegment(configs, headers, parameters, tableNameWithType, resultSegmentName, uploadURL,
             convertedTarredSegmentFile);
       }
 
diff --git a/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/SegmentConversionUtils.java b/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/SegmentConversionUtils.java
index d47fd0e..8cdb499 100644
--- a/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/SegmentConversionUtils.java
+++ b/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/SegmentConversionUtils.java
@@ -17,18 +17,25 @@ package com.linkedin.pinot.minion.executor;
 import com.linkedin.pinot.common.exception.HttpErrorStatusException;
 import com.linkedin.pinot.common.utils.FileUploadDownloadClient;
 import com.linkedin.pinot.common.utils.SimpleHttpResponse;
+import com.linkedin.pinot.common.utils.retry.AttemptsExceededException;
+import com.linkedin.pinot.common.utils.retry.RetriableOperationException;
 import com.linkedin.pinot.common.utils.retry.RetryPolicies;
 import com.linkedin.pinot.common.utils.retry.RetryPolicy;
 import com.linkedin.pinot.core.common.MinionConstants;
 import com.linkedin.pinot.minion.MinionContext;
 import java.io.File;
+import java.io.IOException;
 import java.net.URI;
 import java.util.List;
 import java.util.Map;
 import javax.net.ssl.SSLContext;
+import javax.validation.constraints.Min;
+import javax.ws.rs.HEAD;
 import org.apache.http.Header;
 import org.apache.http.HttpStatus;
+import org.apache.http.HttpVersion;
 import org.apache.http.NameValuePair;
+import org.apache.http.client.methods.RequestBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -91,4 +98,47 @@ public class SegmentConversionUtils {
       });
     }
   }
+
+  public static void uploadSegmentMergeLineage(Map<String, String> configs, String updateLineageUrl,
+      List<String> childrenSegmentGroups, List<String> segments) throws Exception {
+    String maxNumAttemptsConfig = configs.get(MinionConstants.MAX_NUM_ATTEMPTS_KEY);
+    int maxNumAttempts =
+        maxNumAttemptsConfig != null ? Integer.parseInt(maxNumAttemptsConfig) : DEFAULT_MAX_NUM_ATTEMPTS;
+    String initialRetryDelayMsConfig = configs.get(MinionConstants.INITIAL_RETRY_DELAY_MS_KEY);
+    long initialRetryDelayMs =
+        initialRetryDelayMsConfig != null ? Long.parseLong(initialRetryDelayMsConfig) : DEFAULT_INITIAL_RETRY_DELAY_MS;
+    String retryScaleFactorConfig = configs.get(MinionConstants.RETRY_SCALE_FACTOR_KEY);
+    double retryScaleFactor =
+        retryScaleFactorConfig != null ? Double.parseDouble(retryScaleFactorConfig) : DEFAULT_RETRY_SCALE_FACTOR;
+    RetryPolicy retryPolicy =
+        RetryPolicies.exponentialBackoffRetryPolicy(maxNumAttempts, initialRetryDelayMs, retryScaleFactor);
+
+    SSLContext sslContext = MinionContext.getInstance().getSSLContext();
+    try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient(sslContext)) {
+      retryPolicy.attempt(() -> {
+        try {
+          SimpleHttpResponse response =
+              fileUploadDownloadClient.updateSegmentMergeLineage(new URI(updateLineageUrl), childrenSegmentGroups,
+                  segments, FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS);
+          return true;
+        } catch (HttpErrorStatusException e) {
+          int statusCode = e.getStatusCode();
+          if (statusCode == HttpStatus.SC_CONFLICT || statusCode >= 500) {
+            // Temporary exception
+            return false;
+          } else {
+            // Permanent exception
+            throw e;
+          }
+        } catch (Exception e) {
+          return false;
+        }
+      });
+    }
+  }
+
+  public static void main(String[] args) {
+//    RequestBuilder requestBuilder = RequestBuilder.post("localhost:")
+    RequestBuilder requestBuilder = RequestBuilder.post("localhost:8998/table/");
+  }
 }
diff --git a/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/SegmentMergeRollupTaskExecutor.java b/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/SegmentMergeRollupTaskExecutor.java
new file mode 100644
index 0000000..a4ecd81
--- /dev/null
+++ b/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/SegmentMergeRollupTaskExecutor.java
@@ -0,0 +1,105 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.minion.executor;
+
+import com.linkedin.pinot.common.config.PinotTaskConfig;
+import com.linkedin.pinot.common.config.TableNameBuilder;
+import com.linkedin.pinot.common.utils.FileUploadDownloadClient;
+import com.linkedin.pinot.core.common.MinionConstants;
+import com.linkedin.pinot.core.minion.rollup.MergeRollupSegmentConverter;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nonnull;
+import org.apache.http.Header;
+import org.apache.http.message.BasicHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class SegmentMergeRollupTaskExecutor extends BaseMultipleSegmentsConversionExecutor {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentMergeRollupTaskExecutor.class);
+
+  @Override
+  protected List<SegmentConversionResult> convert(@Nonnull PinotTaskConfig pinotTaskConfig,
+      @Nonnull List<File> originalIndexDirs, @Nonnull File workingDir) throws Exception {
+    Map<String, String> configs = pinotTaskConfig.getConfigs();
+    String mergeType = configs.get(MinionConstants.SegmentMergeRollupTask.MERGE_TYPE);
+    String mergedSegmentName = configs.get(MinionConstants.SegmentMergeRollupTask.MERGED_SEGEMNT_NAME_KEY);
+    String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
+
+    MergeRollupSegmentConverter rollupSegmentConverter =
+        new MergeRollupSegmentConverter.Builder().setMergeType(mergeType)
+            .setTableName(tableNameWithType)
+            .setSegmentName(mergedSegmentName)
+            .setInputIndexDirs(originalIndexDirs)
+            .setWorkingDir(workingDir)
+            .build();
+    List<File> resultFiles = rollupSegmentConverter.convert();
+    List<SegmentConversionResult> results = new ArrayList<>();
+    for (File file : resultFiles) {
+      String outputSegmentName = file.getName();
+      results.add(new SegmentConversionResult.Builder().setFile(file)
+          .setSegmentName(outputSegmentName)
+          .setTableNameWithType(tableNameWithType)
+          .build());
+    }
+    return results;
+  }
+
+  @Override
+  protected void preprocessBeforeUpload(@Nonnull PinotTaskConfig pinotTaskConfig,
+      List<SegmentConversionResult> conversionResults) throws Exception {
+    Map<String, String> configs = pinotTaskConfig.getConfigs();
+    String groupsToCoverStr = configs.get(MinionConstants.SegmentMergeRollupTask.GROUPS_TO_COVER_KEY);
+
+    String vipUrl = configs.get(MinionConstants.CONTROLLER_API_URL);
+    List<String> childrenGroupIds = new ArrayList<>();
+    for (String groupId : groupsToCoverStr.split(",")) {
+      childrenGroupIds.add(groupId.trim());
+    }
+    //    SegmentConversionUtils.uploadSegmentMergeLineage();
+
+    List<String> segments = new ArrayList<>();
+
+    for (SegmentConversionResult conversionResult : conversionResults) {
+      segments.add(conversionResult.getSegmentName());
+    }
+
+    LOGGER.info("Updating segment merge lineage metadata. segments: {}, childrenGroupIds: {}", segments,
+        childrenGroupIds);
+    // list of segments, list of children group ids
+    // Need to update merged segment
+    String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
+
+    String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
+    String tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType).toString();
+
+    String requestUrl = vipUrl + "/tables" + "/" + rawTableName + "/segments/lineage?type=" + tableType;
+    SegmentConversionUtils.uploadSegmentMergeLineage(configs, requestUrl, childrenGroupIds, segments);
+
+  }
+
+  @Override
+  protected List<Header> getHttpHeaders() {
+    // Since we update segment merge lineage in "pre-process step", we need to indicate this segment upload is merged segment.
+    Header mergedSegmentPushHeader =
+        new BasicHeader(FileUploadDownloadClient.CustomHeaders.MERGED_SEGMENT_PUSH, "true");
+    return Collections.singletonList(mergedSegmentPushHeader);
+  }
+}
diff --git a/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/SegmentMergeRollupTaskExecutorFactory.java b/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/SegmentMergeRollupTaskExecutorFactory.java
new file mode 100644
index 0000000..dd97948
--- /dev/null
+++ b/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/SegmentMergeRollupTaskExecutorFactory.java
@@ -0,0 +1,23 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.minion.executor;
+
+public class SegmentMergeRollupTaskExecutorFactory implements PinotTaskExecutorFactory {
+  @Override
+  public PinotTaskExecutor create() {
+    return new SegmentMergeRollupTaskExecutor();
+  }
+}
diff --git a/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/TaskExecutorFactoryRegistry.java b/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/TaskExecutorFactoryRegistry.java
index 91e1ea6..d316c6e 100644
--- a/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/TaskExecutorFactoryRegistry.java
+++ b/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/TaskExecutorFactoryRegistry.java
@@ -32,6 +32,8 @@ public class TaskExecutorFactoryRegistry {
     registerTaskExecutorFactory(MinionConstants.ConvertToRawIndexTask.TASK_TYPE,
         new ConvertToRawIndexTaskExecutorFactory());
     registerTaskExecutorFactory(MinionConstants.PurgeTask.TASK_TYPE, new PurgeTaskExecutorFactory());
+    registerTaskExecutorFactory(MinionConstants.SegmentMergeRollupTask.TASK_TYPE,
+        new SegmentMergeRollupTaskExecutorFactory());
   }
 
   /**
diff --git a/pom.xml b/pom.xml
index 81a6dbb..e020a5e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -470,6 +470,11 @@
         <version>${jackson.version}</version>
       </dependency>
       <dependency>
+        <groupId>com.fasterxml.uuid</groupId>
+        <artifactId>java-uuid-generator</artifactId>
+        <version>3.1.5</version>
+      </dependency>
+      <dependency>
         <groupId>org.yaml</groupId>
         <artifactId>snakeyaml</artifactId>
         <version>1.16</version>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org