You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2018/11/26 19:39:47 UTC

[incubator-pinot] 01/01: Segment merge lineage data structure 1. Added segment merge lineage that is a wrapper class of ZNRecord 2. Added segment merge group that will be used by broker during segment selection process 3. Added a unit test

This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch segment-merge-lineage
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 156cd4c326230a4497a12ba7d086bc6f2f402f1c
Author: Seunghyun Lee <sn...@linkedin.com>
AuthorDate: Sun Nov 25 22:10:45 2018 -0800

    Segment merge lineage data structure
    1. Added segment merge lineage that is a wrapper class of ZNRecord
    2. Added segment merge group that will be used by broker during segment
       selection process
    3. Added a unit test
---
 .../pinot/common/lineage/SegmentGroup.java         |  72 +++++
 .../pinot/common/lineage/SegmentMergeLineage.java  | 313 +++++++++++++++++++++
 .../lineage/SegmentMergeLineageAccessHelper.java   |  82 ++++++
 .../pinot/common/metadata/ZKMetadataProvider.java  |   7 +-
 .../common/lineage/SegmentMergeLineageTest.java    | 106 +++++++
 5 files changed, 579 insertions(+), 1 deletion(-)

diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentGroup.java b/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentGroup.java
new file mode 100644
index 0000000..fb28ac1
--- /dev/null
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentGroup.java
@@ -0,0 +1,72 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.common.lineage;
+
+import java.util.List;
+import java.util.Set;
+
+
+/**
+ * Class to represent segment group
+ */
+public class SegmentGroup {
+
+  private String _groupId;
+  private int _groupLevel;
+  private SegmentGroup _parentGroup;
+  private List<SegmentGroup> _childrenGroups;
+  private Set<String> _segments;
+
+  public String getGroupId() {
+    return _groupId;
+  }
+
+  public void setGroupId(String groupId) {
+    _groupId = groupId;
+  }
+
+  public SegmentGroup getParentGroup() {
+    return _parentGroup;
+  }
+
+  public void setParentGroup(SegmentGroup parentGroup) {
+    _parentGroup = parentGroup;
+  }
+
+  public List<SegmentGroup> getChildrenGroups() {
+    return _childrenGroups;
+  }
+
+  public void setChildrenGroups(List<SegmentGroup> childrenGroups) {
+    _childrenGroups = childrenGroups;
+  }
+
+  public Set<String> getSegments() {
+    return _segments;
+  }
+
+  public void setSegments(Set<String> segments) {
+    _segments = segments;
+  }
+
+  public int getGroupLevel() {
+    return _groupLevel;
+  }
+
+  public void setGroupLevel(int groupLevel) {
+    _groupLevel = groupLevel;
+  }
+}
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentMergeLineage.java b/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentMergeLineage.java
new file mode 100644
index 0000000..0ec0be1
--- /dev/null
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentMergeLineage.java
@@ -0,0 +1,313 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.common.lineage;
+
+import com.linkedin.pinot.common.exception.InvalidConfigException;
+import com.linkedin.pinot.common.utils.EqualityUtils;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.ZNRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Class to represent segment merge lineage information
+ */
+public class SegmentMergeLineage {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentMergeLineage.class);
+
+  private static final String LEVEL_KEY_PREFIX = "level_";
+  private static final String ROOT_NODE_GROUP_ID = "root";
+  private static final String SEGMENT_DELIMITER = ",";
+  private static final int DEFAULT_GROUP_LEVEL = 0;
+
+  private String _tableNameWithType;
+  private Map<String, List<String>> _parentGroupToChildrenGroupsMap;
+  private Map<Integer, Map<String, List<String>>> _levelToGroupToSegmentsMap;
+
+  public SegmentMergeLineage(String tableNameWithType) {
+    _tableNameWithType = tableNameWithType;
+    _parentGroupToChildrenGroupsMap = new HashMap<>();
+    _levelToGroupToSegmentsMap = new HashMap<>();
+  }
+
+  public SegmentMergeLineage(String tableNameWithType, Map<String, List<String>> segmentGroupLineageMap,
+      Map<Integer, Map<String, List<String>>> levelToGroupToSegmentMap, int version) {
+    _tableNameWithType = tableNameWithType;
+    _parentGroupToChildrenGroupsMap = segmentGroupLineageMap;
+    _levelToGroupToSegmentsMap = levelToGroupToSegmentMap;
+  }
+
+  public String getTableName() {
+    return _tableNameWithType;
+  }
+
+  public static SegmentMergeLineage fromZNRecord(ZNRecord record) {
+    String tableNameWithType = record.getId();
+    int version = record.getVersion();
+    Map<String, List<String>> segmentGroupLineageMap = record.getListFields();
+
+    Map<Integer, Map<String, List<String>>> groupToSegmentsMap = new HashMap<>();
+    for (Map.Entry<String, Map<String, String>> entry : record.getMapFields().entrySet()) {
+      String levelKey = entry.getKey();
+      Integer level = Integer.parseInt(levelKey.substring(LEVEL_KEY_PREFIX.length()));
+      Map<String, List<String>> groupToSegmentsForLevel = new HashMap<>();
+      for (Map.Entry<String, String> groupEntry : entry.getValue().entrySet()) {
+        String groupId = groupEntry.getKey();
+        String segmentsString = groupEntry.getValue();
+        List<String> segments = Arrays.asList(segmentsString.split(SEGMENT_DELIMITER));
+        groupToSegmentsForLevel.put(groupId, new ArrayList<>(segments));
+      }
+      groupToSegmentsMap.put(level, groupToSegmentsForLevel);
+    }
+    return new SegmentMergeLineage(tableNameWithType, segmentGroupLineageMap, groupToSegmentsMap, version);
+  }
+
+  public ZNRecord toZNRecord() {
+    ZNRecord record = new ZNRecord(_tableNameWithType);
+    record.setListFields(_parentGroupToChildrenGroupsMap);
+    Map<String, Map<String, String>> groupToSegmentsMap = new HashMap<>();
+
+    for (Map.Entry<Integer, Map<String, List<String>>> entry : _levelToGroupToSegmentsMap.entrySet()) {
+      String key = LEVEL_KEY_PREFIX + entry.getKey();
+      Map<String, String> groupSegmentsForLevel = new HashMap<>();
+      for (Map.Entry<String, List<String>> groupEntry : entry.getValue().entrySet()) {
+        String groupId = groupEntry.getKey();
+        String segments = String.join(SEGMENT_DELIMITER, groupEntry.getValue());
+        groupSegmentsForLevel.put(groupId, segments);
+      }
+      groupToSegmentsMap.put(key, groupSegmentsForLevel);
+    }
+    record.setMapFields(groupToSegmentsMap);
+
+    return record;
+  }
+
+  /**
+   * Add segment merge lineage information
+   *
+   * @param groupId a group id
+   * @param currentGroupSegments a list of segments that belongs to the group
+   * @param childrenGroups a list of children groups that the current group covers
+   */
+  public void addSegmentGroup(String groupId, List<String> currentGroupSegments, List<String> childrenGroups)
+      throws InvalidConfigException {
+    // Get group level
+    Integer groupLevel = getGroupLevel(childrenGroups);
+
+    // Update group to segments map
+    Map<String, List<String>> groupToSegmentMap =
+        _levelToGroupToSegmentsMap.computeIfAbsent(groupLevel, k -> new HashMap<>());
+    if (groupToSegmentMap.containsKey(groupId)) {
+      throw new InvalidConfigException("Group id : " + groupId + " already exists.");
+    }
+    groupToSegmentMap.put(groupId, new ArrayList<>(currentGroupSegments));
+    _levelToGroupToSegmentsMap.put(groupLevel, groupToSegmentMap);
+
+    // Update segment group lineage map
+    if (groupLevel > DEFAULT_GROUP_LEVEL) {
+      if (_parentGroupToChildrenGroupsMap.containsKey(groupId)) {
+        throw new InvalidConfigException("Group id : " + groupId + " already exists.");
+      } else {
+        _parentGroupToChildrenGroupsMap.put(groupId, new ArrayList<>(childrenGroups));
+      }
+    }
+
+    LOGGER.info("New group has been added successfully to the segment lineage. (groupId: {}, currentGroupSegments: {}, "
+        + "childrenGroups: {}", groupId, currentGroupSegments, childrenGroups);
+  }
+
+  /**
+   * Remove segment merge information given a group id
+   *
+   * @param groupId a group id
+   */
+  public void removeSegmentGroup(String groupId) {
+    // Clean up the group id from parent to children group mapping
+    _parentGroupToChildrenGroupsMap.remove(groupId);
+    for (List<String> childrenGroups : _parentGroupToChildrenGroupsMap.values()) {
+      childrenGroups.remove(groupId);
+    }
+
+    // Clean up the group id from group to segments mapping
+    for (Map<String, List<String>> groupToSegments : _levelToGroupToSegmentsMap.values()) {
+      groupToSegments.remove(groupId);
+    }
+
+    LOGGER.info("Group {} has been successfully removed.", groupId);
+  }
+
+  /**
+   * Construct a lineage tree and returns the root node
+   *
+   * @return a root node for lineage tree
+   */
+  public SegmentGroup getMergeLineageRootSegmentGroup() {
+    // Create group nodes
+    Map<String, SegmentGroup> groupNodes = new HashMap<>();
+    for (Map.Entry<Integer, Map<String, List<String>>> groupEntryForLevel : _levelToGroupToSegmentsMap.entrySet()) {
+      Integer level = groupEntryForLevel.getKey();
+      Map<String, List<String>> groupToSegmentsForLevel = groupEntryForLevel.getValue();
+      for (Map.Entry<String, List<String>> entry : groupToSegmentsForLevel.entrySet()) {
+        String groupId = entry.getKey();
+        List<String> segments = entry.getValue();
+        SegmentGroup groupNode = new SegmentGroup();
+        groupNode.setGroupId(groupId);
+        groupNode.setSegments(new HashSet<>(segments));
+        groupNode.setGroupLevel(level);
+        groupNodes.put(groupId, groupNode);
+      }
+    }
+
+    // Add edges by updating children & parent group information
+    for (Map.Entry<String, List<String>> lineageEntry : _parentGroupToChildrenGroupsMap.entrySet()) {
+      String parentGroupId = lineageEntry.getKey();
+      List<String> childrenGroupIds = lineageEntry.getValue();
+      List<SegmentGroup> childrenGroups = new ArrayList<>();
+      SegmentGroup parentNode = groupNodes.get(parentGroupId);
+      for (String groupId : childrenGroupIds) {
+        SegmentGroup childNode = groupNodes.get(groupId);
+        if (childNode != null) {
+          childrenGroups.add(childNode);
+          childNode.setParentGroup(parentNode);
+        }
+      }
+      parentNode.setChildrenGroups(childrenGroups);
+    }
+
+    // Create a root node
+    SegmentGroup root = new SegmentGroup();
+    root.setGroupId(ROOT_NODE_GROUP_ID);
+    List<SegmentGroup> childrenForRoot = new ArrayList<>();
+    for (SegmentGroup group : groupNodes.values()) {
+      if (group.getParentGroup() == null) {
+        group.setParentGroup(root);
+        childrenForRoot.add(group);
+      }
+    }
+    root.setChildrenGroups(childrenForRoot);
+
+    return root;
+  }
+
+  /**
+   * Get a list of segments for a given group id
+   *
+   * @param groupId a group id
+   * @return a list of segments that belongs to the given group id, null if the group does not exist
+   */
+  public List<String> getSegmentsForGroup(String groupId) {
+    for (Map<String, List<String>> groupToSegmentMap : _levelToGroupToSegmentsMap.values()) {
+      List<String> segments = groupToSegmentMap.get(groupId);
+      if (segments != null) {
+        return segments;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Get a list of children group ids for a given group id
+   *
+   * @param groupId a group id
+   * @return a list of children groups that are covered by the given group id, null if the group does not exist
+   */
+  public List<String> getChildrenForGroup(String groupId) {
+    return _parentGroupToChildrenGroupsMap.get(groupId);
+  }
+
+  /**
+   * Get a list of all group levels
+   *
+   * @return a list of all group levels
+   */
+  public List<Integer> getAllGroupLevels() {
+    List<Integer> groupLevels = new ArrayList<>(_levelToGroupToSegmentsMap.keySet());
+    Collections.sort(groupLevels);
+    return groupLevels;
+  }
+
+  /**
+   * Get a list of group ids for a given group level
+   *
+   * @param groupLevel a group level
+   * @return a list of group ids that belongs to the given group level, null if the group level does not exist
+   */
+  public List<String> getGroupIdsForGroupLevel(int groupLevel) {
+    Map<String, List<String>> groupToSegmentsMap = _levelToGroupToSegmentsMap.get(groupLevel);
+    if (groupToSegmentsMap != null) {
+      return new ArrayList<>(groupToSegmentsMap.keySet());
+    }
+    return null;
+  }
+
+  /**
+   * Helper function to compute group level given children groups
+   *
+   * @param childrenGroups a list of children group ids
+   * @return group level
+   */
+  private Integer getGroupLevel(List<String> childrenGroups) throws InvalidConfigException {
+    // If no children exists, the group belongs to the base level.
+    if (childrenGroups == null || childrenGroups.isEmpty()) {
+      return DEFAULT_GROUP_LEVEL;
+    }
+
+    for (Map.Entry<Integer, Map<String, List<String>>> entry : _levelToGroupToSegmentsMap.entrySet()) {
+      Integer currentLevel = entry.getKey();
+      Map<String, List<String>> currentLevelGroupToSegmentsMap = entry.getValue();
+      if (currentLevelGroupToSegmentsMap.keySet().containsAll(childrenGroups)) {
+        return currentLevel + 1;
+      }
+    }
+
+    // At this point, not all children groups are covered, cannot add group
+    throw new InvalidConfigException("Cannot compute group level because not all children groups exist "
+        + "in the segment merge lineage, children groups: " + childrenGroups);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (EqualityUtils.isSameReference(this, o)) {
+      return true;
+    }
+
+    if (EqualityUtils.isNullOrNotSameClass(this, o)) {
+      return false;
+    }
+
+    SegmentMergeLineage that = (SegmentMergeLineage) o;
+
+    return EqualityUtils.isEqual(_tableNameWithType, that._tableNameWithType) && EqualityUtils.isEqual(
+        _parentGroupToChildrenGroupsMap, that._parentGroupToChildrenGroupsMap) && EqualityUtils.isEqual(
+        _levelToGroupToSegmentsMap, that._levelToGroupToSegmentsMap);
+  }
+
+  @Override
+  public int hashCode() {
+    int result = EqualityUtils.hashCodeOf(_tableNameWithType);
+    result = EqualityUtils.hashCodeOf(result, _parentGroupToChildrenGroupsMap);
+    result = EqualityUtils.hashCodeOf(result, _levelToGroupToSegmentsMap);
+    return result;
+  }
+}
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentMergeLineageAccessHelper.java b/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentMergeLineageAccessHelper.java
new file mode 100644
index 0000000..3f76c7c
--- /dev/null
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentMergeLineageAccessHelper.java
@@ -0,0 +1,82 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.common.lineage;
+
+import com.linkedin.pinot.common.metadata.ZKMetadataProvider;
+import com.linkedin.pinot.common.utils.retry.RetryPolicies;
+import com.linkedin.pinot.common.utils.retry.RetryPolicy;
+import java.util.List;
+import org.apache.helix.AccessOption;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.zookeeper.data.Stat;
+
+
+/**
+ * Class to help to read, write segment merge lineage
+ */
+public class SegmentMergeLineageAccessHelper {
+
+  /**
+   * Read the segment merge lineage from the property store. Whenever we need the version
+   *
+   * @param propertyStore a property store
+   * @param tableNameWithType a table name with type
+   * @return a ZNRecord of segment merge lineage
+   */
+  public static ZNRecord getSegmentMergeLineageZNRecord(ZkHelixPropertyStore<ZNRecord> propertyStore,
+      String tableNameWithType) {
+    String path = ZKMetadataProvider.constructPropertyStorePathForSegmentMergeLineage(tableNameWithType);
+    Stat stat = new Stat();
+    ZNRecord segmentMergeLineageZNRecord = propertyStore.get(path, stat, AccessOption.PERSISTENT);
+    if (segmentMergeLineageZNRecord != null) {
+      segmentMergeLineageZNRecord.setVersion(stat.getVersion());
+    }
+    return segmentMergeLineageZNRecord;
+  }
+
+  /**
+   * Read the segment merge lineage from the property store
+   *
+   * @param propertyStore  a property store
+   * @param tableNameWithType a table name with type
+   * @return a segment merge lineage
+   */
+  public static SegmentMergeLineage getSegmentMergeLineage(ZkHelixPropertyStore<ZNRecord> propertyStore,
+      String tableNameWithType) {
+    String path = ZKMetadataProvider.constructPropertyStorePathForSegmentMergeLineage(tableNameWithType);
+    ZNRecord segmentMergeLineageZNRecord = propertyStore.get(path, null, AccessOption.PERSISTENT);
+    return SegmentMergeLineage.fromZNRecord(segmentMergeLineageZNRecord);
+  }
+
+  /**
+   * Write the segment merge lineage to the property store
+   *
+   * @param propertyStore a property store
+   * @param segmentMergeLineage a segment merge lineage
+   * @return true if update is successful. false otherwise.
+   */
+  public static boolean writeSegmentMergeLineage(ZkHelixPropertyStore<ZNRecord> propertyStore,
+      SegmentMergeLineage segmentMergeLineage, int expectedVersion) {
+    String tableNameWithType = segmentMergeLineage.getTableName();
+    String path = ZKMetadataProvider.constructPropertyStorePathForSegmentMergeLineage(tableNameWithType);
+    return propertyStore.set(path, segmentMergeLineage.toZNRecord(), expectedVersion, AccessOption.PERSISTENT);
+  }
+}
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/metadata/ZKMetadataProvider.java b/pinot-common/src/main/java/com/linkedin/pinot/common/metadata/ZKMetadataProvider.java
index fe55a0c..3bc2d56 100644
--- a/pinot-common/src/main/java/com/linkedin/pinot/common/metadata/ZKMetadataProvider.java
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/metadata/ZKMetadataProvider.java
@@ -53,6 +53,7 @@ public class ZKMetadataProvider {
   private static final String PROPERTYSTORE_TABLE_CONFIGS_PREFIX = "/CONFIGS/TABLE";
   private static final String PROPERTYSTORE_INSTANCE_CONFIGS_PREFIX = "/CONFIGS/INSTANCE";
   private static final String PROPERTYSTORE_CLUSTER_CONFIGS_PREFIX = "/CONFIGS/CLUSTER";
+  private static final String PROPERTYSTORE_SEGMENT_MERGE_LINEAGE = "/SEGMENT_MERGE_LINEAGE";
 
   public static void setRealtimeTableConfig(ZkHelixPropertyStore<ZNRecord> propertyStore, String realtimeTableName,
       ZNRecord znRecord) {
@@ -106,6 +107,10 @@ public class ZKMetadataProvider {
     return StringUtil.join("/", PROPERTYSTORE_CLUSTER_CONFIGS_PREFIX, controllerConfigKey);
   }
 
+  public static String constructPropertyStorePathForSegmentMergeLineage(String tableNameWithType) {
+    return StringUtil.join("/", PROPERTYSTORE_SEGMENT_MERGE_LINEAGE, tableNameWithType);
+  }
+
   public static boolean isSegmentExisted(ZkHelixPropertyStore<ZNRecord> propertyStore, String resourceNameForResource,
       String segmentName) {
     return propertyStore.exists(constructPropertyStorePathForSegment(resourceNameForResource, segmentName),
@@ -251,7 +256,7 @@ public class ZKMetadataProvider {
    * Get the schema associated with the given table name.
    *
    * @param propertyStore Helix property store
-   * @param tableName Table name with or without type suffix.
+   * @param tableName Table name with or without type suffix.FtaskTypeConfigsMap
    * @return Schema associated with the given table name.
    */
   @Nullable
diff --git a/pinot-common/src/test/java/com/linkedin/pinot/common/lineage/SegmentMergeLineageTest.java b/pinot-common/src/test/java/com/linkedin/pinot/common/lineage/SegmentMergeLineageTest.java
new file mode 100644
index 0000000..923fbfc
--- /dev/null
+++ b/pinot-common/src/test/java/com/linkedin/pinot/common/lineage/SegmentMergeLineageTest.java
@@ -0,0 +1,106 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.common.lineage;
+
+import com.linkedin.pinot.common.exception.InvalidConfigException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class SegmentMergeLineageTest {
+
+  @Test
+  public void testSegmentMergeLineage() throws Exception {
+    SegmentMergeLineage segmentMergeLineage = new SegmentMergeLineage("test_OFFLINE");
+    String groupId1 = "G1";
+    List<String> groupSegments1 = Arrays.asList(new String[]{"segment1", "segment2", "segment3"});
+    segmentMergeLineage.addSegmentGroup(groupId1, groupSegments1, null);
+    Assert.assertEquals(segmentMergeLineage.getSegmentsForGroup(groupId1), groupSegments1);
+
+    String groupId2 = "G2";
+    List<String> groupSegments2 = Arrays.asList(new String[]{"segment4", "segment5"});
+    segmentMergeLineage.addSegmentGroup(groupId2, groupSegments2, null);
+    Assert.assertEquals(segmentMergeLineage.getSegmentsForGroup(groupId2), groupSegments2);
+
+    String groupId3 = "G3";
+    List<String> groupSegments3 = Arrays.asList(new String[]{"segment6"});
+    segmentMergeLineage.addSegmentGroup(groupId3, groupSegments3, Arrays.asList(new String[]{groupId1, groupId2}));
+    Assert.assertEquals(segmentMergeLineage.getSegmentsForGroup(groupId3), groupSegments3);
+
+    // Check available APIs
+    Assert.assertEquals(segmentMergeLineage.getTableName(), "test_OFFLINE");
+    Assert.assertEquals(segmentMergeLineage.getChildrenForGroup(groupId3),
+        Arrays.asList(new String[]{groupId1, groupId2}));
+    Assert.assertEquals(segmentMergeLineage.getAllGroupLevels(), Arrays.asList(new Integer[]{0, 1}));
+    Assert.assertTrue(segmentMergeLineage.equals(SegmentMergeLineage.fromZNRecord(segmentMergeLineage.toZNRecord())));
+    Assert.assertEquals(segmentMergeLineage.getGroupIdsForGroupLevel(0),
+        Arrays.asList(new String[]{groupId1, groupId2}));
+    Assert.assertEquals(segmentMergeLineage.getGroupIdsForGroupLevel(1),
+        Arrays.asList(new String[]{groupId3}));
+    validateSegmentGroup(segmentMergeLineage);
+
+    // Check ZNRecord conversion
+    Assert.assertEquals(segmentMergeLineage, SegmentMergeLineage.fromZNRecord(segmentMergeLineage.toZNRecord()));
+
+    // Test removing groups
+    segmentMergeLineage.removeSegmentGroup(groupId1);
+    Assert.assertNull(segmentMergeLineage.getChildrenForGroup(groupId1));
+    Assert.assertNull(segmentMergeLineage.getSegmentsForGroup(groupId1));
+    Assert.assertFalse(segmentMergeLineage.getGroupIdsForGroupLevel(0).contains(groupId1));
+  }
+
+  @Test(expectedExceptions = InvalidConfigException.class)
+  public void testUpdateWithDuplicateGroupId() throws Exception {
+    SegmentMergeLineage segmentMergeLineage = new SegmentMergeLineage("test_OFFLINE");
+    String groupId1 = "G1";
+    List<String> groupSegments1 = Arrays.asList(new String[]{"segment1, segment2, segment3"});
+    segmentMergeLineage.addSegmentGroup(groupId1, groupSegments1, null);
+    Assert.assertEquals(segmentMergeLineage.getSegmentsForGroup(groupId1), groupSegments1);
+
+    List<String> groupSegments2 = Arrays.asList(new String[]{"segment4, segment5, segment6"});
+    segmentMergeLineage.addSegmentGroup(groupId1, groupSegments2, null);
+  }
+
+  private void validateSegmentGroup(SegmentMergeLineage segmentMergeLineage) {
+    SegmentGroup segmentGroup = segmentMergeLineage.getMergeLineageRootSegmentGroup();
+    for (SegmentGroup child : segmentGroup.getChildrenGroups()) {
+      validateSegmentGroupNode(child, segmentMergeLineage);
+    }
+  }
+
+  private void validateSegmentGroupNode(SegmentGroup segmentGroup, SegmentMergeLineage segmentMergeLineage) {
+    String groupId = segmentGroup.getGroupId();
+    Assert.assertEquals(segmentGroup.getSegments(), new HashSet<>(segmentMergeLineage.getSegmentsForGroup(groupId)));
+    Assert.assertTrue(segmentMergeLineage.getGroupIdsForGroupLevel(segmentGroup.getGroupLevel()).contains(groupId));
+
+    List<SegmentGroup> childrenGroups = segmentGroup.getChildrenGroups();
+    if (childrenGroups != null) {
+      List<String> childrenGroupIds = new ArrayList<>();
+      for (SegmentGroup child : childrenGroups) {
+        childrenGroupIds.add(child.getGroupId());
+      }
+      Assert.assertEquals(childrenGroupIds, segmentMergeLineage.getChildrenForGroup(groupId));
+
+      for (SegmentGroup child : segmentGroup.getChildrenGroups()) {
+        validateSegmentGroupNode(child, segmentMergeLineage);
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org