You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2018/12/01 00:28:21 UTC
[incubator-pinot] 01/02: initial commit
This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch improve-merge-command
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit cc3b86a7e97e0face9fe828fff3e049c4cc43a8e
Author: Seunghyun Lee <sn...@linkedin.com>
AuthorDate: Mon Nov 26 16:24:48 2018 -0800
initial commit
---
.../routing/HelixExternalViewBasedRouting.java | 22 +-
.../BasePartitionAwareRoutingTableBuilder.java | 6 +-
.../routing/builder/BaseRoutingTableBuilder.java | 11 +-
.../builder/DefaultOfflineRoutingTableBuilder.java | 5 +-
.../DefaultRealtimeRoutingTableBuilder.java | 11 +-
.../routing/builder/RoutingTableBuilder.java | 3 +-
.../routing/selector/DefaultSegmentSelector.java | 42 +++
.../routing/selector/MergedSegmentSelector.java | 92 ++++++
.../broker/routing/selector/SegmentSelector.java | 46 +++
.../routing/selector/SegmentSelectorFactory.java | 40 +++
.../broker/routing/RoutingTableBenchmark.java | 122 ++++++++
.../pinot/broker/routing/RoutingTableTest.java | 1 +
.../BalancedRandomRoutingTableBuilderTest.java | 2 +-
.../HighLevelConsumerRoutingTableBuilderTest.java | 2 +-
...rtitionAwareOfflineRoutingTableBuilderTest.java | 9 +-
...titionAwareRealtimeRoutingTableBuilderTest.java | 11 +-
.../broker/selector/MergeSegmentSelectorTest.java | 77 +++++
.../{routing => util}/FakePropertyStore.java | 13 +-
pinot-common/pom.xml | 4 +
.../common/config/MultiLevelRollupSetting.java | 77 +++++
.../linkedin/pinot/common/config/RollupConfig.java | 59 ++++
.../pinot/common/config/SegmentMergeConfig.java | 137 +++++++++
.../linkedin/pinot/common/config/TableConfig.java | 84 ++++--
.../pinot/common/lineage/SegmentGroup.java | 72 +++++
.../pinot/common/lineage/SegmentMergeLineage.java | 333 +++++++++++++++++++++
.../lineage/SegmentMergeLineageAccessHelper.java | 82 +++++
.../pinot/common/metadata/ZKMetadataProvider.java | 5 +
.../common/utils/FileUploadDownloadClient.java | 51 +++-
.../com/linkedin/pinot/common/utils/UUIDUtils.java | 29 ++
.../common/lineage/SegmentMergeLineageTest.java | 106 +++++++
.../PinotSegmentUploadRestletResource.java | 9 +-
.../api/resources/PinotTableRestletResource.java | 48 +++
.../pinot/controller/api/upload/ZKOperator.java | 53 +++-
.../helix/ControllerRequestURLBuilder.java | 5 +
.../helix/core/PinotHelixResourceManager.java | 68 ++++-
.../helix/core/minion/ClusterInfoProvider.java | 6 +
.../generator/SegmentMergeRollupTaskGenerator.java | 313 +++++++++++++++++++
.../minion/generator/TaskGeneratorRegistry.java | 1 +
.../pinot/core/common/MinionConstants.java | 8 +
.../data/readers/PinotSegmentRecordReader.java | 3 +
.../tests/SimpleMinionClusterIntegrationTest.java | 2 +
.../BaseMultipleSegmentsConversionExecutor.java | 13 +-
.../minion/executor/SegmentConversionUtils.java | 50 ++++
.../executor/SegmentMergeRollupTaskExecutor.java | 105 +++++++
.../SegmentMergeRollupTaskExecutorFactory.java | 23 ++
.../executor/TaskExecutorFactoryRegistry.java | 2 +
pom.xml | 5 +
47 files changed, 2189 insertions(+), 79 deletions(-)
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/HelixExternalViewBasedRouting.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/HelixExternalViewBasedRouting.java
index c3db57d..287eeea 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/HelixExternalViewBasedRouting.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/HelixExternalViewBasedRouting.java
@@ -17,6 +17,8 @@ package com.linkedin.pinot.broker.routing;
import com.google.common.collect.Sets;
import com.linkedin.pinot.broker.routing.builder.RoutingTableBuilder;
+import com.linkedin.pinot.broker.routing.selector.SegmentSelector;
+import com.linkedin.pinot.broker.routing.selector.SegmentSelectorFactory;
import com.linkedin.pinot.common.config.TableConfig;
import com.linkedin.pinot.common.config.TableNameBuilder;
import com.linkedin.pinot.common.metrics.BrokerMeter;
@@ -67,6 +69,7 @@ public class HelixExternalViewBasedRouting implements RoutingTable {
private final Map<String, Map<String, InstanceConfig>> _lastKnownInstanceConfigsForTable = new ConcurrentHashMap<>();
private final Map<String, InstanceConfig> _lastKnownInstanceConfigs = new ConcurrentHashMap<>();
private final Map<String, Set<String>> _tablesForInstance = new ConcurrentHashMap<>();
+ private final Map<String, SegmentSelector> _segmentSelectorMap = new ConcurrentHashMap<>();
private final HelixExternalViewBasedTimeBoundaryService _timeBoundaryService;
private final HelixManager _helixManager;
@@ -76,25 +79,25 @@ public class HelixExternalViewBasedRouting implements RoutingTable {
private Configuration _configuration;
- private ZkHelixPropertyStore<ZNRecord> _propertyStore;
-
private RoutingTableBuilderFactory _routingTableBuilderFactory;
+ private SegmentSelectorFactory _segmentSelectorFactory;
+
public HelixExternalViewBasedRouting(ZkHelixPropertyStore<ZNRecord> propertyStore, HelixManager helixManager,
Configuration configuration) {
- _propertyStore = propertyStore;
_configuration = configuration;
_timeBoundaryService = new HelixExternalViewBasedTimeBoundaryService(propertyStore);
_routingTableBuilderMap = new HashMap<>();
_helixManager = helixManager;
_routingTableBuilderFactory = new RoutingTableBuilderFactory(_configuration, propertyStore);
+ _segmentSelectorFactory = new SegmentSelectorFactory(propertyStore);
}
@Override
public Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request) {
String tableName = request.getTableName();
RoutingTableBuilder routingTableBuilder = _routingTableBuilderMap.get(tableName);
- return routingTableBuilder.getRoutingTable(request);
+ return routingTableBuilder.getRoutingTable(request, _segmentSelectorMap.get(tableName));
}
@Override
@@ -116,6 +119,11 @@ public class HelixExternalViewBasedRouting implements RoutingTable {
tableName);
_routingTableBuilderMap.put(tableName, routingTableBuilder);
+ // Initialize segment selector
+ SegmentSelector segmentSelector = _segmentSelectorFactory.createSegmentSelector(tableConfig);
+ LOGGER.info("Initialized segmentSelector: {} for table {}", segmentSelector.getClass().getName(), tableName);
+ _segmentSelectorMap.put(tableName, segmentSelector);
+
// Build the routing table
if (externalView == null) {
// It is possible for us to get a request to serve a table for which there is no external view. In this case, just
@@ -223,8 +231,11 @@ public class HelixExternalViewBasedRouting implements RoutingTable {
int externalViewRecordVersion = externalView.getRecord().getVersion();
_lastKnownExternalViewVersionMap.put(tableNameWithType, externalViewRecordVersion);
+
+
RoutingTableBuilder routingTableBuilder = _routingTableBuilderMap.get(tableNameWithType);
- if (routingTableBuilder == null) {
+ SegmentSelector segmentSelector = _segmentSelectorMap.get(tableNameWithType);
+ if (routingTableBuilder == null || segmentSelector == null) {
//TODO: warn
return;
}
@@ -237,6 +248,7 @@ public class HelixExternalViewBasedRouting implements RoutingTable {
Map<String, InstanceConfig> relevantInstanceConfigs = new HashMap<>();
routingTableBuilder.computeOnExternalViewChange(tableNameWithType, externalView, instanceConfigs);
+ segmentSelector.computeOnExternalViewChange();
// Keep track of the instance configs that are used in that routing table
updateInstanceConfigsMapFromExternalView(relevantInstanceConfigs, instanceConfigs, externalView);
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/BasePartitionAwareRoutingTableBuilder.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/BasePartitionAwareRoutingTableBuilder.java
index 8d1f568..732c636 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/BasePartitionAwareRoutingTableBuilder.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/BasePartitionAwareRoutingTableBuilder.java
@@ -18,6 +18,7 @@ package com.linkedin.pinot.broker.routing.builder;
import com.linkedin.pinot.broker.pruner.SegmentPrunerContext;
import com.linkedin.pinot.broker.pruner.SegmentZKMetadataPrunerService;
import com.linkedin.pinot.broker.routing.RoutingTableLookupRequest;
+import com.linkedin.pinot.broker.routing.selector.SegmentSelector;
import com.linkedin.pinot.common.config.TableConfig;
import com.linkedin.pinot.common.metadata.segment.SegmentZKMetadata;
import com.linkedin.pinot.common.metrics.BrokerMeter;
@@ -82,7 +83,7 @@ public abstract class BasePartitionAwareRoutingTableBuilder implements RoutingTa
}
@Override
- public Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request) {
+ public Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request, SegmentSelector segmentSelector) {
// Copy the reference for the current segment to replica to server mapping for snapshot
Map<String, Map<Integer, String>> segmentToReplicaToServerMap = _segmentToReplicaToServerMap;
@@ -90,6 +91,9 @@ public abstract class BasePartitionAwareRoutingTableBuilder implements RoutingTa
Set<String> segmentsToQuery = segmentToReplicaToServerMap.keySet();
// TODO: add the selection logic here
+ if (segmentSelector != null) {
+ segmentsToQuery = segmentSelector.selectSegments(request, segmentsToQuery);
+ }
Map<String, List<String>> routingTable = new HashMap<>();
SegmentPrunerContext prunerContext = new SegmentPrunerContext(request.getBrokerRequest());
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/BaseRoutingTableBuilder.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/BaseRoutingTableBuilder.java
index c00d30d..1d59e89 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/BaseRoutingTableBuilder.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/BaseRoutingTableBuilder.java
@@ -16,6 +16,7 @@
package com.linkedin.pinot.broker.routing.builder;
import com.linkedin.pinot.broker.routing.RoutingTableLookupRequest;
+import com.linkedin.pinot.broker.routing.selector.SegmentSelector;
import com.linkedin.pinot.common.config.RoutingConfig;
import com.linkedin.pinot.common.config.TableConfig;
import com.linkedin.pinot.common.metrics.BrokerMeter;
@@ -123,16 +124,16 @@ public abstract class BaseRoutingTableBuilder implements RoutingTableBuilder {
}
}
- @Override
- public Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request) {
+ public Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request, SegmentSelector segmentSelector) {
if (_enableDynamicComputing) {
// Copy the pointer for snapshot since the pointer for segment to servers map can change at anytime
Map<String, List<String>> segmentToServersMap = _segmentToServersMap;
- // Get all existing segments
- Set<String> segmentsToQuery = segmentToServersMap.keySet();
-
// TODO: add the selection logic here
+ Set<String> segmentsToQuery = segmentToServersMap.keySet();
+ if (segmentSelector != null) {
+ segmentsToQuery = segmentSelector.selectSegments(request, segmentsToQuery);
+ }
// Compute the final routing table
return computeDynamicRoutingTable(segmentToServersMap, segmentsToQuery);
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/DefaultOfflineRoutingTableBuilder.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/DefaultOfflineRoutingTableBuilder.java
index 07cea6b..64ba82f 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/DefaultOfflineRoutingTableBuilder.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/DefaultOfflineRoutingTableBuilder.java
@@ -16,6 +16,7 @@
package com.linkedin.pinot.broker.routing.builder;
import com.linkedin.pinot.broker.routing.RoutingTableLookupRequest;
+import com.linkedin.pinot.broker.routing.selector.SegmentSelector;
import com.linkedin.pinot.common.config.TableConfig;
import com.linkedin.pinot.common.metrics.BrokerMetrics;
import java.util.HashSet;
@@ -121,8 +122,8 @@ public class DefaultOfflineRoutingTableBuilder implements RoutingTableBuilder {
}
@Override
- public Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request) {
- return _routingTableBuilder.getRoutingTable(request);
+ public Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request, SegmentSelector segmentSelector) {
+ return _routingTableBuilder.getRoutingTable(request, segmentSelector);
}
@Override
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/DefaultRealtimeRoutingTableBuilder.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/DefaultRealtimeRoutingTableBuilder.java
index d78de04..72a9a79 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/DefaultRealtimeRoutingTableBuilder.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/DefaultRealtimeRoutingTableBuilder.java
@@ -16,6 +16,7 @@
package com.linkedin.pinot.broker.routing.builder;
import com.linkedin.pinot.broker.routing.RoutingTableLookupRequest;
+import com.linkedin.pinot.broker.routing.selector.SegmentSelector;
import com.linkedin.pinot.common.config.TableConfig;
import com.linkedin.pinot.common.metrics.BrokerMetrics;
import com.linkedin.pinot.common.utils.SegmentName;
@@ -68,7 +69,7 @@ public class DefaultRealtimeRoutingTableBuilder implements RoutingTableBuilder {
}
@Override
- public Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request) {
+ public Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request, SegmentSelector segmentSelector) {
boolean forceLLC = false;
boolean forceHLC = false;
for (String routingOption : request.getRoutingOptions()) {
@@ -85,14 +86,14 @@ public class DefaultRealtimeRoutingTableBuilder implements RoutingTableBuilder {
}
if (forceLLC) {
- return _realtimeLLCRoutingTableBuilder.getRoutingTable(request);
+ return _realtimeLLCRoutingTableBuilder.getRoutingTable(request, segmentSelector);
} else if (forceHLC) {
- return _realtimeHLCRoutingTableBuilder.getRoutingTable(request);
+ return _realtimeHLCRoutingTableBuilder.getRoutingTable(request, segmentSelector);
} else {
if (_hasLLC) {
- return _realtimeLLCRoutingTableBuilder.getRoutingTable(request);
+ return _realtimeLLCRoutingTableBuilder.getRoutingTable(request, segmentSelector);
} else if (_hasHLC) {
- return _realtimeHLCRoutingTableBuilder.getRoutingTable(request);
+ return _realtimeHLCRoutingTableBuilder.getRoutingTable(request, segmentSelector);
} else {
return Collections.emptyMap();
}
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/RoutingTableBuilder.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/RoutingTableBuilder.java
index 5a9c320..3fa2bee 100644
--- a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/RoutingTableBuilder.java
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/builder/RoutingTableBuilder.java
@@ -16,6 +16,7 @@
package com.linkedin.pinot.broker.routing.builder;
import com.linkedin.pinot.broker.routing.RoutingTableLookupRequest;
+import com.linkedin.pinot.broker.routing.selector.SegmentSelector;
import com.linkedin.pinot.common.config.TableConfig;
import com.linkedin.pinot.common.metrics.BrokerMetrics;
import java.util.List;
@@ -47,7 +48,7 @@ public interface RoutingTableBuilder {
/**
* Get the routing table based on the given lookup request.
*/
- Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request);
+ Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest request, SegmentSelector segmentSelector);
/**
* Get all pre-computed routing tables.
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/selector/DefaultSegmentSelector.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/selector/DefaultSegmentSelector.java
new file mode 100644
index 0000000..b8105f6
--- /dev/null
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/selector/DefaultSegmentSelector.java
@@ -0,0 +1,42 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.broker.routing.selector;
+
+import com.linkedin.pinot.broker.routing.RoutingTableLookupRequest;
+import com.linkedin.pinot.common.config.TableConfig;
+import java.util.Set;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+
+
+/**
+ * Default Segment Selector
+ */
+public class DefaultSegmentSelector implements SegmentSelector {
+
+ @Override
+ public void init(TableConfig tableConfig, ZkHelixPropertyStore<ZNRecord> propertyStore) {
+ }
+
+ @Override
+ public void computeOnExternalViewChange() {
+ }
+
+ @Override
+ public Set<String> selectSegments(RoutingTableLookupRequest request, Set<String> segments) {
+ return segments;
+ }
+}
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/selector/MergedSegmentSelector.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/selector/MergedSegmentSelector.java
new file mode 100644
index 0000000..7d74146
--- /dev/null
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/selector/MergedSegmentSelector.java
@@ -0,0 +1,92 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.broker.routing.selector;
+
+import com.linkedin.pinot.broker.routing.RoutingTableLookupRequest;
+import com.linkedin.pinot.common.config.TableConfig;
+import com.linkedin.pinot.common.lineage.SegmentGroup;
+import com.linkedin.pinot.common.lineage.SegmentMergeLineage;
+import com.linkedin.pinot.common.lineage.SegmentMergeLineageAccessHelper;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+
+
+/**
+ * Segment selector for merged segments
+ */
+public class MergedSegmentSelector implements SegmentSelector {
+ private String _tableNameWithType;
+ private ZkHelixPropertyStore<ZNRecord> _propertyStore;
+ private volatile SegmentGroup _rootSegmentGroup;
+
+ @Override
+ public void init(TableConfig tableConfig, ZkHelixPropertyStore<ZNRecord> propertyStore) {
+ _tableNameWithType = tableConfig.getTableName();
+ _propertyStore = propertyStore;
+ }
+
+ @Override
+ public void computeOnExternalViewChange() {
+ SegmentMergeLineage segmentMergeLineage = SegmentMergeLineageAccessHelper.getSegmentMergeLineage(_propertyStore,
+ _tableNameWithType);
+ _rootSegmentGroup = segmentMergeLineage.getMergeLineageRootSegmentGroup();
+ }
+
+ @Override
+ public Set<String> selectSegments(RoutingTableLookupRequest request, Set<String> segmentsToQuery) {
+ Set<String> selectedSegments = new HashSet<>(segmentsToQuery);
+ for (SegmentGroup segmentGroup : _rootSegmentGroup.getChildrenGroups()) {
+ computeSelectionProcessForSegmentGroup(segmentGroup, selectedSegments, segmentsToQuery);
+ }
+ return selectedSegments;
+ }
+
+ private void computeSelectionProcessForSegmentGroup(SegmentGroup segmentGroup, Set<String> selectedSegments,
+ Set<String> availableSegments) {
+ Set<String> segmentsForGroup = segmentGroup.getSegments();
+
+ if (availableSegments.containsAll(segmentsForGroup)) {
+ // If we pick the current group node, we delete all segments covered by children groups
+ if (segmentGroup.getChildrenGroups() == null || segmentGroup.getChildrenGroups().isEmpty()) {
+ return;
+ }
+ for (SegmentGroup child: segmentGroup.getChildrenGroups()) {
+ removeSegmentsForSegmentGroup(child, selectedSegments);
+ }
+ } else {
+ // If the current group is not picked, we compute the selection recursively for children nodes
+ selectedSegments.removeAll(segmentsForGroup);
+ for (SegmentGroup child: segmentGroup.getChildrenGroups()) {
+ computeSelectionProcessForSegmentGroup(child, selectedSegments, availableSegments);
+ }
+ }
+ }
+
+ private void removeSegmentsForSegmentGroup(SegmentGroup segmentGroup, Set<String> selectedSegments) {
+ Set<String> segmentsForGroup = segmentGroup.getSegments();
+ selectedSegments.removeAll(segmentsForGroup);
+
+ if (segmentGroup.getChildrenGroups() == null || segmentGroup.getChildrenGroups().isEmpty()) {
+ return;
+ }
+
+ for (SegmentGroup child : segmentGroup.getChildrenGroups()) {
+ removeSegmentsForSegmentGroup(child, selectedSegments);
+ }
+ }
+}
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/selector/SegmentSelector.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/selector/SegmentSelector.java
new file mode 100644
index 0000000..f7bb19d
--- /dev/null
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/selector/SegmentSelector.java
@@ -0,0 +1,46 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.broker.routing.selector;
+
+import com.linkedin.pinot.broker.routing.RoutingTableLookupRequest;
+import com.linkedin.pinot.common.config.TableConfig;
+import java.util.Set;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+
+
+/**
+ * Interface for segment selector
+ */
+public interface SegmentSelector {
+
+ /**
+ * Initiate the segment selector
+ */
+ void init(TableConfig tableConfig, ZkHelixPropertyStore<ZNRecord> propertyStore);
+
+ /**
+ * Compute and update the information required for a selector
+ * <p>Should be called whenever there is an external view change
+ */
+ void computeOnExternalViewChange();
+
+ /**
+ * Compute selection algorithm.
+ * <p>May end up with modifying the input set of segments.
+ */
+ Set<String> selectSegments(RoutingTableLookupRequest request, Set<String> segmentsToQuery);
+}
diff --git a/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/selector/SegmentSelectorFactory.java b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/selector/SegmentSelectorFactory.java
new file mode 100644
index 0000000..1033bc9
--- /dev/null
+++ b/pinot-broker/src/main/java/com/linkedin/pinot/broker/routing/selector/SegmentSelectorFactory.java
@@ -0,0 +1,40 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.broker.routing.selector;
+
+import com.linkedin.pinot.common.config.TableConfig;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+
+
+/**
+ * Factory for segment selector
+ */
+public class SegmentSelectorFactory {
+
+ private ZkHelixPropertyStore<ZNRecord> _propertyStore;
+
+ public SegmentSelectorFactory(ZkHelixPropertyStore<ZNRecord> propertyStore) {
+ _propertyStore = propertyStore;
+ }
+
+ public SegmentSelector createSegmentSelector(TableConfig tableConfig) {
+ // TODO: add the support for merged segment selector once merge config is updated.
+ SegmentSelector segmentSelector = new MergedSegmentSelector();
+ segmentSelector.init(tableConfig, _propertyStore);
+ return segmentSelector;
+ }
+}
diff --git a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/RoutingTableBenchmark.java b/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/RoutingTableBenchmark.java
new file mode 100644
index 0000000..046cb69
--- /dev/null
+++ b/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/RoutingTableBenchmark.java
@@ -0,0 +1,122 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.broker.routing;
+
+
+import com.linkedin.pinot.common.config.RoutingConfig;
+import com.linkedin.pinot.common.config.TableConfig;
+import com.linkedin.pinot.common.config.TableNameBuilder;
+import com.linkedin.pinot.common.utils.CommonConstants;
+import java.io.FileInputStream;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.io.IOUtils;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.testng.Assert;
+
+
+public class RoutingTableBenchmark {
+ public static final int NUM_ROUNDS = 10000;
+
+// public static void main(String[] args) throws Exception {
+//// URL resourceUrl = RoutingTableBenchmark.class.getClassLoader().getResource("SampleExternalView.json");
+//// URL resourceUrl = RoutingTableBenchmark.class.getClassLoader().getResource("SampleRealtimeExternalView.json");
+// URL resourceUrl = RoutingTableBenchmark.class.getClassLoader().getResource("AdStatistics.json");
+//
+// Assert.assertNotNull(resourceUrl);
+// String fileName = resourceUrl.getFile();
+//
+// byte[] externalViewBytes = IOUtils.toByteArray(new FileInputStream(fileName));
+// ExternalView externalView = new ExternalView((ZNRecord) new ZNRecordSerializer().deserialize(externalViewBytes));
+// String tableName = externalView.getResourceName();
+// List<InstanceConfig> instanceConfigs = getInstanceConfigs(externalView);
+//
+// TableConfig preComputeTableConfig = generatePreComputeTableConfig(tableName);
+// TableConfig dynamicTableConfig = generateDynamicTableConfig(tableName);
+//
+// HelixExternalViewBasedRouting precomputeRouting = new HelixExternalViewBasedRouting(null, null, new BaseConfiguration());
+// precomputeRouting.markDataResourceOnline(preComputeTableConfig, externalView, instanceConfigs);
+//
+// HelixExternalViewBasedRouting dynamicRouting = new HelixExternalViewBasedRouting(null, null, new BaseConfiguration());
+// dynamicRouting.markDataResourceOnline(dynamicTableConfig, externalView, instanceConfigs);
+//
+//// System.out.println("Pre-compute Routing table builder");
+//// for (int i = 0; i < NUM_ROUNDS; i++) {
+//// long start = System.nanoTime();
+//// Map<String, List<String>> routingTable = precomputeRouting.getRoutingTable(new RoutingTableLookupRequest(tableName));
+//// long end = System.nanoTime();
+//// System.out.println("round " + i + ": " + (end - start) / 1000000.0 + " ms");
+//// }
+//// System.out.println("");
+//
+// System.out.println("Dynamic Routing table builder");
+// for (int i = 0; i < NUM_ROUNDS; i++) {
+// long start = System.nanoTime();
+// Map<String, List<String>> routingTable = dynamicRouting.getRoutingTable(new RoutingTableLookupRequest(tableName));
+// long end = System.nanoTime();
+// System.out.println("round " + i + ": " + (end - start) / 1000000.0 + " ms");
+// }
+// }
+//
+// private static List<InstanceConfig> getInstanceConfigs(ExternalView externalView) {
+// List<InstanceConfig> instanceConfigs = new ArrayList<>();
+// Set<String> instances = new HashSet<>();
+//
+// // Collect all unique instances
+// for (String partitionName : externalView.getPartitionSet()) {
+// for (String instance : externalView.getStateMap(partitionName).keySet()) {
+// if (!instances.contains(instance)) {
+// instanceConfigs.add(new InstanceConfig(instance));
+// instances.add(instance);
+// }
+// }
+// }
+//
+// return instanceConfigs;
+// }
+//
+// private static TableConfig generatePreComputeTableConfig(String tableName) throws Exception {
+// CommonConstants.Helix.TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName);
+// TableConfig.Builder builder = new TableConfig.Builder(tableType);
+// builder.setTableName(tableName);
+// TableConfig tableConfig = builder.build();
+//// tableConfig.getRoutingConfig().setRoutingTableBuilderName("KafkaLowLevel");
+// return tableConfig;
+// }
+//
+// private static TableConfig generateDynamicTableConfig(String tableName) throws Exception {
+// CommonConstants.Helix.TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName);
+// TableConfig.Builder builder = new TableConfig.Builder(tableType);
+// builder.setTableName(tableName);
+// TableConfig tableConfig = builder.build();
+// Map<String, String> option = new HashMap<>();
+// option.put(RoutingConfig.ENABLE_DYNAMIC_COMPUTING_KEY, "true");
+// tableConfig.getRoutingConfig().setRoutingTableBuilderOptions(option);
+//// tableConfig.getRoutingConfig().setRoutingTableBuilderName("KafkaLowLevel");
+// return tableConfig;
+// }
+
+}
+
diff --git a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/RoutingTableTest.java b/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/RoutingTableTest.java
index 25600d3..3413711 100644
--- a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/RoutingTableTest.java
+++ b/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/RoutingTableTest.java
@@ -17,6 +17,7 @@ package com.linkedin.pinot.broker.routing;
import com.linkedin.pinot.broker.routing.builder.HighLevelConsumerBasedRoutingTableBuilder;
import com.linkedin.pinot.broker.routing.builder.RoutingTableBuilder;
+import com.linkedin.pinot.broker.util.FakePropertyStore;
import com.linkedin.pinot.common.config.TableConfig;
import com.linkedin.pinot.common.config.TableConfig.Builder;
import com.linkedin.pinot.common.config.TableNameBuilder;
diff --git a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/BalancedRandomRoutingTableBuilderTest.java b/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/BalancedRandomRoutingTableBuilderTest.java
index 7bae71d..46ca40c 100644
--- a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/BalancedRandomRoutingTableBuilderTest.java
+++ b/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/BalancedRandomRoutingTableBuilderTest.java
@@ -79,7 +79,7 @@ public class BalancedRandomRoutingTableBuilderTest {
// Build routing table
routingTableBuilder.computeOnExternalViewChange("dummy", externalView, instanceConfigList);
RoutingTableLookupRequest request = new RoutingTableLookupRequest(tableNameWithType);
- Map<String, List<String>> routingTable = routingTableBuilder.getRoutingTable(request);
+ Map<String, List<String>> routingTable = routingTableBuilder.getRoutingTable(request, null);
Set<String> segmentsInRoutingTable = new HashSet<>();
for (List<String> segments : routingTable.values()) {
diff --git a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/HighLevelConsumerRoutingTableBuilderTest.java b/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/HighLevelConsumerRoutingTableBuilderTest.java
index 74f53c3..c2ef4e4 100644
--- a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/HighLevelConsumerRoutingTableBuilderTest.java
+++ b/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/HighLevelConsumerRoutingTableBuilderTest.java
@@ -95,7 +95,7 @@ public class HighLevelConsumerRoutingTableBuilderTest {
// Check if the routing table result is correct
for (int run = 0; run < MAX_NUM_GROUPS * 10; run++) {
RoutingTableLookupRequest request = new RoutingTableLookupRequest(tableNameWithType);
- Map<String, List<String>> routingTable = routingTableBuilder.getRoutingTable(request);
+ Map<String, List<String>> routingTable = routingTableBuilder.getRoutingTable(request, null);
Set<String> coveredSegments = new HashSet<>();
for (List<String> segmentsForServer : routingTable.values()) {
coveredSegments.addAll(segmentsForServer);
diff --git a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilderTest.java b/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilderTest.java
index c110b6e..5a44a28 100644
--- a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilderTest.java
+++ b/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilderTest.java
@@ -15,7 +15,7 @@
*/
package com.linkedin.pinot.broker.routing.builder;
-import com.linkedin.pinot.broker.routing.FakePropertyStore;
+import com.linkedin.pinot.broker.util.FakePropertyStore;
import com.linkedin.pinot.broker.routing.RoutingTableLookupRequest;
import com.linkedin.pinot.common.config.ReplicaGroupStrategyConfig;
import com.linkedin.pinot.common.config.RoutingConfig;
@@ -50,7 +50,6 @@ public class PartitionAwareOfflineRoutingTableBuilderTest {
private static final String PARTITION_COLUMN = "memberId";
private static final Pql2Compiler COMPILER = new Pql2Compiler();
-
private static final Random RANDOM = new Random();
private int NUM_REPLICA;
@@ -114,7 +113,7 @@ public class PartitionAwareOfflineRoutingTableBuilderTest {
// Check the query that requires to scan all segment.
String countStarQuery = "select count(*) from myTable";
Map<String, List<String>> routingTable =
- routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery));
+ routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery), null);
// Check that the number of servers picked are always equal or less than the number of servers
// from a single replica group.
@@ -134,7 +133,7 @@ public class PartitionAwareOfflineRoutingTableBuilderTest {
for (int queryPartition = 0; queryPartition < 100; queryPartition++) {
String filterQuery = "select count(*) from myTable where " + PARTITION_COLUMN + " = " + queryPartition;
routingTable =
- routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(filterQuery));
+ routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(filterQuery), null);
// Check that the number of servers picked are always equal or less than the number of servers
// in a single replica group.
@@ -218,7 +217,7 @@ public class PartitionAwareOfflineRoutingTableBuilderTest {
for (int i = 0; i < 100; i++) {
String countStarQuery = "select count(*) from " + OFFLINE_TABLE_NAME;
Map<String, List<String>> routingTable =
- routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery));
+ routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery), null);
Assert.assertEquals(routingTable.keySet().size(), 1);
servers.add(routingTable.keySet().iterator().next());
}
diff --git a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/PartitionAwareRealtimeRoutingTableBuilderTest.java b/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/PartitionAwareRealtimeRoutingTableBuilderTest.java
index 7f349f8..93f2c9a 100644
--- a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/PartitionAwareRealtimeRoutingTableBuilderTest.java
+++ b/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/builder/PartitionAwareRealtimeRoutingTableBuilderTest.java
@@ -15,7 +15,8 @@
*/
package com.linkedin.pinot.broker.routing.builder;
-import com.linkedin.pinot.broker.routing.FakePropertyStore;
+import com.linkedin.pinot.broker.routing.selector.SegmentSelector;
+import com.linkedin.pinot.broker.util.FakePropertyStore;
import com.linkedin.pinot.broker.routing.RoutingTableLookupRequest;
import com.linkedin.pinot.common.config.ColumnPartitionConfig;
import com.linkedin.pinot.common.config.RoutingConfig;
@@ -102,7 +103,7 @@ public class PartitionAwareRealtimeRoutingTableBuilderTest {
// Check the query that requires to scan all segment.
String countStarQuery = "select count(*) from myTable";
Map<String, List<String>> routingTable =
- routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery));
+ routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery), null);
// Check that all segments are covered exactly for once.
Set<String> assignedSegments = new HashSet<>();
@@ -117,7 +118,7 @@ public class PartitionAwareRealtimeRoutingTableBuilderTest {
// Check the broker side server and segment pruning.
for (int queryPartition = 0; queryPartition < 100; queryPartition++) {
String filterQuery = "select count(*) from myTable where " + PARTITION_COLUMN + " = " + queryPartition;
- routingTable = routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(filterQuery));
+ routingTable = routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(filterQuery), null);
int partition = queryPartition % NUM_PARTITION;
assignedSegments = new HashSet<>();
@@ -178,7 +179,7 @@ public class PartitionAwareRealtimeRoutingTableBuilderTest {
// Check the query that requires to scan all segment.
String countStarQuery = "select count(*) from myTable";
Map<String, List<String>> routingTable =
- routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery));
+ routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery), null);
// Check that all segments are covered exactly for once.
Set<String> assignedSegments = new HashSet<>();
@@ -253,7 +254,7 @@ public class PartitionAwareRealtimeRoutingTableBuilderTest {
for (int i = 0; i < 100; i++) {
String countStarQuery = "select count(*) from " + REALTIME_TABLE_NAME;
Map<String, List<String>> routingTable =
- routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery));
+ routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery), null);
Assert.assertEquals(routingTable.keySet().size(), 1);
servers.addAll(routingTable.keySet());
}
diff --git a/pinot-broker/src/test/java/com/linkedin/pinot/broker/selector/MergeSegmentSelectorTest.java b/pinot-broker/src/test/java/com/linkedin/pinot/broker/selector/MergeSegmentSelectorTest.java
new file mode 100644
index 0000000..738e943
--- /dev/null
+++ b/pinot-broker/src/test/java/com/linkedin/pinot/broker/selector/MergeSegmentSelectorTest.java
@@ -0,0 +1,77 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.broker.selector;
+
+import com.fasterxml.uuid.Generators;
+import com.linkedin.pinot.broker.routing.selector.MergedSegmentSelector;
+import com.linkedin.pinot.broker.util.FakePropertyStore;
+import com.linkedin.pinot.common.config.TableConfig;
+import com.linkedin.pinot.common.lineage.SegmentMergeLineage;
+import com.linkedin.pinot.common.lineage.SegmentMergeLineageAccessHelper;
+import com.linkedin.pinot.common.utils.CommonConstants;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.testng.annotations.Test;
+
+
+public class MergeSegmentSelectorTest {
+ private static final String TEST_TABLE_NAME = "test_OFFLINE";
+
+ @Test
+ public void testMergeSegmentSelector() throws Exception {
+ MergedSegmentSelector segmentSelector = new MergedSegmentSelector();
+
+ TableConfig tableConfig =
+ new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(TEST_TABLE_NAME).build();
+ ZkHelixPropertyStore<ZNRecord> fakePropertyStore = new FakePropertyStore();
+ segmentSelector.init(tableConfig, fakePropertyStore);
+
+ SegmentMergeLineageAccessHelper.writeSegmentMergeLineage(fakePropertyStore, generateFakeMergeLineage(), 0);
+ segmentSelector.computeOnExternalViewChange();
+
+ Set<String> segmentsToQuery = new HashSet<>(
+ Arrays.asList(new String[]{"segment0", "segment1", "segment2", "segment3", "merged_segment0", "merged_segment1"}));
+
+ System.out.println("segments to query: " + segmentsToQuery);
+ System.out.println("segments after filter: " + segmentSelector.selectSegments(null, segmentsToQuery));
+
+ }
+
+ private SegmentMergeLineage generateFakeMergeLineage() throws Exception {
+ SegmentMergeLineage segmentMergeLineage = new SegmentMergeLineage(TEST_TABLE_NAME);
+
+ List<String> childrenGroups = new ArrayList<>();
+ for (int i = 0; i < 3; i++) {
+ String groupId = Generators.timeBasedGenerator().generate().toString();
+ segmentMergeLineage.addSegmentGroup(groupId, Arrays.asList(new String[]{"segment" + i}), null);
+ childrenGroups.add(groupId);
+ }
+
+ String mergedGroupId = Generators.timeBasedGenerator().generate().toString();
+ segmentMergeLineage.addSegmentGroup(mergedGroupId,
+ Arrays.asList(new String[]{"merged_segment0", "merged_segment1"}), childrenGroups);
+
+ String unmergedSegmentGroupId = Generators.timeBasedGenerator().generate().toString();
+ segmentMergeLineage.addSegmentGroup(unmergedSegmentGroupId, Arrays.asList(new String[]{"segment3"}), null);
+
+ return segmentMergeLineage;
+ }
+}
diff --git a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/FakePropertyStore.java b/pinot-broker/src/test/java/com/linkedin/pinot/broker/util/FakePropertyStore.java
similarity index 88%
rename from pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/FakePropertyStore.java
rename to pinot-broker/src/test/java/com/linkedin/pinot/broker/util/FakePropertyStore.java
index 6bb8322..e18c9e2 100644
--- a/pinot-broker/src/test/java/com/linkedin/pinot/broker/routing/FakePropertyStore.java
+++ b/pinot-broker/src/test/java/com/linkedin/pinot/broker/util/FakePropertyStore.java
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package com.linkedin.pinot.broker.routing;
+package com.linkedin.pinot.broker.util;
import java.util.HashMap;
import java.util.Map;
@@ -42,6 +42,17 @@ public class FakePropertyStore extends ZkHelixPropertyStore<ZNRecord> {
}
@Override
+ public boolean set(String path, ZNRecord stat, int expectedVersion, int options) {
+ try {
+ setContents(path, stat);
+ return true;
+ } catch (Exception e) {
+ return false;
+ }
+ }
+
+
+ @Override
public boolean set(String path, ZNRecord stat, int options) {
try {
setContents(path, stat);
diff --git a/pinot-common/pom.xml b/pinot-common/pom.xml
index 95e4fc8..6f59e95 100644
--- a/pinot-common/pom.xml
+++ b/pinot-common/pom.xml
@@ -228,6 +228,10 @@
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
+ <groupId>com.fasterxml.uuid</groupId>
+ <artifactId>java-uuid-generator</artifactId>
+ </dependency>
+ <dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</dependency>
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/config/MultiLevelRollupSetting.java b/pinot-common/src/main/java/com/linkedin/pinot/common/config/MultiLevelRollupSetting.java
new file mode 100644
index 0000000..d9da264
--- /dev/null
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/config/MultiLevelRollupSetting.java
@@ -0,0 +1,77 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.common.config;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class MultiLevelRollupSetting {
+ @ConfigKey("timeInputFormat")
+ public String _timeInputFormat;
+
+ @ConfigKey("timeOutputFormat")
+ public String _timeOutputFormat;
+
+ @ConfigKey("timeOutputGranularity")
+ public String _timeOutputGranularity;
+
+ @ConfigKey("minNumSegments")
+ public int _minNumSegments;
+
+ @ConfigKey("minNumTotalDocs")
+ public int _minNumTotalDocs;
+
+ public String getTimeInputFormat() {
+ return _timeInputFormat;
+ }
+
+ public void setTimeInputFormat(String timeInputFormat) {
+ _timeInputFormat = timeInputFormat;
+ }
+
+ public String getTimeOutputFormat() {
+ return _timeOutputFormat;
+ }
+
+ public void setTimeOutputFormat(String timeOutputFormat) {
+ _timeOutputFormat = timeOutputFormat;
+ }
+
+ public String getTimeOutputGranularity() {
+ return _timeOutputGranularity;
+ }
+
+ public void setTimeOutputGranularity(String timeOutputGranularity) {
+ _timeOutputGranularity = timeOutputGranularity;
+ }
+
+ public int getMinNumSegments() {
+ return _minNumSegments;
+ }
+
+ public void setMinNumSegments(int minNumSegments) {
+ _minNumSegments = minNumSegments;
+ }
+
+ public int getMinNumTotalDocs() {
+ return _minNumTotalDocs;
+ }
+
+ public void setMinNumTotalDocs(int minNumTotalDocs) {
+ _minNumTotalDocs = minNumTotalDocs;
+ }
+}
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/config/RollupConfig.java b/pinot-common/src/main/java/com/linkedin/pinot/common/config/RollupConfig.java
new file mode 100644
index 0000000..c86099f
--- /dev/null
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/config/RollupConfig.java
@@ -0,0 +1,59 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.common.config;
+
+import java.util.List;
+import java.util.Map;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class RollupConfig {
+
+ @ConfigKey("rollupType")
+ String rollupType;
+
+ @ConfigKey("preAggregateType")
+ @UseChildKeyHandler(SimpleMapChildKeyHandler.class)
+ Map<String, String> _preAggregateType;
+
+ @ConfigKey("multiLevelRollupSettings")
+ List<MultiLevelRollupSetting> _multiLevelRollupSettings;
+
+ public String getRollupType() {
+ return rollupType;
+ }
+
+ public void setRollupType(String rollupType) {
+ this.rollupType = rollupType;
+ }
+
+ public Map<String, String> getPreAggregateType() {
+ return _preAggregateType;
+ }
+
+ public void setPreAggregateType(Map<String, String> preAggregateType) {
+ _preAggregateType = preAggregateType;
+ }
+
+ public List<MultiLevelRollupSetting> getMultiLevelRollupSettings() {
+ return _multiLevelRollupSettings;
+ }
+
+ public void setMultiLevelRollupSettings(List<MultiLevelRollupSetting> multiLevelRollupSettings) {
+ _multiLevelRollupSettings = multiLevelRollupSettings;
+ }
+
+}
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/config/SegmentMergeConfig.java b/pinot-common/src/main/java/com/linkedin/pinot/common/config/SegmentMergeConfig.java
new file mode 100644
index 0000000..d96952f
--- /dev/null
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/config/SegmentMergeConfig.java
@@ -0,0 +1,137 @@
+/**
+ * Copyright (C) 2014-2016 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.common.config;
+
+import javax.annotation.Nonnull;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class SegmentMergeConfig {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(RollupConfig.class);
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ private static final String MERGE_TYPE = "mergeType";
+ private static final String MERGE_STRATEGY = "mergeStrategy";
+ private static final String MIN_NUM_SEGMENTS = "minNumSegments";
+ private static final String MIN_NUM_TOTAL_DOCS = "minNumTotalDocs";
+ private static final String ROLLUP_CONFIG = "rollupConfig";
+ private static final String OLD_SEGMENT_CLEANUP = "oldSegmentCleanup";
+
+ @ConfigKey(value = "mergeType")
+ private String _mergeType;
+
+ @ConfigKey(value = "mergeStrategy")
+ private String _mergeStrategy;
+
+ @ConfigKey(value = "rollupConfig")
+ private RollupConfig _rollupConfig;
+
+ @ConfigKey(value = "minNumSegments")
+ private int _minNumSegments;
+
+ @ConfigKey(value = "minNumTotalDocs")
+ private long _minNumTotalDocs;
+
+ @ConfigKey(value = "oldSegmentCleanup")
+ private boolean _oldSegmentCleanup;
+
+ public String getMergeType() {
+ return _mergeType;
+ }
+
+ public void setMergeType(String mergeType) {
+ _mergeType = mergeType;
+ }
+
+ public String getMergeStrategy() {
+ return _mergeStrategy;
+ }
+
+ public void setMergeStrategy(String mergeStrategy) {
+ _mergeStrategy = mergeStrategy;
+ }
+
+ public int getMinNumSegments() {
+ return _minNumSegments;
+ }
+
+ public void setMinNumSegments(int minNumSegments) {
+ _minNumSegments = minNumSegments;
+ }
+
+ public long getMinNumTotalDocs() {
+ return _minNumTotalDocs;
+ }
+
+ public void setMinNumTotalDocs(long minNumTotalDocs) {
+ _minNumTotalDocs = minNumTotalDocs;
+ }
+
+ public boolean getOldSegmentCleanup() {
+ return _oldSegmentCleanup;
+ }
+
+ public void setOldSegmentCleanup(boolean oldSegmentCleanup) {
+ _oldSegmentCleanup = oldSegmentCleanup;
+ }
+
+ public RollupConfig getRollupConfig() {
+ return _rollupConfig;
+ }
+
+ public void setRollupConfig(RollupConfig rollupConfig) {
+ _rollupConfig = rollupConfig;
+ }
+
+ @Nonnull
+ public JSONObject toJSON() {
+ JSONObject mergeRollupConfigJsonObject = new JSONObject();
+ try {
+ mergeRollupConfigJsonObject.put(MERGE_TYPE, _mergeType);
+ mergeRollupConfigJsonObject.put(MERGE_STRATEGY, _mergeStrategy);
+ mergeRollupConfigJsonObject.put(ROLLUP_CONFIG, _rollupConfig);
+ mergeRollupConfigJsonObject.put(MIN_NUM_SEGMENTS, _minNumSegments);
+ mergeRollupConfigJsonObject.put(MIN_NUM_TOTAL_DOCS, _minNumTotalDocs);
+ mergeRollupConfigJsonObject.put(OLD_SEGMENT_CLEANUP, _oldSegmentCleanup);
+ } catch (Exception e) {
+ LOGGER.error("Failed to convert rollup config to json", e);
+ }
+
+ return mergeRollupConfigJsonObject;
+ }
+
+ public String toJSONString() {
+ try {
+ return toJSON().toString(2);
+ } catch (Exception e) {
+ return e.toString();
+ }
+ }
+
+ public static void main(String[] args) {
+ SegmentMergeConfig config = new SegmentMergeConfig();
+ config.setMergeType("CONCATENATE");
+ System.out.println(config.toJSONString());
+ System.out.println(config.toJSON());
+ }
+}
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/config/TableConfig.java b/pinot-common/src/main/java/com/linkedin/pinot/common/config/TableConfig.java
index 960c7f0..b4fa08a 100644
--- a/pinot-common/src/main/java/com/linkedin/pinot/common/config/TableConfig.java
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/config/TableConfig.java
@@ -47,6 +47,7 @@ public class TableConfig {
private static final String QUOTA_CONFIG_KEY = "quota";
private static final String TASK_CONFIG_KEY = "task";
private static final String ROUTING_CONFIG_KEY = "routing";
+ private static final String SEGMENT_MERGE_CONFIG_KEY = "segmentMergeConfig";
@ConfigKey("name")
@ConfigDoc(value = "The name for the table.", mandatory = true, exampleValue = "myTable")
@@ -78,6 +79,9 @@ public class TableConfig {
@NestedConfig
private RoutingConfig _routingConfig;
+ @NestedConfig
+ private SegmentMergeConfig _segmentMergeConfig;
+
public TableConfig() {
// TODO: currently these 2 fields are annotated as non-null. Revisit to see whether that's necessary
_tenantConfig = new TenantConfig();
@@ -87,7 +91,8 @@ public class TableConfig {
private TableConfig(@Nonnull String tableName, @Nonnull TableType tableType,
@Nonnull SegmentsValidationAndRetentionConfig validationConfig, @Nonnull TenantConfig tenantConfig,
@Nonnull IndexingConfig indexingConfig, @Nonnull TableCustomConfig customConfig,
- @Nullable QuotaConfig quotaConfig, @Nullable TableTaskConfig taskConfig, @Nullable RoutingConfig routingConfig) {
+ @Nullable QuotaConfig quotaConfig, @Nullable TableTaskConfig taskConfig, @Nullable RoutingConfig routingConfig,
+ @Nullable SegmentMergeConfig segmentMergeConfig) {
_tableName = TableNameBuilder.forType(tableType).tableNameWithType(tableName);
_tableType = tableType;
_validationConfig = validationConfig;
@@ -97,19 +102,18 @@ public class TableConfig {
_quotaConfig = quotaConfig;
_taskConfig = taskConfig;
_routingConfig = routingConfig;
+ _segmentMergeConfig = segmentMergeConfig;
}
// For backward compatible
@Deprecated
@Nonnull
- public static TableConfig init(@Nonnull String jsonConfigString)
- throws IOException, JSONException {
+ public static TableConfig init(@Nonnull String jsonConfigString) throws IOException, JSONException {
return fromJSONConfig(new JSONObject(jsonConfigString));
}
@Nonnull
- public static TableConfig fromJSONConfig(@Nonnull JSONObject jsonConfig)
- throws IOException, JSONException {
+ public static TableConfig fromJSONConfig(@Nonnull JSONObject jsonConfig) throws IOException, JSONException {
TableType tableType = TableType.valueOf(jsonConfig.getString(TABLE_TYPE_KEY).toUpperCase());
String tableName = TableNameBuilder.forType(tableType).tableNameWithType(jsonConfig.getString(TABLE_NAME_KEY));
SegmentsValidationAndRetentionConfig validationConfig =
@@ -136,13 +140,17 @@ public class TableConfig {
OBJECT_MAPPER.readValue(jsonConfig.getJSONObject(ROUTING_CONFIG_KEY).toString(), RoutingConfig.class);
}
+ SegmentMergeConfig segmentMergeConfig = null;
+ if (jsonConfig.has(SEGMENT_MERGE_CONFIG_KEY)) {
+ segmentMergeConfig = OBJECT_MAPPER.readValue(jsonConfig.getJSONObject(SEGMENT_MERGE_CONFIG_KEY).toString(), SegmentMergeConfig.class);
+ }
+
return new TableConfig(tableName, tableType, validationConfig, tenantConfig, indexingConfig, customConfig,
- quotaConfig, taskConfig, routingConfig);
+ quotaConfig, taskConfig, routingConfig, segmentMergeConfig);
}
@Nonnull
- public static JSONObject toJSONConfig(@Nonnull TableConfig tableConfig)
- throws IOException, JSONException {
+ public static JSONObject toJSONConfig(@Nonnull TableConfig tableConfig) throws IOException, JSONException {
JSONObject jsonConfig = new JSONObject();
jsonConfig.put(TABLE_NAME_KEY, tableConfig._tableName);
jsonConfig.put(TABLE_TYPE_KEY, tableConfig._tableType.toString());
@@ -160,12 +168,14 @@ public class TableConfig {
if (tableConfig._routingConfig != null) {
jsonConfig.put(ROUTING_CONFIG_KEY, new JSONObject(OBJECT_MAPPER.writeValueAsString(tableConfig._routingConfig)));
}
+ if (tableConfig._segmentMergeConfig != null) {
+ jsonConfig.put(SEGMENT_MERGE_CONFIG_KEY, new JSONObject(OBJECT_MAPPER.writeValueAsString(tableConfig._segmentMergeConfig)));
+ }
return jsonConfig;
}
@Nonnull
- public static TableConfig fromZnRecord(@Nonnull ZNRecord znRecord)
- throws IOException, JSONException {
+ public static TableConfig fromZnRecord(@Nonnull ZNRecord znRecord) throws IOException, JSONException {
Map<String, String> simpleFields = znRecord.getSimpleFields();
TableType tableType = TableType.valueOf(simpleFields.get(TABLE_TYPE_KEY).toUpperCase());
String tableName = TableNameBuilder.forType(tableType).tableNameWithType(simpleFields.get(TABLE_NAME_KEY));
@@ -194,8 +204,14 @@ public class TableConfig {
routingConfig = OBJECT_MAPPER.readValue(routingConfigString, RoutingConfig.class);
}
+ String segmentMergeConfigString = simpleFields.get(SEGMENT_MERGE_CONFIG_KEY);
+ SegmentMergeConfig segmentMergeConfig = null;
+ if (segmentMergeConfigString != null) {
+ segmentMergeConfig = OBJECT_MAPPER.readValue(segmentMergeConfigString, SegmentMergeConfig.class);
+ }
+
return new TableConfig(tableName, tableType, validationConfig, tenantConfig, indexingConfig, customConfig,
- quotaConfig, taskConfig, routingConfig);
+ quotaConfig, taskConfig, routingConfig, segmentMergeConfig);
}
@Nonnull
@@ -218,6 +234,9 @@ public class TableConfig {
if (tableConfig._routingConfig != null) {
simpleFields.put(ROUTING_CONFIG_KEY, OBJECT_MAPPER.writeValueAsString(tableConfig._routingConfig));
}
+ if (tableConfig._segmentMergeConfig != null) {
+ simpleFields.put(SEGMENT_MERGE_CONFIG_KEY, OBJECT_MAPPER.writeValueAsString(tableConfig._segmentMergeConfig));
+ }
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -305,9 +324,17 @@ public class TableConfig {
_routingConfig = routingConfig;
}
+ @Nullable
+ public SegmentMergeConfig getSegmentMergeConfig() {
+ return _segmentMergeConfig;
+ }
+
+ public void setSegmentMergeConfig(SegmentMergeConfig segmentMergeConfig) {
+ _segmentMergeConfig = segmentMergeConfig;
+ }
+
@Nonnull
- public String toJSONConfigString()
- throws IOException, JSONException {
+ public String toJSONConfigString() throws IOException, JSONException {
return toJSONConfig(this).toString();
}
@@ -332,18 +359,14 @@ public class TableConfig {
TableConfig that = (TableConfig) o;
- return EqualityUtils.isEqual(_tableName, that._tableName) &&
- EqualityUtils.isEqual(_tableType, that._tableType) &&
- EqualityUtils.isEqual(_validationConfig, that._validationConfig) &&
- EqualityUtils.isEqual(_tenantConfig, that._tenantConfig) &&
- EqualityUtils.isEqual(_indexingConfig, that._indexingConfig) &&
- EqualityUtils.isEqual(_customConfig, that._customConfig) &&
- EqualityUtils.isEqual(_quotaConfig, that._quotaConfig) &&
- EqualityUtils.isEqual(_taskConfig, that._taskConfig) &&
- EqualityUtils.isEqual(_routingConfig, that._routingConfig);
+ return EqualityUtils.isEqual(_tableName, that._tableName) && EqualityUtils.isEqual(_tableType, that._tableType)
+ && EqualityUtils.isEqual(_validationConfig, that._validationConfig) && EqualityUtils.isEqual(_tenantConfig,
+ that._tenantConfig) && EqualityUtils.isEqual(_indexingConfig, that._indexingConfig) && EqualityUtils.isEqual(
+ _customConfig, that._customConfig) && EqualityUtils.isEqual(_quotaConfig, that._quotaConfig)
+ && EqualityUtils.isEqual(_taskConfig, that._taskConfig) && EqualityUtils.isEqual(_routingConfig,
+ that._routingConfig);
}
-
@Override
public int hashCode() {
int result = EqualityUtils.hashCodeOf(_tableName);
@@ -401,9 +424,11 @@ public class TableConfig {
private QuotaConfig _quotaConfig;
private TableTaskConfig _taskConfig;
private RoutingConfig _routingConfig;
+ private SegmentMergeConfig _segmentMergeConfig;
private HllConfig _hllConfig;
private StarTreeIndexSpec _starTreeIndexSpec;
+
public Builder(TableType tableType) {
_tableType = tableType;
}
@@ -549,8 +574,15 @@ public class TableConfig {
return this;
}
- public TableConfig build()
- throws IOException, JSONException {
+ public SegmentMergeConfig getSegmentMergeConfig() {
+ return _segmentMergeConfig;
+ }
+
+ public void setSegmentMergeConfig(SegmentMergeConfig segmentMergeConfig) {
+ _segmentMergeConfig = segmentMergeConfig;
+ }
+
+ public TableConfig build() throws IOException, JSONException {
// Validation config
SegmentsValidationAndRetentionConfig validationConfig = new SegmentsValidationAndRetentionConfig();
validationConfig.setTimeColumnName(_timeColumnName);
@@ -598,7 +630,7 @@ public class TableConfig {
}
return new TableConfig(_tableName, _tableType, validationConfig, tenantConfig, indexingConfig, _customConfig,
- _quotaConfig, _taskConfig, _routingConfig);
+ _quotaConfig, _taskConfig, _routingConfig, _segmentMergeConfig);
}
}
}
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentGroup.java b/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentGroup.java
new file mode 100644
index 0000000..fb28ac1
--- /dev/null
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentGroup.java
@@ -0,0 +1,72 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.common.lineage;
+
+import java.util.List;
+import java.util.Set;
+
+
+/**
+ * Class to represent segment group
+ */
+public class SegmentGroup {
+
+ private String _groupId;
+ private int _groupLevel;
+ private SegmentGroup _parentGroup;
+ private List<SegmentGroup> _childrenGroups;
+ private Set<String> _segments;
+
+ public String getGroupId() {
+ return _groupId;
+ }
+
+ public void setGroupId(String groupId) {
+ _groupId = groupId;
+ }
+
+ public SegmentGroup getParentGroup() {
+ return _parentGroup;
+ }
+
+ public void setParentGroup(SegmentGroup parentGroup) {
+ _parentGroup = parentGroup;
+ }
+
+ public List<SegmentGroup> getChildrenGroups() {
+ return _childrenGroups;
+ }
+
+ public void setChildrenGroups(List<SegmentGroup> childrenGroups) {
+ _childrenGroups = childrenGroups;
+ }
+
+ public Set<String> getSegments() {
+ return _segments;
+ }
+
+ public void setSegments(Set<String> segments) {
+ _segments = segments;
+ }
+
+ public int getGroupLevel() {
+ return _groupLevel;
+ }
+
+ public void setGroupLevel(int groupLevel) {
+ _groupLevel = groupLevel;
+ }
+}
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentMergeLineage.java b/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentMergeLineage.java
new file mode 100644
index 0000000..f4a5bdf
--- /dev/null
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentMergeLineage.java
@@ -0,0 +1,333 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.common.lineage;
+
+import com.linkedin.pinot.common.exception.InvalidConfigException;
+import com.linkedin.pinot.common.utils.EqualityUtils;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.ZNRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Class to represent segment merge lineage information.
+ *
+ * Segment merge lineage information is serialized into a znode and stored in a helix property store (zookeeper). This
+ * information will be used by the broker, segment merge task generator, and retention manager.
+ *
+ * For each segment group, we are storing the following information:
+ * 1. group id
+ * - group identifier (will be stored in time based uuid format)
+ * 2. group level
+ * - segment level allows us to have a hierarchical representation of the segment lineage. When we assign the merge
+ * task, we will only merge/roll-up segments with the same level.
+ * (e.g. If hourly segment groups are in level 0, daily segment groups will belong to level 1)
+ * 3. segments
+ * - segments that belong to a particular segment group
+ * 4. lineage information
+ * - If a segment group is created by merging multiple children segment groups, we write the lineage information
+ * (e.g. segment group C is merged from segment group A, B)
+ */
+public class SegmentMergeLineage {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(SegmentMergeLineage.class);
+
+ private static final String LEVEL_KEY_PREFIX = "level_";
+ private static final String ROOT_NODE_GROUP_ID = "root";
+ private static final String SEGMENT_DELIMITER = ",";
+ private static final int DEFAULT_GROUP_LEVEL = 0;
+
+ private String _tableNameWithType;
+
+ // Mapping of group id to children group ids
+ private Map<String, List<String>> _parentGroupToChildrenGroupsMap;
+
+ // Mapping of group level to group id to segments that belong to a group
+ // Segment level represents
+ private Map<Integer, Map<String, List<String>>> _levelToGroupToSegmentsMap;
+
+ public SegmentMergeLineage(String tableNameWithType) {
+ _tableNameWithType = tableNameWithType;
+ _parentGroupToChildrenGroupsMap = new HashMap<>();
+ _levelToGroupToSegmentsMap = new HashMap<>();
+ }
+
+ public SegmentMergeLineage(String tableNameWithType, Map<String, List<String>> segmentGroupLineageMap,
+ Map<Integer, Map<String, List<String>>> levelToGroupToSegmentMap) {
+ _tableNameWithType = tableNameWithType;
+ _parentGroupToChildrenGroupsMap = segmentGroupLineageMap;
+ _levelToGroupToSegmentsMap = levelToGroupToSegmentMap;
+ }
+
+ public String getTableName() {
+ return _tableNameWithType;
+ }
+
+ public static SegmentMergeLineage fromZNRecord(ZNRecord record) {
+ String tableNameWithType = record.getId();
+ Map<String, List<String>> segmentGroupLineageMap = record.getListFields();
+
+ Map<Integer, Map<String, List<String>>> groupToSegmentsMap = new HashMap<>();
+ for (Map.Entry<String, Map<String, String>> entry : record.getMapFields().entrySet()) {
+ String levelKey = entry.getKey();
+ Integer level = Integer.parseInt(levelKey.substring(LEVEL_KEY_PREFIX.length()));
+ Map<String, List<String>> groupToSegmentsForLevel = new HashMap<>();
+ for (Map.Entry<String, String> groupEntry : entry.getValue().entrySet()) {
+ String groupId = groupEntry.getKey();
+ String segmentsString = groupEntry.getValue();
+ List<String> segments = Arrays.asList(segmentsString.split(SEGMENT_DELIMITER));
+ groupToSegmentsForLevel.put(groupId, new ArrayList<>(segments));
+ }
+ groupToSegmentsMap.put(level, groupToSegmentsForLevel);
+ }
+ return new SegmentMergeLineage(tableNameWithType, segmentGroupLineageMap, groupToSegmentsMap);
+ }
+
+ public ZNRecord toZNRecord() {
+ ZNRecord record = new ZNRecord(_tableNameWithType);
+ record.setListFields(_parentGroupToChildrenGroupsMap);
+ Map<String, Map<String, String>> groupToSegmentsMap = new HashMap<>();
+
+ for (Map.Entry<Integer, Map<String, List<String>>> entry : _levelToGroupToSegmentsMap.entrySet()) {
+ String key = LEVEL_KEY_PREFIX + entry.getKey();
+ Map<String, String> groupSegmentsForLevel = new HashMap<>();
+ for (Map.Entry<String, List<String>> groupEntry : entry.getValue().entrySet()) {
+ String groupId = groupEntry.getKey();
+ String segments = String.join(SEGMENT_DELIMITER, groupEntry.getValue());
+ groupSegmentsForLevel.put(groupId, segments);
+ }
+ groupToSegmentsMap.put(key, groupSegmentsForLevel);
+ }
+ record.setMapFields(groupToSegmentsMap);
+
+ return record;
+ }
+
+ /**
+ * Add segment merge lineage information
+ *
+ * @param groupId a group id
+ * @param currentGroupSegments a list of segments that belongs to the group
+ * @param childrenGroups a list of children groups that the current group covers
+ */
+ public void addSegmentGroup(String groupId, List<String> currentGroupSegments, List<String> childrenGroups)
+ throws InvalidConfigException {
+ // Get group level
+ Integer groupLevel = getGroupLevel(childrenGroups);
+
+ // Update group to segments map
+ Map<String, List<String>> groupToSegmentMap =
+ _levelToGroupToSegmentsMap.computeIfAbsent(groupLevel, k -> new HashMap<>());
+ if (groupToSegmentMap.containsKey(groupId)) {
+ throw new InvalidConfigException("Group id : " + groupId + " already exists.");
+ }
+ groupToSegmentMap.put(groupId, new ArrayList<>(currentGroupSegments));
+ _levelToGroupToSegmentsMap.put(groupLevel, groupToSegmentMap);
+
+ // Update segment group lineage map
+ if (groupLevel > DEFAULT_GROUP_LEVEL) {
+ if (_parentGroupToChildrenGroupsMap.containsKey(groupId)) {
+ throw new InvalidConfigException("Group id : " + groupId + " already exists.");
+ } else {
+ _parentGroupToChildrenGroupsMap.put(groupId, new ArrayList<>(childrenGroups));
+ }
+ }
+
+ LOGGER.info("New group has been added successfully to the segment lineage. (groupId: {}, currentGroupSegments: {}, "
+ + "childrenGroups: {}", groupId, currentGroupSegments, childrenGroups);
+ }
+
+ /**
+ * Remove segment merge information given a group id
+ *
+ * @param groupId a group id
+ */
+ public void removeSegmentGroup(String groupId) {
+ // Clean up the group id from parent to children group mapping
+ _parentGroupToChildrenGroupsMap.remove(groupId);
+ for (List<String> childrenGroups : _parentGroupToChildrenGroupsMap.values()) {
+ childrenGroups.remove(groupId);
+ }
+
+ // Clean up the group id from group to segments mapping
+ for (Map<String, List<String>> groupToSegments : _levelToGroupToSegmentsMap.values()) {
+ groupToSegments.remove(groupId);
+ }
+
+ LOGGER.info("Group {} has been successfully removed.", groupId);
+ }
+
+ /**
+ * Construct a lineage tree and returns the root node
+ *
+ * @return a root node for lineage tree
+ */
+ public SegmentGroup getMergeLineageRootSegmentGroup() {
+ // Create group nodes
+ Map<String, SegmentGroup> groupNodes = new HashMap<>();
+ for (Map.Entry<Integer, Map<String, List<String>>> groupEntryForLevel : _levelToGroupToSegmentsMap.entrySet()) {
+ Integer level = groupEntryForLevel.getKey();
+ Map<String, List<String>> groupToSegmentsForLevel = groupEntryForLevel.getValue();
+ for (Map.Entry<String, List<String>> entry : groupToSegmentsForLevel.entrySet()) {
+ String groupId = entry.getKey();
+ List<String> segments = entry.getValue();
+ SegmentGroup groupNode = new SegmentGroup();
+ groupNode.setGroupId(groupId);
+ groupNode.setSegments(new HashSet<>(segments));
+ groupNode.setGroupLevel(level);
+ groupNodes.put(groupId, groupNode);
+ }
+ }
+
+ // Add edges by updating children & parent group information
+ for (Map.Entry<String, List<String>> lineageEntry : _parentGroupToChildrenGroupsMap.entrySet()) {
+ String parentGroupId = lineageEntry.getKey();
+ List<String> childrenGroupIds = lineageEntry.getValue();
+ List<SegmentGroup> childrenGroups = new ArrayList<>();
+ SegmentGroup parentNode = groupNodes.get(parentGroupId);
+ for (String groupId : childrenGroupIds) {
+ SegmentGroup childNode = groupNodes.get(groupId);
+ if (childNode != null) {
+ childrenGroups.add(childNode);
+ childNode.setParentGroup(parentNode);
+ }
+ }
+ parentNode.setChildrenGroups(childrenGroups);
+ }
+
+ // Create a root node
+ SegmentGroup root = new SegmentGroup();
+ root.setGroupId(ROOT_NODE_GROUP_ID);
+ List<SegmentGroup> childrenForRoot = new ArrayList<>();
+ for (SegmentGroup group : groupNodes.values()) {
+ if (group.getParentGroup() == null) {
+ group.setParentGroup(root);
+ childrenForRoot.add(group);
+ }
+ }
+ root.setChildrenGroups(childrenForRoot);
+
+ return root;
+ }
+
+ /**
+ * Get a list of segments for a given group id
+ *
+ * @param groupId a group id
+ * @return a list of segments that belongs to the given group id, null if the group does not exist
+ */
+ public List<String> getSegmentsForGroup(String groupId) {
+ for (Map<String, List<String>> groupToSegmentMap : _levelToGroupToSegmentsMap.values()) {
+ List<String> segments = groupToSegmentMap.get(groupId);
+ if (segments != null) {
+ return segments;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Get a list of children group ids for a given group id
+ *
+ * @param groupId a group id
+ * @return a list of children groups that are covered by the given group id, null if the group does not exist
+ */
+ public List<String> getChildrenForGroup(String groupId) {
+ return _parentGroupToChildrenGroupsMap.get(groupId);
+ }
+
+ /**
+ * Get a list of all group levels
+ *
+ * @return a list of all group levels
+ */
+ public List<Integer> getAllGroupLevels() {
+ List<Integer> groupLevels = new ArrayList<>(_levelToGroupToSegmentsMap.keySet());
+ Collections.sort(groupLevels);
+ return groupLevels;
+ }
+
+ /**
+ * Get a list of group ids for a given group level
+ *
+ * @param groupLevel a group level
+ * @return a list of group ids that belongs to the given group level, null if the group level does not exist
+ */
+ public List<String> getGroupIdsForGroupLevel(int groupLevel) {
+ Map<String, List<String>> groupToSegmentsMap = _levelToGroupToSegmentsMap.get(groupLevel);
+ if (groupToSegmentsMap != null) {
+ return new ArrayList<>(groupToSegmentsMap.keySet());
+ }
+ return null;
+ }
+
+ /**
+ * Helper function to compute group level given children groups
+ *
+ * @param childrenGroups a list of children group ids
+ * @return group level
+ */
+ private Integer getGroupLevel(List<String> childrenGroups) throws InvalidConfigException {
+ // If no children exists, the group belongs to the base level.
+ if (childrenGroups == null || childrenGroups.isEmpty()) {
+ return DEFAULT_GROUP_LEVEL;
+ }
+
+ for (Map.Entry<Integer, Map<String, List<String>>> entry : _levelToGroupToSegmentsMap.entrySet()) {
+ Integer currentLevel = entry.getKey();
+ Map<String, List<String>> currentLevelGroupToSegmentsMap = entry.getValue();
+ if (currentLevelGroupToSegmentsMap.keySet().containsAll(childrenGroups)) {
+ return currentLevel + 1;
+ }
+ }
+
+ // At this point, not all children groups are covered, cannot add group
+ throw new InvalidConfigException("Cannot compute group level because not all children groups exist "
+ + "in the segment merge lineage, children groups: " + childrenGroups);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (EqualityUtils.isSameReference(this, o)) {
+ return true;
+ }
+
+ if (EqualityUtils.isNullOrNotSameClass(this, o)) {
+ return false;
+ }
+
+ SegmentMergeLineage that = (SegmentMergeLineage) o;
+
+ return EqualityUtils.isEqual(_tableNameWithType, that._tableNameWithType) && EqualityUtils.isEqual(
+ _parentGroupToChildrenGroupsMap, that._parentGroupToChildrenGroupsMap) && EqualityUtils.isEqual(
+ _levelToGroupToSegmentsMap, that._levelToGroupToSegmentsMap);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = EqualityUtils.hashCodeOf(_tableNameWithType);
+ result = EqualityUtils.hashCodeOf(result, _parentGroupToChildrenGroupsMap);
+ result = EqualityUtils.hashCodeOf(result, _levelToGroupToSegmentsMap);
+ return result;
+ }
+}
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentMergeLineageAccessHelper.java b/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentMergeLineageAccessHelper.java
new file mode 100644
index 0000000..00eb7c3
--- /dev/null
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentMergeLineageAccessHelper.java
@@ -0,0 +1,82 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.common.lineage;
+
+import com.linkedin.pinot.common.metadata.ZKMetadataProvider;
+import com.linkedin.pinot.common.utils.retry.RetryPolicies;
+import com.linkedin.pinot.common.utils.retry.RetryPolicy;
+import java.util.List;
+import org.apache.helix.AccessOption;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.zookeeper.data.Stat;
+
+
+/**
+ * Class to help to read, write segment merge lineage
+ */
+public class SegmentMergeLineageAccessHelper {
+
+ /**
+ * Read the segment merge lineage ZNRecord from the property store
+ *
+ * @param propertyStore a property store
+ * @param tableNameWithType a table name with type
+ * @return a ZNRecord of segment merge lineage
+ */
+ public static ZNRecord getSegmentMergeLineageZNRecord(ZkHelixPropertyStore<ZNRecord> propertyStore,
+ String tableNameWithType) {
+ String path = ZKMetadataProvider.constructPropertyStorePathForSegmentMergeLineage(tableNameWithType);
+ Stat stat = new Stat();
+ ZNRecord segmentMergeLineageZNRecord = propertyStore.get(path, stat, AccessOption.PERSISTENT);
+ if (segmentMergeLineageZNRecord != null) {
+ segmentMergeLineageZNRecord.setVersion(stat.getVersion());
+ }
+ return segmentMergeLineageZNRecord;
+ }
+
+ /**
+ * Read the segment merge lineage from the property store
+ *
+ * @param propertyStore a property store
+ * @param tableNameWithType a table name with type
+ * @return a segment merge lineage
+ */
+ public static SegmentMergeLineage getSegmentMergeLineage(ZkHelixPropertyStore<ZNRecord> propertyStore,
+ String tableNameWithType) {
+ String path = ZKMetadataProvider.constructPropertyStorePathForSegmentMergeLineage(tableNameWithType);
+ ZNRecord segmentMergeLineageZNRecord = propertyStore.get(path, null, AccessOption.PERSISTENT);
+ return SegmentMergeLineage.fromZNRecord(segmentMergeLineageZNRecord);
+ }
+
+ /**
+ * Write the segment merge lineage to the property store
+ *
+ * @param propertyStore a property store
+ * @param segmentMergeLineage a segment merge lineage
+ * @return true if update is successful. false otherwise.
+ */
+ public static boolean writeSegmentMergeLineage(ZkHelixPropertyStore<ZNRecord> propertyStore,
+ SegmentMergeLineage segmentMergeLineage, int expectedVersion) {
+ String tableNameWithType = segmentMergeLineage.getTableName();
+ String path = ZKMetadataProvider.constructPropertyStorePathForSegmentMergeLineage(tableNameWithType);
+ return propertyStore.set(path, segmentMergeLineage.toZNRecord(), expectedVersion, AccessOption.PERSISTENT);
+ }
+}
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/metadata/ZKMetadataProvider.java b/pinot-common/src/main/java/com/linkedin/pinot/common/metadata/ZKMetadataProvider.java
index fe55a0c..e9d6b63 100644
--- a/pinot-common/src/main/java/com/linkedin/pinot/common/metadata/ZKMetadataProvider.java
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/metadata/ZKMetadataProvider.java
@@ -53,6 +53,7 @@ public class ZKMetadataProvider {
private static final String PROPERTYSTORE_TABLE_CONFIGS_PREFIX = "/CONFIGS/TABLE";
private static final String PROPERTYSTORE_INSTANCE_CONFIGS_PREFIX = "/CONFIGS/INSTANCE";
private static final String PROPERTYSTORE_CLUSTER_CONFIGS_PREFIX = "/CONFIGS/CLUSTER";
+ private static final String PROPERTYSTORE_SEGMENT_MERGE_LINEAGE = "/SEGMENT_MERGE_LINEAGE";
public static void setRealtimeTableConfig(ZkHelixPropertyStore<ZNRecord> propertyStore, String realtimeTableName,
ZNRecord znRecord) {
@@ -106,6 +107,10 @@ public class ZKMetadataProvider {
return StringUtil.join("/", PROPERTYSTORE_CLUSTER_CONFIGS_PREFIX, controllerConfigKey);
}
+ public static String constructPropertyStorePathForSegmentMergeLineage(String tableNameWithType) {
+ return StringUtil.join("/", PROPERTYSTORE_SEGMENT_MERGE_LINEAGE, tableNameWithType);
+ }
+
public static boolean isSegmentExisted(ZkHelixPropertyStore<ZNRecord> propertyStore, String resourceNameForResource,
String segmentName) {
return propertyStore.exists(constructPropertyStorePathForSegment(resourceNameForResource, segmentName),
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/com/linkedin/pinot/common/utils/FileUploadDownloadClient.java
index ca4b423..c8b15f2 100644
--- a/pinot-common/src/main/java/com/linkedin/pinot/common/utils/FileUploadDownloadClient.java
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/utils/FileUploadDownloadClient.java
@@ -26,6 +26,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;
import javax.net.ssl.SSLContext;
@@ -52,6 +53,7 @@ import org.apache.http.entity.mime.content.InputStreamBody;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
+import org.json.JSONArray;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,6 +72,7 @@ public class FileUploadDownloadClient implements Closeable {
public static final String DOWNLOAD_URI = "DOWNLOAD_URI";
public static final String SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER = "Pinot-SegmentZKMetadataCustomMapModifier";
public static final String CRYPTER = "CRYPTER";
+ public static final String MERGED_SEGMENT_PUSH = "MERGED_SEGMENT_PUSH";
}
public static class QueryParameters {
@@ -94,6 +97,7 @@ public class FileUploadDownloadClient implements Closeable {
private static final String SEGMENT_PATH = "/v2/segments";
private static final String SEGMENT_METADATA_PATH = "/segmentmetadata";
private static final String TABLES_PATH = "/tables";
+ private static final String LINEAGE_PATH = "/lineage";
private static final String TYPE_DELIMITER = "?type=";
private static final String SLASH = "/";
@@ -137,6 +141,12 @@ public class FileUploadDownloadClient implements Closeable {
return getURI(HTTPS, host, port, SCHEMA_PATH);
}
+ public static URI getUpdateSegmentMergeLineageHttpURI(String host, int port, String tableName, String type)
+ throws URISyntaxException {
+ String path = TABLES_PATH + SLASH + tableName + LINEAGE_PATH + TYPE_DELIMITER + type;
+ return getURI(HTTP, host, port, path);
+ }
+
/**
* This method calls the old segment upload endpoint. We will deprecate this behavior soon. Please call
* getUploadSegmentHttpURI to construct your request.
@@ -154,7 +164,7 @@ public class FileUploadDownloadClient implements Closeable {
public static URI getOldUploadSegmentHttpsURI(String host, int port) throws URISyntaxException {
return getURI(HTTPS, host, port, OLD_SEGMENT_PATH);
}
-
+
public static URI getUploadSegmentHttpURI(String host, int port) throws URISyntaxException {
return getURI(HTTP, host, port, SEGMENT_PATH);
}
@@ -222,6 +232,29 @@ public class FileUploadDownloadClient implements Closeable {
parameters, socketTimeoutMs);
}
+ private static HttpUriRequest getUpdateSegmentMergeLineageRequest(URI uri, List<String> childrenGroupIds,
+ List<String> segments, int socketTimeoutMs) {
+ JSONObject jsonObject = new JSONObject();
+ JSONArray childrenGroupIdsJsonArray = new JSONArray();
+// for (String groupId : childrenGroupIds) {
+// childrenGroupIdsJsonArray.put(groupId);
+// }
+ jsonObject.put("childrenGroupIds", childrenGroupIds);
+
+// JSONArray segmentsJsonArray = new JSONArray();
+// for (String segment : segments) {
+// segmentsJsonArray.put(segment);
+// }
+ jsonObject.put("segments", segments);
+
+ RequestBuilder requestBuilder = RequestBuilder.post(uri)
+ .setVersion(HttpVersion.HTTP_1_1)
+ .setHeader(HttpHeaders.CONTENT_TYPE, "application/json")
+ .setEntity(new StringEntity(jsonObject.toString(), ContentType.APPLICATION_JSON));
+ setTimeout(requestBuilder, socketTimeoutMs);
+ return requestBuilder.build();
+ }
+
private static HttpUriRequest getSendSegmentUriRequest(URI uri, String downloadUri, @Nullable List<Header> headers,
@Nullable List<NameValuePair> parameters, int socketTimeoutMs) {
RequestBuilder requestBuilder = RequestBuilder.post(uri)
@@ -398,6 +431,11 @@ public class FileUploadDownloadClient implements Closeable {
return sendRequest(getUploadSegmentMetadataRequest(uri, segmentName, segmentMetadataFile, headers, parameters, socketTimeoutMs));
}
+ public SimpleHttpResponse updateSegmentMergeLineage(URI uri, List<String> childrenGroupIds, List<String> segments, int socketTimeoutMs)
+ throws IOException, HttpErrorStatusException {
+ return sendRequest(getUpdateSegmentMergeLineageRequest(uri, childrenGroupIds, segments, socketTimeoutMs));
+ }
+
/**
* Upload segment with segment file.
@@ -595,4 +633,15 @@ public class FileUploadDownloadClient implements Closeable {
public void close() throws IOException {
_httpClient.close();
}
+
+ public static void main(String[] args) {
+ List<String> list = new ArrayList<>();
+ list.add("aa");
+ list.add("bb");
+
+ JSONObject object = new JSONObject();
+ object.put("a", list);
+
+ System.out.println(object.toString());
+ }
}
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/utils/UUIDUtils.java b/pinot-common/src/main/java/com/linkedin/pinot/common/utils/UUIDUtils.java
new file mode 100644
index 0000000..d54a579
--- /dev/null
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/utils/UUIDUtils.java
@@ -0,0 +1,29 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.common.utils;
+
+import java.util.UUID;
+
+
+public class UUIDUtils {
+ // Mimics the implementation from Hector's TimeUUIDUtils class:
+ // https://github.com/rantav/hector/blob/master/core/src/main/java/me/prettyprint/cassandra/utils/TimeUUIDUtils.java
+ private static final long NUM_100NS_INTERVALS_SINCE_UUID_EPOCH = 0x01b21dd213814000L;
+
+ public static long getUnixTimeFromUUID(UUID uuid) {
+ return (uuid.timestamp() - NUM_100NS_INTERVALS_SINCE_UUID_EPOCH) / 10000;
+ }
+}
diff --git a/pinot-common/src/test/java/com/linkedin/pinot/common/lineage/SegmentMergeLineageTest.java b/pinot-common/src/test/java/com/linkedin/pinot/common/lineage/SegmentMergeLineageTest.java
new file mode 100644
index 0000000..923fbfc
--- /dev/null
+++ b/pinot-common/src/test/java/com/linkedin/pinot/common/lineage/SegmentMergeLineageTest.java
@@ -0,0 +1,106 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.common.lineage;
+
+import com.linkedin.pinot.common.exception.InvalidConfigException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class SegmentMergeLineageTest {
+
+ @Test
+ public void testSegmentMergeLineage() throws Exception {
+ SegmentMergeLineage segmentMergeLineage = new SegmentMergeLineage("test_OFFLINE");
+ String groupId1 = "G1";
+ List<String> groupSegments1 = Arrays.asList(new String[]{"segment1", "segment2", "segment3"});
+ segmentMergeLineage.addSegmentGroup(groupId1, groupSegments1, null);
+ Assert.assertEquals(segmentMergeLineage.getSegmentsForGroup(groupId1), groupSegments1);
+
+ String groupId2 = "G2";
+ List<String> groupSegments2 = Arrays.asList(new String[]{"segment4", "segment5"});
+ segmentMergeLineage.addSegmentGroup(groupId2, groupSegments2, null);
+ Assert.assertEquals(segmentMergeLineage.getSegmentsForGroup(groupId2), groupSegments2);
+
+ String groupId3 = "G3";
+ List<String> groupSegments3 = Arrays.asList(new String[]{"segment6"});
+ segmentMergeLineage.addSegmentGroup(groupId3, groupSegments3, Arrays.asList(new String[]{groupId1, groupId2}));
+ Assert.assertEquals(segmentMergeLineage.getSegmentsForGroup(groupId3), groupSegments3);
+
+ // Check available APIs
+ Assert.assertEquals(segmentMergeLineage.getTableName(), "test_OFFLINE");
+ Assert.assertEquals(segmentMergeLineage.getChildrenForGroup(groupId3),
+ Arrays.asList(new String[]{groupId1, groupId2}));
+ Assert.assertEquals(segmentMergeLineage.getAllGroupLevels(), Arrays.asList(new Integer[]{0, 1}));
+ Assert.assertTrue(segmentMergeLineage.equals(SegmentMergeLineage.fromZNRecord(segmentMergeLineage.toZNRecord())));
+ Assert.assertEquals(segmentMergeLineage.getGroupIdsForGroupLevel(0),
+ Arrays.asList(new String[]{groupId1, groupId2}));
+ Assert.assertEquals(segmentMergeLineage.getGroupIdsForGroupLevel(1),
+ Arrays.asList(new String[]{groupId3}));
+ validateSegmentGroup(segmentMergeLineage);
+
+ // Check ZNRecord conversion
+ Assert.assertEquals(segmentMergeLineage, SegmentMergeLineage.fromZNRecord(segmentMergeLineage.toZNRecord()));
+
+ // Test removing groups
+ segmentMergeLineage.removeSegmentGroup(groupId1);
+ Assert.assertNull(segmentMergeLineage.getChildrenForGroup(groupId1));
+ Assert.assertNull(segmentMergeLineage.getSegmentsForGroup(groupId1));
+ Assert.assertFalse(segmentMergeLineage.getGroupIdsForGroupLevel(0).contains(groupId1));
+ }
+
+ @Test(expectedExceptions = InvalidConfigException.class)
+ public void testUpdateWithDuplicateGroupId() throws Exception {
+ SegmentMergeLineage segmentMergeLineage = new SegmentMergeLineage("test_OFFLINE");
+ String groupId1 = "G1";
+ List<String> groupSegments1 = Arrays.asList(new String[]{"segment1, segment2, segment3"});
+ segmentMergeLineage.addSegmentGroup(groupId1, groupSegments1, null);
+ Assert.assertEquals(segmentMergeLineage.getSegmentsForGroup(groupId1), groupSegments1);
+
+ List<String> groupSegments2 = Arrays.asList(new String[]{"segment4, segment5, segment6"});
+ segmentMergeLineage.addSegmentGroup(groupId1, groupSegments2, null);
+ }
+
+ private void validateSegmentGroup(SegmentMergeLineage segmentMergeLineage) {
+ SegmentGroup segmentGroup = segmentMergeLineage.getMergeLineageRootSegmentGroup();
+ for (SegmentGroup child : segmentGroup.getChildrenGroups()) {
+ validateSegmentGroupNode(child, segmentMergeLineage);
+ }
+ }
+
+ private void validateSegmentGroupNode(SegmentGroup segmentGroup, SegmentMergeLineage segmentMergeLineage) {
+ String groupId = segmentGroup.getGroupId();
+ Assert.assertEquals(segmentGroup.getSegments(), new HashSet<>(segmentMergeLineage.getSegmentsForGroup(groupId)));
+ Assert.assertTrue(segmentMergeLineage.getGroupIdsForGroupLevel(segmentGroup.getGroupLevel()).contains(groupId));
+
+ List<SegmentGroup> childrenGroups = segmentGroup.getChildrenGroups();
+ if (childrenGroups != null) {
+ List<String> childrenGroupIds = new ArrayList<>();
+ for (SegmentGroup child : childrenGroups) {
+ childrenGroupIds.add(child.getGroupId());
+ }
+ Assert.assertEquals(childrenGroupIds, segmentMergeLineage.getChildrenForGroup(groupId));
+
+ for (SegmentGroup child : segmentGroup.getChildrenGroups()) {
+ validateSegmentGroupNode(child, segmentMergeLineage);
+ }
+ }
+ }
+}
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java
index 418c2ef..af600e3 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java
@@ -254,6 +254,9 @@ public class PinotSegmentUploadRestletResource {
// Get URI of current segment location
String currentSegmentLocationURI = headers.getHeaderString(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI);
+ String mergedSegmentPushStr = headers.getHeaderString(FileUploadDownloadClient.CustomHeaders.MERGED_SEGMENT_PUSH);
+ boolean mergedSegmentPush = Boolean.valueOf(mergedSegmentPushStr);
+
File tempEncryptedFile = null;
File tempDecryptedFile = null;
File tempSegmentDir = null;
@@ -315,7 +318,7 @@ public class PinotSegmentUploadRestletResource {
// Zk operations
completeZkOperations(enableParallelPushProtection, headers, tempEncryptedFile, provider, segmentMetadata,
- segmentName, zkDownloadUri, moveSegmentToFinalLocation);
+ segmentName, zkDownloadUri, moveSegmentToFinalLocation, mergedSegmentPush);
return new SuccessResponse("Successfully uploaded segment: " + segmentMetadata.getName() + " of table: "
+ segmentMetadata.getTableName());
@@ -373,7 +376,7 @@ public class PinotSegmentUploadRestletResource {
private void completeZkOperations(boolean enableParallelPushProtection, HttpHeaders headers, File tempDecryptedFile,
FileUploadPathProvider provider, SegmentMetadata segmentMetadata, String segmentName, String zkDownloadURI,
- boolean moveSegmentToFinalLocation)
+ boolean moveSegmentToFinalLocation, boolean mergedSegmentPush)
throws Exception {
String finalSegmentPath =
StringUtil.join("/", provider.getBaseDataDirURI().toString(), segmentMetadata.getTableName(),
@@ -381,7 +384,7 @@ public class PinotSegmentUploadRestletResource {
URI finalSegmentLocationURI = new URI(finalSegmentPath);
ZKOperator zkOperator = new ZKOperator(_pinotHelixResourceManager, _controllerConf, _controllerMetrics);
zkOperator.completeSegmentOperations(segmentMetadata, finalSegmentLocationURI, tempDecryptedFile,
- enableParallelPushProtection, headers, zkDownloadURI, moveSegmentToFinalLocation);
+ enableParallelPushProtection, headers, zkDownloadURI, moveSegmentToFinalLocation, mergedSegmentPush);
}
private void decryptFile(String crypterClassHeader, File tempEncryptedFile, File tempDecryptedFile) {
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/resources/PinotTableRestletResource.java
index ae98133..38aef03 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/resources/PinotTableRestletResource.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -15,6 +15,7 @@
*/
package com.linkedin.pinot.controller.api.resources;
+import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.linkedin.pinot.common.config.SegmentsValidationAndRetentionConfig;
import com.linkedin.pinot.common.config.TableConfig;
@@ -516,4 +517,51 @@ public class PinotTableRestletResource {
}
return ResourceUtils.convertToJsonString(result);
}
+
+ public static class LineageParameters {
+ @JsonProperty("segments")
+ List<String> segments;
+
+ @JsonProperty("childrenGroupIds")
+ List<String> childrenGroupIds;
+ }
+
+ @POST
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/tables/{tableName}/segments/lineage")
+ @ApiOperation(value = "Updates segment merge lineage information", notes = "Updates segment merge lineage information")
+ public SuccessResponse updateSegmentMergeLineage(
+ @ApiParam(value = "Name of table to update segment merge lineage", required = true) @Nonnull @PathParam(value = "tableName") String tableName,
+ @ApiParam(value = "offline|realtime") @Nonnull @QueryParam("type") String tableType,
+ LineageParameters lineageParameters
+
+ ) {
+ if (!EnumUtils.isValidEnum(CommonConstants.Helix.TableType.class, tableType.toUpperCase())) {
+ throw new ControllerApplicationException(LOGGER, "Illegal table type " + tableType, Response.Status.BAD_REQUEST);
+ }
+
+ TableType type = TableType.valueOf(tableType.toUpperCase());
+ String tableNameWithType;
+ if (type == TableType.OFFLINE && _pinotHelixResourceManager.hasOfflineTable(tableName)) {
+ tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+ } else if (type == TableType.REALTIME && _pinotHelixResourceManager.hasRealtimeTable(tableName)) {
+ tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName);
+ } else {
+ throw new ControllerApplicationException(LOGGER, "Table " + tableName + ", type " + type + " does not exist",
+ Response.Status.NOT_FOUND);
+ }
+
+ boolean success =
+ _pinotHelixResourceManager.updateSegmentMergeLineage(tableNameWithType, lineageParameters.segments,
+ lineageParameters.childrenGroupIds);
+
+ if (success) {
+ return new SuccessResponse(
+ "Segment merge lineage has been updated successfully. (table: " + tableNameWithType + ", segments: "
+ + lineageParameters.segments + ", childrenGroups: " + lineageParameters.childrenGroupIds + ")");
+ } else {
+ throw new ControllerApplicationException(LOGGER, "Failed to update segment merge lineage for table: "
+ + tableNameWithType, Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ }
}
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/upload/ZKOperator.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/upload/ZKOperator.java
index f1c6ad4..abd3698 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/upload/ZKOperator.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/upload/ZKOperator.java
@@ -15,12 +15,15 @@
*/
package com.linkedin.pinot.controller.api.upload;
+import com.linkedin.pinot.common.config.SegmentMergeConfig;
+import com.linkedin.pinot.common.config.TableConfig;
import com.linkedin.pinot.common.config.TableNameBuilder;
import com.linkedin.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
import com.linkedin.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
import com.linkedin.pinot.common.metrics.ControllerMeter;
import com.linkedin.pinot.common.metrics.ControllerMetrics;
import com.linkedin.pinot.common.segment.SegmentMetadata;
+import com.linkedin.pinot.common.utils.CommonConstants;
import com.linkedin.pinot.common.utils.FileUploadDownloadClient;
import com.linkedin.pinot.controller.ControllerConf;
import com.linkedin.pinot.controller.api.resources.ControllerApplicationException;
@@ -29,6 +32,7 @@ import com.linkedin.pinot.filesystem.PinotFS;
import com.linkedin.pinot.filesystem.PinotFSFactory;
import java.io.File;
import java.net.URI;
+import java.util.Collections;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import org.apache.helix.ZNRecord;
@@ -55,8 +59,7 @@ public class ZKOperator {
public void completeSegmentOperations(SegmentMetadata segmentMetadata, URI finalSegmentLocationURI,
File currentSegmentLocation, boolean enableParallelPushProtection, HttpHeaders headers, String zkDownloadURI,
- boolean moveSegmentToFinalLocation)
- throws Exception {
+ boolean moveSegmentToFinalLocation, boolean mergedSegmentPush) throws Exception {
String rawTableName = segmentMetadata.getTableName();
String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
String segmentName = segmentMetadata.getName();
@@ -68,19 +71,21 @@ public class ZKOperator {
LOGGER.info("Adding new segment {} from table {}", segmentName, rawTableName);
String crypter = headers.getHeaderString(FileUploadDownloadClient.CustomHeaders.CRYPTER);
processNewSegment(segmentMetadata, finalSegmentLocationURI, currentSegmentLocation, zkDownloadURI, crypter,
- rawTableName, segmentName, moveSegmentToFinalLocation);
+ rawTableName, segmentName, moveSegmentToFinalLocation, mergedSegmentPush);
return;
}
LOGGER.info("Segment {} from table {} already exists, refreshing if necessary", segmentName, rawTableName);
processExistingSegment(segmentMetadata, finalSegmentLocationURI, currentSegmentLocation,
- enableParallelPushProtection, headers, zkDownloadURI, offlineTableName, segmentName, znRecord, moveSegmentToFinalLocation);
+ enableParallelPushProtection, headers, zkDownloadURI, offlineTableName, segmentName, znRecord,
+ moveSegmentToFinalLocation);
}
private void processExistingSegment(SegmentMetadata segmentMetadata, URI finalSegmentLocationURI,
File currentSegmentLocation, boolean enableParallelPushProtection, HttpHeaders headers, String zkDownloadURI,
- String offlineTableName, String segmentName, ZNRecord znRecord, boolean moveSegmentToFinalLocation) throws Exception {
+ String offlineTableName, String segmentName, ZNRecord znRecord, boolean moveSegmentToFinalLocation)
+ throws Exception {
OfflineSegmentZKMetadata existingSegmentZKMetadata = new OfflineSegmentZKMetadata(znRecord);
long existingCrc = existingSegmentZKMetadata.getCrc();
@@ -160,9 +165,11 @@ public class ZKOperator {
newCrc, existingCrc, segmentName);
if (moveSegmentToFinalLocation) {
moveSegmentToPermanentDirectory(currentSegmentLocation, finalSegmentLocationURI);
- LOGGER.info("Moved segment {} from temp location {} to {}", segmentName, currentSegmentLocation.getAbsolutePath(), finalSegmentLocationURI.getPath());
+ LOGGER.info("Moved segment {} from temp location {} to {}", segmentName,
+ currentSegmentLocation.getAbsolutePath(), finalSegmentLocationURI.getPath());
} else {
- LOGGER.info("Skipping segment move, keeping segment {} from table {} at {}", segmentName, offlineTableName, zkDownloadURI);
+ LOGGER.info("Skipping segment move, keeping segment {} from table {} at {}", segmentName, offlineTableName,
+ zkDownloadURI);
}
_pinotHelixResourceManager.refreshSegment(segmentMetadata, existingSegmentZKMetadata);
@@ -196,19 +203,40 @@ public class ZKOperator {
private void processNewSegment(SegmentMetadata segmentMetadata, URI finalSegmentLocationURI,
File currentSegmentLocation, String zkDownloadURI, String crypter, String rawTableName, String segmentName,
- boolean moveSegmentToFinalLocation) {
+ boolean moveSegmentToFinalLocation, boolean mergedSegmentPush) {
+
+ if (!mergedSegmentPush) {
+ // Fetch table config
+ TableConfig tableConfig =
+ _pinotHelixResourceManager.getTableConfig(rawTableName, CommonConstants.Helix.TableType.OFFLINE);
+ SegmentMergeConfig segmentMergeConfig = tableConfig.getSegmentMergeConfig();
+
+ // Update lineage information for base segment
+ if (segmentMergeConfig != null) {
+ try {
+ _pinotHelixResourceManager.updateSegmentMergeLineage(tableConfig.getTableName(),
+ Collections.singletonList(segmentName), null);
+ } catch (Exception e) {
+ LOGGER.error("Cannot update segment merge lineage information for table {}, segment {}", rawTableName,
+ segmentName, e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
// For v1 segment uploads, we will not move the segment
if (moveSegmentToFinalLocation) {
try {
moveSegmentToPermanentDirectory(currentSegmentLocation, finalSegmentLocationURI);
- LOGGER.info("Moved segment {} from temp location {} to {}", segmentName, currentSegmentLocation.getAbsolutePath(),
- finalSegmentLocationURI.getPath());
+ LOGGER.info("Moved segment {} from temp location {} to {}", segmentName,
+ currentSegmentLocation.getAbsolutePath(), finalSegmentLocationURI.getPath());
} catch (Exception e) {
LOGGER.error("Could not move segment {} from table {} to permanent directory", segmentName, rawTableName, e);
throw new RuntimeException(e);
}
} else {
- LOGGER.info("Skipping segment move, keeping segment {} from table {} at {}", segmentName, rawTableName, zkDownloadURI);
+ LOGGER.info("Skipping segment move, keeping segment {} from table {} at {}", segmentName, rawTableName,
+ zkDownloadURI);
}
_pinotHelixResourceManager.addNewSegment(segmentMetadata, zkDownloadURI, crypter);
}
@@ -218,7 +246,8 @@ public class ZKOperator {
PinotFS pinotFS = PinotFSFactory.create(finalSegmentLocationURI.getScheme());
// Overwrites current segment file
- LOGGER.info("Copying segment from {} to {}", currentSegmentLocation.getAbsolutePath(), finalSegmentLocationURI.toString());
+ LOGGER.info("Copying segment from {} to {}", currentSegmentLocation.getAbsolutePath(),
+ finalSegmentLocationURI.toString());
pinotFS.copyFromLocalFile(currentSegmentLocation, finalSegmentLocationURI);
}
}
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/ControllerRequestURLBuilder.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/ControllerRequestURLBuilder.java
index 9a79cc3..f45f533 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/ControllerRequestURLBuilder.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/ControllerRequestURLBuilder.java
@@ -239,7 +239,12 @@ public class ControllerRequestURLBuilder {
public String forSegmentListAPIWithTableType(String tableName, String tableType) {
return StringUtil.join("/", StringUtils.chomp(_baseUrl, "/"), "segments", tableName + "?type=" + tableType);
}
+
public String forSegmentListAPI(String tableName) {
return StringUtil.join("/", StringUtils.chomp(_baseUrl, "/"), "segments", tableName);
}
+
+ public String forSegmentMergeLineagePostAPI(String tableName, String tableType) {
+ return StringUtil.join("/", StringUtils.chomp(_baseUrl, "/"), "tables", tableName, "lineage" + "?type=", tableType);
+ }
}
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java
index c12f70e..aa08350 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -15,6 +15,7 @@
*/
package com.linkedin.pinot.controller.helix.core;
+import com.fasterxml.uuid.Generators;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
@@ -36,6 +37,8 @@ import com.linkedin.pinot.common.config.TenantConfig;
import com.linkedin.pinot.common.data.Schema;
import com.linkedin.pinot.common.exception.InvalidConfigException;
import com.linkedin.pinot.common.exception.TableNotFoundException;
+import com.linkedin.pinot.common.lineage.SegmentMergeLineage;
+import com.linkedin.pinot.common.lineage.SegmentMergeLineageAccessHelper;
import com.linkedin.pinot.common.messages.SegmentRefreshMessage;
import com.linkedin.pinot.common.messages.SegmentReloadMessage;
import com.linkedin.pinot.common.messages.TimeboundaryRefreshMessage;
@@ -82,6 +85,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import org.I0Itec.zkclient.exception.ZkBadVersionException;
import org.apache.commons.configuration.Configuration;
import org.apache.helix.AccessOption;
import org.apache.helix.ClusterMessagingService;
@@ -145,8 +149,8 @@ public class PinotHelixResourceManager {
public PinotHelixResourceManager(@Nonnull String zkURL, @Nonnull String helixClusterName,
@Nonnull String controllerInstanceId, @Nonnull String dataDir) {
- this(zkURL, helixClusterName, controllerInstanceId, dataDir, DEFAULT_EXTERNAL_VIEW_UPDATE_TIMEOUT_MILLIS,
- false, false);
+ this(zkURL, helixClusterName, controllerInstanceId, dataDir, DEFAULT_EXTERNAL_VIEW_UPDATE_TIMEOUT_MILLIS, false,
+ false);
}
public PinotHelixResourceManager(@Nonnull ControllerConf controllerConf) {
@@ -1084,6 +1088,10 @@ public class PinotHelixResourceManager {
_propertyStore.create(ZKMetadataProvider.constructPropertyStorePathForResource(tableNameWithType),
new ZNRecord(tableNameWithType), AccessOption.PERSISTENT);
+ // Add segment merge lineage znode if required
+ _propertyStore.create(ZKMetadataProvider.constructPropertyStorePathForSegmentMergeLineage(tableNameWithType),
+ new ZNRecord(tableNameWithType), AccessOption.PERSISTENT);
+
// Update replica group partition assignment to the property store if applicable
updateReplicaGroupPartitionAssignment(tableConfig);
break;
@@ -1524,7 +1532,7 @@ public class PinotHelixResourceManager {
ClusterMessagingService messagingService = _helixZkManager.getMessagingService();
LOGGER.info("Sending timeboundary refresh message for segment {} of table {}:{} to recipients {}", segmentName,
- rawTableName, refreshMessage, recipientCriteria);
+ rawTableName, refreshMessage, recipientCriteria);
// Helix sets the timeoutMs argument specified in 'send' call as the processing timeout of the message.
int nMsgsSent = messagingService.send(recipientCriteria, refreshMessage, null, timeoutMs);
if (nMsgsSent > 0) {
@@ -1534,7 +1542,7 @@ public class PinotHelixResourceManager {
// May be the case when none of the brokers are up yet. That is OK, because when they come up they will get
// the latest time boundary info.
LOGGER.warn("Unable to send timeboundary refresh message for {} of table {}, nMsgs={}", segmentName,
- offlineTableName, nMsgsSent);
+ offlineTableName, nMsgsSent);
}
}
@@ -2192,6 +2200,58 @@ public class PinotHelixResourceManager {
return endpointToInstance;
}
+ public SegmentMergeLineage getSegmentMergeLineage(String tableNameWithType) {
+ ZNRecord segmentMergeLineageZNRecord =
+ SegmentMergeLineageAccessHelper.getSegmentMergeLineageZNRecord(getPropertyStore(), tableNameWithType);
+ return SegmentMergeLineage.fromZNRecord(segmentMergeLineageZNRecord);
+ }
+
+ public boolean updateSegmentMergeLineage(String tableNameWithType, List<String> segments,
+ List<String> childrenGroupIds) {
+ try {
+ DEFAULT_RETRY_POLICY.attempt(() -> {
+ // Fetch segment merge lineage from the property store
+ ZNRecord segmentMergeLineageZNRecord =
+ SegmentMergeLineageAccessHelper.getSegmentMergeLineageZNRecord(getPropertyStore(), tableNameWithType);
+ SegmentMergeLineage segmentMergeLineage = SegmentMergeLineage.fromZNRecord(segmentMergeLineageZNRecord);
+ int expectedVersion = segmentMergeLineageZNRecord.getVersion();
+
+ if (segmentMergeLineage == null) {
+ segmentMergeLineage = new SegmentMergeLineage(tableNameWithType);
+ }
+
+ // Add a new segment group
+ String groupId = Generators.timeBasedGenerator().generate().toString();
+ segmentMergeLineage.addSegmentGroup(groupId, segments, childrenGroupIds);
+
+ try {
+ if (SegmentMergeLineageAccessHelper.writeSegmentMergeLineage(getPropertyStore(), segmentMergeLineage,
+ expectedVersion)) {
+ return true;
+ } else {
+ LOGGER.warn("Failed to update segment merge lineage for table: {}", tableNameWithType);
+ return false;
+ }
+ } catch (ZkBadVersionException e) {
+ LOGGER.warn("Version changed while updating segment merge lineage for table: {}", tableNameWithType, e);
+ return false;
+ } catch (Exception e) {
+ LOGGER.warn("Caught exeception file while updating segment merge lineage for table: {}", tableNameWithType,
+ e);
+ return false;
+ }
+ });
+
+ LOGGER.info("Segment merge lineage has been updated successfully. (tableNameWithType: {}, segments: {}, "
+ + "childrenGroupIds: {})", tableNameWithType, segments, childrenGroupIds);
+ return true;
+ } catch (Exception e) {
+ LOGGER.error("Caught Exception while updating segment merge lineage for table: {}", tableNameWithType, e);
+ return false;
+ }
+ }
+
+
/*
* Uncomment and use for testing on a real cluster
public static void main(String[] args) throws Exception {
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/ClusterInfoProvider.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/ClusterInfoProvider.java
index 750d8a2..765b4e3 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/ClusterInfoProvider.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/ClusterInfoProvider.java
@@ -20,6 +20,7 @@ import com.linkedin.pinot.common.config.PinotTaskConfig;
import com.linkedin.pinot.common.config.TableConfig;
import com.linkedin.pinot.common.config.TableNameBuilder;
import com.linkedin.pinot.common.data.Schema;
+import com.linkedin.pinot.common.lineage.SegmentMergeLineage;
import com.linkedin.pinot.common.metadata.ZKMetadataProvider;
import com.linkedin.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
import com.linkedin.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
@@ -128,4 +129,9 @@ public class ClusterInfoProvider {
public String getVipUrl() {
return _controllerConf.generateVipUrl();
}
+
+
+ public SegmentMergeLineage getSegmentMergeLineage(String tableNameWithType) {
+ return _pinotHelixResourceManager.getSegmentMergeLineage(tableNameWithType);
+ }
}
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/generator/SegmentMergeRollupTaskGenerator.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/generator/SegmentMergeRollupTaskGenerator.java
new file mode 100644
index 0000000..028a53f
--- /dev/null
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/generator/SegmentMergeRollupTaskGenerator.java
@@ -0,0 +1,313 @@
+package com.linkedin.pinot.controller.helix.core.minion.generator;
+
+import com.google.common.base.Preconditions;
+import com.linkedin.pinot.common.config.PinotTaskConfig;
+import com.linkedin.pinot.common.config.TableConfig;
+import com.linkedin.pinot.common.config.TableNameBuilder;
+import com.linkedin.pinot.common.config.TableTaskConfig;
+import com.linkedin.pinot.common.data.Segment;
+import com.linkedin.pinot.common.lineage.SegmentGroup;
+import com.linkedin.pinot.common.lineage.SegmentMergeLineage;
+import com.linkedin.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
+import com.linkedin.pinot.common.segment.SegmentMetadata;
+import com.linkedin.pinot.common.utils.CommonConstants;
+import com.linkedin.pinot.controller.helix.core.minion.ClusterInfoProvider;
+import com.linkedin.pinot.controller.helix.core.util.ZKMetadataUtils;
+import com.linkedin.pinot.core.common.MinionConstants;
+import com.linkedin.pinot.core.operator.transform.transformer.timeunit.TimeUnitTransformer;
+import com.linkedin.pinot.core.operator.transform.transformer.timeunit.TimeUnitTransformerFactory;
+import com.linkedin.pinot.core.segment.index.SegmentMetadataImpl;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import javax.annotation.Nonnull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class SegmentMergeRollupTaskGenerator implements PinotTaskGenerator {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SegmentMergeRollupTaskGenerator.class);
+
+ private final ClusterInfoProvider _clusterInfoProvider;
+
+ public SegmentMergeRollupTaskGenerator(ClusterInfoProvider clusterInfoProvider) {
+ _clusterInfoProvider = clusterInfoProvider;
+ }
+
+ @Nonnull
+ @Override
+ public String getTaskType() {
+ return MinionConstants.SegmentMergeRollupTask.TASK_TYPE;
+ }
+
+ @Nonnull
+ @Override
+ public List<PinotTaskConfig> generateTasks(@Nonnull List<TableConfig> tableConfigs) {
+ List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+
+ // Get the segments that are being converted so that we don't submit them again
+ Set<Segment> runningSegments =
+ TaskGeneratorUtils.getRunningSegments(MinionConstants.ConvertToRawIndexTask.TASK_TYPE, _clusterInfoProvider);
+
+ for (TableConfig tableConfig : tableConfigs) {
+ // Only generate tasks for OFFLINE tables
+ String offlineTableName = tableConfig.getTableName();
+ if (tableConfig.getTableType() != CommonConstants.Helix.TableType.OFFLINE) {
+ LOGGER.warn("Skip generating SegmentMergeRollup for non-OFFLINE table: {}", offlineTableName);
+ continue;
+ }
+
+ TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
+ Preconditions.checkNotNull(tableTaskConfig);
+ Map<String, String> taskConfigs =
+ tableTaskConfig.getConfigsForTaskType(MinionConstants.SegmentMergeRollupTask.TASK_TYPE);
+ Preconditions.checkNotNull(tableConfigs);
+
+ // Get max number of tasks for this table
+ int tableMaxNumTasks;
+ String tableMaxNumTasksConfig = taskConfigs.get(MinionConstants.TABLE_MAX_NUM_TASKS_KEY);
+ if (tableMaxNumTasksConfig != null) {
+ try {
+ tableMaxNumTasks = Integer.valueOf(tableMaxNumTasksConfig);
+ } catch (Exception e) {
+ tableMaxNumTasks = Integer.MAX_VALUE;
+ }
+ } else {
+ tableMaxNumTasks = Integer.MAX_VALUE;
+ }
+
+ // Generate tasks
+ int tableNumTasks = 0;
+
+ List<OfflineSegmentZKMetadata> metadataList = _clusterInfoProvider.getOfflineSegmentsMetadata(offlineTableName);
+
+ SegmentMergeLineage segmentMergeLineage = _clusterInfoProvider.getSegmentMergeLineage(offlineTableName);
+
+ // Get a list of original segments covered by merged segments
+
+ List<Integer> groupLevelList = segmentMergeLineage.getAllGroupLevels();
+ Set<String> uncoveredGroups = new HashSet<>();
+ Set<String> coveredGroups = new HashSet<>();
+ for (Integer groupLevel : groupLevelList) {
+ List<String> groupIdsForGroupLevel;
+ groupIdsForGroupLevel = segmentMergeLineage.getGroupIdsForGroupLevel(groupLevel);
+ uncoveredGroups.addAll(groupIdsForGroupLevel);
+
+ for (String groupId : groupIdsForGroupLevel) {
+ List<String> coveredChildrenForGroupId = segmentMergeLineage.getChildrenForGroup(groupId);
+ if (coveredChildrenForGroupId != null && !coveredChildrenForGroupId.isEmpty()) {
+ coveredGroups.addAll(coveredChildrenForGroupId);
+ }
+ }
+ }
+
+
+ uncoveredGroups.removeAll(coveredGroups);
+
+ Map<String, OfflineSegmentZKMetadata> segmentNameToZkMetadataMap = new HashMap<>();
+ for (OfflineSegmentZKMetadata segmentZKMetadata : metadataList) {
+ segmentNameToZkMetadataMap.put(segmentZKMetadata.getSegmentName(), segmentZKMetadata);
+ }
+
+ LOGGER.info("uncovered groups: ", uncoveredGroups);
+
+ List<OfflineSegmentZKMetadata> segmentsToMerge = new ArrayList<>();
+ for (String groupId : uncoveredGroups) {
+ for (String segmentName : segmentMergeLineage.getSegmentsForGroup(groupId)) {
+ segmentsToMerge.add(segmentNameToZkMetadataMap.get(segmentName));
+ }
+ }
+
+// List<OfflineSegmentZKMetadata> segmentsToMerge = new ArrayList<>();
+// for (OfflineSegmentZKMetadata segmentZKMetadata: metadataList) {
+// String segmentName = segmentZKMetadata.getSegmentName();
+// if (!coveredSegments.contains(segmentName)) {
+// segmentsToMerge.add(segmentZKMetadata);
+// }
+// }
+// if (!segmentsToRemove.isEmpty()) {
+// LOGGER.info("Removing Segments since they are covered by merged segments: " + String.join(",", segmentsToRemove));
+// _clusterInfoProvider.removeSegments(offlineTableName, segmentsToRemove);
+// }
+
+ if (segmentsToMerge.size() > 1) {
+ String downloadUrls = segmentsToMerge.stream()
+ .map(OfflineSegmentZKMetadata::getDownloadUrl)
+ .collect(Collectors.joining(MinionConstants.URL_SEPARATOR));
+
+ String segmentNames = segmentsToMerge.stream()
+ .map(OfflineSegmentZKMetadata::getSegmentName)
+ .collect(Collectors.joining(","));
+
+
+ String crcList = segmentsToMerge.stream()
+ .map(OfflineSegmentZKMetadata::getCrc)
+ .map((crc) -> Long.toString(crc))
+ .collect(Collectors.joining(","));
+
+ String rawTableName = TableNameBuilder.extractRawTableName(offlineTableName);
+ String mergedSegmentName = computeMergedSegmentName(rawTableName, segmentsToMerge);
+ String groupsToCover = String.join(",", uncoveredGroups);
+
+
+ Map<String, String> config = new HashMap<>();
+ config.put(MinionConstants.TABLE_NAME_KEY, offlineTableName);
+ config.put(MinionConstants.SEGMENT_NAME_KEY, segmentNames);
+ config.put(MinionConstants.DOWNLOAD_URL_KEY, downloadUrls);
+ config.put(MinionConstants.UPLOAD_URL_KEY, _clusterInfoProvider.getVipUrl() + "/segments");
+ config.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY, crcList);
+
+ config.put(MinionConstants.SegmentMergeRollupTask.MERGE_TYPE, "CONCATENATE");
+
+
+ config.put(MinionConstants.SegmentMergeRollupTask.MERGED_SEGEMNT_NAME_KEY, mergedSegmentName);
+ config.put(MinionConstants.SegmentMergeRollupTask.GROUPS_TO_COVER_KEY, groupsToCover);
+ config.put(MinionConstants.CONTROLLER_API_URL, _clusterInfoProvider.getVipUrl());
+ pinotTaskConfigs.add(new PinotTaskConfig(MinionConstants.SegmentMergeRollupTask.TASK_TYPE, config));
+
+ }
+// for (OfflineSegmentZKMetadata offlineSegmentZKMetadata : _clusterInfoProvider.getOfflineSegmentsMetadata(
+// offlineTableName)) {
+// // Generate up to tableMaxNumTasks tasks each time for each table
+// if (tableNumTasks == tableMaxNumTasks) {
+// break;
+// }
+//
+// // Skip segments that are already submitted
+// String segmentName = offlineSegmentZKMetadata.getSegmentName();
+// if (runningSegments.contains(new Segment(offlineTableName, segmentName))) {
+// continue;
+// }
+//
+// // Only submit segments that have not been converted
+// Map<String, String> customMap = offlineSegmentZKMetadata.getCustomMap();
+// if (customMap == null || !customMap.containsKey(
+// MinionConstants.ConvertToRawIndexTask.COLUMNS_TO_CONVERT_KEY + MinionConstants.TASK_TIME_SUFFIX)) {
+// Map<String, String> configs = new HashMap<>();
+// configs.put(MinionConstants.TABLE_NAME_KEY, offlineTableName);
+// configs.put(MinionConstants.SEGMENT_NAME_KEY, segmentName);
+// configs.put(MinionConstants.DOWNLOAD_URL_KEY, offlineSegmentZKMetadata.getDownloadUrl());
+// configs.put(MinionConstants.UPLOAD_URL_KEY, _clusterInfoProvider.getVipUrl() + "/segments");
+// configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY, String.valueOf(offlineSegmentZKMetadata.getCrc()));
+// if (columnsToConvertConfig != null) {
+// configs.put(MinionConstants.ConvertToRawIndexTask.COLUMNS_TO_CONVERT_KEY, columnsToConvertConfig);
+// }
+//
+// pinotTaskConfigs.add(new PinotTaskConfig(MinionConstants.SegmentMergeRollupTask.TASK_TYPE, configs));
+// tableNumTasks++;
+// }
+// }
+ }
+
+ return pinotTaskConfigs;
+ }
+
+ private String computeMergedSegmentName(String tableName, List<OfflineSegmentZKMetadata> metadataList) {
+ long minStartTime = Long.MAX_VALUE;
+ long maxEndTime = Long.MIN_VALUE;
+
+ for (OfflineSegmentZKMetadata metadata : metadataList) {
+ long currentSegmentStartTime = metadata.getStartTime();
+ long currentSegmentEndTime = metadata.getEndTime();
+
+ if (currentSegmentStartTime < minStartTime) {
+ minStartTime = currentSegmentStartTime;
+ }
+
+ if (currentSegmentEndTime > maxEndTime) {
+ maxEndTime = currentSegmentEndTime;
+ }
+ }
+ return "merged_" + tableName + "_" + minStartTime + "_" + maxEndTime + "_" + System.currentTimeMillis();
+ }
+
+ @Override
+ public int getNumConcurrentTasksPerInstance() {
+ return DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
+ }
+
+ @Override
+ public void nonLeaderCleanUp() {
+
+ }
+
+ private void addCoveredSegments(SegmentGroup node, Set<String> coveredSegments) {
+ Set<String> currentSegments = node.getSegments();
+ if (currentSegments != null && !currentSegments.isEmpty()) {
+ coveredSegments.addAll(node.getSegments());
+ }
+ if (node.getChildrenGroups() != null) {
+ for (SegmentGroup children : node.getChildrenGroups()) {
+ addCoveredSegments(children, coveredSegments);
+ }
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ File inputDir = new File("/Users/snlee/data/careers_comms_processing_hourly_additive/extracted");
+ try {
+ int segmentNum = 0;
+ List<OfflineSegmentZKMetadata> zkMetadataList = new ArrayList<>();
+ for (File index : inputDir.listFiles()) {
+ SegmentMetadata segmentMetadata = new SegmentMetadataImpl(index);
+ OfflineSegmentZKMetadata zkMetadata = new OfflineSegmentZKMetadata();
+ ZKMetadataUtils.updateSegmentMetadata(zkMetadata, segmentMetadata);
+ zkMetadataList.add(zkMetadata);
+ }
+
+
+ Map<Long, List<String>> grouping = getGrouping(zkMetadataList);
+
+ produceResultStat(grouping);
+
+
+ } finally {
+// FileUtils.deleteQuietly(tmpDir);
+ }
+
+ }
+
+ public static Map<Long, List<String>> getGrouping(List<OfflineSegmentZKMetadata> zkMetadataList) {
+ Map<Long, List<String>> result = new TreeMap<>();
+
+ for (OfflineSegmentZKMetadata zkMetadata : zkMetadataList) {
+ long endTime = zkMetadata.getEndTime();
+ TimeUnit timeUnit = zkMetadata.getTimeUnit();
+ TimeUnitTransformer timeUnitTransformer =
+ TimeUnitTransformerFactory.getTimeUnitTransformer(timeUnit, "months");
+ long[] input = new long[1];
+ input[0] = endTime;
+ long[] output = new long[1];
+ timeUnitTransformer.transform(input, output, 1);
+
+ List<String> group = result.computeIfAbsent(output[0], k -> new ArrayList<>());
+ group.add(zkMetadata.getSegmentName());
+ }
+
+ return result;
+ }
+
+
+ public static void produceResultStat(Map<Long, List<String>> result) {
+ System.out.println("size breakdown: ");
+ for (Map.Entry<Long, List<String>> entry : result.entrySet()) {
+ System.out.println(entry.getKey() + ": " + entry.getValue().size());
+ }
+
+ System.out.println("segments: ");
+ for (Map.Entry<Long, List<String>> entry : result.entrySet()) {
+ List<String> segmentList = entry.getValue();
+ Collections.sort(segmentList);
+ System.out.println(entry.getKey() + ": " + segmentList.get(0) + ", " + segmentList.get(segmentList.size()-1));
+ }
+
+ }
+}
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java
index 49bc3d4..4098607 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java
@@ -32,6 +32,7 @@ public class TaskGeneratorRegistry {
public TaskGeneratorRegistry(@Nonnull ClusterInfoProvider clusterInfoProvider) {
registerTaskGenerator(new ConvertToRawIndexTaskGenerator(clusterInfoProvider));
+ registerTaskGenerator(new SegmentMergeRollupTaskGenerator(clusterInfoProvider));
}
/**
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/com/linkedin/pinot/core/common/MinionConstants.java
index d8a0b9f..640b946 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/common/MinionConstants.java
@@ -25,6 +25,7 @@ public class MinionConstants {
public static final String SEGMENT_NAME_KEY = "segmentName";
public static final String DOWNLOAD_URL_KEY = "downloadURL";
public static final String UPLOAD_URL_KEY = "uploadURL";
+ public static final String CONTROLLER_API_URL = "controllerApiUrl";
public static final String URL_SEPARATOR = ",";
/**
@@ -48,6 +49,13 @@ public class MinionConstants {
public static final String COLUMNS_TO_CONVERT_KEY = "columnsToConvert";
}
+ public static class SegmentMergeRollupTask {
+ public static final String TASK_TYPE = "SegmentMergeRollupTask";
+ public static final String MERGE_TYPE = "mergeType";
+ public static final String MERGED_SEGEMNT_NAME_KEY = "mergedSegmentNameKey";
+ public static final String GROUPS_TO_COVER_KEY = "groupsToCoverKey";
+ }
+
// Purges rows inside segment that match chosen criteria
public static class PurgeTask {
public static final String TASK_TYPE = "PurgeTask";
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/data/readers/PinotSegmentRecordReader.java b/pinot-core/src/main/java/com/linkedin/pinot/core/data/readers/PinotSegmentRecordReader.java
index 4712f36..f80850e 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/data/readers/PinotSegmentRecordReader.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/data/readers/PinotSegmentRecordReader.java
@@ -16,6 +16,7 @@
package com.linkedin.pinot.core.data.readers;
import com.google.common.base.Preconditions;
+import com.linkedin.pinot.common.data.DimensionFieldSpec;
import com.linkedin.pinot.common.data.FieldSpec;
import com.linkedin.pinot.common.data.Schema;
import com.linkedin.pinot.common.segment.ReadMode;
@@ -26,6 +27,8 @@ import com.linkedin.pinot.core.data.readers.sort.SegmentSorter;
import com.linkedin.pinot.core.indexsegment.immutable.ImmutableSegment;
import com.linkedin.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
diff --git a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
index a2cdb1a..dc4eb63 100644
--- a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
@@ -94,6 +94,8 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
Map<String, MinionEventObserverFactory> eventObserverFactoryRegistry =
Collections.singletonMap(TestTaskGenerator.TASK_TYPE, new TestEventObserverFactory());
startMinions(NUM_MINIONS, taskExecutorFactoryRegistry, eventObserverFactoryRegistry);
+
+ Thread.sleep(1000000000);
}
@Test
diff --git a/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/BaseMultipleSegmentsConversionExecutor.java b/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/BaseMultipleSegmentsConversionExecutor.java
index 06cd811..dc40bd0 100644
--- a/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/BaseMultipleSegmentsConversionExecutor.java
+++ b/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/BaseMultipleSegmentsConversionExecutor.java
@@ -30,6 +30,7 @@ import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.commons.io.FileUtils;
+import org.apache.http.Header;
import org.apache.http.NameValuePair;
import org.apache.http.message.BasicNameValuePair;
import org.slf4j.Logger;
@@ -59,6 +60,12 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
protected abstract List<SegmentConversionResult> convert(@Nonnull PinotTaskConfig pinotTaskConfig,
@Nonnull List<File> originalIndexDir, @Nonnull File workingDir) throws Exception;
+ protected abstract void preprocessBeforeUpload(PinotTaskConfig pinotTaskConfig, List<SegmentConversionResult> conversionResults) throws Exception;
+
+ protected List<Header> getHttpHeaders() {
+ return null;
+ }
+
@Override
public List<SegmentConversionResult> executeTask(@Nonnull PinotTaskConfig pinotTaskConfig) throws Exception {
String taskType = pinotTaskConfig.getTaskType();
@@ -122,6 +129,10 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
taskType + " on table: " + tableNameWithType + ", segments: " + inputSegmentNames + " got cancelled");
}
+ preprocessBeforeUpload(pinotTaskConfig, segmentConversionResults);
+
+ List<Header> headers = getHttpHeaders();
+
// Upload the tarred segments
for (int i = 0; i < numOutputSegments; i++) {
File convertedTarredSegmentFile = tarredSegmentFiles.get(i);
@@ -131,7 +142,7 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
List<NameValuePair> parameters = Collections.singletonList(
new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION, "true"));
- SegmentConversionUtils.uploadSegment(configs, null, parameters, tableNameWithType, resultSegmentName, uploadURL,
+ SegmentConversionUtils.uploadSegment(configs, headers, parameters, tableNameWithType, resultSegmentName, uploadURL,
convertedTarredSegmentFile);
}
diff --git a/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/SegmentConversionUtils.java b/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/SegmentConversionUtils.java
index d47fd0e..8cdb499 100644
--- a/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/SegmentConversionUtils.java
+++ b/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/SegmentConversionUtils.java
@@ -17,18 +17,25 @@ package com.linkedin.pinot.minion.executor;
import com.linkedin.pinot.common.exception.HttpErrorStatusException;
import com.linkedin.pinot.common.utils.FileUploadDownloadClient;
import com.linkedin.pinot.common.utils.SimpleHttpResponse;
+import com.linkedin.pinot.common.utils.retry.AttemptsExceededException;
+import com.linkedin.pinot.common.utils.retry.RetriableOperationException;
import com.linkedin.pinot.common.utils.retry.RetryPolicies;
import com.linkedin.pinot.common.utils.retry.RetryPolicy;
import com.linkedin.pinot.core.common.MinionConstants;
import com.linkedin.pinot.minion.MinionContext;
import java.io.File;
+import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Map;
import javax.net.ssl.SSLContext;
+import javax.validation.constraints.Min;
+import javax.ws.rs.HEAD;
import org.apache.http.Header;
import org.apache.http.HttpStatus;
+import org.apache.http.HttpVersion;
import org.apache.http.NameValuePair;
+import org.apache.http.client.methods.RequestBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -91,4 +98,47 @@ public class SegmentConversionUtils {
});
}
}
+
+ public static void uploadSegmentMergeLineage(Map<String, String> configs, String updateLineageUrl,
+ List<String> childrenSegmentGroups, List<String> segments) throws Exception {
+ String maxNumAttemptsConfig = configs.get(MinionConstants.MAX_NUM_ATTEMPTS_KEY);
+ int maxNumAttempts =
+ maxNumAttemptsConfig != null ? Integer.parseInt(maxNumAttemptsConfig) : DEFAULT_MAX_NUM_ATTEMPTS;
+ String initialRetryDelayMsConfig = configs.get(MinionConstants.INITIAL_RETRY_DELAY_MS_KEY);
+ long initialRetryDelayMs =
+ initialRetryDelayMsConfig != null ? Long.parseLong(initialRetryDelayMsConfig) : DEFAULT_INITIAL_RETRY_DELAY_MS;
+ String retryScaleFactorConfig = configs.get(MinionConstants.RETRY_SCALE_FACTOR_KEY);
+ double retryScaleFactor =
+ retryScaleFactorConfig != null ? Double.parseDouble(retryScaleFactorConfig) : DEFAULT_RETRY_SCALE_FACTOR;
+ RetryPolicy retryPolicy =
+ RetryPolicies.exponentialBackoffRetryPolicy(maxNumAttempts, initialRetryDelayMs, retryScaleFactor);
+
+ SSLContext sslContext = MinionContext.getInstance().getSSLContext();
+ try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient(sslContext)) {
+ retryPolicy.attempt(() -> {
+ try {
+ SimpleHttpResponse response =
+ fileUploadDownloadClient.updateSegmentMergeLineage(new URI(updateLineageUrl), childrenSegmentGroups,
+ segments, FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS);
+ return true;
+ } catch (HttpErrorStatusException e) {
+ int statusCode = e.getStatusCode();
+ if (statusCode == HttpStatus.SC_CONFLICT || statusCode >= 500) {
+ // Temporary exception
+ return false;
+ } else {
+ // Permanent exception
+ throw e;
+ }
+ } catch (Exception e) {
+ return false;
+ }
+ });
+ }
+ }
+
+ public static void main(String[] args) {
+// RequestBuilder requestBuilder = RequestBuilder.post("localhost:")
+ RequestBuilder requestBuilder = RequestBuilder.post("localhost:8998/table/");
+ }
}
diff --git a/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/SegmentMergeRollupTaskExecutor.java b/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/SegmentMergeRollupTaskExecutor.java
new file mode 100644
index 0000000..a4ecd81
--- /dev/null
+++ b/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/SegmentMergeRollupTaskExecutor.java
@@ -0,0 +1,105 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.minion.executor;
+
+import com.linkedin.pinot.common.config.PinotTaskConfig;
+import com.linkedin.pinot.common.config.TableNameBuilder;
+import com.linkedin.pinot.common.utils.FileUploadDownloadClient;
+import com.linkedin.pinot.core.common.MinionConstants;
+import com.linkedin.pinot.core.minion.rollup.MergeRollupSegmentConverter;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nonnull;
+import org.apache.http.Header;
+import org.apache.http.message.BasicHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class SegmentMergeRollupTaskExecutor extends BaseMultipleSegmentsConversionExecutor {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SegmentMergeRollupTaskExecutor.class);
+
+ @Override
+ protected List<SegmentConversionResult> convert(@Nonnull PinotTaskConfig pinotTaskConfig,
+ @Nonnull List<File> originalIndexDirs, @Nonnull File workingDir) throws Exception {
+ Map<String, String> configs = pinotTaskConfig.getConfigs();
+ String mergeType = configs.get(MinionConstants.SegmentMergeRollupTask.MERGE_TYPE);
+ String mergedSegmentName = configs.get(MinionConstants.SegmentMergeRollupTask.MERGED_SEGEMNT_NAME_KEY);
+ String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
+
+ MergeRollupSegmentConverter rollupSegmentConverter =
+ new MergeRollupSegmentConverter.Builder().setMergeType(mergeType)
+ .setTableName(tableNameWithType)
+ .setSegmentName(mergedSegmentName)
+ .setInputIndexDirs(originalIndexDirs)
+ .setWorkingDir(workingDir)
+ .build();
+ List<File> resultFiles = rollupSegmentConverter.convert();
+ List<SegmentConversionResult> results = new ArrayList<>();
+ for (File file : resultFiles) {
+ String outputSegmentName = file.getName();
+ results.add(new SegmentConversionResult.Builder().setFile(file)
+ .setSegmentName(outputSegmentName)
+ .setTableNameWithType(tableNameWithType)
+ .build());
+ }
+ return results;
+ }
+
+ @Override
+ protected void preprocessBeforeUpload(@Nonnull PinotTaskConfig pinotTaskConfig,
+ List<SegmentConversionResult> conversionResults) throws Exception {
+ Map<String, String> configs = pinotTaskConfig.getConfigs();
+ String groupsToCoverStr = configs.get(MinionConstants.SegmentMergeRollupTask.GROUPS_TO_COVER_KEY);
+
+ String vipUrl = configs.get(MinionConstants.CONTROLLER_API_URL);
+ List<String> childrenGroupIds = new ArrayList<>();
+ for (String groupId : groupsToCoverStr.split(",")) {
+ childrenGroupIds.add(groupId.trim());
+ }
+ // SegmentConversionUtils.uploadSegmentMergeLineage();
+
+ List<String> segments = new ArrayList<>();
+
+ for (SegmentConversionResult conversionResult : conversionResults) {
+ segments.add(conversionResult.getSegmentName());
+ }
+
+ LOGGER.info("Updating segment merge lineage metadata. segments: {}, childrenGroupIds: {}", segments,
+ childrenGroupIds);
+ // list of segments, list of children group ids
+ // Need to update merged segment
+ String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
+
+ String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
+ String tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType).toString();
+
+ String requestUrl = vipUrl + "/tables" + "/" + rawTableName + "/segments/lineage?type=" + tableType;
+ SegmentConversionUtils.uploadSegmentMergeLineage(configs, requestUrl, childrenGroupIds, segments);
+
+ }
+
+ @Override
+ protected List<Header> getHttpHeaders() {
+ // Since we update segment merge lineage in "pre-process step", we need to indicate this segment upload is merged segment.
+ Header mergedSegmentPushHeader =
+ new BasicHeader(FileUploadDownloadClient.CustomHeaders.MERGED_SEGMENT_PUSH, "true");
+ return Collections.singletonList(mergedSegmentPushHeader);
+ }
+}
diff --git a/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/SegmentMergeRollupTaskExecutorFactory.java b/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/SegmentMergeRollupTaskExecutorFactory.java
new file mode 100644
index 0000000..dd97948
--- /dev/null
+++ b/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/SegmentMergeRollupTaskExecutorFactory.java
@@ -0,0 +1,23 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.minion.executor;
+
+public class SegmentMergeRollupTaskExecutorFactory implements PinotTaskExecutorFactory {
+ @Override
+ public PinotTaskExecutor create() {
+ return new SegmentMergeRollupTaskExecutor();
+ }
+}
diff --git a/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/TaskExecutorFactoryRegistry.java b/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/TaskExecutorFactoryRegistry.java
index 91e1ea6..d316c6e 100644
--- a/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/TaskExecutorFactoryRegistry.java
+++ b/pinot-minion/src/main/java/com/linkedin/pinot/minion/executor/TaskExecutorFactoryRegistry.java
@@ -32,6 +32,8 @@ public class TaskExecutorFactoryRegistry {
registerTaskExecutorFactory(MinionConstants.ConvertToRawIndexTask.TASK_TYPE,
new ConvertToRawIndexTaskExecutorFactory());
registerTaskExecutorFactory(MinionConstants.PurgeTask.TASK_TYPE, new PurgeTaskExecutorFactory());
+ registerTaskExecutorFactory(MinionConstants.SegmentMergeRollupTask.TASK_TYPE,
+ new SegmentMergeRollupTaskExecutorFactory());
}
/**
diff --git a/pom.xml b/pom.xml
index 81a6dbb..e020a5e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -470,6 +470,11 @@
<version>${jackson.version}</version>
</dependency>
<dependency>
+ <groupId>com.fasterxml.uuid</groupId>
+ <artifactId>java-uuid-generator</artifactId>
+ <version>3.1.5</version>
+ </dependency>
+ <dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>1.16</version>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org