You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2018/11/26 19:39:46 UTC

[incubator-pinot] branch segment-merge-lineage created (now 156cd4c)

This is an automated email from the ASF dual-hosted git repository.

snlee pushed a change to branch segment-merge-lineage
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at 156cd4c  Segment merge lineage data structure 1. Added segment merge lineage that is a wrapper class of ZNRecord 2. Added segment merge group that will be used by broker during segment    selection process 3. Added a unit test

This branch includes the following new commits:

     new 156cd4c  Segment merge lineage data structure 1. Added segment merge lineage that is a wrapper class of ZNRecord 2. Added segment merge group that will be used by broker during segment    selection process 3. Added a unit test

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Segment merge lineage data structure 1. Added segment merge lineage that is a wrapper class of ZNRecord 2. Added segment merge group that will be used by broker during segment selection process 3. Added a unit test

Posted by sn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch segment-merge-lineage
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 156cd4c326230a4497a12ba7d086bc6f2f402f1c
Author: Seunghyun Lee <sn...@linkedin.com>
AuthorDate: Sun Nov 25 22:10:45 2018 -0800

    Segment merge lineage data structure
    1. Added segment merge lineage that is a wrapper class of ZNRecord
    2. Added segment merge group that will be used by broker during segment
       selection process
    3. Added a unit test
---
 .../pinot/common/lineage/SegmentGroup.java         |  72 +++++
 .../pinot/common/lineage/SegmentMergeLineage.java  | 313 +++++++++++++++++++++
 .../lineage/SegmentMergeLineageAccessHelper.java   |  82 ++++++
 .../pinot/common/metadata/ZKMetadataProvider.java  |   7 +-
 .../common/lineage/SegmentMergeLineageTest.java    | 106 +++++++
 5 files changed, 579 insertions(+), 1 deletion(-)

diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentGroup.java b/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentGroup.java
new file mode 100644
index 0000000..fb28ac1
--- /dev/null
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentGroup.java
@@ -0,0 +1,72 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.common.lineage;
+
+import java.util.List;
+import java.util.Set;
+
+
+/**
+ * Class to represent segment group
+ */
+public class SegmentGroup {
+
+  private String _groupId;
+  private int _groupLevel;
+  private SegmentGroup _parentGroup;
+  private List<SegmentGroup> _childrenGroups;
+  private Set<String> _segments;
+
+  public String getGroupId() {
+    return _groupId;
+  }
+
+  public void setGroupId(String groupId) {
+    _groupId = groupId;
+  }
+
+  public SegmentGroup getParentGroup() {
+    return _parentGroup;
+  }
+
+  public void setParentGroup(SegmentGroup parentGroup) {
+    _parentGroup = parentGroup;
+  }
+
+  public List<SegmentGroup> getChildrenGroups() {
+    return _childrenGroups;
+  }
+
+  public void setChildrenGroups(List<SegmentGroup> childrenGroups) {
+    _childrenGroups = childrenGroups;
+  }
+
+  public Set<String> getSegments() {
+    return _segments;
+  }
+
+  public void setSegments(Set<String> segments) {
+    _segments = segments;
+  }
+
+  public int getGroupLevel() {
+    return _groupLevel;
+  }
+
+  public void setGroupLevel(int groupLevel) {
+    _groupLevel = groupLevel;
+  }
+}
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentMergeLineage.java b/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentMergeLineage.java
new file mode 100644
index 0000000..0ec0be1
--- /dev/null
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentMergeLineage.java
@@ -0,0 +1,313 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.common.lineage;
+
+import com.linkedin.pinot.common.exception.InvalidConfigException;
+import com.linkedin.pinot.common.utils.EqualityUtils;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.ZNRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Class to represent segment merge lineage information
+ */
+public class SegmentMergeLineage {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentMergeLineage.class);
+
+  private static final String LEVEL_KEY_PREFIX = "level_";
+  private static final String ROOT_NODE_GROUP_ID = "root";
+  private static final String SEGMENT_DELIMITER = ",";
+  private static final int DEFAULT_GROUP_LEVEL = 0;
+
+  private String _tableNameWithType;
+  private Map<String, List<String>> _parentGroupToChildrenGroupsMap;
+  private Map<Integer, Map<String, List<String>>> _levelToGroupToSegmentsMap;
+
+  public SegmentMergeLineage(String tableNameWithType) {
+    _tableNameWithType = tableNameWithType;
+    _parentGroupToChildrenGroupsMap = new HashMap<>();
+    _levelToGroupToSegmentsMap = new HashMap<>();
+  }
+
+  public SegmentMergeLineage(String tableNameWithType, Map<String, List<String>> segmentGroupLineageMap,
+      Map<Integer, Map<String, List<String>>> levelToGroupToSegmentMap, int version) {
+    _tableNameWithType = tableNameWithType;
+    _parentGroupToChildrenGroupsMap = segmentGroupLineageMap;
+    _levelToGroupToSegmentsMap = levelToGroupToSegmentMap;
+  }
+
+  public String getTableName() {
+    return _tableNameWithType;
+  }
+
+  public static SegmentMergeLineage fromZNRecord(ZNRecord record) {
+    String tableNameWithType = record.getId();
+    int version = record.getVersion();
+    Map<String, List<String>> segmentGroupLineageMap = record.getListFields();
+
+    Map<Integer, Map<String, List<String>>> groupToSegmentsMap = new HashMap<>();
+    for (Map.Entry<String, Map<String, String>> entry : record.getMapFields().entrySet()) {
+      String levelKey = entry.getKey();
+      Integer level = Integer.parseInt(levelKey.substring(LEVEL_KEY_PREFIX.length()));
+      Map<String, List<String>> groupToSegmentsForLevel = new HashMap<>();
+      for (Map.Entry<String, String> groupEntry : entry.getValue().entrySet()) {
+        String groupId = groupEntry.getKey();
+        String segmentsString = groupEntry.getValue();
+        List<String> segments = Arrays.asList(segmentsString.split(SEGMENT_DELIMITER));
+        groupToSegmentsForLevel.put(groupId, new ArrayList<>(segments));
+      }
+      groupToSegmentsMap.put(level, groupToSegmentsForLevel);
+    }
+    return new SegmentMergeLineage(tableNameWithType, segmentGroupLineageMap, groupToSegmentsMap, version);
+  }
+
+  public ZNRecord toZNRecord() {
+    ZNRecord record = new ZNRecord(_tableNameWithType);
+    record.setListFields(_parentGroupToChildrenGroupsMap);
+    Map<String, Map<String, String>> groupToSegmentsMap = new HashMap<>();
+
+    for (Map.Entry<Integer, Map<String, List<String>>> entry : _levelToGroupToSegmentsMap.entrySet()) {
+      String key = LEVEL_KEY_PREFIX + entry.getKey();
+      Map<String, String> groupSegmentsForLevel = new HashMap<>();
+      for (Map.Entry<String, List<String>> groupEntry : entry.getValue().entrySet()) {
+        String groupId = groupEntry.getKey();
+        String segments = String.join(SEGMENT_DELIMITER, groupEntry.getValue());
+        groupSegmentsForLevel.put(groupId, segments);
+      }
+      groupToSegmentsMap.put(key, groupSegmentsForLevel);
+    }
+    record.setMapFields(groupToSegmentsMap);
+
+    return record;
+  }
+
+  /**
+   * Add segment merge lineage information
+   *
+   * @param groupId a group id
+   * @param currentGroupSegments a list of segments that belongs to the group
+   * @param childrenGroups a list of children groups that the current group covers
+   */
+  public void addSegmentGroup(String groupId, List<String> currentGroupSegments, List<String> childrenGroups)
+      throws InvalidConfigException {
+    // Get group level
+    Integer groupLevel = getGroupLevel(childrenGroups);
+
+    // Update group to segments map
+    Map<String, List<String>> groupToSegmentMap =
+        _levelToGroupToSegmentsMap.computeIfAbsent(groupLevel, k -> new HashMap<>());
+    if (groupToSegmentMap.containsKey(groupId)) {
+      throw new InvalidConfigException("Group id : " + groupId + " already exists.");
+    }
+    groupToSegmentMap.put(groupId, new ArrayList<>(currentGroupSegments));
+    _levelToGroupToSegmentsMap.put(groupLevel, groupToSegmentMap);
+
+    // Update segment group lineage map
+    if (groupLevel > DEFAULT_GROUP_LEVEL) {
+      if (_parentGroupToChildrenGroupsMap.containsKey(groupId)) {
+        throw new InvalidConfigException("Group id : " + groupId + " already exists.");
+      } else {
+        _parentGroupToChildrenGroupsMap.put(groupId, new ArrayList<>(childrenGroups));
+      }
+    }
+
+    LOGGER.info("New group has been added successfully to the segment lineage. (groupId: {}, currentGroupSegments: {}, "
+        + "childrenGroups: {}", groupId, currentGroupSegments, childrenGroups);
+  }
+
+  /**
+   * Remove segment merge information given a group id
+   *
+   * @param groupId a group id
+   */
+  public void removeSegmentGroup(String groupId) {
+    // Clean up the group id from parent to children group mapping
+    _parentGroupToChildrenGroupsMap.remove(groupId);
+    for (List<String> childrenGroups : _parentGroupToChildrenGroupsMap.values()) {
+      childrenGroups.remove(groupId);
+    }
+
+    // Clean up the group id from group to segments mapping
+    for (Map<String, List<String>> groupToSegments : _levelToGroupToSegmentsMap.values()) {
+      groupToSegments.remove(groupId);
+    }
+
+    LOGGER.info("Group {} has been successfully removed.", groupId);
+  }
+
+  /**
+   * Construct a lineage tree and returns the root node
+   *
+   * @return a root node for lineage tree
+   */
+  public SegmentGroup getMergeLineageRootSegmentGroup() {
+    // Create group nodes
+    Map<String, SegmentGroup> groupNodes = new HashMap<>();
+    for (Map.Entry<Integer, Map<String, List<String>>> groupEntryForLevel : _levelToGroupToSegmentsMap.entrySet()) {
+      Integer level = groupEntryForLevel.getKey();
+      Map<String, List<String>> groupToSegmentsForLevel = groupEntryForLevel.getValue();
+      for (Map.Entry<String, List<String>> entry : groupToSegmentsForLevel.entrySet()) {
+        String groupId = entry.getKey();
+        List<String> segments = entry.getValue();
+        SegmentGroup groupNode = new SegmentGroup();
+        groupNode.setGroupId(groupId);
+        groupNode.setSegments(new HashSet<>(segments));
+        groupNode.setGroupLevel(level);
+        groupNodes.put(groupId, groupNode);
+      }
+    }
+
+    // Add edges by updating children & parent group information
+    for (Map.Entry<String, List<String>> lineageEntry : _parentGroupToChildrenGroupsMap.entrySet()) {
+      String parentGroupId = lineageEntry.getKey();
+      List<String> childrenGroupIds = lineageEntry.getValue();
+      List<SegmentGroup> childrenGroups = new ArrayList<>();
+      SegmentGroup parentNode = groupNodes.get(parentGroupId);
+      for (String groupId : childrenGroupIds) {
+        SegmentGroup childNode = groupNodes.get(groupId);
+        if (childNode != null) {
+          childrenGroups.add(childNode);
+          childNode.setParentGroup(parentNode);
+        }
+      }
+      parentNode.setChildrenGroups(childrenGroups);
+    }
+
+    // Create a root node
+    SegmentGroup root = new SegmentGroup();
+    root.setGroupId(ROOT_NODE_GROUP_ID);
+    List<SegmentGroup> childrenForRoot = new ArrayList<>();
+    for (SegmentGroup group : groupNodes.values()) {
+      if (group.getParentGroup() == null) {
+        group.setParentGroup(root);
+        childrenForRoot.add(group);
+      }
+    }
+    root.setChildrenGroups(childrenForRoot);
+
+    return root;
+  }
+
+  /**
+   * Get a list of segments for a given group id
+   *
+   * @param groupId a group id
+   * @return a list of segments that belongs to the given group id, null if the group does not exist
+   */
+  public List<String> getSegmentsForGroup(String groupId) {
+    for (Map<String, List<String>> groupToSegmentMap : _levelToGroupToSegmentsMap.values()) {
+      List<String> segments = groupToSegmentMap.get(groupId);
+      if (segments != null) {
+        return segments;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Get a list of children group ids for a given group id
+   *
+   * @param groupId a group id
+   * @return a list of children groups that are covered by the given group id, null if the group does not exist
+   */
+  public List<String> getChildrenForGroup(String groupId) {
+    return _parentGroupToChildrenGroupsMap.get(groupId);
+  }
+
+  /**
+   * Get a list of all group levels
+   *
+   * @return a list of all group levels
+   */
+  public List<Integer> getAllGroupLevels() {
+    List<Integer> groupLevels = new ArrayList<>(_levelToGroupToSegmentsMap.keySet());
+    Collections.sort(groupLevels);
+    return groupLevels;
+  }
+
+  /**
+   * Get a list of group ids for a given group level
+   *
+   * @param groupLevel a group level
+   * @return a list of group ids that belongs to the given group level, null if the group level does not exist
+   */
+  public List<String> getGroupIdsForGroupLevel(int groupLevel) {
+    Map<String, List<String>> groupToSegmentsMap = _levelToGroupToSegmentsMap.get(groupLevel);
+    if (groupToSegmentsMap != null) {
+      return new ArrayList<>(groupToSegmentsMap.keySet());
+    }
+    return null;
+  }
+
+  /**
+   * Helper function to compute group level given children groups
+   *
+   * @param childrenGroups a list of children group ids
+   * @return group level
+   */
+  private Integer getGroupLevel(List<String> childrenGroups) throws InvalidConfigException {
+    // If no children exists, the group belongs to the base level.
+    if (childrenGroups == null || childrenGroups.isEmpty()) {
+      return DEFAULT_GROUP_LEVEL;
+    }
+
+    for (Map.Entry<Integer, Map<String, List<String>>> entry : _levelToGroupToSegmentsMap.entrySet()) {
+      Integer currentLevel = entry.getKey();
+      Map<String, List<String>> currentLevelGroupToSegmentsMap = entry.getValue();
+      if (currentLevelGroupToSegmentsMap.keySet().containsAll(childrenGroups)) {
+        return currentLevel + 1;
+      }
+    }
+
+    // At this point, not all children groups are covered, cannot add group
+    throw new InvalidConfigException("Cannot compute group level because not all children groups exist "
+        + "in the segment merge lineage, children groups: " + childrenGroups);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (EqualityUtils.isSameReference(this, o)) {
+      return true;
+    }
+
+    if (EqualityUtils.isNullOrNotSameClass(this, o)) {
+      return false;
+    }
+
+    SegmentMergeLineage that = (SegmentMergeLineage) o;
+
+    return EqualityUtils.isEqual(_tableNameWithType, that._tableNameWithType) && EqualityUtils.isEqual(
+        _parentGroupToChildrenGroupsMap, that._parentGroupToChildrenGroupsMap) && EqualityUtils.isEqual(
+        _levelToGroupToSegmentsMap, that._levelToGroupToSegmentsMap);
+  }
+
+  @Override
+  public int hashCode() {
+    int result = EqualityUtils.hashCodeOf(_tableNameWithType);
+    result = EqualityUtils.hashCodeOf(result, _parentGroupToChildrenGroupsMap);
+    result = EqualityUtils.hashCodeOf(result, _levelToGroupToSegmentsMap);
+    return result;
+  }
+}
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentMergeLineageAccessHelper.java b/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentMergeLineageAccessHelper.java
new file mode 100644
index 0000000..3f76c7c
--- /dev/null
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentMergeLineageAccessHelper.java
@@ -0,0 +1,82 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.common.lineage;
+
+import com.linkedin.pinot.common.metadata.ZKMetadataProvider;
+import com.linkedin.pinot.common.utils.retry.RetryPolicies;
+import com.linkedin.pinot.common.utils.retry.RetryPolicy;
+import java.util.List;
+import org.apache.helix.AccessOption;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.zookeeper.data.Stat;
+
+
+/**
+ * Class to help to read, write segment merge lineage
+ */
+public class SegmentMergeLineageAccessHelper {
+
+  /**
+   * Read the segment merge lineage from the property store. Whenever we need the version
+   *
+   * @param propertyStore a property store
+   * @param tableNameWithType a table name with type
+   * @return a ZNRecord of segment merge lineage
+   */
+  public static ZNRecord getSegmentMergeLineageZNRecord(ZkHelixPropertyStore<ZNRecord> propertyStore,
+      String tableNameWithType) {
+    String path = ZKMetadataProvider.constructPropertyStorePathForSegmentMergeLineage(tableNameWithType);
+    Stat stat = new Stat();
+    ZNRecord segmentMergeLineageZNRecord = propertyStore.get(path, stat, AccessOption.PERSISTENT);
+    if (segmentMergeLineageZNRecord != null) {
+      segmentMergeLineageZNRecord.setVersion(stat.getVersion());
+    }
+    return segmentMergeLineageZNRecord;
+  }
+
+  /**
+   * Read the segment merge lineage from the property store
+   *
+   * @param propertyStore  a property store
+   * @param tableNameWithType a table name with type
+   * @return a segment merge lineage
+   */
+  public static SegmentMergeLineage getSegmentMergeLineage(ZkHelixPropertyStore<ZNRecord> propertyStore,
+      String tableNameWithType) {
+    String path = ZKMetadataProvider.constructPropertyStorePathForSegmentMergeLineage(tableNameWithType);
+    ZNRecord segmentMergeLineageZNRecord = propertyStore.get(path, null, AccessOption.PERSISTENT);
+    return SegmentMergeLineage.fromZNRecord(segmentMergeLineageZNRecord);
+  }
+
+  /**
+   * Write the segment merge lineage to the property store
+   *
+   * @param propertyStore a property store
+   * @param segmentMergeLineage a segment merge lineage
+   * @return true if update is successful. false otherwise.
+   */
+  public static boolean writeSegmentMergeLineage(ZkHelixPropertyStore<ZNRecord> propertyStore,
+      SegmentMergeLineage segmentMergeLineage, int expectedVersion) {
+    String tableNameWithType = segmentMergeLineage.getTableName();
+    String path = ZKMetadataProvider.constructPropertyStorePathForSegmentMergeLineage(tableNameWithType);
+    return propertyStore.set(path, segmentMergeLineage.toZNRecord(), expectedVersion, AccessOption.PERSISTENT);
+  }
+}
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/metadata/ZKMetadataProvider.java b/pinot-common/src/main/java/com/linkedin/pinot/common/metadata/ZKMetadataProvider.java
index fe55a0c..3bc2d56 100644
--- a/pinot-common/src/main/java/com/linkedin/pinot/common/metadata/ZKMetadataProvider.java
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/metadata/ZKMetadataProvider.java
@@ -53,6 +53,7 @@ public class ZKMetadataProvider {
   private static final String PROPERTYSTORE_TABLE_CONFIGS_PREFIX = "/CONFIGS/TABLE";
   private static final String PROPERTYSTORE_INSTANCE_CONFIGS_PREFIX = "/CONFIGS/INSTANCE";
   private static final String PROPERTYSTORE_CLUSTER_CONFIGS_PREFIX = "/CONFIGS/CLUSTER";
+  private static final String PROPERTYSTORE_SEGMENT_MERGE_LINEAGE = "/SEGMENT_MERGE_LINEAGE";
 
   public static void setRealtimeTableConfig(ZkHelixPropertyStore<ZNRecord> propertyStore, String realtimeTableName,
       ZNRecord znRecord) {
@@ -106,6 +107,10 @@ public class ZKMetadataProvider {
     return StringUtil.join("/", PROPERTYSTORE_CLUSTER_CONFIGS_PREFIX, controllerConfigKey);
   }
 
+  public static String constructPropertyStorePathForSegmentMergeLineage(String tableNameWithType) {
+    return StringUtil.join("/", PROPERTYSTORE_SEGMENT_MERGE_LINEAGE, tableNameWithType);
+  }
+
   public static boolean isSegmentExisted(ZkHelixPropertyStore<ZNRecord> propertyStore, String resourceNameForResource,
       String segmentName) {
     return propertyStore.exists(constructPropertyStorePathForSegment(resourceNameForResource, segmentName),
@@ -251,7 +256,7 @@ public class ZKMetadataProvider {
    * Get the schema associated with the given table name.
    *
    * @param propertyStore Helix property store
-   * @param tableName Table name with or without type suffix.
+   * @param tableName Table name with or without type suffix.FtaskTypeConfigsMap
    * @return Schema associated with the given table name.
    */
   @Nullable
diff --git a/pinot-common/src/test/java/com/linkedin/pinot/common/lineage/SegmentMergeLineageTest.java b/pinot-common/src/test/java/com/linkedin/pinot/common/lineage/SegmentMergeLineageTest.java
new file mode 100644
index 0000000..923fbfc
--- /dev/null
+++ b/pinot-common/src/test/java/com/linkedin/pinot/common/lineage/SegmentMergeLineageTest.java
@@ -0,0 +1,106 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.common.lineage;
+
+import com.linkedin.pinot.common.exception.InvalidConfigException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class SegmentMergeLineageTest {
+
+  @Test
+  public void testSegmentMergeLineage() throws Exception {
+    SegmentMergeLineage segmentMergeLineage = new SegmentMergeLineage("test_OFFLINE");
+    String groupId1 = "G1";
+    List<String> groupSegments1 = Arrays.asList(new String[]{"segment1", "segment2", "segment3"});
+    segmentMergeLineage.addSegmentGroup(groupId1, groupSegments1, null);
+    Assert.assertEquals(segmentMergeLineage.getSegmentsForGroup(groupId1), groupSegments1);
+
+    String groupId2 = "G2";
+    List<String> groupSegments2 = Arrays.asList(new String[]{"segment4", "segment5"});
+    segmentMergeLineage.addSegmentGroup(groupId2, groupSegments2, null);
+    Assert.assertEquals(segmentMergeLineage.getSegmentsForGroup(groupId2), groupSegments2);
+
+    String groupId3 = "G3";
+    List<String> groupSegments3 = Arrays.asList(new String[]{"segment6"});
+    segmentMergeLineage.addSegmentGroup(groupId3, groupSegments3, Arrays.asList(new String[]{groupId1, groupId2}));
+    Assert.assertEquals(segmentMergeLineage.getSegmentsForGroup(groupId3), groupSegments3);
+
+    // Check available APIs
+    Assert.assertEquals(segmentMergeLineage.getTableName(), "test_OFFLINE");
+    Assert.assertEquals(segmentMergeLineage.getChildrenForGroup(groupId3),
+        Arrays.asList(new String[]{groupId1, groupId2}));
+    Assert.assertEquals(segmentMergeLineage.getAllGroupLevels(), Arrays.asList(new Integer[]{0, 1}));
+    Assert.assertTrue(segmentMergeLineage.equals(SegmentMergeLineage.fromZNRecord(segmentMergeLineage.toZNRecord())));
+    Assert.assertEquals(segmentMergeLineage.getGroupIdsForGroupLevel(0),
+        Arrays.asList(new String[]{groupId1, groupId2}));
+    Assert.assertEquals(segmentMergeLineage.getGroupIdsForGroupLevel(1),
+        Arrays.asList(new String[]{groupId3}));
+    validateSegmentGroup(segmentMergeLineage);
+
+    // Check ZNRecord conversion
+    Assert.assertEquals(segmentMergeLineage, SegmentMergeLineage.fromZNRecord(segmentMergeLineage.toZNRecord()));
+
+    // Test removing groups
+    segmentMergeLineage.removeSegmentGroup(groupId1);
+    Assert.assertNull(segmentMergeLineage.getChildrenForGroup(groupId1));
+    Assert.assertNull(segmentMergeLineage.getSegmentsForGroup(groupId1));
+    Assert.assertFalse(segmentMergeLineage.getGroupIdsForGroupLevel(0).contains(groupId1));
+  }
+
+  @Test(expectedExceptions = InvalidConfigException.class)
+  public void testUpdateWithDuplicateGroupId() throws Exception {
+    SegmentMergeLineage segmentMergeLineage = new SegmentMergeLineage("test_OFFLINE");
+    String groupId1 = "G1";
+    List<String> groupSegments1 = Arrays.asList(new String[]{"segment1, segment2, segment3"});
+    segmentMergeLineage.addSegmentGroup(groupId1, groupSegments1, null);
+    Assert.assertEquals(segmentMergeLineage.getSegmentsForGroup(groupId1), groupSegments1);
+
+    List<String> groupSegments2 = Arrays.asList(new String[]{"segment4, segment5, segment6"});
+    segmentMergeLineage.addSegmentGroup(groupId1, groupSegments2, null);
+  }
+
+  private void validateSegmentGroup(SegmentMergeLineage segmentMergeLineage) {
+    SegmentGroup segmentGroup = segmentMergeLineage.getMergeLineageRootSegmentGroup();
+    for (SegmentGroup child : segmentGroup.getChildrenGroups()) {
+      validateSegmentGroupNode(child, segmentMergeLineage);
+    }
+  }
+
+  private void validateSegmentGroupNode(SegmentGroup segmentGroup, SegmentMergeLineage segmentMergeLineage) {
+    String groupId = segmentGroup.getGroupId();
+    Assert.assertEquals(segmentGroup.getSegments(), new HashSet<>(segmentMergeLineage.getSegmentsForGroup(groupId)));
+    Assert.assertTrue(segmentMergeLineage.getGroupIdsForGroupLevel(segmentGroup.getGroupLevel()).contains(groupId));
+
+    List<SegmentGroup> childrenGroups = segmentGroup.getChildrenGroups();
+    if (childrenGroups != null) {
+      List<String> childrenGroupIds = new ArrayList<>();
+      for (SegmentGroup child : childrenGroups) {
+        childrenGroupIds.add(child.getGroupId());
+      }
+      Assert.assertEquals(childrenGroupIds, segmentMergeLineage.getChildrenForGroup(groupId));
+
+      for (SegmentGroup child : segmentGroup.getChildrenGroups()) {
+        validateSegmentGroupNode(child, segmentMergeLineage);
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org