You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2018/11/26 19:39:47 UTC
[incubator-pinot] 01/01: Segment merge lineage data structure 1.
Added segment merge lineage that is a wrapper class of ZNRecord 2. Added
segment merge group that will be used by broker during segment selection
process 3. Added a unit test
This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch segment-merge-lineage
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 156cd4c326230a4497a12ba7d086bc6f2f402f1c
Author: Seunghyun Lee <sn...@linkedin.com>
AuthorDate: Sun Nov 25 22:10:45 2018 -0800
Segment merge lineage data structure
1. Added segment merge lineage that is a wrapper class of ZNRecord
2. Added segment merge group that will be used by broker during segment
selection process
3. Added a unit test
---
.../pinot/common/lineage/SegmentGroup.java | 72 +++++
.../pinot/common/lineage/SegmentMergeLineage.java | 313 +++++++++++++++++++++
.../lineage/SegmentMergeLineageAccessHelper.java | 82 ++++++
.../pinot/common/metadata/ZKMetadataProvider.java | 7 +-
.../common/lineage/SegmentMergeLineageTest.java | 106 +++++++
5 files changed, 579 insertions(+), 1 deletion(-)
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentGroup.java b/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentGroup.java
new file mode 100644
index 0000000..fb28ac1
--- /dev/null
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentGroup.java
@@ -0,0 +1,72 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.common.lineage;
+
+import java.util.List;
+import java.util.Set;
+
+
+/**
+ * Class to represent segment group
+ */
+public class SegmentGroup {
+
+ private String _groupId;
+ private int _groupLevel;
+ private SegmentGroup _parentGroup;
+ private List<SegmentGroup> _childrenGroups;
+ private Set<String> _segments;
+
+ public String getGroupId() {
+ return _groupId;
+ }
+
+ public void setGroupId(String groupId) {
+ _groupId = groupId;
+ }
+
+ public SegmentGroup getParentGroup() {
+ return _parentGroup;
+ }
+
+ public void setParentGroup(SegmentGroup parentGroup) {
+ _parentGroup = parentGroup;
+ }
+
+ public List<SegmentGroup> getChildrenGroups() {
+ return _childrenGroups;
+ }
+
+ public void setChildrenGroups(List<SegmentGroup> childrenGroups) {
+ _childrenGroups = childrenGroups;
+ }
+
+ public Set<String> getSegments() {
+ return _segments;
+ }
+
+ public void setSegments(Set<String> segments) {
+ _segments = segments;
+ }
+
+ public int getGroupLevel() {
+ return _groupLevel;
+ }
+
+ public void setGroupLevel(int groupLevel) {
+ _groupLevel = groupLevel;
+ }
+}
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentMergeLineage.java b/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentMergeLineage.java
new file mode 100644
index 0000000..0ec0be1
--- /dev/null
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentMergeLineage.java
@@ -0,0 +1,313 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.common.lineage;
+
+import com.linkedin.pinot.common.exception.InvalidConfigException;
+import com.linkedin.pinot.common.utils.EqualityUtils;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.ZNRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Class to represent segment merge lineage information
+ */
+public class SegmentMergeLineage {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(SegmentMergeLineage.class);
+
+ private static final String LEVEL_KEY_PREFIX = "level_";
+ private static final String ROOT_NODE_GROUP_ID = "root";
+ private static final String SEGMENT_DELIMITER = ",";
+ private static final int DEFAULT_GROUP_LEVEL = 0;
+
+ private String _tableNameWithType;
+ private Map<String, List<String>> _parentGroupToChildrenGroupsMap;
+ private Map<Integer, Map<String, List<String>>> _levelToGroupToSegmentsMap;
+
+ public SegmentMergeLineage(String tableNameWithType) {
+ _tableNameWithType = tableNameWithType;
+ _parentGroupToChildrenGroupsMap = new HashMap<>();
+ _levelToGroupToSegmentsMap = new HashMap<>();
+ }
+
+ public SegmentMergeLineage(String tableNameWithType, Map<String, List<String>> segmentGroupLineageMap,
+ Map<Integer, Map<String, List<String>>> levelToGroupToSegmentMap, int version) {
+ _tableNameWithType = tableNameWithType;
+ _parentGroupToChildrenGroupsMap = segmentGroupLineageMap;
+ _levelToGroupToSegmentsMap = levelToGroupToSegmentMap;
+ }
+
+ public String getTableName() {
+ return _tableNameWithType;
+ }
+
+ public static SegmentMergeLineage fromZNRecord(ZNRecord record) {
+ String tableNameWithType = record.getId();
+ int version = record.getVersion();
+ Map<String, List<String>> segmentGroupLineageMap = record.getListFields();
+
+ Map<Integer, Map<String, List<String>>> groupToSegmentsMap = new HashMap<>();
+ for (Map.Entry<String, Map<String, String>> entry : record.getMapFields().entrySet()) {
+ String levelKey = entry.getKey();
+ Integer level = Integer.parseInt(levelKey.substring(LEVEL_KEY_PREFIX.length()));
+ Map<String, List<String>> groupToSegmentsForLevel = new HashMap<>();
+ for (Map.Entry<String, String> groupEntry : entry.getValue().entrySet()) {
+ String groupId = groupEntry.getKey();
+ String segmentsString = groupEntry.getValue();
+ List<String> segments = Arrays.asList(segmentsString.split(SEGMENT_DELIMITER));
+ groupToSegmentsForLevel.put(groupId, new ArrayList<>(segments));
+ }
+ groupToSegmentsMap.put(level, groupToSegmentsForLevel);
+ }
+ return new SegmentMergeLineage(tableNameWithType, segmentGroupLineageMap, groupToSegmentsMap, version);
+ }
+
+ public ZNRecord toZNRecord() {
+ ZNRecord record = new ZNRecord(_tableNameWithType);
+ record.setListFields(_parentGroupToChildrenGroupsMap);
+ Map<String, Map<String, String>> groupToSegmentsMap = new HashMap<>();
+
+ for (Map.Entry<Integer, Map<String, List<String>>> entry : _levelToGroupToSegmentsMap.entrySet()) {
+ String key = LEVEL_KEY_PREFIX + entry.getKey();
+ Map<String, String> groupSegmentsForLevel = new HashMap<>();
+ for (Map.Entry<String, List<String>> groupEntry : entry.getValue().entrySet()) {
+ String groupId = groupEntry.getKey();
+ String segments = String.join(SEGMENT_DELIMITER, groupEntry.getValue());
+ groupSegmentsForLevel.put(groupId, segments);
+ }
+ groupToSegmentsMap.put(key, groupSegmentsForLevel);
+ }
+ record.setMapFields(groupToSegmentsMap);
+
+ return record;
+ }
+
+ /**
+ * Add segment merge lineage information
+ *
+ * @param groupId a group id
+ * @param currentGroupSegments a list of segments that belongs to the group
+ * @param childrenGroups a list of children groups that the current group covers
+ */
+ public void addSegmentGroup(String groupId, List<String> currentGroupSegments, List<String> childrenGroups)
+ throws InvalidConfigException {
+ // Get group level
+ Integer groupLevel = getGroupLevel(childrenGroups);
+
+ // Update group to segments map
+ Map<String, List<String>> groupToSegmentMap =
+ _levelToGroupToSegmentsMap.computeIfAbsent(groupLevel, k -> new HashMap<>());
+ if (groupToSegmentMap.containsKey(groupId)) {
+ throw new InvalidConfigException("Group id : " + groupId + " already exists.");
+ }
+ groupToSegmentMap.put(groupId, new ArrayList<>(currentGroupSegments));
+ _levelToGroupToSegmentsMap.put(groupLevel, groupToSegmentMap);
+
+ // Update segment group lineage map
+ if (groupLevel > DEFAULT_GROUP_LEVEL) {
+ if (_parentGroupToChildrenGroupsMap.containsKey(groupId)) {
+ throw new InvalidConfigException("Group id : " + groupId + " already exists.");
+ } else {
+ _parentGroupToChildrenGroupsMap.put(groupId, new ArrayList<>(childrenGroups));
+ }
+ }
+
+ LOGGER.info("New group has been added successfully to the segment lineage. (groupId: {}, currentGroupSegments: {}, "
+ + "childrenGroups: {}", groupId, currentGroupSegments, childrenGroups);
+ }
+
+ /**
+ * Remove segment merge information given a group id
+ *
+ * @param groupId a group id
+ */
+ public void removeSegmentGroup(String groupId) {
+ // Clean up the group id from parent to children group mapping
+ _parentGroupToChildrenGroupsMap.remove(groupId);
+ for (List<String> childrenGroups : _parentGroupToChildrenGroupsMap.values()) {
+ childrenGroups.remove(groupId);
+ }
+
+ // Clean up the group id from group to segments mapping
+ for (Map<String, List<String>> groupToSegments : _levelToGroupToSegmentsMap.values()) {
+ groupToSegments.remove(groupId);
+ }
+
+ LOGGER.info("Group {} has been successfully removed.", groupId);
+ }
+
+ /**
+ * Construct a lineage tree and returns the root node
+ *
+ * @return a root node for lineage tree
+ */
+ public SegmentGroup getMergeLineageRootSegmentGroup() {
+ // Create group nodes
+ Map<String, SegmentGroup> groupNodes = new HashMap<>();
+ for (Map.Entry<Integer, Map<String, List<String>>> groupEntryForLevel : _levelToGroupToSegmentsMap.entrySet()) {
+ Integer level = groupEntryForLevel.getKey();
+ Map<String, List<String>> groupToSegmentsForLevel = groupEntryForLevel.getValue();
+ for (Map.Entry<String, List<String>> entry : groupToSegmentsForLevel.entrySet()) {
+ String groupId = entry.getKey();
+ List<String> segments = entry.getValue();
+ SegmentGroup groupNode = new SegmentGroup();
+ groupNode.setGroupId(groupId);
+ groupNode.setSegments(new HashSet<>(segments));
+ groupNode.setGroupLevel(level);
+ groupNodes.put(groupId, groupNode);
+ }
+ }
+
+ // Add edges by updating children & parent group information
+ for (Map.Entry<String, List<String>> lineageEntry : _parentGroupToChildrenGroupsMap.entrySet()) {
+ String parentGroupId = lineageEntry.getKey();
+ List<String> childrenGroupIds = lineageEntry.getValue();
+ List<SegmentGroup> childrenGroups = new ArrayList<>();
+ SegmentGroup parentNode = groupNodes.get(parentGroupId);
+ for (String groupId : childrenGroupIds) {
+ SegmentGroup childNode = groupNodes.get(groupId);
+ if (childNode != null) {
+ childrenGroups.add(childNode);
+ childNode.setParentGroup(parentNode);
+ }
+ }
+ parentNode.setChildrenGroups(childrenGroups);
+ }
+
+ // Create a root node
+ SegmentGroup root = new SegmentGroup();
+ root.setGroupId(ROOT_NODE_GROUP_ID);
+ List<SegmentGroup> childrenForRoot = new ArrayList<>();
+ for (SegmentGroup group : groupNodes.values()) {
+ if (group.getParentGroup() == null) {
+ group.setParentGroup(root);
+ childrenForRoot.add(group);
+ }
+ }
+ root.setChildrenGroups(childrenForRoot);
+
+ return root;
+ }
+
+ /**
+ * Get a list of segments for a given group id
+ *
+ * @param groupId a group id
+ * @return a list of segments that belongs to the given group id, null if the group does not exist
+ */
+ public List<String> getSegmentsForGroup(String groupId) {
+ for (Map<String, List<String>> groupToSegmentMap : _levelToGroupToSegmentsMap.values()) {
+ List<String> segments = groupToSegmentMap.get(groupId);
+ if (segments != null) {
+ return segments;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Get a list of children group ids for a given group id
+ *
+ * @param groupId a group id
+ * @return a list of children groups that are covered by the given group id, null if the group does not exist
+ */
+ public List<String> getChildrenForGroup(String groupId) {
+ return _parentGroupToChildrenGroupsMap.get(groupId);
+ }
+
+ /**
+ * Get a list of all group levels
+ *
+ * @return a list of all group levels
+ */
+ public List<Integer> getAllGroupLevels() {
+ List<Integer> groupLevels = new ArrayList<>(_levelToGroupToSegmentsMap.keySet());
+ Collections.sort(groupLevels);
+ return groupLevels;
+ }
+
+ /**
+ * Get a list of group ids for a given group level
+ *
+ * @param groupLevel a group level
+ * @return a list of group ids that belongs to the given group level, null if the group level does not exist
+ */
+ public List<String> getGroupIdsForGroupLevel(int groupLevel) {
+ Map<String, List<String>> groupToSegmentsMap = _levelToGroupToSegmentsMap.get(groupLevel);
+ if (groupToSegmentsMap != null) {
+ return new ArrayList<>(groupToSegmentsMap.keySet());
+ }
+ return null;
+ }
+
+ /**
+ * Helper function to compute group level given children groups
+ *
+ * @param childrenGroups a list of children group ids
+ * @return group level
+ */
+ private Integer getGroupLevel(List<String> childrenGroups) throws InvalidConfigException {
+ // If no children exists, the group belongs to the base level.
+ if (childrenGroups == null || childrenGroups.isEmpty()) {
+ return DEFAULT_GROUP_LEVEL;
+ }
+
+ for (Map.Entry<Integer, Map<String, List<String>>> entry : _levelToGroupToSegmentsMap.entrySet()) {
+ Integer currentLevel = entry.getKey();
+ Map<String, List<String>> currentLevelGroupToSegmentsMap = entry.getValue();
+ if (currentLevelGroupToSegmentsMap.keySet().containsAll(childrenGroups)) {
+ return currentLevel + 1;
+ }
+ }
+
+ // At this point, not all children groups are covered, cannot add group
+ throw new InvalidConfigException("Cannot compute group level because not all children groups exist "
+ + "in the segment merge lineage, children groups: " + childrenGroups);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (EqualityUtils.isSameReference(this, o)) {
+ return true;
+ }
+
+ if (EqualityUtils.isNullOrNotSameClass(this, o)) {
+ return false;
+ }
+
+ SegmentMergeLineage that = (SegmentMergeLineage) o;
+
+ return EqualityUtils.isEqual(_tableNameWithType, that._tableNameWithType) && EqualityUtils.isEqual(
+ _parentGroupToChildrenGroupsMap, that._parentGroupToChildrenGroupsMap) && EqualityUtils.isEqual(
+ _levelToGroupToSegmentsMap, that._levelToGroupToSegmentsMap);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = EqualityUtils.hashCodeOf(_tableNameWithType);
+ result = EqualityUtils.hashCodeOf(result, _parentGroupToChildrenGroupsMap);
+ result = EqualityUtils.hashCodeOf(result, _levelToGroupToSegmentsMap);
+ return result;
+ }
+}
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentMergeLineageAccessHelper.java b/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentMergeLineageAccessHelper.java
new file mode 100644
index 0000000..3f76c7c
--- /dev/null
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentMergeLineageAccessHelper.java
@@ -0,0 +1,82 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.common.lineage;
+
+import com.linkedin.pinot.common.metadata.ZKMetadataProvider;
+import com.linkedin.pinot.common.utils.retry.RetryPolicies;
+import com.linkedin.pinot.common.utils.retry.RetryPolicy;
+import java.util.List;
+import org.apache.helix.AccessOption;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.zookeeper.data.Stat;
+
+
+/**
+ * Class to help to read, write segment merge lineage
+ */
+public class SegmentMergeLineageAccessHelper {
+
+ /**
+ * Read the segment merge lineage from the property store. Whenever we need the version
+ *
+ * @param propertyStore a property store
+ * @param tableNameWithType a table name with type
+ * @return a ZNRecord of segment merge lineage
+ */
+ public static ZNRecord getSegmentMergeLineageZNRecord(ZkHelixPropertyStore<ZNRecord> propertyStore,
+ String tableNameWithType) {
+ String path = ZKMetadataProvider.constructPropertyStorePathForSegmentMergeLineage(tableNameWithType);
+ Stat stat = new Stat();
+ ZNRecord segmentMergeLineageZNRecord = propertyStore.get(path, stat, AccessOption.PERSISTENT);
+ if (segmentMergeLineageZNRecord != null) {
+ segmentMergeLineageZNRecord.setVersion(stat.getVersion());
+ }
+ return segmentMergeLineageZNRecord;
+ }
+
+ /**
+ * Read the segment merge lineage from the property store
+ *
+ * @param propertyStore a property store
+ * @param tableNameWithType a table name with type
+ * @return a segment merge lineage
+ */
+ public static SegmentMergeLineage getSegmentMergeLineage(ZkHelixPropertyStore<ZNRecord> propertyStore,
+ String tableNameWithType) {
+ String path = ZKMetadataProvider.constructPropertyStorePathForSegmentMergeLineage(tableNameWithType);
+ ZNRecord segmentMergeLineageZNRecord = propertyStore.get(path, null, AccessOption.PERSISTENT);
+ return SegmentMergeLineage.fromZNRecord(segmentMergeLineageZNRecord);
+ }
+
+ /**
+ * Write the segment merge lineage to the property store
+ *
+ * @param propertyStore a property store
+ * @param segmentMergeLineage a segment merge lineage
+ * @return true if update is successful. false otherwise.
+ */
+ public static boolean writeSegmentMergeLineage(ZkHelixPropertyStore<ZNRecord> propertyStore,
+ SegmentMergeLineage segmentMergeLineage, int expectedVersion) {
+ String tableNameWithType = segmentMergeLineage.getTableName();
+ String path = ZKMetadataProvider.constructPropertyStorePathForSegmentMergeLineage(tableNameWithType);
+ return propertyStore.set(path, segmentMergeLineage.toZNRecord(), expectedVersion, AccessOption.PERSISTENT);
+ }
+}
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/metadata/ZKMetadataProvider.java b/pinot-common/src/main/java/com/linkedin/pinot/common/metadata/ZKMetadataProvider.java
index fe55a0c..3bc2d56 100644
--- a/pinot-common/src/main/java/com/linkedin/pinot/common/metadata/ZKMetadataProvider.java
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/metadata/ZKMetadataProvider.java
@@ -53,6 +53,7 @@ public class ZKMetadataProvider {
private static final String PROPERTYSTORE_TABLE_CONFIGS_PREFIX = "/CONFIGS/TABLE";
private static final String PROPERTYSTORE_INSTANCE_CONFIGS_PREFIX = "/CONFIGS/INSTANCE";
private static final String PROPERTYSTORE_CLUSTER_CONFIGS_PREFIX = "/CONFIGS/CLUSTER";
+ private static final String PROPERTYSTORE_SEGMENT_MERGE_LINEAGE = "/SEGMENT_MERGE_LINEAGE";
public static void setRealtimeTableConfig(ZkHelixPropertyStore<ZNRecord> propertyStore, String realtimeTableName,
ZNRecord znRecord) {
@@ -106,6 +107,10 @@ public class ZKMetadataProvider {
return StringUtil.join("/", PROPERTYSTORE_CLUSTER_CONFIGS_PREFIX, controllerConfigKey);
}
+ public static String constructPropertyStorePathForSegmentMergeLineage(String tableNameWithType) {
+ return StringUtil.join("/", PROPERTYSTORE_SEGMENT_MERGE_LINEAGE, tableNameWithType);
+ }
+
public static boolean isSegmentExisted(ZkHelixPropertyStore<ZNRecord> propertyStore, String resourceNameForResource,
String segmentName) {
return propertyStore.exists(constructPropertyStorePathForSegment(resourceNameForResource, segmentName),
@@ -251,7 +256,7 @@ public class ZKMetadataProvider {
* Get the schema associated with the given table name.
*
* @param propertyStore Helix property store
- * @param tableName Table name with or without type suffix.
+ * @param tableName Table name with or without type suffix.FtaskTypeConfigsMap
* @return Schema associated with the given table name.
*/
@Nullable
diff --git a/pinot-common/src/test/java/com/linkedin/pinot/common/lineage/SegmentMergeLineageTest.java b/pinot-common/src/test/java/com/linkedin/pinot/common/lineage/SegmentMergeLineageTest.java
new file mode 100644
index 0000000..923fbfc
--- /dev/null
+++ b/pinot-common/src/test/java/com/linkedin/pinot/common/lineage/SegmentMergeLineageTest.java
@@ -0,0 +1,106 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.common.lineage;
+
+import com.linkedin.pinot.common.exception.InvalidConfigException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class SegmentMergeLineageTest {
+
+ @Test
+ public void testSegmentMergeLineage() throws Exception {
+ SegmentMergeLineage segmentMergeLineage = new SegmentMergeLineage("test_OFFLINE");
+ String groupId1 = "G1";
+ List<String> groupSegments1 = Arrays.asList(new String[]{"segment1", "segment2", "segment3"});
+ segmentMergeLineage.addSegmentGroup(groupId1, groupSegments1, null);
+ Assert.assertEquals(segmentMergeLineage.getSegmentsForGroup(groupId1), groupSegments1);
+
+ String groupId2 = "G2";
+ List<String> groupSegments2 = Arrays.asList(new String[]{"segment4", "segment5"});
+ segmentMergeLineage.addSegmentGroup(groupId2, groupSegments2, null);
+ Assert.assertEquals(segmentMergeLineage.getSegmentsForGroup(groupId2), groupSegments2);
+
+ String groupId3 = "G3";
+ List<String> groupSegments3 = Arrays.asList(new String[]{"segment6"});
+ segmentMergeLineage.addSegmentGroup(groupId3, groupSegments3, Arrays.asList(new String[]{groupId1, groupId2}));
+ Assert.assertEquals(segmentMergeLineage.getSegmentsForGroup(groupId3), groupSegments3);
+
+ // Check available APIs
+ Assert.assertEquals(segmentMergeLineage.getTableName(), "test_OFFLINE");
+ Assert.assertEquals(segmentMergeLineage.getChildrenForGroup(groupId3),
+ Arrays.asList(new String[]{groupId1, groupId2}));
+ Assert.assertEquals(segmentMergeLineage.getAllGroupLevels(), Arrays.asList(new Integer[]{0, 1}));
+ Assert.assertTrue(segmentMergeLineage.equals(SegmentMergeLineage.fromZNRecord(segmentMergeLineage.toZNRecord())));
+ Assert.assertEquals(segmentMergeLineage.getGroupIdsForGroupLevel(0),
+ Arrays.asList(new String[]{groupId1, groupId2}));
+ Assert.assertEquals(segmentMergeLineage.getGroupIdsForGroupLevel(1),
+ Arrays.asList(new String[]{groupId3}));
+ validateSegmentGroup(segmentMergeLineage);
+
+ // Check ZNRecord conversion
+ Assert.assertEquals(segmentMergeLineage, SegmentMergeLineage.fromZNRecord(segmentMergeLineage.toZNRecord()));
+
+ // Test removing groups
+ segmentMergeLineage.removeSegmentGroup(groupId1);
+ Assert.assertNull(segmentMergeLineage.getChildrenForGroup(groupId1));
+ Assert.assertNull(segmentMergeLineage.getSegmentsForGroup(groupId1));
+ Assert.assertFalse(segmentMergeLineage.getGroupIdsForGroupLevel(0).contains(groupId1));
+ }
+
+ @Test(expectedExceptions = InvalidConfigException.class)
+ public void testUpdateWithDuplicateGroupId() throws Exception {
+ SegmentMergeLineage segmentMergeLineage = new SegmentMergeLineage("test_OFFLINE");
+ String groupId1 = "G1";
+ List<String> groupSegments1 = Arrays.asList(new String[]{"segment1, segment2, segment3"});
+ segmentMergeLineage.addSegmentGroup(groupId1, groupSegments1, null);
+ Assert.assertEquals(segmentMergeLineage.getSegmentsForGroup(groupId1), groupSegments1);
+
+ List<String> groupSegments2 = Arrays.asList(new String[]{"segment4, segment5, segment6"});
+ segmentMergeLineage.addSegmentGroup(groupId1, groupSegments2, null);
+ }
+
+ private void validateSegmentGroup(SegmentMergeLineage segmentMergeLineage) {
+ SegmentGroup segmentGroup = segmentMergeLineage.getMergeLineageRootSegmentGroup();
+ for (SegmentGroup child : segmentGroup.getChildrenGroups()) {
+ validateSegmentGroupNode(child, segmentMergeLineage);
+ }
+ }
+
+ private void validateSegmentGroupNode(SegmentGroup segmentGroup, SegmentMergeLineage segmentMergeLineage) {
+ String groupId = segmentGroup.getGroupId();
+ Assert.assertEquals(segmentGroup.getSegments(), new HashSet<>(segmentMergeLineage.getSegmentsForGroup(groupId)));
+ Assert.assertTrue(segmentMergeLineage.getGroupIdsForGroupLevel(segmentGroup.getGroupLevel()).contains(groupId));
+
+ List<SegmentGroup> childrenGroups = segmentGroup.getChildrenGroups();
+ if (childrenGroups != null) {
+ List<String> childrenGroupIds = new ArrayList<>();
+ for (SegmentGroup child : childrenGroups) {
+ childrenGroupIds.add(child.getGroupId());
+ }
+ Assert.assertEquals(childrenGroupIds, segmentMergeLineage.getChildrenForGroup(groupId));
+
+ for (SegmentGroup child : segmentGroup.getChildrenGroups()) {
+ validateSegmentGroupNode(child, segmentMergeLineage);
+ }
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org