You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/07/01 17:42:30 UTC
[incubator-pinot] branch master updated: SegmentProcessorFramework
Enhancement (#7092)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f809e50 SegmentProcessorFramework Enhancement (#7092)
f809e50 is described below
commit f809e50d678ddd0963350a6e59743788c31d13bd
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Thu Jul 1 10:42:11 2021 -0700
SegmentProcessorFramework Enhancement (#7092)
Enhance `SegmentProcessorFramework` with the following improvements:
- Refactor reducer to:
- Support CONCAT/ROLLUP/DEDUP merge type (using off-heap memory)
- Support null values
- Generate generic row file as intermediate file
- Minimize the intermediate file generated (0 for CONCAT, 1 for ROLLUP/DEDUP)
- Add `GenericRowFileRecordReader` which can directly read generic row file for a range of records to avoid creating extra intermediate files when generating segments
- Add support for customized segment name prefix
- Simplify the `MergeRollupTaskExecutor` to directly use the `SegmentProcessorFramework`
---
.../pinot/core/minion/MergeRollupConverter.java | 186 -----
.../core/minion/rollup/RollupRecordAggregator.java | 11 +-
.../MaxValueAggregator.java | 2 +-
.../MinValueAggregator.java | 2 +-
.../SumValueAggregator.java | 2 +-
.../{collector => aggregator}/ValueAggregator.java | 2 +-
.../ValueAggregatorFactory.java | 26 +-
.../processing/collector/CollectorConfig.java | 109 ---
.../processing/collector/CollectorFactory.java | 61 --
.../processing/collector/ConcatCollector.java | 153 ----
.../processing/collector/GenericRowSorter.java | 95 ---
.../processing/collector/RollupCollector.java | 174 ----
.../Collector.java => framework/MergeType.java} | 34 +-
.../processing/framework/SegmentConfig.java | 26 +-
.../processing/framework/SegmentMapperConfig.java | 79 --
.../framework/SegmentProcessorConfig.java | 62 +-
.../framework/SegmentProcessorFramework.java | 94 ++-
.../processing/framework/SegmentReducer.java | 129 ---
.../processing/framework/SegmentReducerConfig.java | 61 --
.../genericrow/GenericRowDeserializer.java | 221 +++--
.../genericrow/GenericRowFileManager.java | 28 +-
.../genericrow/GenericRowFileReader.java | 34 +-
.../genericrow/GenericRowFileRecordReader.java | 127 +++
.../genericrow/GenericRowSerializer.java | 2 +
.../{framework => mapper}/SegmentMapper.java | 37 +-
.../ConcatReducer.java} | 24 +-
.../segment/processing/reducer/DedupReducer.java | 89 +++
.../ValueAggregator.java => reducer/Reducer.java} | 15 +-
.../segment/processing/reducer/ReducerFactory.java | 48 ++
.../segment/processing/reducer/RollupReducer.java | 172 ++++
.../processing/utils/SegmentProcessingUtils.java | 84 --
.../processing/utils/SegmentProcessorUtils.java | 98 +++
.../processing/utils/SortOrderComparator.java | 72 --
.../processing/framework/CollectorTest.java | 306 -------
.../processing/framework/GenericRowSorterTest.java | 77 --
.../segment/processing/framework/ReducerTest.java | 565 +++++++++++++
.../processing/framework/SegmentMapperTest.java | 166 ++--
.../framework/SegmentProcessingFrameworkTest.java | 551 -------------
.../framework/SegmentProcessorFrameworkTest.java | 885 +++++++++++++++++++++
.../processing/framework/SegmentReducerTest.java | 205 -----
.../processing/genericrow/GenericRowSerDeTest.java | 40 +-
.../merge_rollup/MergeRollupTaskExecutor.java | 63 +-
.../tasks/merge_rollup/MergeRollupTaskUtils.java | 40 +-
.../RealtimeToOfflineSegmentsTaskExecutor.java | 60 +-
.../merge_rollup/MergeRollupTaskUtilsTest.java | 16 +-
.../spi/creator/SegmentGeneratorConfig.java | 27 +-
.../creator/name/SimpleSegmentNameGenerator.java | 10 +-
.../apache/pinot/spi/data/readers/GenericRow.java | 6 +-
.../command/SegmentProcessorFrameworkCommand.java | 5 +-
.../processor/SegmentProcessorFrameworkSpec.java | 23 +-
50 files changed, 2622 insertions(+), 2782 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/minion/MergeRollupConverter.java b/pinot-core/src/main/java/org/apache/pinot/core/minion/MergeRollupConverter.java
deleted file mode 100644
index d19e01b..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/minion/MergeRollupConverter.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.minion;
-
-import com.google.common.base.Preconditions;
-import java.io.File;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.commons.io.FileUtils;
-import org.apache.pinot.core.common.MinionConstants;
-import org.apache.pinot.core.segment.processing.collector.CollectorConfig;
-import org.apache.pinot.core.segment.processing.collector.CollectorFactory;
-import org.apache.pinot.core.segment.processing.collector.ValueAggregatorFactory;
-import org.apache.pinot.core.segment.processing.framework.SegmentConfig;
-import org.apache.pinot.core.segment.processing.framework.SegmentProcessorConfig;
-import org.apache.pinot.core.segment.processing.framework.SegmentProcessorFramework;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.data.Schema;
-
-
-/**
- * Merge rollup segment processor framework takes a list of segments and concatenates/rolls up segments based on
- * the configuration.
- *
- * TODO:
- * 1. Add the support for roll-up
- * 2. Add the support to make result segments time aligned
- * 3. Add merge/roll-up prefixes for result segments
- */
-public class MergeRollupConverter {
- private List<File> _originalIndexDirs;
- private final File _workingDir;
- private final String _inputSegmentDir;
- private final String _outputSegmentDir;
- private final SegmentProcessorConfig _segmentProcessorConfig;
-
- private MergeRollupConverter(TableConfig tableConfig, Schema schema, String mergeType,
- Map<String, ValueAggregatorFactory.ValueAggregatorType> aggregatorConfigs, int numRecordsPerSegment,
- List<File> originalIndexDirs, File workingDir, String inputSegmentDir, String outputSegmentDir) {
- SegmentProcessorConfig.Builder segmentProcessorConfigBuilder =
- new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema);
-
- // Aggregations using configured collector
- Set<String> schemaColumns = schema.getPhysicalColumnNames();
- List<String> sortedColumns = tableConfig.getIndexingConfig().getSortedColumn();
- CollectorConfig collectorConfig =
- getCollectorConfig(mergeType, aggregatorConfigs, schemaColumns, sortedColumns);
- Preconditions.checkState(collectorConfig.getCollectorType() == CollectorFactory.CollectorType.CONCAT,
- "Only 'CONCAT' mode is currently supported.");
- segmentProcessorConfigBuilder.setCollectorConfig(collectorConfig);
-
- // Segment config
- SegmentConfig segmentConfig = getSegmentConfig(numRecordsPerSegment);
- segmentProcessorConfigBuilder.setSegmentConfig(segmentConfig);
- _segmentProcessorConfig = segmentProcessorConfigBuilder.build();
-
- _originalIndexDirs = originalIndexDirs;
- _workingDir = workingDir;
- _inputSegmentDir = inputSegmentDir;
- _outputSegmentDir = outputSegmentDir;
- }
-
- public File[] convert()
- throws Exception {
- File inputSegmentsDir = new File(_workingDir, _inputSegmentDir);
- Preconditions.checkState(inputSegmentsDir.mkdirs(), "Failed to create input directory: %s for task: %s",
- inputSegmentsDir.getAbsolutePath(), MinionConstants.MergeRollupTask.TASK_TYPE);
- for (File indexDir : _originalIndexDirs) {
- FileUtils.copyDirectoryToDirectory(indexDir, inputSegmentsDir);
- }
- File outputSegmentsDir = new File(_workingDir, _outputSegmentDir);
- Preconditions.checkState(outputSegmentsDir.mkdirs(), "Failed to create output directory: %s for task: %s",
- outputSegmentsDir.getAbsolutePath(), MinionConstants.MergeRollupTask.TASK_TYPE);
-
- SegmentProcessorFramework segmentProcessorFramework =
- new SegmentProcessorFramework(inputSegmentsDir, _segmentProcessorConfig, outputSegmentsDir);
- try {
- segmentProcessorFramework.processSegments();
- } finally {
- segmentProcessorFramework.cleanup();
- }
- return outputSegmentsDir.listFiles();
- }
-
- /**
- * Construct a {@link CollectorConfig} using configured collector configs and sorted columns from table config
- */
- private CollectorConfig getCollectorConfig(String mergeTypeStr, Map<String, ValueAggregatorFactory.ValueAggregatorType> aggregateConfigs,
- Set<String> schemaColumns, List<String> sortedColumns) {
- CollectorFactory.CollectorType collectorType = mergeTypeStr == null ? CollectorFactory.CollectorType.CONCAT
- : CollectorFactory.CollectorType.valueOf(mergeTypeStr.toUpperCase());
-
- if (sortedColumns != null) {
- for (String column : sortedColumns) {
- Preconditions
- .checkState(schemaColumns.contains(column), "Sorted column: %s is not a physical column in the schema",
- column);
- }
- }
- return new CollectorConfig.Builder().setCollectorType(collectorType).setAggregatorTypeMap(aggregateConfigs)
- .setSortOrder(sortedColumns).build();
- }
-
- private SegmentConfig getSegmentConfig(int numRecordsPerSegment) {
- return new SegmentConfig.Builder().setMaxNumRecordsPerSegment(numRecordsPerSegment).build();
- }
-
- public static class Builder {
- private TableConfig _tableConfig;
- private Schema _schema;
- private String _mergeType;
- private Map<String, ValueAggregatorFactory.ValueAggregatorType> _aggregatorConfigs;
- private int _numRecordsPerSegment;
- private List<File> _originalIndexDirs;
- private File _workingDir;
- private String _inputSegmentDir;
- private String _outputSegmentDir;
-
- public Builder setTableConfig(TableConfig tableConfig) {
- _tableConfig = tableConfig;
- return this;
- }
-
- public Builder setSchema(Schema schema) {
- _schema = schema;
- return this;
- }
-
- public Builder setMergeType(String mergeType) {
- _mergeType = mergeType;
- return this;
- }
-
- public Builder setAggregatorConfigs(Map<String, ValueAggregatorFactory.ValueAggregatorType> aggregatorConfigs) {
- _aggregatorConfigs = aggregatorConfigs;
- return this;
- }
-
- public Builder setNumRecordsPerSegment(int numRecordsPerSegment) {
- _numRecordsPerSegment = numRecordsPerSegment;
- return this;
- }
-
- public Builder setOriginalIndexDirs(List<File> originalIndexDirs) {
- _originalIndexDirs = originalIndexDirs;
- return this;
- }
-
- public Builder setWorkingDir(File workingDir) {
- _workingDir = workingDir;
- return this;
- }
-
- public Builder setInputSegmentDir(String inputSegmentDir) {
- _inputSegmentDir = inputSegmentDir;
- return this;
- }
-
- public Builder setOutputSegmentDir(String outputSegmentDir) {
- _outputSegmentDir = outputSegmentDir;
- return this;
- }
-
- public MergeRollupConverter build() {
- return new MergeRollupConverter(_tableConfig, _schema, _mergeType, _aggregatorConfigs,
- _numRecordsPerSegment, _originalIndexDirs, _workingDir, _inputSegmentDir, _outputSegmentDir);
- }
- }
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/minion/rollup/RollupRecordAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/minion/rollup/RollupRecordAggregator.java
index 87eabd6..384adbe 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/minion/rollup/RollupRecordAggregator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/minion/rollup/RollupRecordAggregator.java
@@ -23,8 +23,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.pinot.core.minion.segment.RecordAggregator;
-import org.apache.pinot.core.segment.processing.collector.ValueAggregator;
-import org.apache.pinot.core.segment.processing.collector.ValueAggregatorFactory;
+import org.apache.pinot.core.segment.processing.aggregator.ValueAggregator;
+import org.apache.pinot.core.segment.processing.aggregator.ValueAggregatorFactory;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.MetricFieldSpec;
import org.apache.pinot.spi.data.Schema;
@@ -38,7 +39,7 @@ import org.apache.pinot.spi.data.readers.GenericRow;
* whose metric column values are aggregated based on the given aggregator functions.
*/
public class RollupRecordAggregator implements RecordAggregator {
- private static final String DEFAULT_VALUE_AGGREGATOR_TYPE = ValueAggregatorFactory.ValueAggregatorType.SUM.toString();
+ private static final String DEFAULT_VALUE_AGGREGATOR_TYPE = AggregationFunctionType.SUM.name();
private final Map<String, ValueAggregator> _valueAggregatorMap;
private final Schema _schema;
@@ -53,7 +54,9 @@ public class RollupRecordAggregator implements RecordAggregator {
if (!fieldSpec.isVirtualColumn() && fieldSpec.getFieldType() == FieldSpec.FieldType.METRIC) {
String metricName = fieldSpec.getName();
String aggregateType = aggregateTypes.getOrDefault(metricName, DEFAULT_VALUE_AGGREGATOR_TYPE);
- ValueAggregator valueAggregator = ValueAggregatorFactory.getValueAggregator(aggregateType, fieldSpec.getDataType());
+ ValueAggregator valueAggregator = ValueAggregatorFactory
+ .getValueAggregator(AggregationFunctionType.getAggregationFunctionType(aggregateType),
+ fieldSpec.getDataType());
_valueAggregatorMap.put(metricName, valueAggregator);
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/MaxValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/MaxValueAggregator.java
similarity index 96%
rename from pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/MaxValueAggregator.java
rename to pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/MaxValueAggregator.java
index 7970691..6a231b0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/MaxValueAggregator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/MaxValueAggregator.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.core.segment.processing.collector;
+package org.apache.pinot.core.segment.processing.aggregator;
import org.apache.pinot.spi.data.FieldSpec;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/MinValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/MinValueAggregator.java
similarity index 96%
rename from pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/MinValueAggregator.java
rename to pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/MinValueAggregator.java
index 10b0446..9352cc9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/MinValueAggregator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/MinValueAggregator.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.core.segment.processing.collector;
+package org.apache.pinot.core.segment.processing.aggregator;
import org.apache.pinot.spi.data.FieldSpec;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/SumValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/SumValueAggregator.java
similarity index 96%
rename from pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/SumValueAggregator.java
rename to pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/SumValueAggregator.java
index b634c46..0570cca 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/SumValueAggregator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/SumValueAggregator.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.core.segment.processing.collector;
+package org.apache.pinot.core.segment.processing.aggregator;
import org.apache.pinot.spi.data.FieldSpec;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/ValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregator.java
similarity index 94%
copy from pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/ValueAggregator.java
copy to pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregator.java
index 80fc99b..016e0fb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/ValueAggregator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregator.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.core.segment.processing.collector;
+package org.apache.pinot.core.segment.processing.aggregator;
/**
* Interface for value aggregator
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/ValueAggregatorFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java
similarity index 68%
rename from pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/ValueAggregatorFactory.java
rename to pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java
index 9c3250d..68578d1 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/ValueAggregatorFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java
@@ -16,36 +16,32 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.core.segment.processing.collector;
+package org.apache.pinot.core.segment.processing.aggregator;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
/**
* Factory class to create instances of value aggregator from the given name.
*/
public class ValueAggregatorFactory {
- public enum ValueAggregatorType {
- SUM, MAX, MIN
- }
-
private ValueAggregatorFactory() {
}
/**
- * Construct a ValueAggregator from the given aggregator type
+ * Constructs a ValueAggregator from the given aggregation type.
*/
- public static ValueAggregator getValueAggregator(String aggregatorTypeStr, FieldSpec.DataType dataType) {
- ValueAggregatorType aggregatorType = ValueAggregatorType.valueOf(aggregatorTypeStr.toUpperCase());
- switch (aggregatorType) {
- case SUM:
- return new SumValueAggregator(dataType);
- case MAX:
- return new MaxValueAggregator(dataType);
+ public static ValueAggregator getValueAggregator(AggregationFunctionType aggregationType, DataType dataType) {
+ switch (aggregationType) {
case MIN:
return new MinValueAggregator(dataType);
+ case MAX:
+ return new MaxValueAggregator(dataType);
+ case SUM:
+ return new SumValueAggregator(dataType);
default:
- throw new IllegalStateException("Unsupported value aggregator type : " + aggregatorTypeStr);
+ throw new IllegalStateException("Unsupported aggregation type: " + aggregationType);
}
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/CollectorConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/CollectorConfig.java
deleted file mode 100644
index 5e8f4db..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/CollectorConfig.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.segment.processing.collector;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import java.util.List;
-import java.util.Map;
-import javax.annotation.Nullable;
-
-
-/**
- * Config for Collector
- */
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class CollectorConfig {
- private static final CollectorFactory.CollectorType DEFAULT_COLLECTOR_TYPE = CollectorFactory.CollectorType.CONCAT;
-
- private final CollectorFactory.CollectorType _collectorType;
- private final Map<String, ValueAggregatorFactory.ValueAggregatorType> _aggregatorTypeMap;
- private final List<String> _sortOrder;
-
- @JsonCreator
- private CollectorConfig(
- @JsonProperty(value = "collectorType", required = true) CollectorFactory.CollectorType collectorType,
- @JsonProperty(value = "aggregatorTypeMap") Map<String, ValueAggregatorFactory.ValueAggregatorType> aggregatorTypeMap,
- @JsonProperty(value = "sortOrder") List<String> sortOrder) {
- _collectorType = collectorType;
- _aggregatorTypeMap = aggregatorTypeMap;
- _sortOrder = sortOrder;
- }
-
- /**
- * The type of the Collector
- */
- @JsonProperty
- public CollectorFactory.CollectorType getCollectorType() {
- return _collectorType;
- }
-
- /**
- * Map containing aggregation types for the metrics
- */
- @JsonProperty
- @Nullable
- public Map<String, ValueAggregatorFactory.ValueAggregatorType> getAggregatorTypeMap() {
- return _aggregatorTypeMap;
- }
-
- /**
- * The columns on which to sort
- */
- @JsonProperty
- @Nullable
- public List<String> getSortOrder() {
- return _sortOrder;
- }
-
- /**
- * Builder for CollectorConfig
- */
- public static class Builder {
- private CollectorFactory.CollectorType _collectorType = DEFAULT_COLLECTOR_TYPE;
- private Map<String, ValueAggregatorFactory.ValueAggregatorType> _aggregatorTypeMap;
- private List<String> _sortOrder;
-
- public Builder setCollectorType(CollectorFactory.CollectorType collectorType) {
- _collectorType = collectorType;
- return this;
- }
-
- public Builder setAggregatorTypeMap(Map<String, ValueAggregatorFactory.ValueAggregatorType> aggregatorTypeMap) {
- _aggregatorTypeMap = aggregatorTypeMap;
- return this;
- }
-
- public Builder setSortOrder(List<String> sortOrder) {
- _sortOrder = sortOrder;
- return this;
- }
-
- public CollectorConfig build() {
- return new CollectorConfig(_collectorType, _aggregatorTypeMap, _sortOrder);
- }
- }
-
- @Override
- public String toString() {
- return "CollectorConfig{" + "_collectorType=" + _collectorType + ", _aggregatorTypeMap=" + _aggregatorTypeMap
- + ", _sortOrder=" + _sortOrder + '}';
- }
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/CollectorFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/CollectorFactory.java
deleted file mode 100644
index f4a51ab..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/CollectorFactory.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.segment.processing.collector;
-
-import org.apache.pinot.spi.data.Schema;
-
-
-/**
- * Factory for constructing a Collector from CollectorConfig
- */
-public final class CollectorFactory {
-
- private CollectorFactory() {
-
- }
-
- public enum CollectorType {
- /**
- * Aggregate the metric values based on configured aggregation types on unique dimension + time column values
- */
- ROLLUP,
- /**
- * Append rows without doing any aggregations
- */
- CONCAT
- // TODO: add support for DEDUP
- }
-
- /**
- * Construct a Collector from the given CollectorConfig and schema
- */
- public static Collector getCollector(CollectorConfig collectorConfig, Schema pinotSchema) {
- Collector collector = null;
- switch (collectorConfig.getCollectorType()) {
-
- case ROLLUP:
- collector = new RollupCollector(collectorConfig, pinotSchema);
- break;
- case CONCAT:
- collector = new ConcatCollector(collectorConfig, pinotSchema);
- break;
- }
- return collector;
- }
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/ConcatCollector.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/ConcatCollector.java
deleted file mode 100644
index 263a0ec..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/ConcatCollector.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.segment.processing.collector;
-
-import com.google.common.base.Preconditions;
-import it.unimi.dsi.fastutil.Arrays;
-import java.io.File;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-import javax.annotation.Nullable;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.io.FileUtils;
-import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileManager;
-import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileReader;
-import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileWriter;
-import org.apache.pinot.core.segment.processing.utils.SegmentProcessingUtils;
-import org.apache.pinot.core.segment.processing.utils.SortOrderComparator;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.readers.GenericRow;
-
-
-/**
- * A Collector implementation for collecting and concatenating all incoming rows.
- */
-public class ConcatCollector implements Collector {
- private final int _numSortColumns;
- private final SortOrderComparator _sortOrderComparator;
- private final File _workingDir;
- private final GenericRowFileManager _recordFileManager;
-
- private GenericRowFileWriter _recordFileWriter;
- private GenericRowFileReader _recordFileReader;
- private int _numDocs;
-
- public ConcatCollector(CollectorConfig collectorConfig, Schema schema) {
- List<String> sortOrder = collectorConfig.getSortOrder();
- List<FieldSpec> fieldSpecs;
- if (CollectionUtils.isNotEmpty(sortOrder)) {
- fieldSpecs = SegmentProcessingUtils.getFieldSpecs(schema, sortOrder);
- _numSortColumns = sortOrder.size();
- _sortOrderComparator = SegmentProcessingUtils.getSortOrderComparator(fieldSpecs, _numSortColumns);
- } else {
- fieldSpecs = SegmentProcessingUtils.getFieldSpecs(schema);
- _numSortColumns = 0;
- _sortOrderComparator = null;
- }
-
- _workingDir =
- new File(FileUtils.getTempDirectory(), String.format("concat_collector_%d", System.currentTimeMillis()));
- Preconditions.checkState(_workingDir.mkdirs(), "Failed to create dir: %s for %s with config: %s",
- _workingDir.getAbsolutePath(), ConcatCollector.class.getSimpleName(), collectorConfig);
-
- // TODO: Pass 'includeNullFields' from the config
- _recordFileManager = new GenericRowFileManager(_workingDir, fieldSpecs, true);
- try {
- _recordFileWriter = _recordFileManager.getFileWriter();
- } catch (IOException e) {
- throw new RuntimeException("Caught exception while creating the file writer", e);
- }
- }
-
- @Override
- public void collect(GenericRow genericRow)
- throws IOException {
- _recordFileWriter.write(genericRow);
- _numDocs++;
- }
-
- @Override
- public Iterator<GenericRow> iterator()
- throws IOException {
- _recordFileManager.closeFileWriter();
- _recordFileReader = _recordFileManager.getFileReader();
-
- // TODO: A lot of this code can be made common across Collectors, once {@link RollupCollector} is also converted to off heap implementation
- if (_numSortColumns != 0) {
- int[] sortedDocIds = new int[_numDocs];
- for (int i = 0; i < _numDocs; i++) {
- sortedDocIds[i] = i;
- }
-
- Arrays.quickSort(0, _numDocs, (i1, i2) -> _sortOrderComparator
- .compare(_recordFileReader.partialRead(sortedDocIds[i1], _numSortColumns),
- _recordFileReader.partialRead(sortedDocIds[i2], _numSortColumns)), (i1, i2) -> {
- int temp = sortedDocIds[i1];
- sortedDocIds[i1] = sortedDocIds[i2];
- sortedDocIds[i2] = temp;
- });
- return createIterator(sortedDocIds);
- } else {
- return createIterator(null);
- }
- }
-
- private Iterator<GenericRow> createIterator(@Nullable int[] sortedDocIds) {
- return new Iterator<GenericRow>() {
- final GenericRow _reuse = new GenericRow();
- int _nextDocId = 0;
-
- @Override
- public boolean hasNext() {
- return _nextDocId < _numDocs;
- }
-
- @Override
- public GenericRow next() {
- int docId = sortedDocIds != null ? sortedDocIds[_nextDocId++] : _nextDocId++;
- return _recordFileReader.read(docId, _reuse);
- }
- };
- }
-
- @Override
- public int size() {
- return _numDocs;
- }
-
- @Override
- public void reset()
- throws IOException {
- _recordFileManager.cleanUp();
- _recordFileWriter = _recordFileManager.getFileWriter();
- _numDocs = 0;
- }
-
- @Override
- public void close()
- throws IOException {
- try {
- _recordFileManager.cleanUp();
- } finally {
- FileUtils.deleteQuietly(_workingDir);
- }
- }
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/GenericRowSorter.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/GenericRowSorter.java
deleted file mode 100644
index d728032..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/GenericRowSorter.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.segment.processing.collector;
-
-import com.google.common.base.Preconditions;
-import java.util.Comparator;
-import java.util.List;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.utils.ByteArray;
-
-
-/**
- * A sorter for GenericRows
- */
-public class GenericRowSorter {
-
- private final Comparator<GenericRow> _genericRowComparator;
-
- public GenericRowSorter(List<String> sortOrder, Schema schema) {
- int sortOrderSize = sortOrder.size();
- DataType[] storedTypes = new DataType[sortOrderSize];
- for (int i = 0; i < sortOrderSize; i++) {
- String column = sortOrder.get(i);
- FieldSpec fieldSpec = schema.getFieldSpecFor(column);
- Preconditions.checkState(fieldSpec != null, "Column in sort order: %s does not exist in schema", column);
- Preconditions.checkState(fieldSpec.isSingleValueField(), "Cannot use multi value column: %s for sorting", column);
- storedTypes[i] = fieldSpec.getDataType().getStoredType();
- }
- _genericRowComparator = (o1, o2) -> {
- for (int i = 0; i < sortOrderSize; i++) {
- String column = sortOrder.get(i);
- DataType dataType = storedTypes[i];
- Object value1 = o1.getValue(column);
- Object value2 = o2.getValue(column);
- int result;
- switch (dataType) {
- case INT:
- result = Integer.compare((int) value1, (int) value2);
- break;
- case LONG:
- result = Long.compare((long) value1, (long) value2);
- break;
- case FLOAT:
- result = Float.compare((float) value1, (float) value2);
- break;
- case DOUBLE:
- result = Double.compare((double) value1, (double) value2);
- break;
- case STRING:
- result = ((String) value1).compareTo((String) value2);
- break;
- case BYTES:
- result = ByteArray.compare((byte[]) value1, (byte[]) value2);
- break;
- default:
- throw new IllegalStateException("Cannot sort on column with dataType " + dataType);
- }
- if (result != 0) {
- return result;
- }
- }
- return 0;
- };
- }
-
- public Comparator<GenericRow> getGenericRowComparator() {
- return _genericRowComparator;
- }
-
- /**
- * Sorts the given list of GenericRow
- */
- public void sort(List<GenericRow> rows) {
- rows.sort(_genericRowComparator);
- }
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/RollupCollector.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/RollupCollector.java
deleted file mode 100644
index 9291813..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/RollupCollector.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.segment.processing.collector;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.readers.GenericRow;
-
-
-/**
- * A Collector that rolls up the incoming records on unique dimensions + time columns, based on provided aggregation types for metrics.
- * By default will use the SUM aggregation on metrics.
- * TODO: Change this to off heap implementation, similar to {@link ConcatCollector}
- * {@link RollupCollector} needs to additionally aggregate records after sorting, before returning iterator
- */
-public class RollupCollector implements Collector {
-
- private final Map<Record, GenericRow> _collection = new HashMap<>();
- private final GenericRowSorter _sorter;
-
- private final int _keySize;
- private final int _valueSize;
- private final String[] _keyColumns;
- private final String[] _valueColumns;
- private final ValueAggregator[] _valueAggregators;
-
- public RollupCollector(CollectorConfig collectorConfig, Schema schema) {
- int keySize = 0;
- int valueSize = 0;
- for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
- if (!fieldSpec.isVirtualColumn()) {
- if (fieldSpec.getFieldType() == FieldSpec.FieldType.METRIC) {
- valueSize ++;
- } else {
- keySize ++;
- }
- }
- }
- _keySize = keySize;
- _valueSize = valueSize;
- _keyColumns = new String[_keySize];
- _valueColumns = new String[_valueSize];
- _valueAggregators = new ValueAggregator[_valueSize];
-
- Map<String, ValueAggregatorFactory.ValueAggregatorType> aggregatorTypeMap = collectorConfig.getAggregatorTypeMap();
- if (aggregatorTypeMap == null) {
- aggregatorTypeMap = Collections.emptyMap();
- }
- int valIdx = 0;
- int keyIdx = 0;
- for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
- if (!fieldSpec.isVirtualColumn()) {
- String name = fieldSpec.getName();
- if (fieldSpec.getFieldType() == FieldSpec.FieldType.METRIC) {
- _valueColumns[valIdx] = name;
- ValueAggregatorFactory.ValueAggregatorType aggregatorType =
- aggregatorTypeMap.getOrDefault(name, ValueAggregatorFactory.ValueAggregatorType.SUM);
- _valueAggregators[valIdx] =
- ValueAggregatorFactory.getValueAggregator(aggregatorType.toString(), fieldSpec.getDataType());
- valIdx++;
- } else {
- _keyColumns[keyIdx++] = name;
- }
- }
- }
-
- List<String> sortOrder = collectorConfig.getSortOrder();
- if (CollectionUtils.isNotEmpty(sortOrder)) {
- _sorter = new GenericRowSorter(sortOrder, schema);
- } else {
- _sorter = null;
- }
- }
-
- /**
- * If a row already exists in the collection (based on dimension + time columns), rollup the metric values, else add the row
- */
- @Override
- public void collect(GenericRow genericRow) {
- Object[] key = new Object[_keySize];
- for (int i = 0; i < _keySize; i++) {
- key[i] = genericRow.getValue(_keyColumns[i]);
- }
- Record keyRecord = new Record(key);
- GenericRow prev = _collection.get(keyRecord);
- if (prev == null) {
- _collection.put(keyRecord, genericRow);
- } else {
- for (int i = 0; i < _valueSize; i++) {
- String valueColumn = _valueColumns[i];
- Object aggregate = _valueAggregators[i].aggregate(prev.getValue(valueColumn), genericRow.getValue(valueColumn));
- prev.putValue(valueColumn, aggregate);
- }
- }
- }
-
- @Override
- public Iterator<GenericRow> iterator() {
- Iterator<GenericRow> iterator;
- if (_sorter != null) {
- List<GenericRow> sortedRows = new ArrayList<>(_collection.values());
- _sorter.sort(sortedRows);
- iterator = sortedRows.iterator();
- } else {
- iterator = _collection.values().iterator();
- }
- return iterator;
- }
-
- @Override
- public int size() {
- return _collection.size();
- }
-
- @Override
- public void reset() {
- _collection.clear();
- }
-
- @Override
- public void close()
- throws IOException {
-
- }
-
- /**
- * A representation for the keys of the generic row
- * Note that the dimensions can have multi-value columns, and hence the equals and hashCode need deep array operations
- */
- private static class Record {
- private final Object[] _keyParts;
-
- public Record(Object[] keyParts) {
- _keyParts = keyParts;
- }
-
- // NOTE: Not check class for performance concern
- @SuppressWarnings("EqualsWhichDoesntCheckParameterClass")
- @Override
- public boolean equals(Object o) {
- return Arrays.deepEquals(_keyParts, ((Record) o)._keyParts);
- }
-
- @Override
- public int hashCode() {
- return Arrays.deepHashCode(_keyParts);
- }
- }
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/Collector.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/MergeType.java
similarity index 50%
rename from pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/Collector.java
rename to pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/MergeType.java
index ece32db..2c38b87 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/Collector.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/MergeType.java
@@ -16,41 +16,25 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.core.segment.processing.collector;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Iterator;
-import org.apache.pinot.spi.data.readers.GenericRow;
-
+package org.apache.pinot.core.segment.processing.framework;
/**
- * Collects and stores GenericRows
+ * The MergeType defines how the segments should be merged.
*/
-public interface Collector extends Closeable {
-
- /**
- * Collects the given GenericRow and stores it
- * @param genericRow the generic row to add to the collection
- */
- void collect(GenericRow genericRow)
- throws IOException;
+public enum MergeType {
/**
- * Finish any pre-exit processing and seal the collection for reading
- * Provide an iterator for the GenericRows in the collection
+ * Concatenates the rows.
*/
- Iterator<GenericRow> iterator()
- throws IOException;
+ CONCAT,
/**
- * The number of rows in the collection
+ * Aggregates the metric values based on the configured aggregation types on unique dimension + time column values.
*/
- int size();
+ ROLLUP,
/**
- * Resets the collection of this collector by deleting all existing GenericRows
+ * Deduplicates rows with the same values.
*/
- void reset()
- throws IOException;
+ DEDUP
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentConfig.java
index f46a967..40964c3 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentConfig.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
+import javax.annotation.Nullable;
/**
@@ -29,44 +30,57 @@ import com.google.common.base.Preconditions;
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class SegmentConfig {
-
private static final int DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT = 5_000_000;
+
private final int _maxNumRecordsPerSegment;
- // TODO: more configs such as segment name prefix
+ private final String _segmentNamePrefix;
@JsonCreator
- private SegmentConfig(@JsonProperty(value = "maxNumRecordsPerSegment") int maxNumRecordsPerSegment) {
+ private SegmentConfig(@JsonProperty(value = "maxNumRecordsPerSegment", required = true) int maxNumRecordsPerSegment,
+ @JsonProperty("segmentNamePrefix") @Nullable String segmentNamePrefix) {
Preconditions.checkState(maxNumRecordsPerSegment > 0, "Max num records per segment must be > 0");
_maxNumRecordsPerSegment = maxNumRecordsPerSegment;
+ _segmentNamePrefix = segmentNamePrefix;
}
/**
* The max number of records allowed per segment
*/
- @JsonProperty
public int getMaxNumRecordsPerSegment() {
return _maxNumRecordsPerSegment;
}
+ @Nullable
+ public String getSegmentNamePrefix() {
+ return _segmentNamePrefix;
+ }
+
/**
* Builder for SegmentConfig
*/
public static class Builder {
private int _maxNumRecordsPerSegment = DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT;
+ private String _segmentNamePrefix;
public Builder setMaxNumRecordsPerSegment(int maxNumRecordsPerSegment) {
_maxNumRecordsPerSegment = maxNumRecordsPerSegment;
return this;
}
+ public Builder setSegmentNamePrefix(String segmentNamePrefix) {
+ _segmentNamePrefix = segmentNamePrefix;
+ return this;
+ }
+
public SegmentConfig build() {
Preconditions.checkState(_maxNumRecordsPerSegment > 0, "Max num records per segment must be > 0");
- return new SegmentConfig(_maxNumRecordsPerSegment);
+ return new SegmentConfig(_maxNumRecordsPerSegment, _segmentNamePrefix);
}
}
@Override
public String toString() {
- return "SegmentsConfig{" + "_maxNumRecordsPerSegment=" + _maxNumRecordsPerSegment + '}';
+ return "SegmentConfig{" + "_maxNumRecordsPerSegment=" + _maxNumRecordsPerSegment + ", _segmentNamePrefix='"
+ + _segmentNamePrefix + '\'' + '}';
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperConfig.java
deleted file mode 100644
index 96cf2e5..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperConfig.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.segment.processing.framework;
-
-import java.util.List;
-import org.apache.pinot.core.segment.processing.filter.RecordFilterConfig;
-import org.apache.pinot.core.segment.processing.partitioner.PartitionerConfig;
-import org.apache.pinot.core.segment.processing.transformer.RecordTransformerConfig;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.data.Schema;
-
-
-/**
- * Config for the mapper phase of SegmentProcessorFramework
- */
-public class SegmentMapperConfig {
- private final TableConfig _tableConfig;
- private final Schema _schema;
- private final RecordTransformerConfig _recordTransformerConfig;
- private final RecordFilterConfig _recordFilterConfig;
- private final List<PartitionerConfig> _partitionerConfigs;
-
- public SegmentMapperConfig(TableConfig tableConfig, Schema schema, RecordTransformerConfig recordTransformerConfig,
- RecordFilterConfig recordFilterConfig, List<PartitionerConfig> partitionerConfigs) {
- _tableConfig = tableConfig;
- _schema = schema;
- _recordTransformerConfig = recordTransformerConfig;
- _recordFilterConfig = recordFilterConfig;
- _partitionerConfigs = partitionerConfigs;
- }
-
- public TableConfig getTableConfig() {
- return _tableConfig;
- }
-
- /**
- * The Pinot schema
- */
- public Schema getSchema() {
- return _schema;
- }
-
- /**
- * The RecordTransformerConfig for the mapper
- */
- public RecordTransformerConfig getRecordTransformerConfig() {
- return _recordTransformerConfig;
- }
-
- /**
- * The RecordFilterConfig for the mapper
- */
- public RecordFilterConfig getRecordFilterConfig() {
- return _recordFilterConfig;
- }
-
- /**
- * The PartitioningConfig for the mapper
- */
- public List<PartitionerConfig> getPartitionerConfigs() {
- return _partitionerConfigs;
- }
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java
index ba1661e..bb299bf 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java
@@ -20,12 +20,14 @@ package org.apache.pinot.core.segment.processing.framework;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.pinot.core.segment.processing.collector.CollectorConfig;
import org.apache.pinot.core.segment.processing.filter.RecordFilterConfig;
import org.apache.pinot.core.segment.processing.partitioner.PartitionerConfig;
import org.apache.pinot.core.segment.processing.transformer.RecordTransformerConfig;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
@@ -34,24 +36,28 @@ import org.apache.pinot.spi.data.Schema;
* Config for configuring the phases of {@link SegmentProcessorFramework}
*/
public class SegmentProcessorConfig {
+ private static final MergeType DEFAULT_MERGE_TYPE = MergeType.CONCAT;
private final TableConfig _tableConfig;
private final Schema _schema;
+ private final MergeType _mergeType;
private final RecordTransformerConfig _recordTransformerConfig;
private final RecordFilterConfig _recordFilterConfig;
private final List<PartitionerConfig> _partitionerConfigs;
- private final CollectorConfig _collectorConfig;
+ private final Map<String, AggregationFunctionType> _aggregationTypes;
private final SegmentConfig _segmentConfig;
- private SegmentProcessorConfig(TableConfig tableConfig, Schema schema,
+ private SegmentProcessorConfig(TableConfig tableConfig, Schema schema, MergeType mergeType,
RecordTransformerConfig recordTransformerConfig, RecordFilterConfig recordFilterConfig,
- List<PartitionerConfig> partitionerConfigs, CollectorConfig collectorConfig, SegmentConfig segmentConfig) {
+ List<PartitionerConfig> partitionerConfigs, Map<String, AggregationFunctionType> aggregationTypes,
+ SegmentConfig segmentConfig) {
_tableConfig = tableConfig;
_schema = schema;
+ _mergeType = mergeType;
_recordTransformerConfig = recordTransformerConfig;
_recordFilterConfig = recordFilterConfig;
_partitionerConfigs = partitionerConfigs;
- _collectorConfig = collectorConfig;
+ _aggregationTypes = aggregationTypes;
_segmentConfig = segmentConfig;
}
@@ -70,6 +76,13 @@ public class SegmentProcessorConfig {
}
/**
+ * The merge type for the SegmentProcessorFramework
+ */
+ public MergeType getMergeType() {
+ return _mergeType;
+ }
+
+ /**
* The RecordTransformerConfig for the SegmentProcessorFramework's map phase
*/
public RecordTransformerConfig getRecordTransformerConfig() {
@@ -91,14 +104,14 @@ public class SegmentProcessorConfig {
}
/**
- * The CollectorConfig for the SegmentProcessorFramework's reduce phase
+ * The aggregator types for the SegmentProcessorFramework's reduce phase with ROLLUP merge type
*/
- public CollectorConfig getCollectorConfig() {
- return _collectorConfig;
+ public Map<String, AggregationFunctionType> getAggregationTypes() {
+ return _aggregationTypes;
}
/**
- * The SegmentConfig for the SegmentProcessorFramework's segment generation phase
+ * The SegmentConfig for the SegmentProcessorFramework's reduce phase
*/
public SegmentConfig getSegmentConfig() {
return _segmentConfig;
@@ -110,10 +123,11 @@ public class SegmentProcessorConfig {
public static class Builder {
private TableConfig _tableConfig;
private Schema _schema;
+ private MergeType _mergeType;
private RecordTransformerConfig _recordTransformerConfig;
private RecordFilterConfig _recordFilterConfig;
private List<PartitionerConfig> _partitionerConfigs;
- private CollectorConfig _collectorConfig;
+ private Map<String, AggregationFunctionType> _aggregationTypes;
private SegmentConfig _segmentConfig;
public Builder setTableConfig(TableConfig tableConfig) {
@@ -126,6 +140,11 @@ public class SegmentProcessorConfig {
return this;
}
+ public Builder setMergeType(MergeType mergeType) {
+ _mergeType = mergeType;
+ return this;
+ }
+
public Builder setRecordTransformerConfig(RecordTransformerConfig recordTransformerConfig) {
_recordTransformerConfig = recordTransformerConfig;
return this;
@@ -141,8 +160,8 @@ public class SegmentProcessorConfig {
return this;
}
- public Builder setCollectorConfig(CollectorConfig collectorConfig) {
- _collectorConfig = collectorConfig;
+ public Builder setAggregationTypes(Map<String, AggregationFunctionType> aggregationTypes) {
+ _aggregationTypes = aggregationTypes;
return this;
}
@@ -164,22 +183,25 @@ public class SegmentProcessorConfig {
if (CollectionUtils.isEmpty(_partitionerConfigs)) {
_partitionerConfigs = Lists.newArrayList(new PartitionerConfig.Builder().build());
}
- if (_collectorConfig == null) {
- _collectorConfig = new CollectorConfig.Builder().build();
+ if (_mergeType == null) {
+ _mergeType = DEFAULT_MERGE_TYPE;
+ }
+ if (_aggregationTypes == null) {
+ _aggregationTypes = Collections.emptyMap();
}
if (_segmentConfig == null) {
_segmentConfig = new SegmentConfig.Builder().build();
}
- return new SegmentProcessorConfig(_tableConfig, _schema, _recordTransformerConfig, _recordFilterConfig,
- _partitionerConfigs, _collectorConfig, _segmentConfig);
+ return new SegmentProcessorConfig(_tableConfig, _schema, _mergeType, _recordTransformerConfig,
+ _recordFilterConfig, _partitionerConfigs, _aggregationTypes, _segmentConfig);
}
}
@Override
public String toString() {
- return "SegmentProcessorConfig{" + "\n_tableConfig=" + _tableConfig + ", \n_schema=" + _schema
- .toSingleLineJsonString() + ", \n_recordFilterConfig=" + _recordFilterConfig + ", \n_recordTransformerConfig="
- + _recordTransformerConfig + ", \n_partitionerConfigs=" + _partitionerConfigs + ", \n_collectorConfig="
- + _collectorConfig + ", \n_segmentsConfig=" + _segmentConfig + "\n}";
+ return "SegmentProcessorConfig{" + "_tableConfig=" + _tableConfig + ", _schema=" + _schema + ", _mergeType="
+ + _mergeType + ", _recordTransformerConfig=" + _recordTransformerConfig + ", _recordFilterConfig="
+ + _recordFilterConfig + ", _partitionerConfigs=" + _partitionerConfigs + ", _aggregationTypes="
+ + _aggregationTypes + ", _segmentConfig=" + _segmentConfig + '}';
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
index 34c280a..c8df450 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
@@ -27,12 +27,18 @@ import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileManager;
+import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileReader;
+import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileRecordReader;
+import org.apache.pinot.core.segment.processing.mapper.SegmentMapper;
+import org.apache.pinot.core.segment.processing.reducer.Reducer;
+import org.apache.pinot.core.segment.processing.reducer.ReducerFactory;
+import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
+import org.apache.pinot.segment.local.segment.creator.RecordReaderSegmentCreationDataSource;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.readers.FileFormat;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -116,11 +122,8 @@ public class SegmentProcessorFramework {
.checkState(segmentFiles != null && segmentFiles.length > 0, "Failed to find segments under input dir: %s",
_inputSegmentsDir.getAbsolutePath());
- // Mapper phase.
- LOGGER.info("Beginning mapper phase. Processing segments: {}", Arrays.toString(_inputSegmentsDir.list()));
- SegmentMapperConfig mapperConfig =
- new SegmentMapperConfig(_tableConfig, _schema, _segmentProcessorConfig.getRecordTransformerConfig(),
- _segmentProcessorConfig.getRecordFilterConfig(), _segmentProcessorConfig.getPartitionerConfigs());
+ // Map phase
+ LOGGER.info("Beginning map phase on segments: {}", Arrays.toString(_inputSegmentsDir.list()));
List<RecordReader> recordReaders = new ArrayList<>(segmentFiles.length);
for (File indexDir : segmentFiles) {
String fileName = indexDir.getName();
@@ -139,56 +142,61 @@ public class SegmentProcessorFramework {
recordReader.init(indexDir, null, null, true);
recordReaders.add(recordReader);
}
- SegmentMapper mapper = new SegmentMapper(recordReaders, mapperConfig, _mapperOutputDir);
+ SegmentMapper mapper = new SegmentMapper(recordReaders, _segmentProcessorConfig, _mapperOutputDir);
Map<String, GenericRowFileManager> partitionToFileManagerMap = mapper.map();
for (RecordReader recordReader : recordReaders) {
recordReader.close();
}
+ FileUtils.deleteDirectory(_mapperInputDir);
// Check for mapper output files
- int numPartitions = partitionToFileManagerMap.size();
- Preconditions.checkState(numPartitions > 0, "No partition generated from mapper phase");
+ if (partitionToFileManagerMap.isEmpty()) {
+ LOGGER.info("No partition generated from mapper phase, skipping the reducer phase");
+ return;
+ }
- // Reducer phase.
- LOGGER.info("Beginning reducer phase on partitions: {}", partitionToFileManagerMap.keySet());
+ // Reduce phase
+ LOGGER.info("Beginning reduce phase on partitions: {}", partitionToFileManagerMap.keySet());
for (Map.Entry<String, GenericRowFileManager> entry : partitionToFileManagerMap.entrySet()) {
- String partition = entry.getKey();
+ String partitionId = entry.getKey();
GenericRowFileManager fileManager = entry.getValue();
- // Set partition as reducerId
- SegmentReducerConfig reducerConfig =
- new SegmentReducerConfig(_schema, _segmentProcessorConfig.getCollectorConfig(),
- _segmentProcessorConfig.getSegmentConfig().getMaxNumRecordsPerSegment());
- SegmentReducer reducer = new SegmentReducer(partition, fileManager, reducerConfig, _reducerOutputDir);
- reducer.reduce();
- reducer.cleanup();
- fileManager.cleanUp();
- }
-
- // Check for reducer output files
- File[] reducerOutputFiles = _reducerOutputDir.listFiles();
- if (reducerOutputFiles.length == 0) {
- throw new IllegalStateException(
- "No files found in reducer output directory: " + _reducerOutputDir.getAbsolutePath()
- + ". Exiting SegmentProcessorFramework.");
+ Reducer reducer = ReducerFactory.getReducer(partitionId, fileManager, _segmentProcessorConfig, _reducerOutputDir);
+ entry.setValue(reducer.reduce());
}
- // Segment generation phase.
- LOGGER.info("Beginning segment generation phase. Processing files: {}", Arrays.toString(_reducerOutputDir.list()));
- // Reducer output directory will have 1 or more avro files
- int segmentNum = 0;
- for (File resultFile : reducerOutputFiles) {
- SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(_tableConfig, _schema);
- segmentGeneratorConfig.setTableName(_tableConfig.getTableName());
- segmentGeneratorConfig.setOutDir(_outputSegmentsDir.getAbsolutePath());
- segmentGeneratorConfig.setInputFilePath(resultFile.getAbsolutePath());
- segmentGeneratorConfig.setFormat(FileFormat.AVRO);
- segmentGeneratorConfig.setSequenceId(segmentNum++);
- SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
- driver.init(segmentGeneratorConfig);
- driver.build();
+ // Segment creation phase
+ LOGGER.info("Beginning segment creation phase on partitions: {}", partitionToFileManagerMap.keySet());
+ SegmentGeneratorConfig generatorConfig = new SegmentGeneratorConfig(_tableConfig, _schema);
+ generatorConfig.setOutDir(_outputSegmentsDir.getPath());
+ // TODO: Use NormalizedDateSegmentNameGenerator
+ generatorConfig.setSegmentNamePrefix(_segmentProcessorConfig.getSegmentConfig().getSegmentNamePrefix());
+ int maxNumRecordsPerSegment = _segmentProcessorConfig.getSegmentConfig().getMaxNumRecordsPerSegment();
+ CompositeTransformer passThroughTransformer = CompositeTransformer.getPassThroughTransformer();
+ int sequenceId = 0;
+ for (Map.Entry<String, GenericRowFileManager> entry : partitionToFileManagerMap.entrySet()) {
+ String partitionId = entry.getKey();
+ GenericRowFileManager fileManager = entry.getValue();
+ GenericRowFileReader fileReader = fileManager.getFileReader();
+ int numRows = fileReader.getNumRows();
+ int numSortFields = fileReader.getNumSortFields();
+ LOGGER.info("Start creating segments on partition: {}, numRows: {}, numSortFields: {}", partitionId, numRows,
+ numSortFields);
+ GenericRowFileRecordReader recordReader = fileReader.getRecordReader();
+ for (int startRowId = 0; startRowId < numRows; startRowId += maxNumRecordsPerSegment, sequenceId++) {
+ int endRowId = Math.min(startRowId + maxNumRecordsPerSegment, numRows);
+ LOGGER.info("Start creating segment of sequenceId: {} with row range: {} to {}", sequenceId, startRowId,
+ endRowId);
+ generatorConfig.setSequenceId(sequenceId);
+ GenericRowFileRecordReader recordReaderForRange = recordReader.getRecordReaderForRange(startRowId, endRowId);
+ SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
+ driver.init(generatorConfig, new RecordReaderSegmentCreationDataSource(recordReaderForRange),
+ passThroughTransformer, null);
+ driver.build();
+ }
+ fileManager.cleanUp();
}
- LOGGER.info("Successfully converted segments from: {} to {}", _inputSegmentsDir,
+ LOGGER.info("Successfully converted segments from: {} to {}", Arrays.toString(_inputSegmentsDir.list()),
Arrays.toString(_outputSegmentsDir.list()));
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentReducer.java
deleted file mode 100644
index 26a2be7..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentReducer.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.segment.processing.framework;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Iterator;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.pinot.core.segment.processing.collector.Collector;
-import org.apache.pinot.core.segment.processing.collector.CollectorFactory;
-import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileManager;
-import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileReader;
-import org.apache.pinot.core.util.SegmentProcessorAvroUtils;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Reducer phase of the SegmentProcessorFramework
- * Reads the avro files in the input directory and creates output avro files in the reducer output directory.
- * The avro files in the input directory are expected to contain data for only 1 partition
- * Performs operations on that partition data as follows:
- * - concatenation/rollup of records
- * - split
- * - TODO: dedup
- */
-public class SegmentReducer {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(SegmentReducer.class);
-
- private final String _reducerId;
- private final GenericRowFileManager _fileManager;
- private final File _reducerOutputDir;
-
- private final Schema _pinotSchema;
- private final org.apache.avro.Schema _avroSchema;
- private final Collector _collector;
- private final int _numRecordsPerPart;
-
- public SegmentReducer(String reducerId, GenericRowFileManager fileManager, SegmentReducerConfig reducerConfig,
- File reducerOutputDir) {
- _reducerId = reducerId;
- _fileManager = fileManager;
- _reducerOutputDir = reducerOutputDir;
-
- _pinotSchema = reducerConfig.getPinotSchema();
- _avroSchema = SegmentProcessorAvroUtils.convertPinotSchemaToAvroSchema(_pinotSchema);
- _collector = CollectorFactory.getCollector(reducerConfig.getCollectorConfig(), _pinotSchema);
- _numRecordsPerPart = reducerConfig.getNumRecordsPerPart();
- LOGGER.info("Initialized reducer with id: {}, output dir: {}, collector: {}, numRecordsPerPart: {}", _reducerId,
- _reducerOutputDir, _collector.getClass(), _numRecordsPerPart);
- }
-
- /**
- * Reads the avro files in the input directory.
- * Performs configured operations and outputs to other avro file(s) in the reducer output directory.
- */
- public void reduce()
- throws Exception {
- int part = 0;
- GenericRowFileReader fileReader = _fileManager.getFileReader();
- int numRows = fileReader.getNumRows();
- for (int i = 0; i < numRows; i++) {
- GenericRow next = fileReader.read(i, new GenericRow());
-
- // Aggregations
- _collector.collect(next);
-
- // Reached max records per part file. Flush
- if (_collector.size() == _numRecordsPerPart) {
- flushRecords(_collector, createReducerOutputFileName(_reducerId, part++));
- _collector.reset();
- }
- }
- _fileManager.closeFileReader();
- if (_collector.size() > 0) {
- flushRecords(_collector, createReducerOutputFileName(_reducerId, part));
- _collector.reset();
- }
- }
-
- /**
- * Flushes all records from the collector into a part files in the reducer output directory
- */
- private void flushRecords(Collector collector, String fileName)
- throws IOException {
- GenericData.Record reusableRecord = new GenericData.Record(_avroSchema);
- Iterator<GenericRow> collectionIt = collector.iterator();
- DataFileWriter<GenericData.Record> recordWriter = new DataFileWriter<>(new GenericDatumWriter<>(_avroSchema));
- recordWriter.create(_avroSchema, new File(_reducerOutputDir, fileName));
- while (collectionIt.hasNext()) {
- SegmentProcessorAvroUtils.convertGenericRowToAvroRecord(collectionIt.next(), reusableRecord);
- recordWriter.append(reusableRecord);
- }
- recordWriter.close();
- }
-
- public static String createReducerOutputFileName(String reducerId, int part) {
- return "reducer_" + reducerId + "_" + part + ".avro";
- }
-
- /**
- * Cleans up reducer state
- */
- public void cleanup()
- throws IOException {
- _collector.close();
- }
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentReducerConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentReducerConfig.java
deleted file mode 100644
index d402003..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentReducerConfig.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.segment.processing.framework;
-
-import org.apache.pinot.core.segment.processing.collector.CollectorConfig;
-import org.apache.pinot.spi.data.Schema;
-
-
-/**
- * Config for the reducer phase of SegmentProcessorFramework
- */
-public class SegmentReducerConfig {
-
- private final Schema _pinotSchema;
- private final int _numRecordsPerPart;
- private final CollectorConfig _collectorConfig;
-
- public SegmentReducerConfig(Schema pinotSchema, CollectorConfig collectorConfig,
- int numRecordsPerPart) {
- _pinotSchema = pinotSchema;
- _numRecordsPerPart = numRecordsPerPart;
- _collectorConfig = collectorConfig;
- }
-
- /**
- * The Pinot schema
- */
- public Schema getPinotSchema() {
- return _pinotSchema;
- }
-
- /**
- * The number of records that a reducer should put in a single part file. This will directly control number of records per segment
- */
- public int getNumRecordsPerPart() {
- return _numRecordsPerPart;
- }
-
- /**
- * The CollectorConfig for the reducer
- */
- public CollectorConfig getCollectorConfig() {
- return _collectorConfig;
- }
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowDeserializer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowDeserializer.java
index caee49b..fe52766 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowDeserializer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowDeserializer.java
@@ -18,12 +18,12 @@
*/
package org.apache.pinot.core.segment.processing.genericrow;
-import com.google.common.base.Preconditions;
import java.util.List;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.ByteArray;
import org.apache.pinot.spi.utils.StringUtils;
@@ -58,32 +58,26 @@ public class GenericRowDeserializer {
/**
* Deserializes the {@link GenericRow} at the given offset.
*/
- public GenericRow deserialize(long offset, GenericRow reuse) {
- reuse.clear();
-
+ public void deserialize(long offset, GenericRow buffer) {
for (int i = 0; i < _numFields; i++) {
String fieldName = _fieldNames[i];
if (_isSingleValueFields[i]) {
switch (_storedTypes[i]) {
case INT:
- int intValue = _dataBuffer.getInt(offset);
- reuse.putValue(fieldName, intValue);
+ buffer.putValue(fieldName, _dataBuffer.getInt(offset));
offset += Integer.BYTES;
break;
case LONG:
- long longValue = _dataBuffer.getLong(offset);
- reuse.putValue(fieldName, longValue);
+ buffer.putValue(fieldName, _dataBuffer.getLong(offset));
offset += Long.BYTES;
break;
case FLOAT:
- float floatValue = _dataBuffer.getFloat(offset);
- reuse.putValue(fieldName, floatValue);
+ buffer.putValue(fieldName, _dataBuffer.getFloat(offset));
offset += Float.BYTES;
break;
case DOUBLE:
- double doubleValue = _dataBuffer.getDouble(offset);
- reuse.putValue(fieldName, doubleValue);
+ buffer.putValue(fieldName, _dataBuffer.getDouble(offset));
offset += Double.BYTES;
break;
case STRING: {
@@ -92,7 +86,7 @@ public class GenericRowDeserializer {
byte[] stringBytes = new byte[numBytes];
_dataBuffer.copyTo(offset, stringBytes);
offset += numBytes;
- reuse.putValue(fieldName, StringUtils.decodeUtf8(stringBytes));
+ buffer.putValue(fieldName, StringUtils.decodeUtf8(stringBytes));
break;
}
case BYTES: {
@@ -101,7 +95,7 @@ public class GenericRowDeserializer {
byte[] bytes = new byte[numBytes];
_dataBuffer.copyTo(offset, bytes);
offset += numBytes;
- reuse.putValue(fieldName, bytes);
+ buffer.putValue(fieldName, bytes);
break;
}
default:
@@ -151,7 +145,7 @@ public class GenericRowDeserializer {
throw new IllegalStateException("Unsupported MV stored type: " + _storedTypes[i]);
}
- reuse.putValue(fieldName, multiValue);
+ buffer.putValue(fieldName, multiValue);
}
}
@@ -160,64 +154,165 @@ public class GenericRowDeserializer {
int numNullFields = _dataBuffer.getInt(offset);
offset += Integer.BYTES;
for (int i = 0; i < numNullFields; i++) {
- reuse.addNullValueField(_fieldNames[_dataBuffer.getInt(offset)]);
+ buffer.addNullValueField(_fieldNames[_dataBuffer.getInt(offset)]);
offset += Integer.BYTES;
}
}
-
- return reuse;
}
/**
- * Deserializes the first several fields at the given offset. This method can be used to sort the generic rows without
- * fully deserialize the whole row for each comparison. The selected fields should all be single-valued.
+ * Compares the rows at the given offsets.
*/
- public Object[] partialDeserialize(long offset, int numFields) {
- Object[] values = new Object[numFields];
-
- for (int i = 0; i < numFields; i++) {
- Preconditions.checkState(_isSingleValueFields[i], "Partial deserialize should not be applied to MV column: %s",
- _fieldNames[i]);
- switch (_storedTypes[i]) {
- case INT:
- values[i] = _dataBuffer.getInt(offset);
- offset += Integer.BYTES;
- break;
- case LONG:
- values[i] = _dataBuffer.getLong(offset);
- offset += Long.BYTES;
- break;
- case FLOAT:
- values[i] = _dataBuffer.getFloat(offset);
- offset += Float.BYTES;
- break;
- case DOUBLE:
- values[i] = _dataBuffer.getDouble(offset);
- offset += Double.BYTES;
- break;
- case STRING: {
- int numBytes = _dataBuffer.getInt(offset);
- offset += Integer.BYTES;
- byte[] stringBytes = new byte[numBytes];
- _dataBuffer.copyTo(offset, stringBytes);
- offset += numBytes;
- values[i] = StringUtils.decodeUtf8(stringBytes);
- break;
+ public int compare(long offset1, long offset2, int numFieldsToCompare) {
+ for (int i = 0; i < numFieldsToCompare; i++) {
+ if (_isSingleValueFields[i]) {
+ switch (_storedTypes[i]) {
+ case INT: {
+ int result = Integer.compare(_dataBuffer.getInt(offset1), _dataBuffer.getInt(offset2));
+ if (result != 0) {
+ return result;
+ }
+ offset1 += Integer.BYTES;
+ offset2 += Integer.BYTES;
+ break;
+ }
+ case LONG: {
+ int result = Long.compare(_dataBuffer.getLong(offset1), _dataBuffer.getLong(offset2));
+ if (result != 0) {
+ return result;
+ }
+ offset1 += Long.BYTES;
+ offset2 += Long.BYTES;
+ break;
+ }
+ case FLOAT: {
+ int result = Float.compare(_dataBuffer.getFloat(offset1), _dataBuffer.getFloat(offset2));
+ if (result != 0) {
+ return result;
+ }
+ offset1 += Float.BYTES;
+ offset2 += Float.BYTES;
+ break;
+ }
+ case DOUBLE: {
+ int result = Double.compare(_dataBuffer.getDouble(offset1), _dataBuffer.getDouble(offset2));
+ if (result != 0) {
+ return result;
+ }
+ offset1 += Double.BYTES;
+ offset2 += Double.BYTES;
+ break;
+ }
+ case STRING: {
+ int numBytes1 = _dataBuffer.getInt(offset1);
+ offset1 += Integer.BYTES;
+ byte[] stringBytes1 = new byte[numBytes1];
+ _dataBuffer.copyTo(offset1, stringBytes1);
+ int numBytes2 = _dataBuffer.getInt(offset2);
+ offset2 += Integer.BYTES;
+ byte[] stringBytes2 = new byte[numBytes2];
+ _dataBuffer.copyTo(offset2, stringBytes2);
+ int result = StringUtils.decodeUtf8(stringBytes1).compareTo(StringUtils.decodeUtf8(stringBytes2));
+ if (result != 0) {
+ return result;
+ }
+ offset1 += numBytes1;
+ offset2 += numBytes2;
+ break;
+ }
+ case BYTES: {
+ int numBytes1 = _dataBuffer.getInt(offset1);
+ offset1 += Integer.BYTES;
+ byte[] bytes1 = new byte[numBytes1];
+ _dataBuffer.copyTo(offset1, bytes1);
+ int numBytes2 = _dataBuffer.getInt(offset2);
+ offset2 += Integer.BYTES;
+ byte[] bytes2 = new byte[numBytes2];
+ _dataBuffer.copyTo(offset2, bytes2);
+ int result = ByteArray.compare(bytes1, bytes2);
+ if (result != 0) {
+ return result;
+ }
+ offset1 += numBytes1;
+ offset2 += numBytes2;
+ break;
+ }
+ default:
+ throw new IllegalStateException("Unsupported SV stored type: " + _storedTypes[i]);
}
- case BYTES: {
- int numBytes = _dataBuffer.getInt(offset);
- offset += Integer.BYTES;
- byte[] bytes = new byte[numBytes];
- _dataBuffer.copyTo(offset, bytes);
- offset += numBytes;
- values[i] = bytes;
- break;
+ } else {
+ int numValues = _dataBuffer.getInt(offset1);
+ int numValues2 = _dataBuffer.getInt(offset2);
+ if (numValues != numValues2) {
+ return Integer.compare(numValues, numValues2);
+ }
+ offset1 += Integer.BYTES;
+ offset2 += Integer.BYTES;
+
+ switch (_storedTypes[i]) {
+ case INT:
+ for (int j = 0; j < numValues; j++) {
+ int result = Integer.compare(_dataBuffer.getInt(offset1), _dataBuffer.getInt(offset2));
+ if (result != 0) {
+ return result;
+ }
+ offset1 += Integer.BYTES;
+ offset2 += Integer.BYTES;
+ }
+ break;
+ case LONG:
+ for (int j = 0; j < numValues; j++) {
+ int result = Long.compare(_dataBuffer.getLong(offset1), _dataBuffer.getLong(offset2));
+ if (result != 0) {
+ return result;
+ }
+ offset1 += Long.BYTES;
+ offset2 += Long.BYTES;
+ }
+ break;
+ case FLOAT:
+ for (int j = 0; j < numValues; j++) {
+ int result = Float.compare(_dataBuffer.getFloat(offset1), _dataBuffer.getFloat(offset2));
+ if (result != 0) {
+ return result;
+ }
+ offset1 += Float.BYTES;
+ offset2 += Float.BYTES;
+ }
+ break;
+ case DOUBLE:
+ for (int j = 0; j < numValues; j++) {
+ int result = Double.compare(_dataBuffer.getDouble(offset1), _dataBuffer.getDouble(offset2));
+ if (result != 0) {
+ return result;
+ }
+ offset1 += Double.BYTES;
+ offset2 += Double.BYTES;
+ }
+ break;
+ case STRING:
+ for (int j = 0; j < numValues; j++) {
+ int numBytes1 = _dataBuffer.getInt(offset1);
+ offset1 += Integer.BYTES;
+ byte[] stringBytes1 = new byte[numBytes1];
+ _dataBuffer.copyTo(offset1, stringBytes1);
+ int numBytes2 = _dataBuffer.getInt(offset2);
+ offset2 += Integer.BYTES;
+ byte[] stringBytes2 = new byte[numBytes2];
+ _dataBuffer.copyTo(offset2, stringBytes2);
+ int result = StringUtils.decodeUtf8(stringBytes1).compareTo(StringUtils.decodeUtf8(stringBytes2));
+ if (result != 0) {
+ return result;
+ }
+ offset1 += numBytes1;
+ offset2 += numBytes2;
+ }
+ break;
+ default:
+ throw new IllegalStateException("Unsupported MV stored type: " + _storedTypes[i]);
}
- default:
- throw new IllegalStateException("Unsupported SV stored type: " + _storedTypes[i]);
}
}
-
- return values;
+ return 0;
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowFileManager.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowFileManager.java
index 9fd9401..605813d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowFileManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowFileManager.java
@@ -37,15 +37,39 @@ public class GenericRowFileManager {
private final File _dataFile;
private final List<FieldSpec> _fieldSpecs;
private final boolean _includeNullFields;
+ private final int _numSortFields;
private GenericRowFileWriter _fileWriter;
private GenericRowFileReader _fileReader;
- public GenericRowFileManager(File outputDir, List<FieldSpec> fieldSpecs, boolean includeNullFields) {
+ public GenericRowFileManager(File outputDir, List<FieldSpec> fieldSpecs, boolean includeNullFields,
+ int numSortFields) {
_offsetFile = new File(outputDir, OFFSET_FILE_NAME);
_dataFile = new File(outputDir, DATA_FILE_NAME);
_fieldSpecs = fieldSpecs;
_includeNullFields = includeNullFields;
+ _numSortFields = numSortFields;
+ }
+
+ /**
+ * Returns the field specs for the files.
+ */
+ public List<FieldSpec> getFieldSpecs() {
+ return _fieldSpecs;
+ }
+
+ /**
+ * Returns {@code true} if the file contains null fields, {@code false} otherwise.
+ */
+ public boolean isIncludeNullFields() {
+ return _includeNullFields;
+ }
+
+ /**
+ * Returns the number of sort fields.
+ */
+ public int getNumSortFields() {
+ return _numSortFields;
}
/**
@@ -80,7 +104,7 @@ public class GenericRowFileManager {
if (_fileReader == null) {
Preconditions.checkState(_offsetFile.exists(), "Record offset file: %s does not exist", _offsetFile);
Preconditions.checkState(_dataFile.exists(), "Record data file: %s does not exist", _dataFile);
- _fileReader = new GenericRowFileReader(_offsetFile, _dataFile, _fieldSpecs, _includeNullFields);
+ _fileReader = new GenericRowFileReader(_offsetFile, _dataFile, _fieldSpecs, _includeNullFields, _numSortFields);
}
return _fileReader;
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowFileReader.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowFileReader.java
index 13102aa..d0328a6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowFileReader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowFileReader.java
@@ -36,8 +36,10 @@ public class GenericRowFileReader implements Closeable {
private final PinotDataBuffer _offsetBuffer;
private final PinotDataBuffer _dataBuffer;
private final GenericRowDeserializer _deserializer;
+ private final int _numSortFields;
- public GenericRowFileReader(File offsetFile, File dataFile, List<FieldSpec> fieldSpecs, boolean includeNullFields)
+ public GenericRowFileReader(File offsetFile, File dataFile, List<FieldSpec> fieldSpecs, boolean includeNullFields,
+ int numSortFields)
throws IOException {
long offsetFileLength = offsetFile.length();
_numRows = (int) (offsetFileLength >>> 3); // offsetFileLength / Long.BYTES
@@ -46,6 +48,7 @@ public class GenericRowFileReader implements Closeable {
_dataBuffer = PinotDataBuffer
.mapFile(dataFile, true, 0L, dataFile.length(), PinotDataBuffer.NATIVE_ORDER, "GenericRow data buffer");
_deserializer = new GenericRowDeserializer(_dataBuffer, fieldSpecs, includeNullFields);
+ _numSortFields = numSortFields;
}
/**
@@ -56,19 +59,34 @@ public class GenericRowFileReader implements Closeable {
}
/**
- * Reads the data of the given row id into the given reusable row.
+ * Returns the number of sort fields.
*/
- public GenericRow read(int rowId, GenericRow reuse) {
+ public int getNumSortFields() {
+ return _numSortFields;
+ }
+
+ /**
+ * Reads the data of the given row id into the given buffer row.
+ */
+ public void read(int rowId, GenericRow buffer) {
long offset = _offsetBuffer.getLong((long) rowId << 3); // rowId * Long.BYTES
- return _deserializer.deserialize(offset, reuse);
+ _deserializer.deserialize(offset, buffer);
+ }
+
+ /**
+ * Compares the rows at the given row ids. Only compare the values for the sort fields.
+ */
+ public int compare(int rowId1, int rowId2) {
+ long offset1 = _offsetBuffer.getLong((long) rowId1 << 3); // rowId1 * Long.BYTES
+ long offset2 = _offsetBuffer.getLong((long) rowId2 << 3); // rowId2 * Long.BYTES
+ return _deserializer.compare(offset1, offset2, _numSortFields);
}
/**
- * Reads the first several fields of the given row id.
+ * Returns a record reader for the rows within the file. Records are sorted if sort order is configured.
*/
- public Object[] partialRead(int rowId, int numFields) {
- long offset = _offsetBuffer.getLong((long) rowId << 3);
- return _deserializer.partialDeserialize(offset, numFields);
+ public GenericRowFileRecordReader getRecordReader() {
+ return new GenericRowFileRecordReader(this);
}
@Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowFileRecordReader.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowFileRecordReader.java
new file mode 100644
index 0000000..f7ecd0f
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowFileRecordReader.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.genericrow;
+
+import it.unimi.dsi.fastutil.Arrays;
+import java.io.File;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderConfig;
+
+
+/**
+ * Record reader for the GenericRow file.
+ */
+public class GenericRowFileRecordReader implements RecordReader {
+ private final GenericRowFileReader _fileReader;
+ private final int _startRowId;
+ private final int _endRowId;
+ private final int[] _sortedRowIds;
+
+ private int _nextRowId;
+
+ public GenericRowFileRecordReader(GenericRowFileReader fileReader) {
+ _fileReader = fileReader;
+ int numRows = fileReader.getNumRows();
+ _startRowId = 0;
+ _endRowId = numRows;
+ if (fileReader.getNumSortFields() > 0) {
+ _sortedRowIds = new int[numRows];
+ for (int i = 0; i < numRows; i++) {
+ _sortedRowIds[i] = i;
+ }
+ Arrays
+ .quickSort(0, _endRowId, (i1, i2) -> _fileReader.compare(_sortedRowIds[i1], _sortedRowIds[i2]), (i1, i2) -> {
+ int temp = _sortedRowIds[i1];
+ _sortedRowIds[i1] = _sortedRowIds[i2];
+ _sortedRowIds[i2] = temp;
+ });
+ } else {
+ _sortedRowIds = null;
+ }
+ }
+
+ private GenericRowFileRecordReader(GenericRowFileReader fileReader, int startRowId, int endRowId,
+ @Nullable int[] sortedRowIds) {
+ _fileReader = fileReader;
+ _startRowId = startRowId;
+ _endRowId = endRowId;
+ _sortedRowIds = sortedRowIds;
+
+ _nextRowId = startRowId;
+ }
+
+ /**
+ * Returns a record reader for the given row id range.
+ */
+ public GenericRowFileRecordReader getRecordReaderForRange(int startRowId, int endRowId) {
+ return new GenericRowFileRecordReader(_fileReader, startRowId, endRowId, _sortedRowIds);
+ }
+
+ /**
+ * Reads the data of the given row id into the given buffer row.
+ */
+ public void read(int rowId, GenericRow buffer) {
+ if (_sortedRowIds != null) {
+ rowId = _sortedRowIds[rowId];
+ }
+ _fileReader.read(rowId, buffer);
+ }
+
+ /**
+ * Compares the records at the given row ids.
+ */
+ public int compare(int rowId1, int rowId2) {
+ assert _sortedRowIds != null;
+ return _fileReader.compare(_sortedRowIds[rowId1], _sortedRowIds[rowId2]);
+ }
+
+ @Override
+ public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return _nextRowId < _endRowId;
+ }
+
+ @Override
+ public GenericRow next() {
+ return next(new GenericRow());
+ }
+
+ @Override
+ public GenericRow next(GenericRow reuse) {
+ int rowId = _sortedRowIds != null ? _sortedRowIds[_nextRowId++] : _nextRowId++;
+ _fileReader.read(rowId, reuse);
+ return reuse;
+ }
+
+ @Override
+ public void rewind() {
+ _nextRowId = _startRowId;
+ }
+
+ @Override
+ public void close() {
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowSerializer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowSerializer.java
index 7549670..b57893b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowSerializer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowSerializer.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.segment.processing.genericrow;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -149,6 +150,7 @@ public class GenericRowSerializer {
_nullFieldIndexes[numNullFields++] = nullFieldIndex;
}
}
+ Arrays.sort(_nullFieldIndexes, 0, numNullFields);
numBytes += Integer.BYTES * (1 + numNullFields);
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapper.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
similarity index 86%
rename from pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapper.java
rename to pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
index c16479c..b9430d7 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapper.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
@@ -16,27 +16,28 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.core.segment.processing.framework;
+package org.apache.pinot.core.segment.processing.mapper;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
import java.util.stream.Collectors;
-import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.utils.StringUtil;
import org.apache.pinot.core.segment.processing.filter.RecordFilter;
import org.apache.pinot.core.segment.processing.filter.RecordFilterFactory;
+import org.apache.pinot.core.segment.processing.framework.SegmentProcessorConfig;
import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileManager;
import org.apache.pinot.core.segment.processing.partitioner.PartitionerConfig;
import org.apache.pinot.core.segment.processing.partitioner.PartitionerFactory;
import org.apache.pinot.core.segment.processing.transformer.RecordTransformer;
import org.apache.pinot.core.segment.processing.transformer.RecordTransformerFactory;
-import org.apache.pinot.core.segment.processing.utils.SegmentProcessingUtils;
+import org.apache.pinot.core.segment.processing.utils.SegmentProcessorUtils;
import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
import org.apache.pinot.segment.local.recordtransformer.DataTypeTransformer;
import org.apache.pinot.segment.local.utils.IngestionUtils;
@@ -66,6 +67,7 @@ public class SegmentMapper {
private final List<FieldSpec> _fieldSpecs;
private final boolean _includeNullFields;
+ private final int _numSortFields;
// TODO: Merge the following transformers into one. Currently we need an extra DataTypeTransformer in the end in case
// _recordTransformer changes the data type.
@@ -76,26 +78,25 @@ public class SegmentMapper {
private final Partitioner[] _partitioners;
private final String[] _partitionsBuffer;
- private final Map<String, GenericRowFileManager> _partitionToFileManagerMap = new HashMap<>();
+ // NOTE: Use TreeMap so that the order is deterministic
+ private final Map<String, GenericRowFileManager> _partitionToFileManagerMap = new TreeMap<>();
- public SegmentMapper(List<RecordReader> recordReaders, SegmentMapperConfig mapperConfig, File mapperOutputDir) {
+ public SegmentMapper(List<RecordReader> recordReaders, SegmentProcessorConfig processorConfig, File mapperOutputDir) {
_recordReaders = recordReaders;
_mapperOutputDir = mapperOutputDir;
- TableConfig tableConfig = mapperConfig.getTableConfig();
- Schema schema = mapperConfig.getSchema();
- List<String> sortOrder = tableConfig.getIndexingConfig().getSortedColumn();
- if (CollectionUtils.isNotEmpty(sortOrder)) {
- _fieldSpecs = SegmentProcessingUtils.getFieldSpecs(schema, sortOrder);
- } else {
- _fieldSpecs = SegmentProcessingUtils.getFieldSpecs(schema);
- }
+ TableConfig tableConfig = processorConfig.getTableConfig();
+ Schema schema = processorConfig.getSchema();
+ Pair<List<FieldSpec>, Integer> pair = SegmentProcessorUtils
+ .getFieldSpecs(schema, processorConfig.getMergeType(), tableConfig.getIndexingConfig().getSortedColumn());
+ _fieldSpecs = pair.getLeft();
+ _numSortFields = pair.getRight();
_includeNullFields = tableConfig.getIndexingConfig().isNullHandlingEnabled();
_defaultRecordTransformer = CompositeTransformer.getDefaultTransformer(tableConfig, schema);
- _recordFilter = RecordFilterFactory.getRecordFilter(mapperConfig.getRecordFilterConfig());
- _recordTransformer = RecordTransformerFactory.getRecordTransformer(mapperConfig.getRecordTransformerConfig());
+ _recordFilter = RecordFilterFactory.getRecordFilter(processorConfig.getRecordFilterConfig());
+ _recordTransformer = RecordTransformerFactory.getRecordTransformer(processorConfig.getRecordTransformerConfig());
_dataTypeTransformer = new DataTypeTransformer(schema);
- List<PartitionerConfig> partitionerConfigs = mapperConfig.getPartitionerConfigs();
+ List<PartitionerConfig> partitionerConfigs = processorConfig.getPartitionerConfigs();
int numPartitioners = partitionerConfigs.size();
_partitioners = new Partitioner[numPartitioners];
_partitionsBuffer = new String[numPartitioners];
@@ -166,7 +167,7 @@ public class SegmentMapper {
if (fileManager == null) {
File partitionOutputDir = new File(_mapperOutputDir, partition);
FileUtils.forceMkdir(partitionOutputDir);
- fileManager = new GenericRowFileManager(partitionOutputDir, _fieldSpecs, _includeNullFields);
+ fileManager = new GenericRowFileManager(partitionOutputDir, _fieldSpecs, _includeNullFields, _numSortFields);
_partitionToFileManagerMap.put(partition, fileManager);
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/ValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/ConcatReducer.java
similarity index 63%
copy from pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/ValueAggregator.java
copy to pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/ConcatReducer.java
index 80fc99b..41ec90b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/ValueAggregator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/ConcatReducer.java
@@ -16,16 +16,24 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.core.segment.processing.collector;
+package org.apache.pinot.core.segment.processing.reducer;
+
+import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileManager;
+
/**
- * Interface for value aggregator
+ * ConcatReducer is a no-op reducer.
*/
-public interface ValueAggregator {
+public class ConcatReducer implements Reducer {
+ private final GenericRowFileManager _fileManager;
+
+ public ConcatReducer(GenericRowFileManager fileManager) {
+ _fileManager = fileManager;
+ }
- /**
- * Given two values, return the aggregated value
- * @return aggregated value given two column values
- */
- Object aggregate(Object value1, Object value2);
+ @Override
+ public GenericRowFileManager reduce()
+ throws Exception {
+ return _fileManager;
+ }
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/DedupReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/DedupReducer.java
new file mode 100644
index 0000000..33bdd8b
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/DedupReducer.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.reducer;
+
+import java.io.File;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileManager;
+import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileReader;
+import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileRecordReader;
+import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileWriter;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * DedupReducer deduplicates the GenericRows with the same values.
+ */
+public class DedupReducer implements Reducer {
+ private static final Logger LOGGER = LoggerFactory.getLogger(DedupReducer.class);
+
+ private final String _partitionId;
+ private final GenericRowFileManager _fileManager;
+ private final File _reducerOutputDir;
+
+ public DedupReducer(String partitionId, GenericRowFileManager fileManager, File reducerOutputDir) {
+ _partitionId = partitionId;
+ _fileManager = fileManager;
+ _reducerOutputDir = reducerOutputDir;
+ }
+
+ @Override
+ public GenericRowFileManager reduce()
+ throws Exception {
+ LOGGER.info("Start reducing on partition: {}", _partitionId);
+ long reduceStartTimeMs = System.currentTimeMillis();
+
+ GenericRowFileReader fileReader = _fileManager.getFileReader();
+ int numRows = fileReader.getNumRows();
+ int numSortFields = fileReader.getNumSortFields();
+ LOGGER.info("Start sorting on numRows: {}, numSortFields: {}", numRows, numSortFields);
+ long sortStartTimeMs = System.currentTimeMillis();
+ GenericRowFileRecordReader recordReader = fileReader.getRecordReader();
+ LOGGER.info("Finish sorting in {}ms", System.currentTimeMillis() - sortStartTimeMs);
+
+ File partitionOutputDir = new File(_reducerOutputDir, _partitionId);
+ FileUtils.forceMkdir(partitionOutputDir);
+ LOGGER.info("Start creating dedup file under dir: {}", partitionOutputDir);
+ long dedupFileCreationStartTimeMs = System.currentTimeMillis();
+ GenericRowFileManager dedupFileManager =
+ new GenericRowFileManager(partitionOutputDir, _fileManager.getFieldSpecs(), _fileManager.isIncludeNullFields(),
+ 0);
+ GenericRowFileWriter dedupFileWriter = dedupFileManager.getFileWriter();
+ GenericRow previousRow = new GenericRow();
+ recordReader.read(0, previousRow);
+ int previousRowId = 0;
+ dedupFileWriter.write(previousRow);
+ for (int i = 1; i < numRows; i++) {
+ if (recordReader.compare(previousRowId, i) != 0) {
+ previousRow.clear();
+ recordReader.read(i, previousRow);
+ previousRowId = i;
+ dedupFileWriter.write(previousRow);
+ }
+ }
+ dedupFileManager.closeFileWriter();
+ LOGGER.info("Finish creating dedup file in {}ms", System.currentTimeMillis() - dedupFileCreationStartTimeMs);
+
+ _fileManager.cleanUp();
+ LOGGER.info("Finish reducing in {}ms", System.currentTimeMillis() - reduceStartTimeMs);
+ return dedupFileManager;
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/ValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/Reducer.java
similarity index 67%
rename from pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/ValueAggregator.java
rename to pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/Reducer.java
index 80fc99b..bd94846 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/ValueAggregator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/Reducer.java
@@ -16,16 +16,19 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.core.segment.processing.collector;
+package org.apache.pinot.core.segment.processing.reducer;
+
+import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileManager;
+
/**
- * Interface for value aggregator
+ * Reducer reduces the GenericRows based on the strategy.
*/
-public interface ValueAggregator {
+public interface Reducer {
/**
- * Given two values, return the aggregated value
- * @return aggregated value given two column values
+ * Reduces the GenericRows based on the strategy, returns the file manager that contains the reduced rows.
*/
- Object aggregate(Object value1, Object value2);
+ GenericRowFileManager reduce()
+ throws Exception;
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/ReducerFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/ReducerFactory.java
new file mode 100644
index 0000000..a205500
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/ReducerFactory.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.reducer;
+
+import java.io.File;
+import org.apache.pinot.core.segment.processing.framework.MergeType;
+import org.apache.pinot.core.segment.processing.framework.SegmentProcessorConfig;
+import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileManager;
+
+
+/**
+ * Factory for Reducer.
+ */
+public class ReducerFactory {
+ private ReducerFactory() {
+ }
+
+ public static Reducer getReducer(String partitionId, GenericRowFileManager fileManager,
+ SegmentProcessorConfig processorConfig, File reducerOutputDir) {
+ MergeType mergeType = processorConfig.getMergeType();
+ switch (mergeType) {
+ case CONCAT:
+ return new ConcatReducer(fileManager);
+ case ROLLUP:
+ return new RollupReducer(partitionId, fileManager, processorConfig.getAggregationTypes(), reducerOutputDir);
+ case DEDUP:
+ return new DedupReducer(partitionId, fileManager, reducerOutputDir);
+ default:
+ throw new IllegalStateException("Unsupported merge type: " + mergeType);
+ }
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/RollupReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/RollupReducer.java
new file mode 100644
index 0000000..5b11c46
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/RollupReducer.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.reducer;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.segment.processing.aggregator.ValueAggregator;
+import org.apache.pinot.core.segment.processing.aggregator.ValueAggregatorFactory;
+import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileManager;
+import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileReader;
+import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileRecordReader;
+import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileWriter;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.FieldType;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * RollupReducer aggregates the metric values for GenericRows with the same dimension + time values.
+ */
+public class RollupReducer implements Reducer {
+ private static final Logger LOGGER = LoggerFactory.getLogger(RollupReducer.class);
+ private static final AggregationFunctionType DEFAULT_AGGREGATOR_TYPE = AggregationFunctionType.SUM;
+
+ private final String _partitionId;
+ private final GenericRowFileManager _fileManager;
+ private final Map<String, AggregationFunctionType> _aggregationTypes;
+ private final File _reducerOutputDir;
+
+ public RollupReducer(String partitionId, GenericRowFileManager fileManager,
+ Map<String, AggregationFunctionType> aggregationTypes, File reducerOutputDir) {
+ _partitionId = partitionId;
+ _fileManager = fileManager;
+ _aggregationTypes = aggregationTypes;
+ _reducerOutputDir = reducerOutputDir;
+ }
+
+ @Override
+ public GenericRowFileManager reduce()
+ throws Exception {
+ LOGGER.info("Start reducing on partition: {}", _partitionId);
+ long reduceStartTimeMs = System.currentTimeMillis();
+
+ GenericRowFileReader fileReader = _fileManager.getFileReader();
+ int numRows = fileReader.getNumRows();
+ int numSortFields = fileReader.getNumSortFields();
+ LOGGER.info("Start sorting on numRows: {}, numSortFields: {}", numRows, numSortFields);
+ long sortStartTimeMs = System.currentTimeMillis();
+ GenericRowFileRecordReader recordReader = fileReader.getRecordReader();
+ LOGGER.info("Finish sorting in {}ms", System.currentTimeMillis() - sortStartTimeMs);
+
+ List<FieldSpec> fieldSpecs = _fileManager.getFieldSpecs();
+ boolean includeNullFields = _fileManager.isIncludeNullFields();
+ List<AggregatorContext> aggregatorContextList = new ArrayList<>();
+ for (FieldSpec fieldSpec : fieldSpecs) {
+ if (fieldSpec.getFieldType() == FieldType.METRIC) {
+ aggregatorContextList.add(new AggregatorContext(fieldSpec,
+ _aggregationTypes.getOrDefault(fieldSpec.getName(), DEFAULT_AGGREGATOR_TYPE)));
+ }
+ }
+
+ File partitionOutputDir = new File(_reducerOutputDir, _partitionId);
+ FileUtils.forceMkdir(partitionOutputDir);
+ LOGGER.info("Start creating rollup file under dir: {}", partitionOutputDir);
+ long rollupFileCreationStartTimeMs = System.currentTimeMillis();
+ GenericRowFileManager rollupFileManager =
+ new GenericRowFileManager(partitionOutputDir, fieldSpecs, includeNullFields, 0);
+ GenericRowFileWriter rollupFileWriter = rollupFileManager.getFileWriter();
+ GenericRow previousRow = new GenericRow();
+ recordReader.read(0, previousRow);
+ int previousRowId = 0;
+ GenericRow buffer = new GenericRow();
+ if (includeNullFields) {
+ for (int i = 1; i < numRows; i++) {
+ buffer.clear();
+ recordReader.read(i, buffer);
+ if (recordReader.compare(previousRowId, i) == 0) {
+ aggregateWithNullFields(previousRow, buffer, aggregatorContextList);
+ } else {
+ rollupFileWriter.write(previousRow);
+ previousRowId = i;
+ GenericRow temp = previousRow;
+ previousRow = buffer;
+ buffer = temp;
+ }
+ }
+ } else {
+ for (int i = 1; i < numRows; i++) {
+ buffer.clear();
+ recordReader.read(i, buffer);
+ if (recordReader.compare(previousRowId, i) == 0) {
+ aggregateWithoutNullFields(previousRow, buffer, aggregatorContextList);
+ } else {
+ rollupFileWriter.write(previousRow);
+ previousRowId = i;
+ GenericRow temp = previousRow;
+ previousRow = buffer;
+ buffer = temp;
+ }
+ }
+ }
+ rollupFileWriter.write(previousRow);
+ rollupFileManager.closeFileWriter();
+ LOGGER.info("Finish creating rollup file in {}ms", System.currentTimeMillis() - rollupFileCreationStartTimeMs);
+
+ _fileManager.cleanUp();
+ LOGGER.info("Finish reducing in {}ms", System.currentTimeMillis() - reduceStartTimeMs);
+ return rollupFileManager;
+ }
+
+ private static void aggregateWithNullFields(GenericRow aggregatedRow, GenericRow rowToAggregate,
+ List<AggregatorContext> aggregatorContextList) {
+ for (AggregatorContext aggregatorContext : aggregatorContextList) {
+ String column = aggregatorContext._column;
+
+ // Skip aggregating on null fields
+ if (rowToAggregate.isNullValue(column)) {
+ continue;
+ }
+
+ if (aggregatedRow.removeNullValueField(column)) {
+ // Null field, directly put new value
+ aggregatedRow.putValue(column, rowToAggregate.getValue(column));
+ } else {
+ // Non-null field, aggregate the value
+ aggregatedRow.putValue(column,
+ aggregatorContext._aggregator.aggregate(aggregatedRow.getValue(column), rowToAggregate.getValue(column)));
+ }
+ }
+ }
+
+ private static void aggregateWithoutNullFields(GenericRow aggregatedRow, GenericRow rowToAggregate,
+ List<AggregatorContext> aggregatorContextList) {
+ for (AggregatorContext aggregatorContext : aggregatorContextList) {
+ String column = aggregatorContext._column;
+ aggregatedRow.putValue(column,
+ aggregatorContext._aggregator.aggregate(aggregatedRow.getValue(column), rowToAggregate.getValue(column)));
+ }
+ }
+
+ private static class AggregatorContext {
+ final String _column;
+ final ValueAggregator _aggregator;
+
+ AggregatorContext(FieldSpec fieldSpec, AggregationFunctionType aggregationType) {
+ _column = fieldSpec.getName();
+ _aggregator = ValueAggregatorFactory.getValueAggregator(aggregationType, fieldSpec.getDataType());
+ }
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/utils/SegmentProcessingUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/utils/SegmentProcessingUtils.java
deleted file mode 100644
index 698a105..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/utils/SegmentProcessingUtils.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.segment.processing.utils;
-
-import com.google.common.base.Preconditions;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.apache.pinot.spi.data.Schema;
-
-
-public final class SegmentProcessingUtils {
- private SegmentProcessingUtils() {
- }
-
- /**
- * Returns the field specs (physical only) with the names sorted in alphabetical order.
- */
- public static List<FieldSpec> getFieldSpecs(Schema schema) {
- List<FieldSpec> fieldSpecs = new ArrayList<>();
- for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
- if (!fieldSpec.isVirtualColumn()) {
- fieldSpecs.add(fieldSpec);
- }
- }
- fieldSpecs.sort(Comparator.comparing(FieldSpec::getName));
- return fieldSpecs;
- }
-
- /**
- * Returns the field specs (physical only) with sorted column in the front, followed by other columns sorted in
- * alphabetical order.
- */
- public static List<FieldSpec> getFieldSpecs(Schema schema, List<String> sortOrder) {
- List<FieldSpec> fieldSpecs = new ArrayList<>();
- for (String sortColumn : sortOrder) {
- FieldSpec fieldSpec = schema.getFieldSpecFor(sortColumn);
- Preconditions.checkArgument(fieldSpec != null, "Failed to find sort column: %s", sortColumn);
- Preconditions.checkArgument(fieldSpec.isSingleValueField(), "Cannot sort on MV column: %s", sortColumn);
- Preconditions.checkArgument(!fieldSpec.isVirtualColumn(), "Cannot sort on virtual column: %s", sortColumn);
- fieldSpecs.add(fieldSpec);
- }
-
- List<FieldSpec> nonSortFieldSpecs = new ArrayList<>();
- for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
- if (!fieldSpec.isVirtualColumn() && !sortOrder.contains(fieldSpec.getName())) {
- nonSortFieldSpecs.add(fieldSpec);
- }
- }
- nonSortFieldSpecs.sort(Comparator.comparing(FieldSpec::getName));
-
- fieldSpecs.addAll(nonSortFieldSpecs);
- return fieldSpecs;
- }
-
- /**
- * Returns the value comparator based on the sort order.
- */
- public static SortOrderComparator getSortOrderComparator(List<FieldSpec> fieldSpecs, int numSortColumns) {
- DataType[] sortColumnStoredTypes = new DataType[numSortColumns];
- for (int i = 0; i < numSortColumns; i++) {
- sortColumnStoredTypes[i] = fieldSpecs.get(i).getDataType().getStoredType();
- }
- return new SortOrderComparator(numSortColumns, sortColumnStoredTypes);
- }
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/utils/SegmentProcessorUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/utils/SegmentProcessorUtils.java
new file mode 100644
index 0000000..02c0c97
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/utils/SegmentProcessorUtils.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.utils;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.core.segment.processing.framework.MergeType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+
+
+public final class SegmentProcessorUtils {
+ private SegmentProcessorUtils() {
+ }
+
+ /**
+ * Returns the field specs (physical only) and number of sort fields based on the merge type and sort order.
+ * <p>The field specs returned should have sorted columns in the front, followed by dimension/time columns sorted in
+ * alphabetical order, followed by metric columns sorted in alphabetical order.
+ * <p>For CONCAT, only include sort columns as sort fields;
+ * <p>For ROLLUP, include sort columns and dimension/time columns as sort fields;
+ * <p>For DEDUP, include all columns as sort fields.
+ */
+ public static Pair<List<FieldSpec>, Integer> getFieldSpecs(Schema schema, MergeType mergeType,
+ @Nullable List<String> sortOrder) {
+ if (sortOrder == null) {
+ sortOrder = Collections.emptyList();
+ }
+
+ List<FieldSpec> fieldSpecs = new ArrayList<>();
+ for (String sortColumn : sortOrder) {
+ FieldSpec fieldSpec = schema.getFieldSpecFor(sortColumn);
+ Preconditions.checkArgument(fieldSpec != null, "Failed to find sort column: %s", sortColumn);
+ Preconditions.checkArgument(fieldSpec.isSingleValueField(), "Cannot sort on MV column: %s", sortColumn);
+ Preconditions.checkArgument(!fieldSpec.isVirtualColumn(), "Cannot sort on virtual column: %s", sortColumn);
+ Preconditions
+ .checkArgument(fieldSpec.getFieldType() != FieldSpec.FieldType.METRIC || mergeType != MergeType.ROLLUP,
+ "For ROLLUP, cannot sort on metric column: %s", sortColumn);
+ fieldSpecs.add(fieldSpec);
+ }
+
+ List<FieldSpec> metricFieldSpecs = new ArrayList<>();
+ List<FieldSpec> nonMetricFieldSpecs = new ArrayList<>();
+ for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
+ if (!fieldSpec.isVirtualColumn() && !sortOrder.contains(fieldSpec.getName())) {
+ if (fieldSpec.getFieldType() == FieldSpec.FieldType.METRIC) {
+ metricFieldSpecs.add(fieldSpec);
+ } else {
+ nonMetricFieldSpecs.add(fieldSpec);
+ }
+ }
+ }
+
+ metricFieldSpecs.sort(Comparator.comparing(FieldSpec::getName));
+ fieldSpecs.addAll(nonMetricFieldSpecs);
+ nonMetricFieldSpecs.sort(Comparator.comparing(FieldSpec::getName));
+ fieldSpecs.addAll(metricFieldSpecs);
+
+ int numSortFields;
+ switch (mergeType) {
+ case CONCAT:
+ numSortFields = sortOrder.size();
+ break;
+ case ROLLUP:
+ numSortFields = sortOrder.size() + nonMetricFieldSpecs.size();
+ break;
+ case DEDUP:
+ numSortFields = fieldSpecs.size();
+ break;
+ default:
+ throw new IllegalStateException("Unsupported merge type: " + mergeType);
+ }
+
+ return new ImmutablePair<>(fieldSpecs, numSortFields);
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/utils/SortOrderComparator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/utils/SortOrderComparator.java
deleted file mode 100644
index 5652957..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/utils/SortOrderComparator.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.segment.processing.utils;
-
-import java.util.Comparator;
-import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.apache.pinot.spi.utils.ByteArray;
-
-
-/**
- * Comparator for values of the sort columns.
- */
-public class SortOrderComparator implements Comparator<Object[]> {
- private final int _numSortColumns;
- private final DataType[] _sortColumnStoredTypes;
-
- public SortOrderComparator(int numSortColumns, DataType[] sortColumnStoredTypes) {
- _numSortColumns = numSortColumns;
- _sortColumnStoredTypes = sortColumnStoredTypes;
- }
-
- @Override
- public int compare(Object[] o1, Object[] o2) {
- for (int i = 0; i < _numSortColumns; i++) {
- Object value1 = o1[i];
- Object value2 = o2[i];
- int result;
- switch (_sortColumnStoredTypes[i]) {
- case INT:
- result = Integer.compare((int) value1, (int) value2);
- break;
- case LONG:
- result = Long.compare((long) value1, (long) value2);
- break;
- case FLOAT:
- result = Float.compare((float) value1, (float) value2);
- break;
- case DOUBLE:
- result = Double.compare((double) value1, (double) value2);
- break;
- case STRING:
- result = ((String) value1).compareTo((String) value2);
- break;
- case BYTES:
- result = ByteArray.compare((byte[]) value1, (byte[]) value2);
- break;
- default:
- throw new IllegalStateException("Unsupported sort column stored type: " + _sortColumnStoredTypes[i]);
- }
- if (result != 0) {
- return result;
- }
- }
- return 0;
- }
-}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/CollectorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/CollectorTest.java
deleted file mode 100644
index ce8aecb..0000000
--- a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/CollectorTest.java
+++ /dev/null
@@ -1,306 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.segment.processing.framework;
-
-import com.google.common.collect.Lists;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import org.apache.commons.lang.RandomStringUtils;
-import org.apache.commons.lang.math.RandomUtils;
-import org.apache.pinot.core.segment.processing.collector.Collector;
-import org.apache.pinot.core.segment.processing.collector.CollectorConfig;
-import org.apache.pinot.core.segment.processing.collector.CollectorFactory;
-import org.apache.pinot.core.segment.processing.collector.ConcatCollector;
-import org.apache.pinot.core.segment.processing.collector.RollupCollector;
-import org.apache.pinot.core.segment.processing.collector.ValueAggregatorFactory;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-
-
-/**
- * Tests for {@link Collector}
- */
-public class CollectorTest {
-
- private final List<String> uniqueD = new ArrayList<>();
- private static final Random RANDOM = new Random(10);
-
- @BeforeClass
- public void before() {
- for (int i = 0; i < 20; i++) {
- uniqueD.add(RandomStringUtils.random(5));
- }
- }
-
- @Test
- public void testConcatCollector()
- throws IOException {
- Schema schema =
- new Schema.SchemaBuilder().setSchemaName("testSchema").addSingleValueDimension("d", FieldSpec.DataType.STRING)
- .build();
- CollectorConfig collectorConfig = new CollectorConfig.Builder().build();
- Collector collector = CollectorFactory.getCollector(collectorConfig, schema);
- assertEquals(collector.getClass(), ConcatCollector.class);
-
- for (int i = 0; i < 100; i++) {
- GenericRow row = new GenericRow();
- row.putValue("d", uniqueD.get(RandomUtils.nextInt(uniqueD.size())));
- collector.collect(row);
- }
- assertEquals(collector.size(), 100);
- Iterator<GenericRow> iterator = collector.iterator();
- while (iterator.hasNext()) {
- GenericRow next = iterator.next();
- assertTrue(uniqueD.contains(String.valueOf(next.getValue("d"))));
- }
- collector.reset();
- assertEquals(collector.size(), 0);
- collector.close();
- }
-
- @Test
- public void testRollupCollectorWithNoMetrics()
- throws IOException {
- Schema schema =
- new Schema.SchemaBuilder().setSchemaName("testSchema").addSingleValueDimension("d", FieldSpec.DataType.STRING)
- .build();
- CollectorConfig collectorConfig =
- new CollectorConfig.Builder().setCollectorType(CollectorFactory.CollectorType.ROLLUP).build();
- Collector collector = CollectorFactory.getCollector(collectorConfig, schema);
- assertEquals(collector.getClass(), RollupCollector.class);
-
- Set<String> usedValues = new HashSet<>();
- for (int i = 0; i < 100; i++) {
- GenericRow row = new GenericRow();
- String value = uniqueD.get(RANDOM.nextInt(uniqueD.size()));
- row.putValue("d", value);
- collector.collect(row);
- usedValues.add(value);
- }
- assertEquals(collector.size(), usedValues.size());
- Iterator<GenericRow> iterator = collector.iterator();
- while (iterator.hasNext()) {
- GenericRow next = iterator.next();
- assertTrue(uniqueD.contains(String.valueOf(next.getValue("d"))));
- }
- collector.reset();
- assertEquals(collector.size(), 0);
- collector.close();
- }
-
- @Test
- public void testRollupCollectorWithDefaultAggregations()
- throws IOException {
- Schema schema =
- new Schema.SchemaBuilder().setSchemaName("testSchema").addSingleValueDimension("d", FieldSpec.DataType.STRING)
- .addMetric("m1", FieldSpec.DataType.INT).addMetric("m2", FieldSpec.DataType.LONG).build();
- CollectorConfig collectorConfig =
- new CollectorConfig.Builder().setCollectorType(CollectorFactory.CollectorType.ROLLUP).build();
- Collector collector = CollectorFactory.getCollector(collectorConfig, schema);
-
- Map<String, Integer> m1Map = new HashMap<>();
- Map<String, Long> m2Map = new HashMap<>();
- for (int i = 0; i < 100; i++) {
- GenericRow row = new GenericRow();
- String value = uniqueD.get(RANDOM.nextInt(uniqueD.size()));
- row.putValue("d", value);
- int m1 = RandomUtils.nextInt(10);
- row.putValue("m1", m1);
- long m2 = RANDOM.nextLong();
- row.putValue("m2", m2);
-
- if (m1Map.containsKey(value)) {
- m1Map.put(value, m1Map.get(value) + m1);
- m2Map.put(value, m2Map.get(value) + m2);
- } else {
- m1Map.put(value, m1);
- m2Map.put(value, m2);
- }
- collector.collect(row);
- }
- assertEquals(collector.size(), m1Map.size());
- Iterator<GenericRow> iterator = collector.iterator();
- while (iterator.hasNext()) {
- GenericRow next = iterator.next();
- String d = String.valueOf(next.getValue("d"));
- assertTrue(uniqueD.contains(d));
- assertEquals(next.getValue("m1"), m1Map.get(d));
- assertEquals(next.getValue("m2"), m2Map.get(d));
- }
- collector.reset();
- assertEquals(collector.size(), 0);
- collector.close();
- }
-
- @Test
- public void testRollupCollectorWithMVDimensions()
- throws IOException {
- Schema schema =
- new Schema.SchemaBuilder().setSchemaName("testSchema").addMultiValueDimension("dMv", FieldSpec.DataType.STRING)
- .addMetric("m1", FieldSpec.DataType.INT).build();
- CollectorConfig collectorConfig =
- new CollectorConfig.Builder().setCollectorType(CollectorFactory.CollectorType.ROLLUP).build();
- Collector collector = CollectorFactory.getCollector(collectorConfig, schema);
-
- GenericRow r1 = new GenericRow();
- r1.putValue("dMv", new Object[]{"a", "b"});
- r1.putValue("m1", 100);
- GenericRow r2 = new GenericRow();
- r2.putValue("dMv", new Object[]{"b", "a"});
- r2.putValue("m1", 100);
- GenericRow r3 = new GenericRow();
- r3.putValue("dMv", new Object[]{"a", "b"});
- r3.putValue("m1", 100);
- GenericRow r4 = new GenericRow();
- r4.putValue("dMv", new Object[]{"a"});
- r4.putValue("m1", 100);
- collector.collect(r1);
- collector.collect(r2);
- collector.collect(r3);
- collector.collect(r4);
- assertEquals(collector.size(), 3);
- collector.close();
- }
-
- @Test
- public void testRollupCollectorWithMinMaxAggregations()
- throws IOException {
- Schema schema =
- new Schema.SchemaBuilder().setSchemaName("testSchema").addSingleValueDimension("d", FieldSpec.DataType.STRING)
- .addMetric("m1", FieldSpec.DataType.INT).addMetric("m2", FieldSpec.DataType.LONG).build();
- Map<String, ValueAggregatorFactory.ValueAggregatorType> valueAggregatorMap = new HashMap<>();
- valueAggregatorMap.put("m1", ValueAggregatorFactory.ValueAggregatorType.MAX);
- valueAggregatorMap.put("m2", ValueAggregatorFactory.ValueAggregatorType.MIN);
- CollectorConfig collectorConfig =
- new CollectorConfig.Builder().setCollectorType(CollectorFactory.CollectorType.ROLLUP)
- .setAggregatorTypeMap(valueAggregatorMap).build();
- Collector collector = CollectorFactory.getCollector(collectorConfig, schema);
-
- Map<String, Integer> m1Map = new HashMap<>();
- Map<String, Long> m2Map = new HashMap<>();
- for (int i = 0; i < 100; i++) {
- GenericRow row = new GenericRow();
- String value = uniqueD.get(RANDOM.nextInt(uniqueD.size()));
- row.putValue("d", value);
- int m1 = RandomUtils.nextInt(10);
- row.putValue("m1", m1);
- long m2 = RANDOM.nextLong();
- row.putValue("m2", m2);
-
- if (m1Map.containsKey(value)) {
- m1Map.put(value, Math.max(m1Map.get(value), m1));
- m2Map.put(value, Math.min(m2Map.get(value), m2));
- } else {
- m1Map.put(value, m1);
- m2Map.put(value, m2);
- }
- collector.collect(row);
- }
- assertEquals(collector.size(), m1Map.size());
- Iterator<GenericRow> iterator = collector.iterator();
- while (iterator.hasNext()) {
- GenericRow next = iterator.next();
- String d = String.valueOf(next.getValue("d"));
- assertTrue(uniqueD.contains(d));
- assertEquals(next.getValue("m1"), m1Map.get(d));
- assertEquals(next.getValue("m2"), m2Map.get(d));
- }
- collector.reset();
- assertEquals(collector.size(), 0);
- collector.close();
- }
-
- @Test
- public void testConcatCollectorWithSort()
- throws IOException {
- Schema schema =
- new Schema.SchemaBuilder().setSchemaName("testSchema").addSingleValueDimension("d", FieldSpec.DataType.STRING)
- .build();
- CollectorConfig collectorConfig = new CollectorConfig.Builder().setSortOrder(Lists.newArrayList("d")).build();
- Collector collector = CollectorFactory.getCollector(collectorConfig, schema);
- assertEquals(collector.getClass(), ConcatCollector.class);
-
- List<String> dValues = new ArrayList<>();
- for (int i = 0; i < 100; i++) {
- GenericRow row = new GenericRow();
- String dValue = uniqueD.get(RandomUtils.nextInt(uniqueD.size()));
- row.putValue("d", dValue);
- dValues.add(dValue);
- collector.collect(row);
- }
- assertEquals(collector.size(), 100);
- Collections.sort(dValues);
- Iterator<GenericRow> iterator = collector.iterator();
- int idx = 0;
- while (iterator.hasNext()) {
- GenericRow next = iterator.next();
- assertEquals(dValues.get(idx++), String.valueOf(next.getValue("d")));
- }
- collector.reset();
- assertEquals(collector.size(), 0);
- collector.close();
- }
-
- @Test
- public void testRollupCollectorWithSort()
- throws IOException {
- Schema schema =
- new Schema.SchemaBuilder().setSchemaName("testSchema").addSingleValueDimension("d", FieldSpec.DataType.STRING)
- .addMetric("m1", FieldSpec.DataType.INT).build();
- CollectorConfig collectorConfig =
- new CollectorConfig.Builder().setCollectorType(CollectorFactory.CollectorType.ROLLUP)
- .setSortOrder(Lists.newArrayList("d")).build();
- Collector collector = CollectorFactory.getCollector(collectorConfig, schema);
-
- Set<String> dValues = new HashSet<>();
- for (int i = 0; i < 100; i++) {
- GenericRow row = new GenericRow();
- String value = uniqueD.get(RANDOM.nextInt(uniqueD.size()));
- dValues.add(value);
- row.putValue("d", value);
- row.putValue("m1", RandomUtils.nextInt(10));
- collector.collect(row);
- }
- List<String> uniqueDValues = new ArrayList<>(dValues);
- Collections.sort(uniqueDValues);
- Iterator<GenericRow> iterator = collector.iterator();
- int idx = 0;
- while (iterator.hasNext()) {
- GenericRow next = iterator.next();
- assertEquals(uniqueDValues.get(idx++), String.valueOf(next.getValue("d")));
- }
- collector.reset();
- assertEquals(collector.size(), 0);
- collector.close();
- }
-}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/GenericRowSorterTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/GenericRowSorterTest.java
deleted file mode 100644
index 2268906..0000000
--- a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/GenericRowSorterTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.segment.processing.framework;
-
-import com.google.common.collect.Lists;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Random;
-import org.apache.commons.lang3.RandomStringUtils;
-import org.apache.pinot.core.segment.processing.collector.GenericRowSorter;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-
-/**
- * Tests for {@link GenericRowSorter}
- */
-public class GenericRowSorterTest {
-
- private static final Random RANDOM = new Random(10);
-
- @Test
- public void testSort() {
- Schema schema = new Schema.SchemaBuilder().addSingleValueDimension("stringCol", FieldSpec.DataType.STRING)
- .addSingleValueDimension("intCol", FieldSpec.DataType.INT).addMetric("longCol", FieldSpec.DataType.LONG)
- .addMetric("doubleCol", FieldSpec.DataType.DOUBLE).build();
-
- List<GenericRow> rows = new ArrayList<>(1000);
- List<Object[]> ogRows = new ArrayList<>(1000);
- for (int i = 0; i < 1000; i++) {
- Object[] ogRow = new Object[]{RandomStringUtils.randomAlphabetic(5), RANDOM.nextInt(
- 10), RANDOM.nextLong(), RANDOM.nextDouble()};
- GenericRow row = new GenericRow();
- row.putValue("stringCol", ogRow[0]);
- row.putValue("intCol", ogRow[1]);
- row.putValue("longCol", ogRow[2]);
- row.putValue("doubleCol", ogRow[3]);
- rows.add(row);
- ogRows.add(ogRow);
- }
-
- GenericRowSorter sorter = new GenericRowSorter(Lists.newArrayList("intCol", "stringCol", "doubleCol"), schema);
- sorter.sort(rows);
-
- ogRows.sort(Comparator.comparingInt((Object[] o) -> (int) o[1]).thenComparing(o -> (String) o[0])
- .thenComparingDouble(o -> (double) o[3]));
-
- for (int i = 0; i < 1000; i++) {
- GenericRow r = rows.get(i);
- Object[] ogRow = ogRows.get(i);
- Assert.assertEquals(ogRow[0], r.getValue("stringCol"));
- Assert.assertEquals(ogRow[1], r.getValue("intCol"));
- Assert.assertEquals(ogRow[2], r.getValue("longCol"));
- Assert.assertEquals(ogRow[3], r.getValue("doubleCol"));
- }
- }
-}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/ReducerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/ReducerTest.java
new file mode 100644
index 0000000..7f421cd
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/ReducerTest.java
@@ -0,0 +1,565 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.framework;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileManager;
+import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileReader;
+import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileRecordReader;
+import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileWriter;
+import org.apache.pinot.core.segment.processing.reducer.Reducer;
+import org.apache.pinot.core.segment.processing.reducer.ReducerFactory;
+import org.apache.pinot.core.segment.processing.utils.SegmentProcessorUtils;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+/**
+ * Tests for {@link Reducer}
+ */
+public class ReducerTest {
+ private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "ReducerTest");
+ private static final File FILE_MANAGER_OUTPUT_DIR = new File(TEMP_DIR, "fileManagerOutput");
+ private static final File REDUCER_OUTPUT_DIR = new File(TEMP_DIR, "reducerOutput");
+ private static final Random RANDOM = new Random();
+
+ @BeforeMethod
+ public void setUp()
+ throws IOException {
+ FileUtils.deleteQuietly(TEMP_DIR);
+ FileUtils.forceMkdir(FILE_MANAGER_OUTPUT_DIR);
+ FileUtils.forceMkdir(REDUCER_OUTPUT_DIR);
+ }
+
+ @AfterMethod
+ public void tearDown() {
+ FileUtils.deleteQuietly(TEMP_DIR);
+ }
+
+ @Test
+ public void testConcat()
+ throws Exception {
+ TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+ Schema schema =
+ new Schema.SchemaBuilder().setSchemaName("testTable").addSingleValueDimension("d", DataType.INT).build();
+ Pair<List<FieldSpec>, Integer> result = SegmentProcessorUtils.getFieldSpecs(schema, MergeType.CONCAT, null);
+ GenericRowFileManager fileManager =
+ new GenericRowFileManager(FILE_MANAGER_OUTPUT_DIR, result.getLeft(), false, result.getRight());
+
+ GenericRowFileWriter fileWriter = fileManager.getFileWriter();
+ int numRecords = 100;
+ int[] expectedValues = new int[numRecords];
+ GenericRow row = new GenericRow();
+ for (int i = 0; i < numRecords; i++) {
+ row.clear();
+ int value = RANDOM.nextInt();
+ row.putValue("d", value);
+ fileWriter.write(row);
+ expectedValues[i] = value;
+ }
+ fileManager.closeFileWriter();
+
+ SegmentProcessorConfig config = new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema)
+ .setMergeType(MergeType.CONCAT).build();
+ Reducer reducer = ReducerFactory.getReducer("0", fileManager, config, REDUCER_OUTPUT_DIR);
+ GenericRowFileManager reducedFileManager = reducer.reduce();
+ GenericRowFileReader fileReader = reducedFileManager.getFileReader();
+ GenericRowFileRecordReader recordReader = fileReader.getRecordReader();
+ for (int i = 0; i < numRecords; i++) {
+ row.clear();
+ recordReader.read(i, row);
+ Map<String, Object> fieldToValueMap = row.getFieldToValueMap();
+ assertEquals(fieldToValueMap.size(), 1);
+ assertEquals(fieldToValueMap.get("d"), expectedValues[i]);
+ }
+ reducedFileManager.cleanUp();
+ }
+
+ @Test
+ public void testConcatWithNull()
+ throws Exception {
+ TableConfig tableConfig =
+ new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setNullHandlingEnabled(true).build();
+ Schema schema =
+ new Schema.SchemaBuilder().setSchemaName("testTable").addSingleValueDimension("d", DataType.INT).build();
+ Pair<List<FieldSpec>, Integer> result = SegmentProcessorUtils.getFieldSpecs(schema, MergeType.CONCAT, null);
+ GenericRowFileManager fileManager =
+ new GenericRowFileManager(FILE_MANAGER_OUTPUT_DIR, result.getLeft(), true, result.getRight());
+
+ GenericRowFileWriter fileWriter = fileManager.getFileWriter();
+ int numRecords = 100;
+ int[] expectedValues = new int[numRecords];
+ boolean[] expectedIsNulls = new boolean[numRecords];
+ GenericRow row = new GenericRow();
+ for (int i = 0; i < numRecords; i++) {
+ row.clear();
+ int value;
+ boolean isNull = RANDOM.nextBoolean();
+ if (isNull) {
+ value = Integer.MIN_VALUE;
+ row.putDefaultNullValue("d", value);
+ } else {
+ value = RANDOM.nextInt();
+ row.putValue("d", value);
+ }
+ expectedValues[i] = value;
+ expectedIsNulls[i] = isNull;
+ fileWriter.write(row);
+ }
+ fileManager.closeFileWriter();
+
+ SegmentProcessorConfig config = new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema)
+ .setMergeType(MergeType.CONCAT).build();
+ Reducer reducer = ReducerFactory.getReducer("0", fileManager, config, REDUCER_OUTPUT_DIR);
+ GenericRowFileManager reducedFileManager = reducer.reduce();
+ GenericRowFileReader fileReader = reducedFileManager.getFileReader();
+ GenericRowFileRecordReader recordReader = fileReader.getRecordReader();
+ for (int i = 0; i < numRecords; i++) {
+ row.clear();
+ recordReader.read(i, row);
+ Map<String, Object> fieldToValueMap = row.getFieldToValueMap();
+ assertEquals(fieldToValueMap.size(), 1);
+ assertEquals(fieldToValueMap.get("d"), expectedValues[i]);
+ assertEquals(row.isNullValue("d"), expectedIsNulls[i]);
+ }
+ reducedFileManager.cleanUp();
+ }
+
+ @Test
+ public void testConcatWithSort()
+ throws Exception {
+ TableConfig tableConfig =
+ new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setSortedColumn("d").build();
+ Schema schema =
+ new Schema.SchemaBuilder().setSchemaName("testTable").addSingleValueDimension("d", DataType.INT).build();
+ Pair<List<FieldSpec>, Integer> result =
+ SegmentProcessorUtils.getFieldSpecs(schema, MergeType.CONCAT, Collections.singletonList("d"));
+ GenericRowFileManager fileManager =
+ new GenericRowFileManager(FILE_MANAGER_OUTPUT_DIR, result.getLeft(), false, result.getRight());
+
+ GenericRowFileWriter fileWriter = fileManager.getFileWriter();
+ int numRecords = 100;
+ int[] expectedValues = new int[numRecords];
+ GenericRow row = new GenericRow();
+ for (int i = 0; i < numRecords; i++) {
+ row.clear();
+ int value = RANDOM.nextInt();
+ row.putValue("d", value);
+ fileWriter.write(row);
+ expectedValues[i] = value;
+ }
+ fileManager.closeFileWriter();
+
+ Arrays.sort(expectedValues);
+ SegmentProcessorConfig config = new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema)
+ .setMergeType(MergeType.CONCAT).build();
+ Reducer reducer = ReducerFactory.getReducer("0", fileManager, config, REDUCER_OUTPUT_DIR);
+ GenericRowFileManager reducedFileManager = reducer.reduce();
+ GenericRowFileReader fileReader = reducedFileManager.getFileReader();
+ GenericRowFileRecordReader recordReader = fileReader.getRecordReader();
+ for (int i = 0; i < numRecords; i++) {
+ row.clear();
+ recordReader.read(i, row);
+ Map<String, Object> fieldToValueMap = row.getFieldToValueMap();
+ assertEquals(fieldToValueMap.size(), 1);
+ assertEquals(fieldToValueMap.get("d"), expectedValues[i]);
+ }
+ reducedFileManager.cleanUp();
+ }
+
+ @Test
+ public void testRollup()
+ throws Exception {
+ TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+ Schema schema = new Schema.SchemaBuilder().setSchemaName("testTable").addSingleValueDimension("d", DataType.INT)
+ .addMetric("m1", DataType.INT).addMetric("m2", DataType.LONG).addMetric("m3", DataType.FLOAT).build();
+ Pair<List<FieldSpec>, Integer> result = SegmentProcessorUtils.getFieldSpecs(schema, MergeType.ROLLUP, null);
+ GenericRowFileManager fileManager =
+ new GenericRowFileManager(FILE_MANAGER_OUTPUT_DIR, result.getLeft(), false, result.getRight());
+
+ GenericRowFileWriter fileWriter = fileManager.getFileWriter();
+ int numRecords = 100;
+ // NOTE: Use TreeMap so that the entries are sorted
+ Map<Integer, Object[]> expectedValues = new TreeMap<>();
+ GenericRow row = new GenericRow();
+ for (int i = 0; i < numRecords; i++) {
+ row.clear();
+ int d = RANDOM.nextInt(10);
+ int m1 = RANDOM.nextInt();
+ long m2 = RANDOM.nextLong();
+ float m3 = RANDOM.nextFloat();
+ row.putValue("d", d);
+ row.putValue("m1", m1);
+ row.putValue("m2", m2);
+ row.putValue("m3", m3);
+ fileWriter.write(row);
+ Object[] metrics = expectedValues.get(d);
+ if (metrics == null) {
+ expectedValues.put(d, new Object[]{m1, m2, m3});
+ } else {
+ metrics[0] = (int) metrics[0] + m1;
+ metrics[1] = Math.min((long) metrics[1], m2);
+ metrics[2] = Math.max((float) metrics[2], m3);
+ }
+ }
+ fileManager.closeFileWriter();
+
+ Map<String, AggregationFunctionType> aggregationTypes = new HashMap<>();
+ aggregationTypes.put("m1", AggregationFunctionType.SUM);
+ aggregationTypes.put("m2", AggregationFunctionType.MIN);
+ aggregationTypes.put("m3", AggregationFunctionType.MAX);
+ SegmentProcessorConfig config = new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema)
+ .setMergeType(MergeType.ROLLUP).setAggregationTypes(aggregationTypes).build();
+ Reducer reducer = ReducerFactory.getReducer("0", fileManager, config, REDUCER_OUTPUT_DIR);
+ GenericRowFileManager reducedFileManager = reducer.reduce();
+ GenericRowFileReader fileReader = reducedFileManager.getFileReader();
+ assertEquals(fileReader.getNumRows(), expectedValues.size());
+ GenericRowFileRecordReader recordReader = fileReader.getRecordReader();
+ int rowId = 0;
+ for (Map.Entry<Integer, Object[]> entry : expectedValues.entrySet()) {
+ row.clear();
+ recordReader.read(rowId++, row);
+ Map<String, Object> fieldToValueMap = row.getFieldToValueMap();
+ assertEquals(fieldToValueMap.size(), 4);
+ assertEquals(fieldToValueMap.get("d"), entry.getKey());
+ Object[] expectedMetrics = entry.getValue();
+ assertEquals(fieldToValueMap.get("m1"), expectedMetrics[0]);
+ assertEquals(fieldToValueMap.get("m2"), expectedMetrics[1]);
+ assertEquals(fieldToValueMap.get("m3"), expectedMetrics[2]);
+ }
+ }
+
+ @Test
+ public void testRollupWithNull()
+ throws Exception {
+ TableConfig tableConfig =
+ new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setNullHandlingEnabled(true).build();
+ // NOTE: Intentionally put non-zero default value for metric to test that null values are skipped
+ Schema schema = new Schema.SchemaBuilder().setSchemaName("testTable").addSingleValueDimension("d", DataType.INT)
+ .addMetric("m", DataType.INT, 1).build();
+ Pair<List<FieldSpec>, Integer> result = SegmentProcessorUtils.getFieldSpecs(schema, MergeType.ROLLUP, null);
+ GenericRowFileManager fileManager =
+ new GenericRowFileManager(FILE_MANAGER_OUTPUT_DIR, result.getLeft(), true, result.getRight());
+
+ GenericRowFileWriter fileWriter = fileManager.getFileWriter();
+ int numRecords = 100;
+ // NOTE: Use TreeMap so that the entries are sorted
+ Map<Integer, Integer> expectedValues = new TreeMap<>();
+ GenericRow row = new GenericRow();
+ for (int i = 0; i < numRecords; i++) {
+ row.clear();
+ int d = RANDOM.nextInt(10);
+ row.putValue("d", d);
+ if (RANDOM.nextBoolean()) {
+ row.putDefaultNullValue("m", 1);
+ // Put 0 as the expected value because null value should be skipped
+ expectedValues.putIfAbsent(d, 0);
+ } else {
+ int m = RANDOM.nextInt();
+ row.putValue("m", m);
+ expectedValues.merge(d, m, Integer::sum);
+ }
+ fileWriter.write(row);
+ }
+ fileManager.closeFileWriter();
+
+ SegmentProcessorConfig config = new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema)
+ .setMergeType(MergeType.ROLLUP).build();
+ Reducer reducer = ReducerFactory.getReducer("0", fileManager, config, REDUCER_OUTPUT_DIR);
+ GenericRowFileManager reducedFileManager = reducer.reduce();
+ GenericRowFileReader fileReader = reducedFileManager.getFileReader();
+ assertEquals(fileReader.getNumRows(), expectedValues.size());
+ GenericRowFileRecordReader recordReader = fileReader.getRecordReader();
+ int rowId = 0;
+ for (Map.Entry<Integer, Integer> entry : expectedValues.entrySet()) {
+ row.clear();
+ recordReader.read(rowId++, row);
+ Map<String, Object> fieldToValueMap = row.getFieldToValueMap();
+ assertEquals(fieldToValueMap.size(), 2);
+ assertEquals(fieldToValueMap.get("d"), entry.getKey());
+ int m = (int) fieldToValueMap.get("m");
+ if (row.isNullValue("m")) {
+ assertEquals(m, 1);
+ } else {
+ assertEquals(m, (int) entry.getValue());
+ }
+ }
+ }
+
+ @Test
+ public void testRollupWithMV()
+ throws Exception {
+ TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+ Schema schema = new Schema.SchemaBuilder().setSchemaName("testTable").addMultiValueDimension("d", DataType.INT)
+ .addMetric("m", DataType.INT).build();
+ Pair<List<FieldSpec>, Integer> result = SegmentProcessorUtils.getFieldSpecs(schema, MergeType.ROLLUP, null);
+ GenericRowFileManager fileManager =
+ new GenericRowFileManager(FILE_MANAGER_OUTPUT_DIR, result.getLeft(), false, result.getRight());
+
+ GenericRowFileWriter fileWriter = fileManager.getFileWriter();
+ int numRecords = 100;
+ // NOTE: dValues are sorted
+ Object[][] dValues =
+ new Object[][]{new Object[]{0}, new Object[]{1}, new Object[]{2}, new Object[]{0, 1}, new Object[]{0, 2}, new Object[]{1, 0}, new Object[]{1, 2}, new Object[]{2, 0}, new Object[]{2, 1}, new Object[]{0, 1, 2}};
+ // NOTE: Use TreeMap so that the entries are sorted
+ Map<Integer, Integer> expectedValues = new TreeMap<>();
+ GenericRow row = new GenericRow();
+ for (int i = 0; i < numRecords; i++) {
+ row.clear();
+ int dIndex = RANDOM.nextInt(10);
+ int m = RANDOM.nextInt();
+ row.putValue("d", dValues[dIndex]);
+ row.putValue("m", m);
+ fileWriter.write(row);
+ expectedValues.merge(dIndex, m, Integer::sum);
+ }
+ fileManager.closeFileWriter();
+
+ SegmentProcessorConfig config = new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema)
+ .setMergeType(MergeType.ROLLUP).build();
+ Reducer reducer = ReducerFactory.getReducer("0", fileManager, config, REDUCER_OUTPUT_DIR);
+ GenericRowFileManager reducedFileManager = reducer.reduce();
+ GenericRowFileReader fileReader = reducedFileManager.getFileReader();
+ assertEquals(fileReader.getNumRows(), expectedValues.size());
+ GenericRowFileRecordReader recordReader = fileReader.getRecordReader();
+ int rowId = 0;
+ for (Map.Entry<Integer, Integer> entry : expectedValues.entrySet()) {
+ row.clear();
+ recordReader.read(rowId++, row);
+ Map<String, Object> fieldToValueMap = row.getFieldToValueMap();
+ assertEquals(fieldToValueMap.size(), 2);
+ assertEquals(fieldToValueMap.get("d"), dValues[entry.getKey()]);
+ assertEquals(fieldToValueMap.get("m"), entry.getValue());
+ }
+ }
+
+ @Test
+ public void testRollupWithSort()
+ throws Exception {
+ TableConfig tableConfig =
+ new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setSortedColumn("d2").build();
+ Schema schema = new Schema.SchemaBuilder().setSchemaName("testTable").addSingleValueDimension("d1", DataType.INT)
+ .addSingleValueDimension("d2", DataType.INT).addMetric("m", DataType.INT).build();
+ Pair<List<FieldSpec>, Integer> result =
+ SegmentProcessorUtils.getFieldSpecs(schema, MergeType.ROLLUP, Collections.singletonList("d2"));
+ GenericRowFileManager fileManager =
+ new GenericRowFileManager(FILE_MANAGER_OUTPUT_DIR, result.getLeft(), false, result.getRight());
+
+ GenericRowFileWriter fileWriter = fileManager.getFileWriter();
+ int numRecords = 100;
+ // NOTE: dValues are sorted with the second value (d2), secondary sort with the first value (d1)
+ int[][] dValues =
+ new int[][]{new int[]{1, 0}, new int[]{5, 0}, new int[]{10, 0}, new int[]{3, 2}, new int[]{0, 5}, new int[]{5, 5}, new int[]{8, 5}, new int[]{2, 6}, new int[]{4, 6}, new int[]{1, 10}};
+ // NOTE: Use TreeMap so that the entries are sorted
+ Map<Integer, Integer> expectedValues = new TreeMap<>();
+ GenericRow row = new GenericRow();
+ for (int i = 0; i < numRecords; i++) {
+ row.clear();
+ int dIndex = RANDOM.nextInt(10);
+ int m = RANDOM.nextInt();
+ row.putValue("d1", dValues[dIndex][0]);
+ row.putValue("d2", dValues[dIndex][1]);
+ row.putValue("m", m);
+ fileWriter.write(row);
+ expectedValues.merge(dIndex, m, Integer::sum);
+ }
+ fileManager.closeFileWriter();
+
+ SegmentProcessorConfig config = new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema)
+ .setMergeType(MergeType.ROLLUP).build();
+ Reducer reducer = ReducerFactory.getReducer("0", fileManager, config, REDUCER_OUTPUT_DIR);
+ GenericRowFileManager reducedFileManager = reducer.reduce();
+ GenericRowFileReader fileReader = reducedFileManager.getFileReader();
+ assertEquals(fileReader.getNumRows(), expectedValues.size());
+ GenericRowFileRecordReader recordReader = fileReader.getRecordReader();
+ int rowId = 0;
+ for (Map.Entry<Integer, Integer> entry : expectedValues.entrySet()) {
+ row.clear();
+ recordReader.read(rowId++, row);
+ Map<String, Object> fieldToValueMap = row.getFieldToValueMap();
+ assertEquals(fieldToValueMap.size(), 3);
+ int expectedDIndex = entry.getKey();
+ assertEquals(fieldToValueMap.get("d1"), dValues[expectedDIndex][0]);
+ assertEquals(fieldToValueMap.get("d2"), dValues[expectedDIndex][1]);
+ assertEquals(fieldToValueMap.get("m"), entry.getValue());
+ }
+ }
+
+ @Test
+ public void testDedup()
+ throws Exception {
+ TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+ Schema schema = new Schema.SchemaBuilder().setSchemaName("testTable").addSingleValueDimension("d", DataType.INT)
+ .addMetric("m", DataType.INT).build();
+ Pair<List<FieldSpec>, Integer> result = SegmentProcessorUtils.getFieldSpecs(schema, MergeType.DEDUP, null);
+ GenericRowFileManager fileManager =
+ new GenericRowFileManager(FILE_MANAGER_OUTPUT_DIR, result.getLeft(), false, result.getRight());
+
+ GenericRowFileWriter fileWriter = fileManager.getFileWriter();
+ int numRecords = 100;
+ // NOTE: Use TreeSet so that the entries are sorted
+ Set<Integer> expectedValues = new TreeSet<>();
+ GenericRow row = new GenericRow();
+ for (int i = 0; i < numRecords; i++) {
+ row.clear();
+ int d = RANDOM.nextInt(5);
+ int m = RANDOM.nextInt(5);
+ row.putValue("d", d);
+ row.putValue("m", m);
+ fileWriter.write(row);
+ expectedValues.add(d * 5 + m);
+ }
+ fileManager.closeFileWriter();
+
+ SegmentProcessorConfig config =
+ new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema).setMergeType(MergeType.DEDUP)
+ .build();
+ Reducer reducer = ReducerFactory.getReducer("0", fileManager, config, REDUCER_OUTPUT_DIR);
+ GenericRowFileManager reducedFileManager = reducer.reduce();
+ GenericRowFileReader fileReader = reducedFileManager.getFileReader();
+ assertEquals(fileReader.getNumRows(), expectedValues.size());
+ GenericRowFileRecordReader recordReader = fileReader.getRecordReader();
+ int rowId = 0;
+ for (int expectedValue : expectedValues) {
+ row.clear();
+ recordReader.read(rowId++, row);
+ Map<String, Object> fieldToValueMap = row.getFieldToValueMap();
+ assertEquals(fieldToValueMap.size(), 2);
+ assertEquals(fieldToValueMap.get("d"), expectedValue / 5);
+ assertEquals(fieldToValueMap.get("m"), expectedValue % 5);
+ }
+ }
+
+ @Test
+ public void testDedupWithMV()
+ throws Exception {
+ TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+ Schema schema =
+ new Schema.SchemaBuilder().setSchemaName("testTable").addMultiValueDimension("d", DataType.INT).build();
+ Pair<List<FieldSpec>, Integer> result = SegmentProcessorUtils.getFieldSpecs(schema, MergeType.DEDUP, null);
+ GenericRowFileManager fileManager =
+ new GenericRowFileManager(FILE_MANAGER_OUTPUT_DIR, result.getLeft(), false, result.getRight());
+
+ GenericRowFileWriter fileWriter = fileManager.getFileWriter();
+ int numRecords = 100;
+ // NOTE: dValues are sorted
+ Object[][] dValues =
+ new Object[][]{new Object[]{0}, new Object[]{1}, new Object[]{2}, new Object[]{0, 1}, new Object[]{0, 2}, new Object[]{1, 0}, new Object[]{1, 2}, new Object[]{2, 0}, new Object[]{2, 1}, new Object[]{0, 1, 2}};
+ // NOTE: Use TreeSet so that the entries are sorted
+ Set<Integer> expectedValues = new TreeSet<>();
+ GenericRow row = new GenericRow();
+ for (int i = 0; i < numRecords; i++) {
+ row.clear();
+ int dIndex = RANDOM.nextInt(10);
+ row.putValue("d", dValues[dIndex]);
+ fileWriter.write(row);
+ expectedValues.add(dIndex);
+ }
+ fileManager.closeFileWriter();
+
+ SegmentProcessorConfig config = new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema)
+ .setMergeType(MergeType.ROLLUP).build();
+ Reducer reducer = ReducerFactory.getReducer("0", fileManager, config, REDUCER_OUTPUT_DIR);
+ GenericRowFileManager reducedFileManager = reducer.reduce();
+ GenericRowFileReader fileReader = reducedFileManager.getFileReader();
+ assertEquals(fileReader.getNumRows(), expectedValues.size());
+ GenericRowFileRecordReader recordReader = fileReader.getRecordReader();
+ int rowId = 0;
+ for (int expectedValue : expectedValues) {
+ row.clear();
+ recordReader.read(rowId++, row);
+ Map<String, Object> fieldToValueMap = row.getFieldToValueMap();
+ assertEquals(fieldToValueMap.size(), 1);
+ assertEquals(fieldToValueMap.get("d"), dValues[expectedValue]);
+ }
+ }
+
+ @Test
+ public void testDedupWithSort()
+ throws Exception {
+ TableConfig tableConfig =
+ new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setSortedColumn("d2").build();
+ Schema schema = new Schema.SchemaBuilder().setSchemaName("testTable").addSingleValueDimension("d1", DataType.INT)
+ .addSingleValueDimension("d2", DataType.INT).build();
+ Pair<List<FieldSpec>, Integer> result =
+ SegmentProcessorUtils.getFieldSpecs(schema, MergeType.DEDUP, Collections.singletonList("d2"));
+ GenericRowFileManager fileManager =
+ new GenericRowFileManager(FILE_MANAGER_OUTPUT_DIR, result.getLeft(), false, result.getRight());
+
+ GenericRowFileWriter fileWriter = fileManager.getFileWriter();
+ int numRecords = 100;
+ // NOTE: dValues are sorted with the second value (d2), secondary sort with the first value (d1)
+ int[][] dValues =
+ new int[][]{new int[]{1, 0}, new int[]{5, 0}, new int[]{10, 0}, new int[]{3, 2}, new int[]{0, 5}, new int[]{5, 5}, new int[]{8, 5}, new int[]{2, 6}, new int[]{4, 6}, new int[]{1, 10}};
+ // NOTE: Use TreeSet so that the entries are sorted
+ Set<Integer> expectedValues = new TreeSet<>();
+ GenericRow row = new GenericRow();
+ for (int i = 0; i < numRecords; i++) {
+ row.clear();
+ int dIndex = RANDOM.nextInt(10);
+ int m = RANDOM.nextInt();
+ row.putValue("d1", dValues[dIndex][0]);
+ row.putValue("d2", dValues[dIndex][1]);
+ row.putValue("m", m);
+ fileWriter.write(row);
+ expectedValues.add(dIndex);
+ }
+ fileManager.closeFileWriter();
+
+ SegmentProcessorConfig config = new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema)
+ .setMergeType(MergeType.ROLLUP).build();
+ Reducer reducer = ReducerFactory.getReducer("0", fileManager, config, REDUCER_OUTPUT_DIR);
+ GenericRowFileManager reducedFileManager = reducer.reduce();
+ GenericRowFileReader fileReader = reducedFileManager.getFileReader();
+ assertEquals(fileReader.getNumRows(), expectedValues.size());
+ GenericRowFileRecordReader recordReader = fileReader.getRecordReader();
+ int rowId = 0;
+ for (int expectedDIndex : expectedValues) {
+ row.clear();
+ recordReader.read(rowId++, row);
+ Map<String, Object> fieldToValueMap = row.getFieldToValueMap();
+ assertEquals(fieldToValueMap.size(), 2);
+ assertEquals(fieldToValueMap.get("d1"), dValues[expectedDIndex][0]);
+ assertEquals(fieldToValueMap.get("d2"), dValues[expectedDIndex][1]);
+ }
+ }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
index 682cd83..a0892bd 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.core.segment.processing.framework;
-import com.google.common.collect.Lists;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
@@ -33,6 +32,7 @@ import org.apache.pinot.core.segment.processing.filter.RecordFilterConfig;
import org.apache.pinot.core.segment.processing.filter.RecordFilterFactory;
import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileManager;
import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileReader;
+import org.apache.pinot.core.segment.processing.mapper.SegmentMapper;
import org.apache.pinot.core.segment.processing.partitioner.PartitionerConfig;
import org.apache.pinot.core.segment.processing.partitioner.PartitionerFactory;
import org.apache.pinot.core.segment.processing.transformer.RecordTransformerConfig;
@@ -117,7 +117,7 @@ public class SegmentMapperTest {
}
@Test(dataProvider = "segmentMapperConfigProvider")
- public void segmentMapperTest(SegmentMapperConfig segmentMapperConfig, Map<String, List<Object[]>> partitionToRecords)
+ public void segmentMapperTest(SegmentProcessorConfig processorConfig, Map<String, List<Object[]>> partitionToRecords)
throws Exception {
File mapperOutputDir = new File(TEMP_DIR, "mapper_output");
FileUtils.deleteQuietly(mapperOutputDir);
@@ -126,7 +126,7 @@ public class SegmentMapperTest {
PinotSegmentRecordReader segmentRecordReader = new PinotSegmentRecordReader();
segmentRecordReader.init(_indexDir, null, null, true);
SegmentMapper segmentMapper =
- new SegmentMapper(Collections.singletonList(segmentRecordReader), segmentMapperConfig, mapperOutputDir);
+ new SegmentMapper(Collections.singletonList(segmentRecordReader), processorConfig, mapperOutputDir);
Map<String, GenericRowFileManager> partitionToFileManagerMap = segmentMapper.map();
segmentRecordReader.close();
@@ -150,19 +150,20 @@ public class SegmentMapperTest {
int numRows = fileReader.getNumRows();
List<Object[]> expectedRecords = partitionToRecords.get(partition);
assertEquals(numRows, expectedRecords.size());
- GenericRow reuse = new GenericRow();
+ GenericRow buffer = new GenericRow();
for (int i = 0; i < numRows; i++) {
- reuse = fileReader.read(i, reuse);
+ fileReader.read(i, buffer);
Object[] expectedValues = expectedRecords.get(i);
- assertEquals(reuse.getValue("campaign"), expectedValues[0]);
- assertEquals(reuse.getValue("clicks"), expectedValues[1]);
- assertEquals(reuse.getValue("timeValue"), expectedValues[2]);
+ assertEquals(buffer.getValue("campaign"), expectedValues[0]);
+ assertEquals(buffer.getValue("clicks"), expectedValues[1]);
+ assertEquals(buffer.getValue("timeValue"), expectedValues[2]);
// Default null value
if (expectedValues[0].equals("xyz")) {
- assertEquals(reuse.getNullValueFields(), Collections.singleton("campaign"));
+ assertEquals(buffer.getNullValueFields(), Collections.singleton("campaign"));
} else {
- assertEquals(reuse.getNullValueFields(), Collections.emptySet());
+ assertEquals(buffer.getNullValueFields(), Collections.emptySet());
}
+ buffer.clear();
}
fileManager.cleanUp();
}
@@ -183,108 +184,108 @@ public class SegmentMapperTest {
List<Object[]> inputs = new ArrayList<>();
// default configs
- SegmentMapperConfig config1 =
- new SegmentMapperConfig(_tableConfig, _schema, new RecordTransformerConfig.Builder().build(),
- new RecordFilterConfig.Builder().build(), Lists.newArrayList(new PartitionerConfig.Builder().build()));
- Map<String, List<Object[]>> expectedRecords1 = new HashMap<>();
- expectedRecords1.put("0", outputData);
- inputs.add(new Object[]{config1, expectedRecords1});
+ SegmentProcessorConfig config =
+ new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).build();
+ Map<String, List<Object[]>> expectedRecords = new HashMap<>();
+ expectedRecords.put("0", outputData);
+ inputs.add(new Object[]{config, expectedRecords});
// round robin partitioner
- SegmentMapperConfig config12 =
- new SegmentMapperConfig(_tableConfig, _schema, new RecordTransformerConfig.Builder().build(),
- new RecordFilterConfig.Builder().build(), Lists.newArrayList(
- new PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.ROUND_ROBIN)
- .setNumPartitions(3).build()));
- Map<String, List<Object[]>> expectedRecords12 = new HashMap<>();
- IntStream.range(0, 3).forEach(i -> expectedRecords12.put(String.valueOf(i), new ArrayList<>()));
+ SegmentProcessorConfig config1 =
+ new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).setPartitionerConfigs(
+ Collections.singletonList(
+ new PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.ROUND_ROBIN)
+ .setNumPartitions(3).build())).build();
+ Map<String, List<Object[]>> expectedRecords1 = new HashMap<>();
+ IntStream.range(0, 3).forEach(i -> expectedRecords1.put(String.valueOf(i), new ArrayList<>()));
for (int i = 0; i < outputData.size(); i++) {
- expectedRecords12.get(String.valueOf(i % 3)).add(outputData.get(i));
+ expectedRecords1.get(String.valueOf(i % 3)).add(outputData.get(i));
}
- inputs.add(new Object[]{config12, expectedRecords12});
+ inputs.add(new Object[]{config1, expectedRecords1});
// partition by timeValue
- SegmentMapperConfig config2 =
- new SegmentMapperConfig(_tableConfig, _schema, new RecordTransformerConfig.Builder().build(),
- new RecordFilterConfig.Builder().build(), Lists.newArrayList(
- new PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.COLUMN_VALUE)
- .setColumnName("timeValue").build()));
+ SegmentProcessorConfig config2 =
+ new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).setPartitionerConfigs(
+ Collections.singletonList(
+ new PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.COLUMN_VALUE)
+ .setColumnName("timeValue").build())).build();
Map<String, List<Object[]>> expectedRecords2 =
outputData.stream().collect(Collectors.groupingBy(r -> String.valueOf(r[2]), Collectors.toList()));
inputs.add(new Object[]{config2, expectedRecords2});
// partition by campaign
- SegmentMapperConfig config3 =
- new SegmentMapperConfig(_tableConfig, _schema, new RecordTransformerConfig.Builder().build(),
- new RecordFilterConfig.Builder().build(), Lists.newArrayList(
- new PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.COLUMN_VALUE)
- .setColumnName("campaign").build()));
+ SegmentProcessorConfig config3 =
+ new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).setPartitionerConfigs(
+ Collections.singletonList(
+ new PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.COLUMN_VALUE)
+ .setColumnName("campaign").build())).build();
Map<String, List<Object[]>> expectedRecords3 =
outputData.stream().collect(Collectors.groupingBy(r -> String.valueOf(r[0]), Collectors.toList()));
inputs.add(new Object[]{config3, expectedRecords3});
// transform function partition
- SegmentMapperConfig config4 =
- new SegmentMapperConfig(_tableConfig, _schema, new RecordTransformerConfig.Builder().build(),
- new RecordFilterConfig.Builder().build(), Lists.newArrayList(
- new PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.TRANSFORM_FUNCTION)
- .setTransformFunction("toEpochDays(timeValue)").build()));
+ SegmentProcessorConfig config4 =
+ new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).setPartitionerConfigs(
+ Collections.singletonList(new PartitionerConfig.Builder()
+ .setPartitionerType(PartitionerFactory.PartitionerType.TRANSFORM_FUNCTION)
+ .setTransformFunction("toEpochDays(timeValue)").build())).build();
Map<String, List<Object[]>> expectedRecords4 = outputData.stream()
.collect(Collectors.groupingBy(r -> String.valueOf(((long) r[2]) / 86400000), Collectors.toList()));
inputs.add(new Object[]{config4, expectedRecords4});
// partition by column and then table column partition config
- SegmentMapperConfig config41 =
- new SegmentMapperConfig(_tableConfig, _schema, new RecordTransformerConfig.Builder().build(),
- new RecordFilterConfig.Builder().build(), Lists.newArrayList(
- new PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.COLUMN_VALUE)
- .setColumnName("campaign").build(), new PartitionerConfig.Builder()
- .setPartitionerType(PartitionerFactory.PartitionerType.TABLE_PARTITION_CONFIG).setColumnName("clicks")
- .setColumnPartitionConfig(new ColumnPartitionConfig("Modulo", 3)).build()));
- Map<String, List<Object[]>> expectedRecords41 = new HashMap<>();
+ SegmentProcessorConfig config5 =
+ new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).setPartitionerConfigs(
+ Arrays.asList(
+ new PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.COLUMN_VALUE)
+ .setColumnName("campaign").build(), new PartitionerConfig.Builder()
+ .setPartitionerType(PartitionerFactory.PartitionerType.TABLE_PARTITION_CONFIG)
+ .setColumnName("clicks").setColumnPartitionConfig(new ColumnPartitionConfig("Modulo", 3)).build()))
+ .build();
+ Map<String, List<Object[]>> expectedRecords5 = new HashMap<>();
for (Object[] record : outputData) {
String partition = record[0] + "_" + (int) record[1] % 3;
- List<Object[]> objects = expectedRecords41.computeIfAbsent(partition, k -> new ArrayList<>());
+ List<Object[]> objects = expectedRecords5.computeIfAbsent(partition, k -> new ArrayList<>());
objects.add(record);
}
- inputs.add(new Object[]{config41, expectedRecords41});
+ inputs.add(new Object[]{config5, expectedRecords5});
// filter function which filters out nothing
- SegmentMapperConfig config5 =
- new SegmentMapperConfig(_tableConfig, _schema, new RecordTransformerConfig.Builder().build(),
+ SegmentProcessorConfig config6 =
+ new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).setRecordFilterConfig(
new RecordFilterConfig.Builder().setRecordFilterType(RecordFilterFactory.RecordFilterType.FILTER_FUNCTION)
- .setFilterFunction("Groovy({campaign == \"foo\"}, campaign)").build(),
- Lists.newArrayList(new PartitionerConfig.Builder().build()));
- Map<String, List<Object[]>> expectedRecords5 = new HashMap<>();
- expectedRecords5.put("0", outputData);
- inputs.add(new Object[]{config5, expectedRecords5});
+ .setFilterFunction("Groovy({campaign == \"foo\"}, campaign)").build()).build();
+ Map<String, List<Object[]>> expectedRecords6 = new HashMap<>();
+ expectedRecords6.put("0", outputData);
+ inputs.add(new Object[]{config6, expectedRecords6});
// filter function which filters out everything
- SegmentMapperConfig config6 =
- new SegmentMapperConfig(_tableConfig, _schema, new RecordTransformerConfig.Builder().build(),
+ SegmentProcessorConfig config7 =
+ new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).setRecordFilterConfig(
new RecordFilterConfig.Builder().setRecordFilterType(RecordFilterFactory.RecordFilterType.FILTER_FUNCTION)
- .setFilterFunction("Groovy({timeValue > 0}, timeValue)").build(),
- Lists.newArrayList(new PartitionerConfig.Builder().build()));
- Map<String, List<Object[]>> expectedRecords6 = new HashMap<>();
- inputs.add(new Object[]{config6, expectedRecords6});
+ .setFilterFunction("Groovy({timeValue > 0}, timeValue)").build()).build();
+ Map<String, List<Object[]>> expectedRecords7 = new HashMap<>();
+ inputs.add(new Object[]{config7, expectedRecords7});
// filter function which filters out certain times
- SegmentMapperConfig config7 =
- new SegmentMapperConfig(_tableConfig, _schema, new RecordTransformerConfig.Builder().build(),
+ SegmentProcessorConfig config8 =
+ new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).setRecordFilterConfig(
new RecordFilterConfig.Builder().setRecordFilterType(RecordFilterFactory.RecordFilterType.FILTER_FUNCTION)
.setFilterFunction("Groovy({timeValue < 1597795200000L || timeValue >= 1597881600000L}, timeValue)")
- .build(), Lists.newArrayList(new PartitionerConfig.Builder().build()));
- Map<String, List<Object[]>> expectedRecords7 =
+ .build()).build();
+ Map<String, List<Object[]>> expectedRecords8 =
outputData.stream().filter(r -> ((long) r[2]) >= 1597795200000L && ((long) r[2]) < 1597881600000L)
.collect(Collectors.groupingBy(r -> "0", Collectors.toList()));
- inputs.add(new Object[]{config7, expectedRecords7});
+ inputs.add(new Object[]{config8, expectedRecords8});
// record transformation - round timeValue to nearest day
Map<String, String> transformFunctionMap = new HashMap<>();
transformFunctionMap.put("timeValue", "round(timeValue, 86400000)");
- SegmentMapperConfig config9 = new SegmentMapperConfig(_tableConfig, _schema,
- new RecordTransformerConfig.Builder().setTransformFunctionsMap(transformFunctionMap).build(),
- new RecordFilterConfig.Builder().build(), Lists.newArrayList(new PartitionerConfig.Builder().build()));
+ RecordTransformerConfig transformerConfig =
+ new RecordTransformerConfig.Builder().setTransformFunctionsMap(transformFunctionMap).build();
+ SegmentProcessorConfig config9 =
+ new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema)
+ .setRecordTransformerConfig(transformerConfig).build();
List<Object[]> transformedData = new ArrayList<>();
outputData.forEach(r -> transformedData.add(new Object[]{r[0], r[1], (((long) r[2]) / 86400000) * 86400000}));
Map<String, List<Object[]>> expectedRecords9 = new HashMap<>();
@@ -292,23 +293,24 @@ public class SegmentMapperTest {
inputs.add(new Object[]{config9, expectedRecords9});
// record transformation - round timeValue to nearest day, partition on timeValue
- SegmentMapperConfig config10 = new SegmentMapperConfig(_tableConfig, _schema,
- new RecordTransformerConfig.Builder().setTransformFunctionsMap(transformFunctionMap).build(),
- new RecordFilterConfig.Builder().build(), Lists.newArrayList(
- new PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.COLUMN_VALUE)
- .setColumnName("timeValue").build()));
+ SegmentProcessorConfig config10 =
+ new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema)
+ .setRecordTransformerConfig(transformerConfig).setPartitionerConfigs(Collections.singletonList(
+ new PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.COLUMN_VALUE)
+ .setColumnName("timeValue").build())).build();
Map<String, List<Object[]>> expectedRecords10 =
transformedData.stream().collect(Collectors.groupingBy(r -> String.valueOf(r[2]), Collectors.toList()));
inputs.add(new Object[]{config10, expectedRecords10});
// record transformation - round timeValue to nearest day, partition on timeValue, filter out timeValues
- SegmentMapperConfig config11 = new SegmentMapperConfig(_tableConfig, _schema,
- new RecordTransformerConfig.Builder().setTransformFunctionsMap(transformFunctionMap).build(),
- new RecordFilterConfig.Builder().setRecordFilterType(RecordFilterFactory.RecordFilterType.FILTER_FUNCTION)
- .setFilterFunction("Groovy({timeValue < 1597795200000L|| timeValue >= 1597881600000}, timeValue)").build(),
- Lists.newArrayList(
+ SegmentProcessorConfig config11 =
+ new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema)
+ .setRecordTransformerConfig(transformerConfig).setRecordFilterConfig(
+ new RecordFilterConfig.Builder().setRecordFilterType(RecordFilterFactory.RecordFilterType.FILTER_FUNCTION)
+ .setFilterFunction("Groovy({timeValue < 1597795200000L|| timeValue >= 1597881600000}, timeValue)")
+ .build()).setPartitionerConfigs(Collections.singletonList(
new PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.COLUMN_VALUE)
- .setColumnName("timeValue").build()));
+ .setColumnName("timeValue").build())).build();
Map<String, List<Object[]>> expectedRecords11 =
transformedData.stream().filter(r -> ((long) r[2]) == 1597795200000L)
.collect(Collectors.groupingBy(r -> "1597795200000", Collectors.toList()));
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessingFrameworkTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessingFrameworkTest.java
deleted file mode 100644
index fc7daef..0000000
--- a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessingFrameworkTest.java
+++ /dev/null
@@ -1,551 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.segment.processing.framework;
-
-import com.google.common.collect.Lists;
-import java.io.File;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.IntStream;
-import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
-import org.apache.pinot.core.segment.processing.collector.CollectorConfig;
-import org.apache.pinot.core.segment.processing.collector.CollectorFactory;
-import org.apache.pinot.core.segment.processing.filter.RecordFilterConfig;
-import org.apache.pinot.core.segment.processing.filter.RecordFilterFactory;
-import org.apache.pinot.core.segment.processing.partitioner.PartitionerConfig;
-import org.apache.pinot.core.segment.processing.partitioner.PartitionerFactory;
-import org.apache.pinot.core.segment.processing.transformer.RecordTransformerConfig;
-import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
-import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
-import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
-import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.data.readers.RecordReader;
-import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-
-
-/**
- * End-to-end tests for SegmentProcessorFramework
- */
-public class SegmentProcessingFrameworkTest {
-
- private File _baseDir;
- private File _emptyInputDir;
- private File _singleDaySingleSegment;
- private File _multipleDaysSingleSegment;
- private File _singleDayMultipleSegments;
- private File _multipleDaysMultipleSegments;
- private File _multiValueSegments;
- private File _tarredSegments;
- private Schema _pinotSchema;
- private Schema _pinotSchemaMV;
- private TableConfig _tableConfig;
- private final List<Object[]> _rawDataMultipleDays = Lists
- .newArrayList(new Object[]{"abc", 1000, 1597719600000L}, new Object[]{null, 2000, 1597773600000L},
- new Object[]{"abc", 1000, 1597777200000L}, new Object[]{"abc", 4000, 1597795200000L},
- new Object[]{"abc", 3000, 1597802400000L}, new Object[]{null, 1000, 1597838400000L},
- new Object[]{"xyz", 4000, 1597856400000L}, new Object[]{null, 1000, 1597878000000L},
- new Object[]{"abc", 7000, 1597881600000L}, new Object[]{"xyz", 6000, 1597892400000L});
-
- private final List<Object[]> _rawDataSingleDay = Lists
- .newArrayList(new Object[]{"abc", 1000, 1597795200000L}, new Object[]{null, 2000, 1597795200000L},
- new Object[]{"abc", 1000, 1597795200000L}, new Object[]{"abc", 4000, 1597795200000L},
- new Object[]{"abc", 3000, 1597795200000L}, new Object[]{null, 1000, 1597795200000L},
- new Object[]{"xyz", 4000, 1597795200000L}, new Object[]{null, 1000, 1597795200000L},
- new Object[]{"abc", 7000, 1597795200000L}, new Object[]{"xyz", 6000, 1597795200000L});
-
- private final List<Object[]> _multiValue = Lists
- .newArrayList(new Object[]{new String[]{"a", "b"}, 1000, 1597795200000L},
- new Object[]{null, 1000, 1597795200000L}, new Object[]{null, 1000, 1597795200000L},
- new Object[]{new String[]{"a", "b"}, 1000, 1597795200000L});
-
- @BeforeClass
- public void setup()
- throws Exception {
- _tableConfig =
- new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setTimeColumnName("timeValue").build();
- _pinotSchema = new Schema.SchemaBuilder().setSchemaName("mySchema")
- .addSingleValueDimension("campaign", FieldSpec.DataType.STRING, "").addMetric("clicks", FieldSpec.DataType.INT)
- .addDateTime("timeValue", FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
- _pinotSchemaMV = new Schema.SchemaBuilder().setSchemaName("mySchema")
- .addMultiValueDimension("campaign", FieldSpec.DataType.STRING, "").addMetric("clicks", FieldSpec.DataType.INT)
- .addDateTime("timeValue", FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
-
- _baseDir = new File(FileUtils.getTempDirectory(), "segment_processor_framework_test_" + System.currentTimeMillis());
- FileUtils.deleteQuietly(_baseDir);
- assertTrue(_baseDir.mkdirs());
-
- // create segments in many folders
- _emptyInputDir = new File(_baseDir, "empty_input");
- assertTrue(_emptyInputDir.mkdirs());
- // 1. Single segment, single day
- _singleDaySingleSegment = new File(_baseDir, "input_segments_single_day_single_segment");
- createInputSegments(_singleDaySingleSegment, _rawDataSingleDay, 1, _pinotSchema);
- // 2. Single segment, multiple days
- _multipleDaysSingleSegment = new File(_baseDir, "input_segments_multiple_day_single_segment");
- createInputSegments(_multipleDaysSingleSegment, _rawDataMultipleDays, 1, _pinotSchema);
- // 3. Multiple segments single day
- _singleDayMultipleSegments = new File(_baseDir, "input_segments_single_day_multiple_segment");
- createInputSegments(_singleDayMultipleSegments, _rawDataSingleDay, 3, _pinotSchema);
- // 4. Multiple segments, multiple days
- _multipleDaysMultipleSegments = new File(_baseDir, "input_segments_multiple_day_multiple_segment");
- createInputSegments(_multipleDaysMultipleSegments, _rawDataMultipleDays, 3, _pinotSchema);
- // 5. Multi value
- _multiValueSegments = new File(_baseDir, "multi_value_segment");
- createInputSegments(_multiValueSegments, _multiValue, 1, _pinotSchemaMV);
- // 6. tarred segments
- _tarredSegments = new File(_baseDir, "tarred_segment");
- createInputSegments(_tarredSegments, _rawDataSingleDay, 3, _pinotSchema);
- File[] segmentDirs = _tarredSegments.listFiles();
- for (File segmentDir : segmentDirs) {
- TarGzCompressionUtils.createTarGzFile(segmentDir, new File(_tarredSegments, segmentDir.getName() + ".tar.gz"));
- FileUtils.deleteQuietly(segmentDir);
- }
- }
-
- private void createInputSegments(File inputDir, List<Object[]> rawData, int numSegments, Schema pinotSchema)
- throws Exception {
- assertTrue(inputDir.mkdirs());
-
- List<List<GenericRow>> dataLists = new ArrayList<>();
- if (numSegments > 1) {
- List<GenericRow> dataList1 = new ArrayList<>(4);
- IntStream.range(0, 4).forEach(i -> dataList1.add(getGenericRow(rawData.get(i))));
- dataLists.add(dataList1);
- List<GenericRow> dataList2 = new ArrayList<>(4);
- IntStream.range(4, 7).forEach(i -> dataList2.add(getGenericRow(rawData.get(i))));
- dataLists.add(dataList2);
- List<GenericRow> dataList3 = new ArrayList<>(4);
- IntStream.range(7, 10).forEach(i -> dataList3.add(getGenericRow(rawData.get(i))));
- dataLists.add(dataList3);
- } else {
- List<GenericRow> dataList = new ArrayList<>();
- rawData.forEach(r -> dataList.add(getGenericRow(r)));
- dataLists.add(dataList);
- }
-
- int idx = 0;
- for (List<GenericRow> inputRows : dataLists) {
- RecordReader recordReader = new GenericRowRecordReader(inputRows);
- SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(_tableConfig, pinotSchema);
- segmentGeneratorConfig.setTableName(_tableConfig.getTableName());
- segmentGeneratorConfig.setOutDir(inputDir.getAbsolutePath());
- segmentGeneratorConfig.setSequenceId(idx++);
- SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
- driver.init(segmentGeneratorConfig, recordReader);
- driver.build();
- }
-
- assertEquals(inputDir.listFiles().length, numSegments);
- }
-
- private GenericRow getGenericRow(Object[] rawRow) {
- GenericRow row = new GenericRow();
- row.putValue("campaign", rawRow[0]);
- row.putValue("clicks", rawRow[1]);
- row.putValue("timeValue", rawRow[2]);
- return row;
- }
-
- @Test
- public void testBadInputFolders()
- throws Exception {
- SegmentProcessorConfig config;
-
- try {
- new SegmentProcessorConfig.Builder().setSchema(_pinotSchema).build();
- fail("Should fail for missing tableConfig");
- } catch (IllegalStateException e) {
- // expected
- }
-
- try {
- new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).build();
- fail("Should fail for missing schema");
- } catch (IllegalStateException e) {
- // expected
- }
-
- config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_pinotSchema).build();
-
- File outputSegmentDir = new File(_baseDir, "output_directory_bad_input_folders");
- FileUtils.deleteQuietly(outputSegmentDir);
- assertTrue(outputSegmentDir.mkdirs());
-
- // non-existent input dir
- File nonExistent = new File(_baseDir, "non_existent");
- try {
- new SegmentProcessorFramework(nonExistent, config, outputSegmentDir);
- fail("Should fail for non existent input dir");
- } catch (IllegalStateException e) {
- // expected
- }
-
- // file used as input dir
- File fileInput = new File(_baseDir, "file.txt");
- assertTrue(fileInput.createNewFile());
- try {
- new SegmentProcessorFramework(fileInput, config, outputSegmentDir);
- fail("Should fail for file used as input dir");
- } catch (IllegalStateException e) {
- // expected
- }
-
- // non existent output dir
- try {
- new SegmentProcessorFramework(_singleDaySingleSegment, config, nonExistent);
- fail("Should fail for non existent output dir");
- } catch (IllegalStateException e) {
- // expected
- }
-
- // file used as output dir
- try {
- new SegmentProcessorFramework(_singleDaySingleSegment, config, fileInput);
- fail("Should fail for file used as output dir");
- } catch (IllegalStateException e) {
- // expected
- }
-
- // output dir not empty
- try {
- new SegmentProcessorFramework(fileInput, config, _singleDaySingleSegment);
- fail("Should fail for output dir not empty");
- } catch (IllegalStateException e) {
- // expected
- }
-
- // empty input dir
- SegmentProcessorFramework framework = new SegmentProcessorFramework(_emptyInputDir, config, outputSegmentDir);
- try {
- framework.processSegments();
- fail("Should fail for empty input");
- } catch (Exception e) {
- framework.cleanup();
- }
- }
-
- @Test
- public void testSingleDaySingleSegment()
- throws Exception {
-
- SegmentProcessorConfig config =
- new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_pinotSchema).build();
- // Single day, Single segment
- File outputSegmentDir = new File(_baseDir, "output_directory_single_day_single_segment");
-
- // default configs
- FileUtils.deleteQuietly(outputSegmentDir);
- assertTrue(outputSegmentDir.mkdirs());
- SegmentProcessorFramework framework =
- new SegmentProcessorFramework(_singleDaySingleSegment, config, outputSegmentDir);
- framework.processSegments();
- framework.cleanup();
- assertEquals(outputSegmentDir.listFiles().length, 1);
- SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(outputSegmentDir.listFiles()[0]);
- assertEquals(segmentMetadata.getTotalDocs(), 10);
-
- // partitioning
- config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_pinotSchema)
- .setPartitionerConfigs(Lists.newArrayList(
- new PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.ROUND_ROBIN)
- .setNumPartitions(3).build())).build();
- FileUtils.deleteQuietly(outputSegmentDir);
- assertTrue(outputSegmentDir.mkdirs());
- framework = new SegmentProcessorFramework(_singleDaySingleSegment, config, outputSegmentDir);
- framework.processSegments();
- framework.cleanup();
- assertEquals(outputSegmentDir.listFiles().length, 3);
- int totalDocs = 0;
- for (File segment : outputSegmentDir.listFiles()) {
- segmentMetadata = new SegmentMetadataImpl(segment);
- totalDocs += segmentMetadata.getTotalDocs();
- assertTrue(segmentMetadata.getTotalDocs() == 3 || segmentMetadata.getTotalDocs() == 4);
- }
- assertEquals(totalDocs, 10);
-
- // record filtering
- config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_pinotSchema)
- .setRecordFilterConfig(
- new RecordFilterConfig.Builder().setRecordFilterType(RecordFilterFactory.RecordFilterType.FILTER_FUNCTION)
- .setFilterFunction("Groovy({campaign == \"abc\"}, campaign)").build()).build();
- FileUtils.deleteQuietly(outputSegmentDir);
- assertTrue(outputSegmentDir.mkdirs());
- framework = new SegmentProcessorFramework(_singleDaySingleSegment, config, outputSegmentDir);
- framework.processSegments();
- framework.cleanup();
- assertEquals(outputSegmentDir.listFiles().length, 1);
- segmentMetadata = new SegmentMetadataImpl(outputSegmentDir.listFiles()[0]);
- assertEquals(segmentMetadata.getTotalDocs(), 5);
-
- // filtered everything
- config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_pinotSchema)
- .setRecordFilterConfig(
- new RecordFilterConfig.Builder().setRecordFilterType(RecordFilterFactory.RecordFilterType.FILTER_FUNCTION)
- .setFilterFunction("Groovy({clicks > 0}, clicks)").build()).build();
- FileUtils.deleteQuietly(outputSegmentDir);
- assertTrue(outputSegmentDir.mkdirs());
- framework = new SegmentProcessorFramework(_singleDaySingleSegment, config, outputSegmentDir);
- try {
- framework.processSegments();
- fail("Should fail for empty map output");
- } catch (IllegalStateException e) {
- framework.cleanup();
- }
-
- // record transformation
- Map<String, String> recordTransformationMap = new HashMap<>();
- recordTransformationMap.put("clicks", "times(clicks, 0)");
- config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_pinotSchema)
- .setRecordTransformerConfig(
- new RecordTransformerConfig.Builder().setTransformFunctionsMap(recordTransformationMap).build()).build();
- FileUtils.deleteQuietly(outputSegmentDir);
- assertTrue(outputSegmentDir.mkdirs());
- framework = new SegmentProcessorFramework(_singleDaySingleSegment, config, outputSegmentDir);
- framework.processSegments();
- framework.cleanup();
- assertEquals(outputSegmentDir.listFiles().length, 1);
- segmentMetadata = new SegmentMetadataImpl(outputSegmentDir.listFiles()[0]);
- assertEquals(segmentMetadata.getTotalDocs(), 10);
- assertEquals(segmentMetadata.getColumnMetadataFor("clicks").getCardinality(), 1);
-
- // collection
- config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_pinotSchema)
- .setCollectorConfig(
- new CollectorConfig.Builder().setCollectorType(CollectorFactory.CollectorType.ROLLUP).build()).build();
- FileUtils.deleteQuietly(outputSegmentDir);
- assertTrue(outputSegmentDir.mkdirs());
- framework = new SegmentProcessorFramework(_singleDaySingleSegment, config, outputSegmentDir);
- framework.processSegments();
- framework.cleanup();
- assertEquals(outputSegmentDir.listFiles().length, 1);
- segmentMetadata = new SegmentMetadataImpl(outputSegmentDir.listFiles()[0]);
- assertEquals(segmentMetadata.getTotalDocs(), 3);
- assertEquals(segmentMetadata.getColumnMetadataFor("campaign").getCardinality(), 3);
-
- // segment config
- config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_pinotSchema)
- .setSegmentConfig(new SegmentConfig.Builder().setMaxNumRecordsPerSegment(4).build()).build();
- FileUtils.deleteQuietly(outputSegmentDir);
- assertTrue(outputSegmentDir.mkdirs());
- framework = new SegmentProcessorFramework(_singleDaySingleSegment, config, outputSegmentDir);
- framework.processSegments();
- framework.cleanup();
- assertEquals(outputSegmentDir.listFiles().length, 3);
- totalDocs = 0;
- for (File segment : outputSegmentDir.listFiles()) {
- segmentMetadata = new SegmentMetadataImpl(segment);
- totalDocs += segmentMetadata.getTotalDocs();
- assertTrue(segmentMetadata.getTotalDocs() == 4 || segmentMetadata.getTotalDocs() == 2);
- }
- assertEquals(totalDocs, 10);
- }
-
- @Test
- public void testMultipleDaysSingleSegment()
- throws Exception {
- SegmentProcessorConfig config =
- new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_pinotSchema).build();
-
- // Multiple day, Single segment
- File outputSegmentDir = new File(_baseDir, "output_directory_multiple_days_single_segment");
-
- // default configs
- FileUtils.deleteQuietly(outputSegmentDir);
- assertTrue(outputSegmentDir.mkdirs());
- SegmentProcessorFramework framework =
- new SegmentProcessorFramework(_multipleDaysSingleSegment, config, outputSegmentDir);
- framework.processSegments();
- framework.cleanup();
- assertEquals(outputSegmentDir.listFiles().length, 1);
- SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(outputSegmentDir.listFiles()[0]);
- assertEquals(segmentMetadata.getTotalDocs(), 10);
-
- // date partition
- config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_pinotSchema)
- .setPartitionerConfigs(Lists.newArrayList(
- new PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.COLUMN_VALUE)
- .setColumnName("timeValue").build())).build();
- FileUtils.deleteQuietly(outputSegmentDir);
- assertTrue(outputSegmentDir.mkdirs());
- framework = new SegmentProcessorFramework(_multipleDaysSingleSegment, config, outputSegmentDir);
- framework.processSegments();
- framework.cleanup();
- assertEquals(outputSegmentDir.listFiles().length, 10);
- for (File segment : outputSegmentDir.listFiles()) {
- segmentMetadata = new SegmentMetadataImpl(segment);
- assertEquals(segmentMetadata.getTotalDocs(), 1);
- }
- }
-
- @Test
- public void testSingleDayMultipleSegments()
- throws Exception {
- SegmentProcessorConfig config =
- new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_pinotSchema).build();
-
- // Single day, multiple segments
- File outputSegmentDir = new File(_baseDir, "output_directory_single_day_multiple_segments");
-
- // default configs
- FileUtils.deleteQuietly(outputSegmentDir);
- assertTrue(outputSegmentDir.mkdirs());
- SegmentProcessorFramework framework =
- new SegmentProcessorFramework(_singleDayMultipleSegments, config, outputSegmentDir);
- framework.processSegments();
- framework.cleanup();
- assertEquals(outputSegmentDir.listFiles().length, 1);
- SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(outputSegmentDir.listFiles()[0]);
- assertEquals(segmentMetadata.getTotalDocs(), 10);
-
- // collection
- config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_pinotSchema)
- .setCollectorConfig(
- new CollectorConfig.Builder().setCollectorType(CollectorFactory.CollectorType.ROLLUP).build()).build();
- FileUtils.deleteQuietly(outputSegmentDir);
- assertTrue(outputSegmentDir.mkdirs());
- framework = new SegmentProcessorFramework(_singleDayMultipleSegments, config, outputSegmentDir);
- framework.processSegments();
- framework.cleanup();
- assertEquals(outputSegmentDir.listFiles().length, 1);
- segmentMetadata = new SegmentMetadataImpl(outputSegmentDir.listFiles()[0]);
- assertEquals(segmentMetadata.getTotalDocs(), 3);
- assertEquals(segmentMetadata.getColumnMetadataFor("campaign").getCardinality(), 3);
- }
-
- @Test
- public void testMultipleDaysMultipleSegments()
- throws Exception {
- SegmentProcessorConfig config =
- new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_pinotSchema).build();
-
- // Multiple day, multiple segments
- File outputSegmentDir = new File(_baseDir, "output_directory_multiple_days_multiple_segments");
-
- // default configs
- FileUtils.deleteQuietly(outputSegmentDir);
- assertTrue(outputSegmentDir.mkdirs());
- SegmentProcessorFramework framework =
- new SegmentProcessorFramework(_multipleDaysMultipleSegments, config, outputSegmentDir);
- framework.processSegments();
- framework.cleanup();
- assertEquals(outputSegmentDir.listFiles().length, 1);
- SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(outputSegmentDir.listFiles()[0]);
- assertEquals(segmentMetadata.getTotalDocs(), 10);
-
- // date partition
- config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_pinotSchema)
- .setPartitionerConfigs(Lists.newArrayList(
- new PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.TRANSFORM_FUNCTION)
- .setTransformFunction("round(timeValue, 86400000)").build())).build();
- FileUtils.deleteQuietly(outputSegmentDir);
- assertTrue(outputSegmentDir.mkdirs());
- framework = new SegmentProcessorFramework(_multipleDaysSingleSegment, config, outputSegmentDir);
- framework.processSegments();
- framework.cleanup();
- assertEquals(outputSegmentDir.listFiles().length, 3);
- int totalDocs = 0;
- for (File segment : outputSegmentDir.listFiles()) {
- segmentMetadata = new SegmentMetadataImpl(segment);
- totalDocs += segmentMetadata.getTotalDocs();
- }
- assertEquals(totalDocs, 10);
-
- // round date, partition, collect
- HashMap<String, String> recordTransformationMap = new HashMap<>();
- recordTransformationMap.put("timeValue", "round(timeValue, 86400000)");
- config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_pinotSchema)
- .setRecordTransformerConfig(
- new RecordTransformerConfig.Builder().setTransformFunctionsMap(recordTransformationMap).build())
- .setPartitionerConfigs(Lists.newArrayList(
- new PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.COLUMN_VALUE)
- .setColumnName("timeValue").build())).setCollectorConfig(
- new CollectorConfig.Builder().setCollectorType(CollectorFactory.CollectorType.ROLLUP).build()).build();
- FileUtils.deleteQuietly(outputSegmentDir);
- assertTrue(outputSegmentDir.mkdirs());
- framework = new SegmentProcessorFramework(_multipleDaysSingleSegment, config, outputSegmentDir);
- framework.processSegments();
- framework.cleanup();
- assertEquals(outputSegmentDir.listFiles().length, 3);
- totalDocs = 0;
- for (File segment : outputSegmentDir.listFiles()) {
- segmentMetadata = new SegmentMetadataImpl(segment);
- totalDocs += segmentMetadata.getTotalDocs();
- assertEquals(segmentMetadata.getColumnMetadataFor("timeValue").getCardinality(), 1);
- }
- assertTrue(totalDocs < 10);
- }
-
- @Test
- public void testMultiValue()
- throws Exception {
- // Multi-value
- File outputSegmentDir = new File(_baseDir, "output_directory_multivalue");
-
- // collection
- SegmentProcessorConfig config =
- new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_pinotSchemaMV).setCollectorConfig(
- new CollectorConfig.Builder().setCollectorType(CollectorFactory.CollectorType.ROLLUP).build()).build();
- FileUtils.deleteQuietly(outputSegmentDir);
- assertTrue(outputSegmentDir.mkdirs());
- SegmentProcessorFramework framework = new SegmentProcessorFramework(_multiValueSegments, config, outputSegmentDir);
- framework.processSegments();
- framework.cleanup();
- assertEquals(outputSegmentDir.listFiles().length, 1);
- SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(outputSegmentDir.listFiles()[0]);
- assertEquals(segmentMetadata.getTotalDocs(), 2);
- }
-
- @Test
- public void testTarredSegments()
- throws Exception {
- SegmentProcessorConfig config =
- new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_pinotSchema).build();
- File outputSegmentDir = new File(_baseDir, "output_directory_tarred_seg");
-
- // default configs
- FileUtils.deleteQuietly(outputSegmentDir);
- assertTrue(outputSegmentDir.mkdirs());
- SegmentProcessorFramework framework = new SegmentProcessorFramework(_tarredSegments, config, outputSegmentDir);
- framework.processSegments();
- framework.cleanup();
- assertEquals(outputSegmentDir.listFiles().length, 1);
- SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(outputSegmentDir.listFiles()[0]);
- assertEquals(segmentMetadata.getTotalDocs(), 10);
- }
-
- @AfterClass
- public void cleanup() {
- FileUtils.deleteQuietly(_baseDir);
- }
-}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
new file mode 100644
index 0000000..7c13be5
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
@@ -0,0 +1,885 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.framework;
+
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.core.segment.processing.filter.RecordFilterConfig;
+import org.apache.pinot.core.segment.processing.filter.RecordFilterFactory;
+import org.apache.pinot.core.segment.processing.partitioner.PartitionerConfig;
+import org.apache.pinot.core.segment.processing.partitioner.PartitionerFactory;
+import org.apache.pinot.core.segment.processing.transformer.RecordTransformerConfig;
+import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.index.metadata.ColumnMetadata;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.segment.spi.index.reader.NullValueVectorReader;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+/**
+ * End-to-end tests for SegmentProcessorFramework
+ */
+public class SegmentProcessorFrameworkTest {
+
+ private File _baseDir;
+ private File _emptyInputDir;
+ private File _singleDaySingleSegment;
+ private File _multipleDaysSingleSegment;
+ private File _singleDayMultipleSegments;
+ private File _multipleDaysMultipleSegments;
+ private File _multiValueSegments;
+ private File _tarredSegments;
+
+ private TableConfig _tableConfig;
+ private TableConfig _tableConfigNullValueEnabled;
+ private Schema _schema;
+ private Schema _schemaMV;
+
+ private final List<Object[]> _rawDataSingleDay = Lists
+ .newArrayList(new Object[]{"abc", 1000, 1597795200000L}, new Object[]{null, 2000, 1597795200000L},
+ new Object[]{"abc", null, 1597795200000L}, new Object[]{"abc", 4000, 1597795200000L},
+ new Object[]{"abc", 3000, 1597795200000L}, new Object[]{null, null, 1597795200000L},
+ new Object[]{"xyz", 4000, 1597795200000L}, new Object[]{null, 1000, 1597795200000L},
+ new Object[]{"abc", 7000, 1597795200000L}, new Object[]{"xyz", 6000, 1597795200000L});
+
+ private final List<Object[]> _rawDataMultipleDays = Lists
+ .newArrayList(new Object[]{"abc", 1000, 1597719600000L}, new Object[]{null, 2000, 1597773600000L},
+ new Object[]{"abc", null, 1597777200000L}, new Object[]{"abc", 4000, 1597795200000L},
+ new Object[]{"abc", 3000, 1597802400000L}, new Object[]{null, null, 1597838400000L},
+ new Object[]{"xyz", 4000, 1597856400000L}, new Object[]{null, 1000, 1597878000000L},
+ new Object[]{"abc", 7000, 1597881600000L}, new Object[]{"xyz", 6000, 1597892400000L});
+
+ private final List<Object[]> _rawDataMultiValue = Lists
+ .newArrayList(new Object[]{new String[]{"a", "b"}, 1000, 1597795200000L},
+ new Object[]{null, null, 1597795200000L}, new Object[]{null, 1000, 1597795200000L},
+ new Object[]{new String[]{"a", "b"}, null, 1597795200000L});
+
+ @BeforeClass
+ public void setup()
+ throws Exception {
+ _tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setTimeColumnName("time").build();
+ _tableConfigNullValueEnabled =
+ new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setTimeColumnName("time")
+ .setNullHandlingEnabled(true).build();
+ _schema =
+ new Schema.SchemaBuilder().setSchemaName("mySchema").addSingleValueDimension("campaign", DataType.STRING, "")
+ // NOTE: Intentionally put 1000 as default value to test skipping null values during rollup
+ .addMetric("clicks", DataType.INT, 1000)
+ .addDateTime("time", DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
+ _schemaMV =
+ new Schema.SchemaBuilder().setSchemaName("mySchema").addMultiValueDimension("campaign", DataType.STRING, "")
+ // NOTE: Intentionally put 1000 as default value to test skipping null values during rollup
+ .addMetric("clicks", DataType.INT, 1000)
+ .addDateTime("time", DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
+
+ _baseDir = new File(FileUtils.getTempDirectory(), "segment_processor_framework_test_" + System.currentTimeMillis());
+ FileUtils.deleteQuietly(_baseDir);
+ assertTrue(_baseDir.mkdirs());
+
+ // create segments in many folders
+ _emptyInputDir = new File(_baseDir, "empty_input");
+ assertTrue(_emptyInputDir.mkdirs());
+ // 1. Single segment, single day
+ _singleDaySingleSegment = new File(_baseDir, "input_segments_single_day_single_segment");
+ createInputSegments(_singleDaySingleSegment, _rawDataSingleDay, 1, _schema);
+ // 2. Single segment, multiple days
+ _multipleDaysSingleSegment = new File(_baseDir, "input_segments_multiple_day_single_segment");
+ createInputSegments(_multipleDaysSingleSegment, _rawDataMultipleDays, 1, _schema);
+ // 3. Multiple segments, single day
+ _singleDayMultipleSegments = new File(_baseDir, "input_segments_single_day_multiple_segment");
+ createInputSegments(_singleDayMultipleSegments, _rawDataSingleDay, 3, _schema);
+ // 4. Multiple segments, multiple days
+ _multipleDaysMultipleSegments = new File(_baseDir, "input_segments_multiple_day_multiple_segment");
+ createInputSegments(_multipleDaysMultipleSegments, _rawDataMultipleDays, 3, _schema);
+ // 5. Multi value
+ _multiValueSegments = new File(_baseDir, "multi_value_segment");
+ createInputSegments(_multiValueSegments, _rawDataMultiValue, 1, _schemaMV);
+ // 6. tarred segments
+ _tarredSegments = new File(_baseDir, "tarred_segment");
+ createInputSegments(_tarredSegments, _rawDataSingleDay, 3, _schema);
+ File[] segmentDirs = _tarredSegments.listFiles();
+ assertNotNull(segmentDirs);
+ for (File segmentDir : segmentDirs) {
+ TarGzCompressionUtils.createTarGzFile(segmentDir,
+ new File(_tarredSegments, segmentDir.getName() + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION));
+ FileUtils.deleteQuietly(segmentDir);
+ }
+ }
+
+ private void createInputSegments(File inputDir, List<Object[]> rawData, int numSegments, Schema schema)
+ throws Exception {
+ assertTrue(inputDir.mkdirs());
+
+ List<List<GenericRow>> dataLists = new ArrayList<>();
+ if (numSegments > 1) {
+ List<GenericRow> dataList1 = new ArrayList<>(4);
+ IntStream.range(0, 4).forEach(i -> dataList1.add(getGenericRow(rawData.get(i))));
+ dataLists.add(dataList1);
+ List<GenericRow> dataList2 = new ArrayList<>(4);
+ IntStream.range(4, 7).forEach(i -> dataList2.add(getGenericRow(rawData.get(i))));
+ dataLists.add(dataList2);
+ List<GenericRow> dataList3 = new ArrayList<>(4);
+ IntStream.range(7, 10).forEach(i -> dataList3.add(getGenericRow(rawData.get(i))));
+ dataLists.add(dataList3);
+ } else {
+ List<GenericRow> dataList = new ArrayList<>();
+ rawData.forEach(r -> dataList.add(getGenericRow(r)));
+ dataLists.add(dataList);
+ }
+
+ int idx = 0;
+ for (List<GenericRow> inputRows : dataLists) {
+ RecordReader recordReader = new GenericRowRecordReader(inputRows);
+ // NOTE: Generate segments with null value enabled
+ SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(_tableConfigNullValueEnabled, schema);
+ segmentGeneratorConfig.setOutDir(inputDir.getPath());
+ segmentGeneratorConfig.setSequenceId(idx++);
+ SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
+ driver.init(segmentGeneratorConfig, recordReader);
+ driver.build();
+ }
+
+ File[] files = inputDir.listFiles();
+ assertNotNull(files);
+ assertEquals(files.length, numSegments);
+ }
+
+ private GenericRow getGenericRow(Object[] rawRow) {
+ GenericRow row = new GenericRow();
+ row.putValue("campaign", rawRow[0]);
+ row.putValue("clicks", rawRow[1]);
+ row.putValue("time", rawRow[2]);
+ return row;
+ }
+
+ @Test
+ public void testBadInputFolders()
+ throws Exception {
+ SegmentProcessorConfig config;
+
+ try {
+ new SegmentProcessorConfig.Builder().setSchema(_schema).build();
+ fail("Should fail for missing tableConfig");
+ } catch (IllegalStateException e) {
+ // expected
+ }
+
+ try {
+ new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).build();
+ fail("Should fail for missing schema");
+ } catch (IllegalStateException e) {
+ // expected
+ }
+
+ config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).build();
+
+ File outputSegmentDir = new File(_baseDir, "output_directory_bad_input_folders");
+ FileUtils.deleteQuietly(outputSegmentDir);
+ assertTrue(outputSegmentDir.mkdirs());
+
+ // non-existent input dir
+ File nonExistent = new File(_baseDir, "non_existent");
+ try {
+ new SegmentProcessorFramework(nonExistent, config, outputSegmentDir);
+ fail("Should fail for non existent input dir");
+ } catch (IllegalStateException e) {
+ // expected
+ }
+
+ // file used as input dir
+ File fileInput = new File(_baseDir, "file.txt");
+ assertTrue(fileInput.createNewFile());
+ try {
+ new SegmentProcessorFramework(fileInput, config, outputSegmentDir);
+ fail("Should fail for file used as input dir");
+ } catch (IllegalStateException e) {
+ // expected
+ }
+
+ // non existent output dir
+ try {
+ new SegmentProcessorFramework(_singleDaySingleSegment, config, nonExistent);
+ fail("Should fail for non existent output dir");
+ } catch (IllegalStateException e) {
+ // expected
+ }
+
+ // file used as output dir
+ try {
+ new SegmentProcessorFramework(_singleDaySingleSegment, config, fileInput);
+ fail("Should fail for file used as output dir");
+ } catch (IllegalStateException e) {
+ // expected
+ }
+
+ // output dir not empty
+ try {
+ new SegmentProcessorFramework(fileInput, config, _singleDaySingleSegment);
+ fail("Should fail for output dir not empty");
+ } catch (IllegalStateException e) {
+ // expected
+ }
+
+ // empty input dir
+ SegmentProcessorFramework framework = new SegmentProcessorFramework(_emptyInputDir, config, outputSegmentDir);
+ try {
+ framework.processSegments();
+ fail("Should fail for empty input");
+ } catch (Exception e) {
+ framework.cleanup();
+ }
+ }
+
+ @Test
+ public void testSingleDaySingleSegment()
+ throws Exception {
+ File outputSegmentDir = new File(_baseDir, "output_directory_single_day_single_segment");
+ FileUtils.forceMkdir(outputSegmentDir);
+
+ // default configs
+ SegmentProcessorConfig config =
+ new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).build();
+ SegmentProcessorFramework framework =
+ new SegmentProcessorFramework(_singleDaySingleSegment, config, outputSegmentDir);
+ framework.processSegments();
+ framework.cleanup();
+ File[] files = outputSegmentDir.listFiles();
+ assertNotNull(files);
+ assertEquals(files.length, 1);
+ ImmutableSegment segment = ImmutableSegmentLoader.load(files[0], ReadMode.mmap);
+ SegmentMetadataImpl segmentMetadata = (SegmentMetadataImpl) segment.getSegmentMetadata();
+ assertEquals(segmentMetadata.getTotalDocs(), 10);
+ ColumnMetadata campaignMetadata = segmentMetadata.getColumnMetadataFor("campaign");
+ assertEquals(campaignMetadata.getCardinality(), 3);
+ assertEquals(campaignMetadata.getMinValue(), "");
+ assertEquals(campaignMetadata.getMaxValue(), "xyz");
+ ColumnMetadata clicksMetadata = segmentMetadata.getColumnMetadataFor("clicks");
+ assertEquals(clicksMetadata.getCardinality(), 6);
+ assertEquals(clicksMetadata.getMinValue(), 1000);
+ assertEquals(clicksMetadata.getMaxValue(), 7000);
+ ColumnMetadata timeMetadata = segmentMetadata.getColumnMetadataFor("time");
+ assertEquals(timeMetadata.getCardinality(), 1);
+ assertEquals(timeMetadata.getMinValue(), 1597795200000L);
+ assertEquals(timeMetadata.getMaxValue(), 1597795200000L);
+ DataSource campaignDataSource = segment.getDataSource("campaign");
+ assertNull(campaignDataSource.getNullValueVector());
+ DataSource clicksDataSource = segment.getDataSource("clicks");
+ assertNull(clicksDataSource.getNullValueVector());
+ DataSource timeDataSource = segment.getDataSource("time");
+ assertNull(timeDataSource.getNullValueVector());
+ segment.destroy();
+ assertEquals(files[0].getName(), "myTable_1597795200000_1597795200000_0");
+ FileUtils.cleanDirectory(outputSegmentDir);
+
+ // default configs - null value enabled
+ config =
+ new SegmentProcessorConfig.Builder().setTableConfig(_tableConfigNullValueEnabled).setSchema(_schema).build();
+ framework = new SegmentProcessorFramework(_singleDaySingleSegment, config, outputSegmentDir);
+ framework.processSegments();
+ framework.cleanup();
+ files = outputSegmentDir.listFiles();
+ assertNotNull(files);
+ assertEquals(files.length, 1);
+ segment = ImmutableSegmentLoader.load(files[0], ReadMode.mmap);
+ segmentMetadata = (SegmentMetadataImpl) segment.getSegmentMetadata();
+ assertEquals(segmentMetadata.getTotalDocs(), 10);
+ campaignMetadata = segmentMetadata.getColumnMetadataFor("campaign");
+ assertEquals(campaignMetadata.getCardinality(), 3);
+ assertEquals(campaignMetadata.getMinValue(), "");
+ assertEquals(campaignMetadata.getMaxValue(), "xyz");
+ clicksMetadata = segmentMetadata.getColumnMetadataFor("clicks");
+ assertEquals(clicksMetadata.getCardinality(), 6);
+ assertEquals(clicksMetadata.getMinValue(), 1000);
+ assertEquals(clicksMetadata.getMaxValue(), 7000);
+ timeMetadata = segmentMetadata.getColumnMetadataFor("time");
+ assertEquals(timeMetadata.getCardinality(), 1);
+ assertEquals(timeMetadata.getMinValue(), 1597795200000L);
+ assertEquals(timeMetadata.getMaxValue(), 1597795200000L);
+ campaignDataSource = segment.getDataSource("campaign");
+ NullValueVectorReader campaignNullValueVector = campaignDataSource.getNullValueVector();
+ assertNotNull(campaignNullValueVector);
+ assertEquals(campaignNullValueVector.getNullBitmap().toArray(), new int[]{1, 5, 7});
+ clicksDataSource = segment.getDataSource("clicks");
+ NullValueVectorReader clicksNullValueVector = clicksDataSource.getNullValueVector();
+ assertNotNull(clicksNullValueVector);
+ assertEquals(clicksNullValueVector.getNullBitmap().toArray(), new int[]{2, 5});
+ timeDataSource = segment.getDataSource("time");
+ NullValueVectorReader timeNullValueVector = timeDataSource.getNullValueVector();
+ assertNotNull(timeNullValueVector);
+ assertTrue(timeNullValueVector.getNullBitmap().isEmpty());
+ segment.destroy();
+ FileUtils.cleanDirectory(outputSegmentDir);
+
+ // partition - null value enabled
+ config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfigNullValueEnabled).setSchema(_schema)
+ .setPartitionerConfigs(Lists.newArrayList(
+ new PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.ROUND_ROBIN)
+ .setNumPartitions(2).build())).build();
+ framework = new SegmentProcessorFramework(_singleDaySingleSegment, config, outputSegmentDir);
+ framework.processSegments();
+ framework.cleanup();
+ files = outputSegmentDir.listFiles();
+ assertNotNull(files);
+ assertEquals(files.length, 2);
+ Arrays.sort(files);
+ // segment 0
+ segment = ImmutableSegmentLoader.load(files[0], ReadMode.mmap);
+ assertEquals(segment.getSegmentMetadata().getTotalDocs(), 5);
+ campaignDataSource = segment.getDataSource("campaign");
+ campaignNullValueVector = campaignDataSource.getNullValueVector();
+ assertNotNull(campaignNullValueVector);
+ assertTrue(campaignNullValueVector.getNullBitmap().isEmpty());
+ clicksDataSource = segment.getDataSource("clicks");
+ clicksNullValueVector = clicksDataSource.getNullValueVector();
+ assertNotNull(clicksNullValueVector);
+ assertEquals(clicksNullValueVector.getNullBitmap().toArray(), new int[]{1});
+ segment.destroy();
+ // segment 1
+ segment = ImmutableSegmentLoader.load(files[1], ReadMode.mmap);
+ assertEquals(segment.getSegmentMetadata().getTotalDocs(), 5);
+ campaignDataSource = segment.getDataSource("campaign");
+ campaignNullValueVector = campaignDataSource.getNullValueVector();
+ assertNotNull(campaignNullValueVector);
+ assertEquals(campaignNullValueVector.getNullBitmap().toArray(), new int[]{0, 2, 3});
+ clicksDataSource = segment.getDataSource("clicks");
+ clicksNullValueVector = clicksDataSource.getNullValueVector();
+ assertNotNull(clicksNullValueVector);
+ assertEquals(clicksNullValueVector.getNullBitmap().toArray(), new int[]{2});
+ segment.destroy();
+ FileUtils.cleanDirectory(outputSegmentDir);
+
+ // record filtering
+ config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).setRecordFilterConfig(
+ new RecordFilterConfig.Builder().setRecordFilterType(RecordFilterFactory.RecordFilterType.FILTER_FUNCTION)
+ .setFilterFunction("Groovy({campaign == \"abc\"}, campaign)").build()).build();
+ framework = new SegmentProcessorFramework(_singleDaySingleSegment, config, outputSegmentDir);
+ framework.processSegments();
+ framework.cleanup();
+ files = outputSegmentDir.listFiles();
+ assertNotNull(files);
+ assertEquals(files.length, 1);
+ assertEquals(new SegmentMetadataImpl(files[0]).getTotalDocs(), 5);
+ FileUtils.cleanDirectory(outputSegmentDir);
+
+ // filtered everything
+ config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).setRecordFilterConfig(
+ new RecordFilterConfig.Builder().setRecordFilterType(RecordFilterFactory.RecordFilterType.FILTER_FUNCTION)
+ .setFilterFunction("Groovy({clicks > 0}, clicks)").build()).build();
+ framework = new SegmentProcessorFramework(_singleDaySingleSegment, config, outputSegmentDir);
+ framework.processSegments();
+ framework.cleanup();
+ files = outputSegmentDir.listFiles();
+ assertNotNull(files);
+ assertEquals(files.length, 0);
+ FileUtils.cleanDirectory(outputSegmentDir);
+
+ // record transformation - null value enabled
+ Map<String, String> recordTransformationMap = new HashMap<>();
+ recordTransformationMap.put("clicks", "times(clicks, 0)");
+ config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfigNullValueEnabled).setSchema(_schema)
+ .setRecordTransformerConfig(
+ new RecordTransformerConfig.Builder().setTransformFunctionsMap(recordTransformationMap).build()).build();
+ framework = new SegmentProcessorFramework(_singleDaySingleSegment, config, outputSegmentDir);
+ framework.processSegments();
+ framework.cleanup();
+ files = outputSegmentDir.listFiles();
+ assertNotNull(files);
+ assertEquals(files.length, 1);
+ segment = ImmutableSegmentLoader.load(files[0], ReadMode.mmap);
+ segmentMetadata = (SegmentMetadataImpl) segment.getSegmentMetadata();
+ assertEquals(segmentMetadata.getTotalDocs(), 10);
+ clicksMetadata = segmentMetadata.getColumnMetadataFor("clicks");
+ assertEquals(clicksMetadata.getCardinality(), 1);
+ assertEquals(clicksMetadata.getMinValue(), 0);
+ assertEquals(clicksMetadata.getMaxValue(), 0);
+ clicksDataSource = segment.getDataSource("clicks");
+ clicksNullValueVector = clicksDataSource.getNullValueVector();
+ assertNotNull(clicksNullValueVector);
+ assertEquals(clicksNullValueVector.getNullBitmap().toArray(), new int[]{2, 5});
+ segment.destroy();
+ FileUtils.cleanDirectory(outputSegmentDir);
+
+ // rollup
+ config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema)
+ .setMergeType(MergeType.ROLLUP).build();
+ framework = new SegmentProcessorFramework(_singleDaySingleSegment, config, outputSegmentDir);
+ framework.processSegments();
+ framework.cleanup();
+ files = outputSegmentDir.listFiles();
+ assertNotNull(files);
+ assertEquals(files.length, 1);
+ segment = ImmutableSegmentLoader.load(files[0], ReadMode.mmap);
+ segmentMetadata = (SegmentMetadataImpl) segment.getSegmentMetadata();
+ assertEquals(segmentMetadata.getTotalDocs(), 3);
+ campaignMetadata = segmentMetadata.getColumnMetadataFor("campaign");
+ assertEquals(campaignMetadata.getCardinality(), 3);
+ assertEquals(campaignMetadata.getMinValue(), "");
+ assertEquals(campaignMetadata.getMaxValue(), "xyz");
+ clicksMetadata = segmentMetadata.getColumnMetadataFor("clicks");
+ assertEquals(clicksMetadata.getCardinality(), 3);
+ assertEquals(clicksMetadata.getMinValue(), 4000);
+ assertEquals(clicksMetadata.getMaxValue(), 16000);
+ timeMetadata = segmentMetadata.getColumnMetadataFor("time");
+ assertEquals(timeMetadata.getCardinality(), 1);
+ assertEquals(timeMetadata.getMinValue(), 1597795200000L);
+ assertEquals(timeMetadata.getMaxValue(), 1597795200000L);
+ segment.destroy();
+ FileUtils.cleanDirectory(outputSegmentDir);
+
+ // rollup - null value enabled
+ config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfigNullValueEnabled).setSchema(_schema)
+ .setMergeType(MergeType.ROLLUP).build();
+ framework = new SegmentProcessorFramework(_singleDaySingleSegment, config, outputSegmentDir);
+ framework.processSegments();
+ framework.cleanup();
+ files = outputSegmentDir.listFiles();
+ assertNotNull(files);
+ assertEquals(files.length, 1);
+ segment = ImmutableSegmentLoader.load(files[0], ReadMode.mmap);
+ segmentMetadata = (SegmentMetadataImpl) segment.getSegmentMetadata();
+ assertEquals(segmentMetadata.getTotalDocs(), 3);
+ campaignMetadata = segmentMetadata.getColumnMetadataFor("campaign");
+ assertEquals(campaignMetadata.getCardinality(), 3);
+ assertEquals(campaignMetadata.getMinValue(), "");
+ assertEquals(campaignMetadata.getMaxValue(), "xyz");
+ clicksMetadata = segmentMetadata.getColumnMetadataFor("clicks");
+ assertEquals(clicksMetadata.getCardinality(), 3);
+ assertEquals(clicksMetadata.getMinValue(), 3000);
+ assertEquals(clicksMetadata.getMaxValue(), 15000);
+ timeMetadata = segmentMetadata.getColumnMetadataFor("time");
+ assertEquals(timeMetadata.getCardinality(), 1);
+ assertEquals(timeMetadata.getMinValue(), 1597795200000L);
+ assertEquals(timeMetadata.getMaxValue(), 1597795200000L);
+ campaignDataSource = segment.getDataSource("campaign");
+ campaignNullValueVector = campaignDataSource.getNullValueVector();
+ assertNotNull(campaignNullValueVector);
+ assertEquals(campaignNullValueVector.getNullBitmap().toArray(), new int[]{0});
+ clicksDataSource = segment.getDataSource("clicks");
+ clicksNullValueVector = clicksDataSource.getNullValueVector();
+ assertNotNull(clicksNullValueVector);
+ assertTrue(clicksNullValueVector.getNullBitmap().isEmpty());
+ timeDataSource = segment.getDataSource("time");
+ timeNullValueVector = timeDataSource.getNullValueVector();
+ assertNotNull(timeNullValueVector);
+ assertTrue(timeNullValueVector.getNullBitmap().isEmpty());
+ segment.destroy();
+ FileUtils.cleanDirectory(outputSegmentDir);
+
+ // dedup
+ config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema)
+ .setMergeType(MergeType.DEDUP).build();
+ framework = new SegmentProcessorFramework(_singleDaySingleSegment, config, outputSegmentDir);
+ framework.processSegments();
+ framework.cleanup();
+ files = outputSegmentDir.listFiles();
+ assertNotNull(files);
+ assertEquals(files.length, 1);
+ assertEquals(new SegmentMetadataImpl(files[0]).getTotalDocs(), 8);
+ FileUtils.cleanDirectory(outputSegmentDir);
+
+ // segment config
+ config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).setSegmentConfig(
+ new SegmentConfig.Builder().setMaxNumRecordsPerSegment(4).setSegmentNamePrefix("myPrefix").build()).build();
+ framework = new SegmentProcessorFramework(_singleDaySingleSegment, config, outputSegmentDir);
+ framework.processSegments();
+ framework.cleanup();
+ files = outputSegmentDir.listFiles();
+ assertNotNull(files);
+ assertEquals(files.length, 3);
+ Arrays.sort(files);
+ assertEquals(new SegmentMetadataImpl(files[0]).getTotalDocs(), 4);
+ assertEquals(new SegmentMetadataImpl(files[1]).getTotalDocs(), 4);
+ assertEquals(new SegmentMetadataImpl(files[2]).getTotalDocs(), 2);
+ assertEquals(files[0].getName(), "myPrefix_1597795200000_1597795200000_0");
+ assertEquals(files[1].getName(), "myPrefix_1597795200000_1597795200000_1");
+ assertEquals(files[2].getName(), "myPrefix_1597795200000_1597795200000_2");
+ FileUtils.cleanDirectory(outputSegmentDir);
+ }
+
+ @Test
+ public void testMultipleDaysSingleSegment()
+ throws Exception {
+ File outputSegmentDir = new File(_baseDir, "output_directory_multiple_days_single_segment");
+ FileUtils.forceMkdir(outputSegmentDir);
+
+ // default configs
+ SegmentProcessorConfig config =
+ new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).build();
+ SegmentProcessorFramework framework =
+ new SegmentProcessorFramework(_multipleDaysSingleSegment, config, outputSegmentDir);
+ framework.processSegments();
+ framework.cleanup();
+ File[] files = outputSegmentDir.listFiles();
+ assertNotNull(files);
+ assertEquals(files.length, 1);
+ assertEquals(new SegmentMetadataImpl(files[0]).getTotalDocs(), 10);
+ FileUtils.cleanDirectory(outputSegmentDir);
+
+ // date partition
+ config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).setPartitionerConfigs(
+ Lists.newArrayList(
+ new PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.COLUMN_VALUE)
+ .setColumnName("time").build())).build();
+ framework = new SegmentProcessorFramework(_multipleDaysSingleSegment, config, outputSegmentDir);
+ framework.processSegments();
+ framework.cleanup();
+ files = outputSegmentDir.listFiles();
+ assertNotNull(files);
+ assertEquals(files.length, 10);
+ for (File file : files) {
+ assertEquals(new SegmentMetadataImpl(file).getTotalDocs(), 1);
+ }
+ FileUtils.cleanDirectory(outputSegmentDir);
+
+ // date partition - round
+ config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).setPartitionerConfigs(
+ Lists.newArrayList(
+ new PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.TRANSFORM_FUNCTION)
+ .setTransformFunction("round(\"time\", 86400000)").build())).build();
+ framework = new SegmentProcessorFramework(_multipleDaysSingleSegment, config, outputSegmentDir);
+ framework.processSegments();
+ framework.cleanup();
+ files = outputSegmentDir.listFiles();
+ assertNotNull(files);
+ assertEquals(files.length, 3);
+ Arrays.sort(files);
+ // segment 0
+ SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(files[0]);
+ assertEquals(segmentMetadata.getTotalDocs(), 3);
+ ColumnMetadata timeMetadata = segmentMetadata.getColumnMetadataFor("time");
+ assertEquals(timeMetadata.getCardinality(), 3);
+ assertEquals(timeMetadata.getMinValue(), 1597719600000L);
+ assertEquals(timeMetadata.getMaxValue(), 1597777200000L);
+ // segment 1
+ segmentMetadata = new SegmentMetadataImpl(files[1]);
+ assertEquals(segmentMetadata.getTotalDocs(), 5);
+ timeMetadata = segmentMetadata.getColumnMetadataFor("time");
+ assertEquals(timeMetadata.getCardinality(), 5);
+ assertEquals(timeMetadata.getMinValue(), 1597795200000L);
+ assertEquals(timeMetadata.getMaxValue(), 1597878000000L);
+ // segment 2
+ segmentMetadata = new SegmentMetadataImpl(files[2]);
+ assertEquals(segmentMetadata.getTotalDocs(), 2);
+ timeMetadata = segmentMetadata.getColumnMetadataFor("time");
+ assertEquals(timeMetadata.getCardinality(), 2);
+ assertEquals(timeMetadata.getMinValue(), 1597881600000L);
+ assertEquals(timeMetadata.getMaxValue(), 1597892400000L);
+ FileUtils.cleanDirectory(outputSegmentDir);
+
+ // round, date partition, rollup - null value enabled
+ config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfigNullValueEnabled).setSchema(_schema)
+ .setMergeType(MergeType.ROLLUP).setRecordTransformerConfig(new RecordTransformerConfig.Builder()
+ .setTransformFunctionsMap(Collections.singletonMap("time", "round(\"time\", 86400000)")).build())
+ .setPartitionerConfigs(Lists.newArrayList(
+ new PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.COLUMN_VALUE)
+ .setColumnName("time").build())).build();
+ framework = new SegmentProcessorFramework(_multipleDaysSingleSegment, config, outputSegmentDir);
+ framework.processSegments();
+ framework.cleanup();
+ files = outputSegmentDir.listFiles();
+ assertNotNull(files);
+ assertEquals(files.length, 3);
+ Arrays.sort(files);
+ // segment 0
+ ImmutableSegment segment = ImmutableSegmentLoader.load(files[0], ReadMode.mmap);
+ segmentMetadata = (SegmentMetadataImpl) segment.getSegmentMetadata();
+ assertEquals(segmentMetadata.getTotalDocs(), 2);
+ ColumnMetadata campaignMetadata = segmentMetadata.getColumnMetadataFor("campaign");
+ assertEquals(campaignMetadata.getCardinality(), 2);
+ assertEquals(campaignMetadata.getMinValue(), "");
+ assertEquals(campaignMetadata.getMaxValue(), "abc");
+ ColumnMetadata clicksMetadata = segmentMetadata.getColumnMetadataFor("clicks");
+ assertEquals(clicksMetadata.getCardinality(), 2);
+ assertEquals(clicksMetadata.getMinValue(), 1000);
+ assertEquals(clicksMetadata.getMaxValue(), 2000);
+ timeMetadata = segmentMetadata.getColumnMetadataFor("time");
+ assertEquals(timeMetadata.getCardinality(), 1);
+ assertEquals(timeMetadata.getMinValue(), 1597708800000L);
+ assertEquals(timeMetadata.getMaxValue(), 1597708800000L);
+ DataSource campaignDataSource = segment.getDataSource("campaign");
+ NullValueVectorReader campaignNullValueVector = campaignDataSource.getNullValueVector();
+ assertNotNull(campaignNullValueVector);
+ assertEquals(campaignNullValueVector.getNullBitmap().toArray(), new int[]{0});
+ DataSource clicksDataSource = segment.getDataSource("clicks");
+ NullValueVectorReader clicksNullValueVector = clicksDataSource.getNullValueVector();
+ assertNotNull(clicksNullValueVector);
+ assertTrue(clicksNullValueVector.getNullBitmap().isEmpty());
+ DataSource timeDataSource = segment.getDataSource("time");
+ NullValueVectorReader timeNullValueVector = timeDataSource.getNullValueVector();
+ assertNotNull(timeNullValueVector);
+ assertTrue(timeNullValueVector.getNullBitmap().isEmpty());
+ segment.destroy();
+ // segment 1
+ segment = ImmutableSegmentLoader.load(files[1], ReadMode.mmap);
+ segmentMetadata = (SegmentMetadataImpl) segment.getSegmentMetadata();
+ assertEquals(segmentMetadata.getTotalDocs(), 3);
+ campaignMetadata = segmentMetadata.getColumnMetadataFor("campaign");
+ assertEquals(campaignMetadata.getCardinality(), 3);
+ assertEquals(campaignMetadata.getMinValue(), "");
+ assertEquals(campaignMetadata.getMaxValue(), "xyz");
+ clicksMetadata = segmentMetadata.getColumnMetadataFor("clicks");
+ assertEquals(clicksMetadata.getCardinality(), 3);
+ assertEquals(clicksMetadata.getMinValue(), 1000);
+ assertEquals(clicksMetadata.getMaxValue(), 7000);
+ timeMetadata = segmentMetadata.getColumnMetadataFor("time");
+ assertEquals(timeMetadata.getCardinality(), 1);
+ assertEquals(timeMetadata.getMinValue(), 1597795200000L);
+ assertEquals(timeMetadata.getMaxValue(), 1597795200000L);
+ timeDataSource = segment.getDataSource("campaign");
+ campaignNullValueVector = timeDataSource.getNullValueVector();
+ assertNotNull(campaignNullValueVector);
+ assertEquals(campaignNullValueVector.getNullBitmap().toArray(), new int[]{0});
+ clicksDataSource = segment.getDataSource("clicks");
+ clicksNullValueVector = clicksDataSource.getNullValueVector();
+ assertNotNull(clicksNullValueVector);
+ assertTrue(clicksNullValueVector.getNullBitmap().isEmpty());
+ timeDataSource = segment.getDataSource("time");
+ timeNullValueVector = timeDataSource.getNullValueVector();
+ assertNotNull(timeNullValueVector);
+ assertTrue(timeNullValueVector.getNullBitmap().isEmpty());
+ segment.destroy();
+ // segment 2
+ segment = ImmutableSegmentLoader.load(files[2], ReadMode.mmap);
+ segmentMetadata = (SegmentMetadataImpl) segment.getSegmentMetadata();
+ assertEquals(segmentMetadata.getTotalDocs(), 2);
+ campaignMetadata = segmentMetadata.getColumnMetadataFor("campaign");
+ assertEquals(campaignMetadata.getCardinality(), 2);
+ assertEquals(campaignMetadata.getMinValue(), "abc");
+ assertEquals(campaignMetadata.getMaxValue(), "xyz");
+ clicksMetadata = segmentMetadata.getColumnMetadataFor("clicks");
+ assertEquals(clicksMetadata.getCardinality(), 2);
+ assertEquals(clicksMetadata.getMinValue(), 6000);
+ assertEquals(clicksMetadata.getMaxValue(), 7000);
+ timeMetadata = segmentMetadata.getColumnMetadataFor("time");
+ assertEquals(timeMetadata.getCardinality(), 1);
+ assertEquals(timeMetadata.getMinValue(), 1597881600000L);
+ assertEquals(timeMetadata.getMaxValue(), 1597881600000L);
+ timeDataSource = segment.getDataSource("campaign");
+ campaignNullValueVector = timeDataSource.getNullValueVector();
+ assertNotNull(campaignNullValueVector);
+ assertTrue(campaignNullValueVector.getNullBitmap().isEmpty());
+ clicksDataSource = segment.getDataSource("clicks");
+ clicksNullValueVector = clicksDataSource.getNullValueVector();
+ assertNotNull(clicksNullValueVector);
+ assertTrue(clicksNullValueVector.getNullBitmap().isEmpty());
+ timeDataSource = segment.getDataSource("time");
+ timeNullValueVector = timeDataSource.getNullValueVector();
+ assertNotNull(timeNullValueVector);
+ assertTrue(timeNullValueVector.getNullBitmap().isEmpty());
+ segment.destroy();
+ FileUtils.cleanDirectory(outputSegmentDir);
+ }
+
+ @Test
+ public void testSingleDayMultipleSegments()
+ throws Exception {
+ File outputSegmentDir = new File(_baseDir, "output_directory_single_day_multiple_segments");
+ FileUtils.forceMkdir(outputSegmentDir);
+
+ // default configs
+ SegmentProcessorConfig config =
+ new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).build();
+ SegmentProcessorFramework framework =
+ new SegmentProcessorFramework(_singleDayMultipleSegments, config, outputSegmentDir);
+ framework.processSegments();
+ framework.cleanup();
+ File[] files = outputSegmentDir.listFiles();
+ assertNotNull(files);
+ assertEquals(files.length, 1);
+ assertEquals(new SegmentMetadataImpl(files[0]).getTotalDocs(), 10);
+ FileUtils.cleanDirectory(outputSegmentDir);
+
+ // rollup
+ config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema)
+ .setMergeType(MergeType.ROLLUP).build();
+ FileUtils.deleteQuietly(outputSegmentDir);
+ assertTrue(outputSegmentDir.mkdirs());
+ framework = new SegmentProcessorFramework(_singleDayMultipleSegments, config, outputSegmentDir);
+ framework.processSegments();
+ framework.cleanup();
+ files = outputSegmentDir.listFiles();
+ assertNotNull(files);
+ assertEquals(files.length, 1);
+ assertEquals(new SegmentMetadataImpl(files[0]).getTotalDocs(), 3);
+ FileUtils.cleanDirectory(outputSegmentDir);
+ }
+
+ @Test
+ public void testMultipleDaysMultipleSegments()
+ throws Exception {
+ File outputSegmentDir = new File(_baseDir, "output_directory_multiple_days_multiple_segments");
+ FileUtils.forceMkdir(outputSegmentDir);
+
+ // default configs
+ SegmentProcessorConfig config =
+ new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).build();
+ SegmentProcessorFramework framework =
+ new SegmentProcessorFramework(_multipleDaysMultipleSegments, config, outputSegmentDir);
+ framework.processSegments();
+ framework.cleanup();
+ File[] files = outputSegmentDir.listFiles();
+ assertNotNull(files);
+ assertEquals(files.length, 1);
+ assertEquals(new SegmentMetadataImpl(files[0]).getTotalDocs(), 10);
+ FileUtils.cleanDirectory(outputSegmentDir);
+
+ // round, date partition, rollup
+ config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema)
+ .setMergeType(MergeType.ROLLUP).setRecordTransformerConfig(new RecordTransformerConfig.Builder()
+ .setTransformFunctionsMap(Collections.singletonMap("time", "round(\"time\", 86400000)")).build())
+ .setPartitionerConfigs(Lists.newArrayList(
+ new PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.COLUMN_VALUE)
+ .setColumnName("time").build())).build();
+ framework = new SegmentProcessorFramework(_multipleDaysMultipleSegments, config, outputSegmentDir);
+ framework.processSegments();
+ framework.cleanup();
+ files = outputSegmentDir.listFiles();
+ assertNotNull(files);
+ assertEquals(files.length, 3);
+ Arrays.sort(files);
+ assertEquals(new SegmentMetadataImpl(files[0]).getTotalDocs(), 2);
+ assertEquals(new SegmentMetadataImpl(files[1]).getTotalDocs(), 3);
+ assertEquals(new SegmentMetadataImpl(files[2]).getTotalDocs(), 2);
+ FileUtils.cleanDirectory(outputSegmentDir);
+ }
+
+ @Test
+ public void testMultiValue()
+ throws Exception {
+ File outputSegmentDir = new File(_baseDir, "output_directory_multi_value");
+ FileUtils.forceMkdir(outputSegmentDir);
+
+ // rollup - null value enabled
+ SegmentProcessorConfig config =
+ new SegmentProcessorConfig.Builder().setTableConfig(_tableConfigNullValueEnabled).setSchema(_schemaMV)
+ .setMergeType(MergeType.ROLLUP).build();
+ SegmentProcessorFramework framework = new SegmentProcessorFramework(_multiValueSegments, config, outputSegmentDir);
+ framework.processSegments();
+ framework.cleanup();
+ File[] files = outputSegmentDir.listFiles();
+ assertNotNull(files);
+ assertEquals(files.length, 1);
+ ImmutableSegment segment = ImmutableSegmentLoader.load(files[0], ReadMode.mmap);
+ SegmentMetadataImpl segmentMetadata = (SegmentMetadataImpl) segment.getSegmentMetadata();
+ assertEquals(segmentMetadata.getTotalDocs(), 2);
+ ColumnMetadata campaignMetadata = segmentMetadata.getColumnMetadataFor("campaign");
+ assertEquals(campaignMetadata.getCardinality(), 3);
+ assertEquals(campaignMetadata.getMinValue(), "");
+ assertEquals(campaignMetadata.getMaxValue(), "b");
+ ColumnMetadata clicksMetadata = segmentMetadata.getColumnMetadataFor("clicks");
+ assertEquals(clicksMetadata.getCardinality(), 1);
+ assertEquals(clicksMetadata.getMinValue(), 1000);
+ assertEquals(clicksMetadata.getMaxValue(), 1000);
+ ColumnMetadata timeMetadata = segmentMetadata.getColumnMetadataFor("time");
+ assertEquals(timeMetadata.getCardinality(), 1);
+ assertEquals(timeMetadata.getMinValue(), 1597795200000L);
+ assertEquals(timeMetadata.getMaxValue(), 1597795200000L);
+ DataSource campaignDataSource = segment.getDataSource("campaign");
+ NullValueVectorReader campaignNullValueVector = campaignDataSource.getNullValueVector();
+ assertNotNull(campaignNullValueVector);
+ assertEquals(campaignNullValueVector.getNullBitmap().toArray(), new int[]{0});
+ DataSource clicksDataSource = segment.getDataSource("clicks");
+ NullValueVectorReader clicksNullValueVector = clicksDataSource.getNullValueVector();
+ assertNotNull(clicksNullValueVector);
+ assertTrue(clicksNullValueVector.getNullBitmap().isEmpty());
+ DataSource timeDataSource = segment.getDataSource("time");
+ NullValueVectorReader timeNullValueVector = timeDataSource.getNullValueVector();
+ assertNotNull(timeNullValueVector);
+ assertTrue(timeNullValueVector.getNullBitmap().isEmpty());
+ segment.destroy();
+ FileUtils.cleanDirectory(outputSegmentDir);
+
+ // dedup
+ config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfigNullValueEnabled).setSchema(_schemaMV)
+ .setMergeType(MergeType.DEDUP).build();
+ framework = new SegmentProcessorFramework(_multiValueSegments, config, outputSegmentDir);
+ framework.processSegments();
+ framework.cleanup();
+ files = outputSegmentDir.listFiles();
+ assertNotNull(files);
+ assertEquals(files.length, 1);
+ segment = ImmutableSegmentLoader.load(files[0], ReadMode.mmap);
+ segmentMetadata = (SegmentMetadataImpl) segment.getSegmentMetadata();
+ assertEquals(segmentMetadata.getTotalDocs(), 2);
+ campaignMetadata = segmentMetadata.getColumnMetadataFor("campaign");
+ assertEquals(campaignMetadata.getCardinality(), 3);
+ assertEquals(campaignMetadata.getMinValue(), "");
+ assertEquals(campaignMetadata.getMaxValue(), "b");
+ clicksMetadata = segmentMetadata.getColumnMetadataFor("clicks");
+ assertEquals(clicksMetadata.getCardinality(), 1);
+ assertEquals(clicksMetadata.getMinValue(), 1000);
+ assertEquals(clicksMetadata.getMaxValue(), 1000);
+ timeMetadata = segmentMetadata.getColumnMetadataFor("time");
+ assertEquals(timeMetadata.getCardinality(), 1);
+ assertEquals(timeMetadata.getMinValue(), 1597795200000L);
+ assertEquals(timeMetadata.getMaxValue(), 1597795200000L);
+ segment.destroy();
+ FileUtils.cleanDirectory(outputSegmentDir);
+ }
+
+ @Test
+ public void testTarredSegments()
+ throws Exception {
+ File outputSegmentDir = new File(_baseDir, "output_directory_tarred");
+ FileUtils.forceMkdir(outputSegmentDir);
+
+ // default configs
+ SegmentProcessorConfig config =
+ new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).build();
+ SegmentProcessorFramework framework = new SegmentProcessorFramework(_tarredSegments, config, outputSegmentDir);
+ framework.processSegments();
+ framework.cleanup();
+ File[] files = outputSegmentDir.listFiles();
+ assertNotNull(files);
+ assertEquals(files.length, 1);
+ assertEquals(new SegmentMetadataImpl(files[0]).getTotalDocs(), 10);
+ FileUtils.cleanDirectory(outputSegmentDir);
+ }
+
+ @AfterClass
+ public void tearDown() {
+ FileUtils.deleteQuietly(_baseDir);
+ }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentReducerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentReducerTest.java
deleted file mode 100644
index f393f5e..0000000
--- a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentReducerTest.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.segment.processing.framework;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.commons.io.FileUtils;
-import org.apache.pinot.core.segment.processing.collector.CollectorConfig;
-import org.apache.pinot.core.segment.processing.collector.CollectorFactory;
-import org.apache.pinot.core.segment.processing.collector.ValueAggregatorFactory;
-import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileManager;
-import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileWriter;
-import org.apache.pinot.core.segment.processing.utils.SegmentProcessingUtils;
-import org.apache.pinot.plugin.inputformat.avro.AvroRecordReader;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.data.readers.RecordReader;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-
-
-/**
- * Tests for {@link SegmentReducer}
- */
-public class SegmentReducerTest {
- private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "SegmentReducerTest");
-
- private final File _mapperOutputDir = new File(TEMP_DIR, "mapper_output/1597795200000");
- private final Schema _pinotSchema = new Schema.SchemaBuilder().setSchemaName("mySchema")
- .addSingleValueDimension("campaign", FieldSpec.DataType.STRING).addMetric("clicks", FieldSpec.DataType.INT)
- .addDateTime("timeValue", FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
- private final List<Object[]> _rawData1597795200000L = Arrays
- .asList(new Object[]{"abc", 4000, 1597795200000L}, new Object[]{"abc", 3000, 1597795200000L},
- new Object[]{"pqr", 1000, 1597795200000L}, new Object[]{"xyz", 4000, 1597795200000L},
- new Object[]{"pqr", 1000, 1597795200000L});
-
- private GenericRowFileManager _fileManager;
-
- @BeforeClass
- public void setUp()
- throws IOException {
- FileUtils.deleteQuietly(TEMP_DIR);
- assertTrue(_mapperOutputDir.mkdirs());
-
- List<FieldSpec> fieldSpecs = SegmentProcessingUtils.getFieldSpecs(_pinotSchema);
- _fileManager = new GenericRowFileManager(_mapperOutputDir, fieldSpecs, false);
- GenericRowFileWriter fileWriter = _fileManager.getFileWriter();
- GenericRow reuse = new GenericRow();
- for (int i = 0; i < 5; i++) {
- reuse.putValue("campaign", _rawData1597795200000L.get(i)[0]);
- reuse.putValue("clicks", _rawData1597795200000L.get(i)[1]);
- reuse.putValue("timeValue", _rawData1597795200000L.get(i)[2]);
- fileWriter.write(reuse);
- reuse.clear();
- }
- _fileManager.closeFileWriter();
- }
-
- @Test(dataProvider = "segmentReducerDataProvider")
- public void segmentReducerTest(String reducerId, SegmentReducerConfig reducerConfig, Set<String> expectedFileNames,
- List<Object[]> expectedRecords, Comparator comparator)
- throws Exception {
- File reducerOutputDir = new File(TEMP_DIR, "reducer_output");
- FileUtils.deleteQuietly(reducerOutputDir);
- assertTrue(reducerOutputDir.mkdirs());
- SegmentReducer segmentReducer = new SegmentReducer(reducerId, _fileManager, reducerConfig, reducerOutputDir);
- segmentReducer.reduce();
- segmentReducer.cleanup();
-
- // check num expected files
- File[] avroFileNames = reducerOutputDir.listFiles();
- assertEquals(avroFileNames.length, expectedFileNames.size());
-
- GenericRow next = new GenericRow();
- int numRecords = 0;
- List<Object[]> actualRecords = new ArrayList<>();
- for (File avroFile : reducerOutputDir.listFiles()) {
- String fileName = avroFile.getName();
- assertTrue(expectedFileNames.contains(fileName));
- RecordReader avroRecordReader = new AvroRecordReader();
- avroRecordReader.init(avroFile, _pinotSchema.getColumnNames(), null);
-
- while (avroRecordReader.hasNext()) {
- avroRecordReader.next(next);
- actualRecords.add(new Object[]{next.getValue("campaign"), next.getValue("clicks"), next.getValue("timeValue")});
- numRecords++;
- }
- }
- assertEquals(numRecords, expectedRecords.size());
- if (comparator != null) {
- // for runs with no sort order, apply same comparator across expected and actual to help with comparison
- actualRecords.sort(comparator);
- }
- for (int i = 0; i < numRecords; i++) {
- assertEquals(actualRecords.get(i)[0], expectedRecords.get(i)[0]);
- assertEquals(actualRecords.get(i)[1], expectedRecords.get(i)[1]);
- assertEquals(actualRecords.get(i)[2], expectedRecords.get(i)[2]);
- }
-
- FileUtils.deleteQuietly(reducerOutputDir);
- }
-
- @DataProvider(name = "segmentReducerDataProvider")
- public Object[][] segmentReducerDataProvider() {
- String reducerId = "aReducerId";
- List<Object[]> outputData = new ArrayList<>();
- _rawData1597795200000L.forEach(r -> outputData.add(new Object[]{r[0], r[1], r[2]}));
-
- Comparator<Object[]> comparator =
- Comparator.comparing((Object[] o) -> (String) o[0]).thenComparingInt(o -> (int) o[1]);
- outputData.sort(comparator);
-
- List<Object[]> inputs = new ArrayList<>();
-
- // default - CONCAT
- SegmentReducerConfig config1 = new SegmentReducerConfig(_pinotSchema, new CollectorConfig.Builder().build(), 100);
- HashSet<String> expectedFileNames1 = Sets.newHashSet(SegmentReducer.createReducerOutputFileName(reducerId, 0));
- inputs.add(new Object[]{reducerId, config1, expectedFileNames1, outputData, comparator});
-
- // CONCAT, numRecordsPerPart = 2
- SegmentReducerConfig config2 = new SegmentReducerConfig(_pinotSchema, new CollectorConfig.Builder().build(), 2);
- HashSet<String> expectedFileNames2 = Sets.newHashSet(SegmentReducer.createReducerOutputFileName(reducerId, 0),
- SegmentReducer.createReducerOutputFileName(reducerId, 1),
- SegmentReducer.createReducerOutputFileName(reducerId, 2));
- inputs.add(new Object[]{reducerId, config2, expectedFileNames2, outputData, comparator});
-
- // ROLLUP - default aggregation
- SegmentReducerConfig config3 = new SegmentReducerConfig(_pinotSchema,
- new CollectorConfig.Builder().setCollectorType(CollectorFactory.CollectorType.ROLLUP).build(), 100);
- HashSet<String> expectedFileNames3 = Sets.newHashSet(SegmentReducer.createReducerOutputFileName(reducerId, 0));
- List<Object> rollupRows3 = Lists
- .newArrayList(new Object[]{"abc", 7000, 1597795200000L}, new Object[]{"pqr", 2000, 1597795200000L},
- new Object[]{"xyz", 4000, 1597795200000L});
- inputs.add(new Object[]{reducerId, config3, expectedFileNames3, rollupRows3, comparator});
-
- // ROLLUP MAX
- Map<String, ValueAggregatorFactory.ValueAggregatorType> valueAggregators = new HashMap<>();
- valueAggregators.put("clicks", ValueAggregatorFactory.ValueAggregatorType.MAX);
- SegmentReducerConfig config4 = new SegmentReducerConfig(_pinotSchema,
- new CollectorConfig.Builder().setCollectorType(CollectorFactory.CollectorType.ROLLUP)
- .setAggregatorTypeMap(valueAggregators).build(), 100);
- HashSet<String> expectedFileNames4 = Sets.newHashSet(SegmentReducer.createReducerOutputFileName(reducerId, 0));
- List<Object> rollupRows4 = Lists
- .newArrayList(new Object[]{"abc", 4000, 1597795200000L}, new Object[]{"pqr", 1000, 1597795200000L},
- new Object[]{"xyz", 4000, 1597795200000L});
- inputs.add(new Object[]{reducerId, config4, expectedFileNames4, rollupRows4, comparator});
-
- // CONCAT and sort
- SegmentReducerConfig config6 = new SegmentReducerConfig(_pinotSchema,
- new CollectorConfig.Builder().setSortOrder(Lists.newArrayList("campaign", "clicks")).build(), 100);
- HashSet<String> expectedFileNames6 = Sets.newHashSet(SegmentReducer.createReducerOutputFileName(reducerId, 0));
- inputs.add(new Object[]{reducerId, config6, expectedFileNames6, outputData, null});
-
- // ROLLUP and sort
- SegmentReducerConfig config7 = new SegmentReducerConfig(_pinotSchema,
- new CollectorConfig.Builder().setCollectorType(CollectorFactory.CollectorType.ROLLUP)
- .setSortOrder(Lists.newArrayList("campaign", "clicks")).build(), 100);
- HashSet<String> expectedFileNames7 = Sets.newHashSet(SegmentReducer.createReducerOutputFileName(reducerId, 0));
- List<Object> rollupRows7 = Lists
- .newArrayList(new Object[]{"abc", 7000, 1597795200000L}, new Object[]{"pqr", 2000, 1597795200000L},
- new Object[]{"xyz", 4000, 1597795200000L});
- inputs.add(new Object[]{reducerId, config7, expectedFileNames7, rollupRows7, null});
-
- return inputs.toArray(new Object[0][]);
- }
-
- @AfterClass
- public void tearDown()
- throws IOException {
- _fileManager.cleanUp();
- FileUtils.deleteQuietly(TEMP_DIR);
- }
-}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowSerDeTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowSerDeTest.java
index cc85316..b17c7c7 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowSerDeTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowSerDeTest.java
@@ -75,16 +75,16 @@ public class GenericRowSerDeTest {
PinotDataBuffer dataBuffer = PinotDataBuffer.allocateDirect(bytes.length, PinotDataBuffer.NATIVE_ORDER, null);
dataBuffer.readFrom(0L, bytes);
GenericRowDeserializer deserializer = new GenericRowDeserializer(dataBuffer, _fieldSpecs, false);
- GenericRow reuse = new GenericRow();
- deserializer.deserialize(0L, reuse);
- Map<String, Object> actualValueMap = reuse.getFieldToValueMap();
+ GenericRow buffer = new GenericRow();
+ deserializer.deserialize(0L, buffer);
+ Map<String, Object> actualValueMap = buffer.getFieldToValueMap();
Map<String, Object> expectedValueMap = _row.getFieldToValueMap();
// NOTE: Cannot directly assert equals on maps because they contain arrays
assertEquals(actualValueMap.size(), expectedValueMap.size());
for (Map.Entry<String, Object> entry : expectedValueMap.entrySet()) {
assertEquals(actualValueMap.get(entry.getKey()), entry.getValue());
}
- assertTrue(reuse.getNullValueFields().isEmpty());
+ assertTrue(buffer.getNullValueFields().isEmpty());
}
@Test
@@ -94,9 +94,9 @@ public class GenericRowSerDeTest {
PinotDataBuffer dataBuffer = PinotDataBuffer.allocateDirect(bytes.length, PinotDataBuffer.NATIVE_ORDER, null);
dataBuffer.readFrom(0L, bytes);
GenericRowDeserializer deserializer = new GenericRowDeserializer(dataBuffer, _fieldSpecs, true);
- GenericRow reuse = new GenericRow();
- deserializer.deserialize(0L, reuse);
- assertEquals(reuse, _row);
+ GenericRow buffer = new GenericRow();
+ deserializer.deserialize(0L, buffer);
+ assertEquals(buffer, _row);
}
@Test
@@ -108,30 +108,28 @@ public class GenericRowSerDeTest {
PinotDataBuffer dataBuffer = PinotDataBuffer.allocateDirect(bytes.length, PinotDataBuffer.NATIVE_ORDER, null);
dataBuffer.readFrom(0L, bytes);
GenericRowDeserializer deserializer = new GenericRowDeserializer(dataBuffer, fieldSpecs, true);
- GenericRow reuse = new GenericRow();
- deserializer.deserialize(0L, reuse);
- Map<String, Object> fieldToValueMap = reuse.getFieldToValueMap();
+ GenericRow buffer = new GenericRow();
+ deserializer.deserialize(0L, buffer);
+ Map<String, Object> fieldToValueMap = buffer.getFieldToValueMap();
assertEquals(fieldToValueMap.size(), 2);
assertEquals(fieldToValueMap.get("intSV"), _row.getValue("intSV"));
assertEquals(fieldToValueMap.get("nullSV"), _row.getValue("nullSV"));
- Set<String> nullValueFields = reuse.getNullValueFields();
+ Set<String> nullValueFields = buffer.getNullValueFields();
assertEquals(nullValueFields, Collections.singleton("nullSV"));
}
@Test
- public void testPartialDeserialize() {
+ public void testCompare() {
GenericRowSerializer serializer = new GenericRowSerializer(_fieldSpecs, true);
byte[] bytes = serializer.serialize(_row);
- PinotDataBuffer dataBuffer = PinotDataBuffer.allocateDirect(bytes.length, PinotDataBuffer.NATIVE_ORDER, null);
+ long numBytes = bytes.length;
+ PinotDataBuffer dataBuffer = PinotDataBuffer.allocateDirect(numBytes * 2, PinotDataBuffer.NATIVE_ORDER, null);
dataBuffer.readFrom(0L, bytes);
+ dataBuffer.readFrom(numBytes, bytes);
GenericRowDeserializer deserializer = new GenericRowDeserializer(dataBuffer, _fieldSpecs, true);
- Object[] values = deserializer.partialDeserialize(0L, 7);
- assertEquals(values[0], _row.getValue("intSV"));
- assertEquals(values[1], _row.getValue("longSV"));
- assertEquals(values[2], _row.getValue("floatSV"));
- assertEquals(values[3], _row.getValue("doubleSV"));
- assertEquals(values[4], _row.getValue("stringSV"));
- assertEquals(values[5], _row.getValue("bytesSV"));
- assertEquals(values[6], _row.getValue("nullSV"));
+ int numFields = _fieldSpecs.size();
+ for (int i = 0; i < numFields; i++) {
+ assertEquals(deserializer.compare(0L, numBytes, i), 0);
+ }
}
}
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutor.java
index 22971e5..f607990 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutor.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutor.java
@@ -24,14 +24,17 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
import org.apache.pinot.core.common.MinionConstants;
-import org.apache.pinot.core.minion.MergeRollupConverter;
import org.apache.pinot.core.minion.PinotTaskConfig;
-import org.apache.pinot.core.segment.processing.collector.CollectorFactory;
-import org.apache.pinot.core.segment.processing.collector.ValueAggregatorFactory;
+import org.apache.pinot.core.segment.processing.framework.MergeType;
+import org.apache.pinot.core.segment.processing.framework.SegmentConfig;
+import org.apache.pinot.core.segment.processing.framework.SegmentProcessorConfig;
+import org.apache.pinot.core.segment.processing.framework.SegmentProcessorFramework;
import org.apache.pinot.plugin.minion.tasks.BaseMultipleSegmentsConversionExecutor;
import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.slf4j.Logger;
@@ -51,7 +54,6 @@ public class MergeRollupTaskExecutor extends BaseMultipleSegmentsConversionExecu
private static final Logger LOGGER = LoggerFactory.getLogger(MergeRollupTaskExecutor.class);
private static final String INPUT_SEGMENTS_DIR = "input_segments";
private static final String OUTPUT_SEGMENTS_DIR = "output_segments";
- private static final int DEFAULT_NUM_RECORDS_PER_SEGMENT = 1_000_000;
@Override
protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, List<File> originalIndexDirs,
@@ -62,33 +64,52 @@ public class MergeRollupTaskExecutor extends BaseMultipleSegmentsConversionExecu
LOGGER.info("Starting task: {} with configs: {}", taskType, configs);
long startMillis = System.currentTimeMillis();
- String mergeTypeString = configs.get(MinionConstants.MergeRollupTask.MERGE_TYPE_KEY);
- Preconditions.checkNotNull(mergeTypeString, "MergeType cannot be null");
- Preconditions.checkState(mergeTypeString.equalsIgnoreCase(CollectorFactory.CollectorType.CONCAT.name()),
+ Preconditions.checkState(
+ MergeType.CONCAT.name().equalsIgnoreCase(configs.get(MinionConstants.MergeRollupTask.MERGE_TYPE_KEY)),
"Only 'CONCAT' mode is currently supported.");
String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
TableConfig tableConfig = getTableConfig(tableNameWithType);
Schema schema = getSchema(tableNameWithType);
- Map<String, ValueAggregatorFactory.ValueAggregatorType> aggregatorConfigs =
- MergeRollupTaskUtils.getRollupAggregationTypeMap(configs);
+ Map<String, AggregationFunctionType> aggregationTypes = MergeRollupTaskUtils.getRollupAggregationTypes(configs);
String numRecordsPerSegmentString = configs.get(MinionConstants.MergeRollupTask.MAX_NUM_RECORDS_PER_SEGMENT);
- int numRecordsPerSegment =
- numRecordsPerSegmentString != null ? Integer.parseInt(numRecordsPerSegmentString) : DEFAULT_NUM_RECORDS_PER_SEGMENT;
- MergeRollupConverter processorFramework =
- new MergeRollupConverter.Builder().setTableConfig(tableConfig).setSchema(schema)
- .setMergeType(mergeTypeString).setAggregatorConfigs(aggregatorConfigs)
- .setNumRecordsPerSegment(numRecordsPerSegment).setOriginalIndexDirs(originalIndexDirs)
- .setWorkingDir(workingDir).setInputSegmentDir(INPUT_SEGMENTS_DIR).setOutputSegmentDir(OUTPUT_SEGMENTS_DIR)
- .build();
- File[] outputFiles = processorFramework.convert();
+ SegmentProcessorConfig.Builder segmentProcessorConfigBuilder =
+ new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema)
+ .setMergeType(MergeType.CONCAT);
+ if (!aggregationTypes.isEmpty()) {
+ segmentProcessorConfigBuilder.setAggregationTypes(aggregationTypes);
+ }
+ if (numRecordsPerSegmentString != null) {
+ segmentProcessorConfigBuilder.setSegmentConfig(
+ new SegmentConfig.Builder().setMaxNumRecordsPerSegment(Integer.parseInt(numRecordsPerSegmentString)).build());
+ }
+
+ SegmentProcessorConfig segmentProcessorConfig = segmentProcessorConfigBuilder.build();
+
+ File inputSegmentsDir = new File(workingDir, INPUT_SEGMENTS_DIR);
+ Preconditions.checkState(inputSegmentsDir.mkdirs(), "Failed to create input directory: %s for task: %s",
+ inputSegmentsDir.getAbsolutePath(), taskType);
+ for (File indexDir : originalIndexDirs) {
+ FileUtils.copyDirectoryToDirectory(indexDir, inputSegmentsDir);
+ }
+ File outputSegmentsDir = new File(workingDir, OUTPUT_SEGMENTS_DIR);
+ Preconditions.checkState(outputSegmentsDir.mkdirs(), "Failed to create output directory: %s for task: %s",
+ outputSegmentsDir.getAbsolutePath(), taskType);
+
+ SegmentProcessorFramework segmentProcessorFramework =
+ new SegmentProcessorFramework(inputSegmentsDir, segmentProcessorConfig, outputSegmentsDir);
+ try {
+ segmentProcessorFramework.processSegments();
+ } finally {
+ segmentProcessorFramework.cleanup();
+ }
long endMillis = System.currentTimeMillis();
LOGGER.info("Finished task: {} with configs: {}. Total time: {}ms", taskType, configs, (endMillis - startMillis));
List<SegmentConversionResult> results = new ArrayList<>();
- for (File file : outputFiles) {
+ for (File file : outputSegmentsDir.listFiles()) {
String outputSegmentName = file.getName();
results.add(new SegmentConversionResult.Builder().setFile(file).setSegmentName(outputSegmentName)
.setTableNameWithType(tableNameWithType).build());
@@ -97,8 +118,8 @@ public class MergeRollupTaskExecutor extends BaseMultipleSegmentsConversionExecu
}
@Override
- protected SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifier(
- PinotTaskConfig pinotTaskConfig, SegmentConversionResult segmentConversionResult) {
+ protected SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifier(PinotTaskConfig pinotTaskConfig,
+ SegmentConversionResult segmentConversionResult) {
return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE, Collections
.singletonMap(MinionConstants.MergeRollupTask.TASK_TYPE + MinionConstants.TASK_BUCKET_GRANULARITY_SUFFIX,
pinotTaskConfig.getConfigs().get(MinionConstants.MergeRollupTask.GRANULARITY_KEY).toUpperCase()));
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskUtils.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskUtils.java
index 6a7c424..85b72c3 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskUtils.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskUtils.java
@@ -24,14 +24,14 @@ import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.common.minion.Granularity;
import org.apache.pinot.core.common.MinionConstants;
-import org.apache.pinot.core.segment.processing.collector.CollectorFactory;
-import org.apache.pinot.core.segment.processing.collector.ValueAggregatorFactory;
+import org.apache.pinot.core.segment.processing.framework.MergeType;
import org.apache.pinot.pql.parsers.utils.Pair;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.spi.utils.TimeUtils;
public class MergeRollupTaskUtils {
-
+ //@formatter:off
private static final String[] validMergeProperties = {
MinionConstants.MergeRollupTask.MERGE_TYPE_KEY,
MinionConstants.MergeRollupTask.BUFFER_TIME,
@@ -40,20 +40,20 @@ public class MergeRollupTaskUtils {
};
private static final String[] validMergeType = {
- CollectorFactory.CollectorType.CONCAT.name(),
- CollectorFactory.CollectorType.ROLLUP.name()
+ MergeType.CONCAT.name(),
+ MergeType.ROLLUP.name()
};
+ //@formatter:on
- public static Map<String, ValueAggregatorFactory.ValueAggregatorType> getRollupAggregationTypeMap(
- Map<String, String> mergeRollupConfig) {
- Map<String, ValueAggregatorFactory.ValueAggregatorType> rollupAggregationTypeMap = new HashMap<>();
+ public static Map<String, AggregationFunctionType> getRollupAggregationTypes(Map<String, String> mergeRollupConfig) {
+ Map<String, AggregationFunctionType> aggregationTypes = new HashMap<>();
for (Map.Entry<String, String> entry : mergeRollupConfig.entrySet()) {
if (entry.getKey().startsWith(MinionConstants.MergeRollupTask.AGGREGATE_KEY_PREFIX)) {
- rollupAggregationTypeMap.put(getAggregateColumn(entry.getKey()),
- ValueAggregatorFactory.ValueAggregatorType.valueOf(entry.getValue().toUpperCase()));
+ aggregationTypes.put(getAggregateColumn(entry.getKey()),
+ AggregationFunctionType.getAggregationFunctionType(entry.getValue()));
}
}
- return rollupAggregationTypeMap;
+ return aggregationTypes;
}
public static Map<Granularity, MergeProperties> getAllMergeProperties(Map<String, String> mergeRollupConfig) {
@@ -71,23 +71,23 @@ public class MergeRollupTaskUtils {
Map<Granularity, MergeProperties> allMergeProperties = new HashMap<>();
for (Map.Entry<Granularity, Map<String, String>> entry : mergePropertiesMap.entrySet()) {
Map<String, String> properties = entry.getValue();
- MergeProperties mergeProperties = new MergeProperties(
- properties.get(MinionConstants.MergeRollupTask.MERGE_TYPE_KEY).toUpperCase(),
- TimeUtils.convertPeriodToMillis(properties.get(MinionConstants.MergeRollupTask.BUFFER_TIME)),
- Long.parseLong(properties.get(MinionConstants.MergeRollupTask.MAX_NUM_RECORDS_PER_SEGMENT)),
- Long.parseLong(properties.get(MinionConstants.MergeRollupTask.MAX_NUM_RECORDS_PER_TASK)));
+ MergeProperties mergeProperties =
+ new MergeProperties(properties.get(MinionConstants.MergeRollupTask.MERGE_TYPE_KEY).toUpperCase(),
+ TimeUtils.convertPeriodToMillis(properties.get(MinionConstants.MergeRollupTask.BUFFER_TIME)),
+ Long.parseLong(properties.get(MinionConstants.MergeRollupTask.MAX_NUM_RECORDS_PER_SEGMENT)),
+ Long.parseLong(properties.get(MinionConstants.MergeRollupTask.MAX_NUM_RECORDS_PER_TASK)));
allMergeProperties.put(entry.getKey(), mergeProperties);
}
return allMergeProperties;
}
private static String getAggregateColumn(String rollupAggregateConfigKey) {
- return rollupAggregateConfigKey.split(
- MinionConstants.MergeRollupTask.AGGREGATE_KEY_PREFIX + MinionConstants.DOT_SEPARATOR)[1];
+ return rollupAggregateConfigKey
+ .split(MinionConstants.MergeRollupTask.AGGREGATE_KEY_PREFIX + MinionConstants.DOT_SEPARATOR)[1];
}
- private static Pair<Granularity, String> getGranularityAndPropertyPair(
- String mergePropertyConfigKey, String mergePropertyConfigValue) {
+ private static Pair<Granularity, String> getGranularityAndPropertyPair(String mergePropertyConfigKey,
+ String mergePropertyConfigValue) {
String[] components = StringUtils.split(mergePropertyConfigKey, MinionConstants.DOT_SEPARATOR);
Preconditions.checkState(components.length == 3);
Preconditions.checkState(isValidMergeProperties(components[2]));
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtime_to_offline_segments/RealtimeToOfflineSegmentsTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtime_to_offline_segments/RealtimeToOfflineSegmentsTaskExecutor.java
index 0bf5e7d..cfbae13 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtime_to_offline_segments/RealtimeToOfflineSegmentsTaskExecutor.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtime_to_offline_segments/RealtimeToOfflineSegmentsTaskExecutor.java
@@ -34,11 +34,9 @@ import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifi
import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
-import org.apache.pinot.core.segment.processing.collector.CollectorConfig;
-import org.apache.pinot.core.segment.processing.collector.CollectorFactory;
-import org.apache.pinot.core.segment.processing.collector.ValueAggregatorFactory;
import org.apache.pinot.core.segment.processing.filter.RecordFilterConfig;
import org.apache.pinot.core.segment.processing.filter.RecordFilterFactory;
+import org.apache.pinot.core.segment.processing.framework.MergeType;
import org.apache.pinot.core.segment.processing.framework.SegmentConfig;
import org.apache.pinot.core.segment.processing.framework.SegmentProcessorConfig;
import org.apache.pinot.core.segment.processing.framework.SegmentProcessorFramework;
@@ -48,6 +46,7 @@ import org.apache.pinot.core.segment.processing.transformer.RecordTransformerCon
import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
import org.apache.pinot.plugin.minion.tasks.BaseMultipleSegmentsConversionExecutor;
import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
@@ -146,13 +145,14 @@ public class RealtimeToOfflineSegmentsTaskExecutor extends BaseMultipleSegmentsC
String timeColumnTransformFunction =
configs.get(MinionConstants.RealtimeToOfflineSegmentsTask.TIME_COLUMN_TRANSFORM_FUNCTION_KEY);
- String collectorTypeStr = configs.get(MinionConstants.RealtimeToOfflineSegmentsTask.COLLECTOR_TYPE_KEY);
- Map<String, String> aggregatorConfigs = new HashMap<>();
+ // TODO: Rename the config key
+ String mergeTypeStr = configs.get(MinionConstants.RealtimeToOfflineSegmentsTask.COLLECTOR_TYPE_KEY);
+ Map<String, AggregationFunctionType> aggregationTypes = new HashMap<>();
for (Map.Entry<String, String> entry : configs.entrySet()) {
String key = entry.getKey();
if (key.endsWith(MinionConstants.RealtimeToOfflineSegmentsTask.AGGREGATION_TYPE_KEY_SUFFIX)) {
String column = key.split(MinionConstants.RealtimeToOfflineSegmentsTask.AGGREGATION_TYPE_KEY_SUFFIX)[0];
- aggregatorConfigs.put(column, entry.getValue());
+ aggregationTypes.put(column, AggregationFunctionType.getAggregationFunctionType(entry.getValue()));
}
}
String numRecordsPerSegment =
@@ -160,6 +160,9 @@ public class RealtimeToOfflineSegmentsTaskExecutor extends BaseMultipleSegmentsC
SegmentProcessorConfig.Builder segmentProcessorConfigBuilder =
new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema);
+ if (mergeTypeStr != null) {
+ segmentProcessorConfigBuilder.setMergeType(MergeType.valueOf(mergeTypeStr.toUpperCase()));
+ }
// Time rollup using configured time transformation function
if (timeColumnTransformFunction != null) {
@@ -181,11 +184,10 @@ public class RealtimeToOfflineSegmentsTaskExecutor extends BaseMultipleSegmentsC
segmentProcessorConfigBuilder.setPartitionerConfigs(Lists.newArrayList(partitionerConfig));
}
- // Aggregations using configured Collector
- List<String> sortedColumns = tableConfig.getIndexingConfig().getSortedColumn();
- CollectorConfig collectorConfig =
- getCollectorConfig(collectorTypeStr, aggregatorConfigs, schemaColumns, sortedColumns);
- segmentProcessorConfigBuilder.setCollectorConfig(collectorConfig);
+ // Reducer config
+ if (!aggregationTypes.isEmpty()) {
+ segmentProcessorConfigBuilder.setAggregationTypes(aggregationTypes);
+ }
// Segment config
if (numRecordsPerSegment != null) {
@@ -238,10 +240,10 @@ public class RealtimeToOfflineSegmentsTaskExecutor extends BaseMultipleSegmentsC
}
@Override
- protected SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifier(
- PinotTaskConfig pinotTaskConfig, SegmentConversionResult segmentConversionResult) {
- return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE, Collections
- .emptyMap());
+ protected SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifier(PinotTaskConfig pinotTaskConfig,
+ SegmentConversionResult segmentConversionResult) {
+ return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE,
+ Collections.emptyMap());
}
/**
@@ -301,34 +303,6 @@ public class RealtimeToOfflineSegmentsTaskExecutor extends BaseMultipleSegmentsC
}
/**
- * Construct a {@link CollectorConfig} using configured collector configs and sorted columns from table config
- */
- private CollectorConfig getCollectorConfig(String collectorTypeStr, Map<String, String> aggregateConfigs,
- Set<String> schemaColumns, List<String> sortedColumns) {
- CollectorFactory.CollectorType collectorType = collectorTypeStr == null ? CollectorFactory.CollectorType.CONCAT
- : CollectorFactory.CollectorType.valueOf(collectorTypeStr.toUpperCase());
-
- Map<String, ValueAggregatorFactory.ValueAggregatorType> aggregatorTypeMap = new HashMap<>();
- for (Map.Entry<String, String> entry : aggregateConfigs.entrySet()) {
- String column = entry.getKey();
- Preconditions
- .checkState(schemaColumns.contains(column), "Aggregate column: %s is not a physical column in the schema",
- column);
- aggregatorTypeMap.put(column, ValueAggregatorFactory.ValueAggregatorType.valueOf(entry.getValue().toUpperCase()));
- }
-
- if (sortedColumns != null) {
- for (String column : sortedColumns) {
- Preconditions
- .checkState(schemaColumns.contains(column), "Sorted column: %s is not a physical column in the schema",
- column);
- }
- }
- return new CollectorConfig.Builder().setCollectorType(collectorType).setAggregatorTypeMap(aggregatorTypeMap)
- .setSortOrder(sortedColumns).build();
- }
-
- /**
* Construct a {@link SegmentConfig} using config values
*/
private SegmentConfig getSegmentConfig(String numRecordsPerSegment) {
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskUtilsTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskUtilsTest.java
index 6350415..6058bc0 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskUtilsTest.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskUtilsTest.java
@@ -21,8 +21,8 @@ package org.apache.pinot.plugin.minion.tasks.merge_rollup;
import java.util.HashMap;
import java.util.Map;
import org.apache.pinot.common.minion.Granularity;
-import org.apache.pinot.core.segment.processing.collector.CollectorFactory;
-import org.apache.pinot.core.segment.processing.collector.ValueAggregatorFactory;
+import org.apache.pinot.core.segment.processing.framework.MergeType;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -51,13 +51,13 @@ public class MergeRollupTaskUtilsTest {
@Test
public void testGetRollupAggregationTypeMap() {
- Map<String, ValueAggregatorFactory.ValueAggregatorType> rollupAggregationTypeMap =
- MergeRollupTaskUtils.getRollupAggregationTypeMap(_mergeRollupTaskConfig);
+ Map<String, AggregationFunctionType> rollupAggregationTypeMap =
+ MergeRollupTaskUtils.getRollupAggregationTypes(_mergeRollupTaskConfig);
Assert.assertEquals(rollupAggregationTypeMap.size(), 2);
Assert.assertTrue(rollupAggregationTypeMap.containsKey(METRIC_COLUMN_A));
Assert.assertTrue(rollupAggregationTypeMap.containsKey(METRIC_COLUMN_B));
- Assert.assertEquals(rollupAggregationTypeMap.get(METRIC_COLUMN_A), ValueAggregatorFactory.ValueAggregatorType.SUM);
- Assert.assertEquals(rollupAggregationTypeMap.get(METRIC_COLUMN_B), ValueAggregatorFactory.ValueAggregatorType.MAX);
+ Assert.assertEquals(rollupAggregationTypeMap.get(METRIC_COLUMN_A), AggregationFunctionType.SUM);
+ Assert.assertEquals(rollupAggregationTypeMap.get(METRIC_COLUMN_B), AggregationFunctionType.MAX);
}
@Test
@@ -69,13 +69,13 @@ public class MergeRollupTaskUtilsTest {
Assert.assertTrue(allMergeProperties.containsKey(Granularity.MONTHLY));
MergeProperties dailyProperty = allMergeProperties.get(Granularity.DAILY);
- Assert.assertEquals(dailyProperty.getMergeType(), CollectorFactory.CollectorType.CONCAT.name());
+ Assert.assertEquals(dailyProperty.getMergeType(), MergeType.CONCAT.name());
Assert.assertEquals(dailyProperty.getBufferTimeMs(), 172800000L);
Assert.assertEquals(dailyProperty.getMaxNumRecordsPerSegment(), 1000000L);
Assert.assertEquals(dailyProperty.getMaxNumRecordsPerTask(), 5000000L);
MergeProperties monthlyProperty = allMergeProperties.get(Granularity.MONTHLY);
- Assert.assertEquals(monthlyProperty.getMergeType(), CollectorFactory.CollectorType.ROLLUP.name());
+ Assert.assertEquals(monthlyProperty.getMergeType(), MergeType.ROLLUP.name());
Assert.assertEquals(monthlyProperty.getBufferTimeMs(), 2592000000L);
Assert.assertEquals(monthlyProperty.getMaxNumRecordsPerSegment(), 2000000L);
Assert.assertEquals(monthlyProperty.getMaxNumRecordsPerTask(), 5000000L);
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
index 5c34634..7a10497 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
@@ -84,6 +84,7 @@ public class SegmentGeneratorConfig implements Serializable {
private String _outDir = null;
private String _rawTableName = null;
private String _segmentName = null;
+ private String _segmentNamePrefix = null;
private String _segmentNamePostfix = null;
private String _segmentTimeColumnName = null;
private TimeUnit _segmentTimeUnit = null;
@@ -152,9 +153,8 @@ public class SegmentGeneratorConfig implements Serializable {
this.setRawIndexCreationColumns(noDictionaryColumns);
if (noDictionaryColumnMap != null) {
- Map<String, ChunkCompressionType> serializedNoDictionaryColumnMap =
- noDictionaryColumnMap.entrySet().stream().collect(Collectors
- .toMap(Map.Entry::getKey, e -> ChunkCompressionType.valueOf(e.getValue())));
+ Map<String, ChunkCompressionType> serializedNoDictionaryColumnMap = noDictionaryColumnMap.entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e -> ChunkCompressionType.valueOf(e.getValue())));
this.setRawIndexCompressionType(serializedNoDictionaryColumnMap);
}
}
@@ -267,10 +267,11 @@ public class SegmentGeneratorConfig implements Serializable {
List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList();
if (fieldConfigList != null) {
for (FieldConfig fieldConfig : fieldConfigList) {
- if (fieldConfig.getEncodingType() == FieldConfig.EncodingType.RAW && fieldConfig.getCompressionCodec() != null) {
+ if (fieldConfig.getEncodingType() == FieldConfig.EncodingType.RAW
+ && fieldConfig.getCompressionCodec() != null) {
_rawIndexCreationColumns.add(fieldConfig.getName());
- _rawIndexCompressionType.put(fieldConfig.getName(),
- ChunkCompressionType.valueOf(fieldConfig.getCompressionCodec().name()));
+ _rawIndexCompressionType
+ .put(fieldConfig.getName(), ChunkCompressionType.valueOf(fieldConfig.getCompressionCodec().name()));
}
}
}
@@ -476,6 +477,14 @@ public class SegmentGeneratorConfig implements Serializable {
_creatorVersion = creatorVersion;
}
+ public String getSegmentNamePrefix() {
+ return _segmentNamePrefix;
+ }
+
+ public void setSegmentNamePrefix(String segmentNamePrefix) {
+ _segmentNamePrefix = segmentNamePrefix;
+ }
+
public String getSegmentNamePostfix() {
return _segmentNamePostfix;
}
@@ -604,7 +613,11 @@ public class SegmentGeneratorConfig implements Serializable {
if (_segmentName != null) {
return new FixedSegmentNameGenerator(_segmentName);
}
- return new SimpleSegmentNameGenerator(_rawTableName, _segmentNamePostfix);
+ if (_segmentNamePrefix != null) {
+ return new SimpleSegmentNameGenerator(_segmentNamePrefix, _segmentNamePostfix);
+ } else {
+ return new SimpleSegmentNameGenerator(_rawTableName, _segmentNamePostfix);
+ }
}
public void setSegmentNameGenerator(SegmentNameGenerator segmentNameGenerator) {
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/SimpleSegmentNameGenerator.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/SimpleSegmentNameGenerator.java
index 82c0f71..67f1670 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/SimpleSegmentNameGenerator.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/SimpleSegmentNameGenerator.java
@@ -34,23 +34,23 @@ import javax.annotation.Nullable;
* </ul>
*/
public class SimpleSegmentNameGenerator implements SegmentNameGenerator {
- private final String _tableName;
+ private final String _segmentNamePrefix;
private final String _segmentNamePostfix;
- public SimpleSegmentNameGenerator(String tableName, String segmentNamePostfix) {
- _tableName = tableName;
+ public SimpleSegmentNameGenerator(String segmentNamePrefix, @Nullable String segmentNamePostfix) {
+ _segmentNamePrefix = segmentNamePrefix;
_segmentNamePostfix = segmentNamePostfix;
}
@Override
public String generateSegmentName(int sequenceId, @Nullable Object minTimeValue, @Nullable Object maxTimeValue) {
return JOINER
- .join(_tableName, minTimeValue, maxTimeValue, _segmentNamePostfix, sequenceId >= 0 ? sequenceId : null);
+ .join(_segmentNamePrefix, minTimeValue, maxTimeValue, _segmentNamePostfix, sequenceId >= 0 ? sequenceId : null);
}
@Override
public String toString() {
- StringBuilder stringBuilder = new StringBuilder("SimpleSegmentNameGenerator: tableName=").append(_tableName);
+ StringBuilder stringBuilder = new StringBuilder("SimpleSegmentNameGenerator: tableName=").append(_segmentNamePrefix);
if (_segmentNamePostfix != null) {
stringBuilder.append(", segmentNamePostfix=").append(_segmentNamePostfix);
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
index a496cf2..e6c8fed 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
@@ -204,10 +204,10 @@ public class GenericRow implements Serializable {
}
/**
- * Marks a field as {@code non-null}.
+ * Marks a field as {@code non-null} and returns whether the field was marked as {@code null}.
*/
- public void removeNullValueField(String fieldName) {
- _nullValueFields.remove(fieldName);
+ public boolean removeNullValueField(String fieldName) {
+ return _nullValueFields.remove(fieldName);
}
/**
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/SegmentProcessorFrameworkCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/SegmentProcessorFrameworkCommand.java
index b3387e5..daa6e6c 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/SegmentProcessorFrameworkCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/SegmentProcessorFrameworkCommand.java
@@ -92,11 +92,12 @@ public class SegmentProcessorFrameworkCommand extends AbstractBaseAdminCommand i
TableConfig tableConfig =
JsonUtils.fileToObject(new File(segmentProcessorFrameworkSpec.getTableConfigFile()), TableConfig.class);
SegmentProcessorConfig segmentProcessorConfig =
- new SegmentProcessorConfig.Builder().setSchema(schema).setTableConfig(tableConfig)
+ new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema)
+ .setMergeType(segmentProcessorFrameworkSpec.getMergeType())
.setRecordTransformerConfig(segmentProcessorFrameworkSpec.getRecordTransformerConfig())
.setRecordFilterConfig(segmentProcessorFrameworkSpec.getRecordFilterConfig())
.setPartitionerConfigs(segmentProcessorFrameworkSpec.getPartitionerConfigs())
- .setCollectorConfig(segmentProcessorFrameworkSpec.getCollectorConfig())
+ .setAggregationTypes(segmentProcessorFrameworkSpec.getAggregationTypes())
.setSegmentConfig(segmentProcessorFrameworkSpec.getSegmentConfig()).build();
SegmentProcessorFramework framework =
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/processor/SegmentProcessorFrameworkSpec.java b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/processor/SegmentProcessorFrameworkSpec.java
index 7ec6cfd..c38499a 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/processor/SegmentProcessorFrameworkSpec.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/processor/SegmentProcessorFrameworkSpec.java
@@ -19,11 +19,13 @@
package org.apache.pinot.tools.segment.processor;
import java.util.List;
-import org.apache.pinot.core.segment.processing.collector.CollectorConfig;
+import java.util.Map;
import org.apache.pinot.core.segment.processing.filter.RecordFilterConfig;
+import org.apache.pinot.core.segment.processing.framework.MergeType;
import org.apache.pinot.core.segment.processing.framework.SegmentConfig;
import org.apache.pinot.core.segment.processing.partitioner.PartitionerConfig;
import org.apache.pinot.core.segment.processing.transformer.RecordTransformerConfig;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
/**
@@ -36,10 +38,11 @@ public class SegmentProcessorFrameworkSpec {
private String _tableConfigFile;
private String _schemaFile;
+ private MergeType _mergeType;
private RecordTransformerConfig _recordTransformerConfig;
private RecordFilterConfig _recordFilterConfig;
private List<PartitionerConfig> _partitionerConfigs;
- private CollectorConfig _collectorConfig;
+ private Map<String, AggregationFunctionType> _aggregationTypes;
private SegmentConfig _segmentConfig;
public String getInputSegmentsDir() {
@@ -74,6 +77,14 @@ public class SegmentProcessorFrameworkSpec {
_schemaFile = schemaFile;
}
+ public MergeType getMergeType() {
+ return _mergeType;
+ }
+
+ public void setMergeType(MergeType mergeType) {
+ _mergeType = mergeType;
+ }
+
public RecordTransformerConfig getRecordTransformerConfig() {
return _recordTransformerConfig;
}
@@ -98,12 +109,12 @@ public class SegmentProcessorFrameworkSpec {
_partitionerConfigs = partitionerConfigs;
}
- public CollectorConfig getCollectorConfig() {
- return _collectorConfig;
+ public Map<String, AggregationFunctionType> getAggregationTypes() {
+ return _aggregationTypes;
}
- public void setCollectorConfig(CollectorConfig collectorConfig) {
- _collectorConfig = collectorConfig;
+ public void setAggregationTypes(Map<String, AggregationFunctionType> aggregationTypes) {
+ _aggregationTypes = aggregationTypes;
}
public SegmentConfig getSegmentConfig() {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org