You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2021/06/25 19:37:38 UTC

[GitHub] [incubator-pinot] Jackie-Jiang opened a new pull request #7092: SegmentProcessorFramework Enhancement

Jackie-Jiang opened a new pull request #7092:
URL: https://github.com/apache/incubator-pinot/pull/7092


   ## Description
   Enhance `SegmentProcessorFramework` with the following improvements:
   - Refactor reducer to:
     - Support CONCAT/ROLLUP/DEDUP merge type (using off-heap memory)
     - Support null values
     - Generate generic row file as intermediate file
     - Minimize the intermediate file generated (0 for CONCAT, 1 for ROLLUP/DEDUP)
   - Add `GenericRowFileRecordReader` which can directly read generic row file for a range of records to avoid creating extra intermediate files when generating segments
   - Simplify the `MergeRollupTaskExecutor` to directly use the `SegmentProcessorFramework`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #7092: SegmentProcessorFramework Enhancement

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on a change in pull request #7092:
URL: https://github.com/apache/incubator-pinot/pull/7092#discussion_r660894852



##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtime_to_offline_segments/RealtimeToOfflineSegmentsTaskExecutor.java
##########
@@ -146,20 +145,24 @@ public void preProcess(PinotTaskConfig pinotTaskConfig) {
 
     String timeColumnTransformFunction =
         configs.get(MinionConstants.RealtimeToOfflineSegmentsTask.TIME_COLUMN_TRANSFORM_FUNCTION_KEY);
-    String collectorTypeStr = configs.get(MinionConstants.RealtimeToOfflineSegmentsTask.COLLECTOR_TYPE_KEY);
-    Map<String, String> aggregatorConfigs = new HashMap<>();
+    // TODO: Rename the config key

Review comment:
       Yeah, planning to do it in a separate PR as this PR is focusing on the framework enhancement




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jtao15 commented on a change in pull request #7092: SegmentProcessorFramework Enhancement

Posted by GitBox <gi...@apache.org>.
jtao15 commented on a change in pull request #7092:
URL: https://github.com/apache/incubator-pinot/pull/7092#discussion_r661024026



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/minion/MergeRollupConverter.java
##########
@@ -1,186 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.minion;
-
-import com.google.common.base.Preconditions;
-import java.io.File;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.commons.io.FileUtils;
-import org.apache.pinot.core.common.MinionConstants;
-import org.apache.pinot.core.segment.processing.collector.CollectorConfig;
-import org.apache.pinot.core.segment.processing.collector.CollectorFactory;
-import org.apache.pinot.core.segment.processing.collector.ValueAggregatorFactory;
-import org.apache.pinot.core.segment.processing.framework.SegmentConfig;
-import org.apache.pinot.core.segment.processing.framework.SegmentProcessorConfig;
-import org.apache.pinot.core.segment.processing.framework.SegmentProcessorFramework;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.data.Schema;
-
-
-/**
- * Merge rollup segment processor framework takes a list of segments and concatenates/rolls up segments based on
- * the configuration.
- *
- * TODO:
- *   1. Add the support for roll-up
- *   2. Add the support to make result segments time aligned
- *   3. Add merge/roll-up prefixes for result segments
- */
-public class MergeRollupConverter {

Review comment:
       This class can be reused in `SegmentMergeCommand`, should we keep it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #7092: SegmentProcessorFramework Enhancement

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on a change in pull request #7092:
URL: https://github.com/apache/incubator-pinot/pull/7092#discussion_r661065906



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/minion/MergeRollupConverter.java
##########
@@ -1,186 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.minion;
-
-import com.google.common.base.Preconditions;
-import java.io.File;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.commons.io.FileUtils;
-import org.apache.pinot.core.common.MinionConstants;
-import org.apache.pinot.core.segment.processing.collector.CollectorConfig;
-import org.apache.pinot.core.segment.processing.collector.CollectorFactory;
-import org.apache.pinot.core.segment.processing.collector.ValueAggregatorFactory;
-import org.apache.pinot.core.segment.processing.framework.SegmentConfig;
-import org.apache.pinot.core.segment.processing.framework.SegmentProcessorConfig;
-import org.apache.pinot.core.segment.processing.framework.SegmentProcessorFramework;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.data.Schema;
-
-
-/**
- * Merge rollup segment processor framework takes a list of segments and concatenates/rolls up segments based on
- * the configuration.
- *
- * TODO:
- *   1. Add the support for roll-up
- *   2. Add the support to make result segments time aligned
- *   3. Add merge/roll-up prefixes for result segments
- */
-public class MergeRollupConverter {

Review comment:
       I don't think it adds much value to the `SegmentProcessorFramework` though. The `SegmentMergeCommand` can also directly use the `SegmentProcessorFramework` directly similar to the `MergeRollupTaskExecutor`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jtao15 commented on pull request #7092: SegmentProcessorFramework Enhancement

Posted by GitBox <gi...@apache.org>.
jtao15 commented on pull request #7092:
URL: https://github.com/apache/incubator-pinot/pull/7092#issuecomment-871552464


   LGTM, thanks for working on the enhancement.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #7092: SegmentProcessorFramework Enhancement

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #7092:
URL: https://github.com/apache/incubator-pinot/pull/7092#discussion_r660878771



##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtime_to_offline_segments/RealtimeToOfflineSegmentsTaskExecutor.java
##########
@@ -146,20 +145,24 @@ public void preProcess(PinotTaskConfig pinotTaskConfig) {
 
     String timeColumnTransformFunction =
         configs.get(MinionConstants.RealtimeToOfflineSegmentsTask.TIME_COLUMN_TRANSFORM_FUNCTION_KEY);
-    String collectorTypeStr = configs.get(MinionConstants.RealtimeToOfflineSegmentsTask.COLLECTOR_TYPE_KEY);
-    Map<String, String> aggregatorConfigs = new HashMap<>();
+    // TODO: Rename the config key

Review comment:
       maybe introduce the new key, update the docs, and deprecate the old one?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] Jackie-Jiang merged pull request #7092: SegmentProcessorFramework Enhancement

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang merged pull request #7092:
URL: https://github.com/apache/incubator-pinot/pull/7092


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #7092: SegmentProcessorFramework Enhancement

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #7092:
URL: https://github.com/apache/incubator-pinot/pull/7092#discussion_r660878218



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/RollupReducer.java
##########
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.reducer;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.segment.processing.aggregator.ValueAggregator;
+import org.apache.pinot.core.segment.processing.aggregator.ValueAggregatorFactory;
+import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileManager;
+import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileReader;
+import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileRecordReader;
+import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileWriter;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.FieldType;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * RollupReducer aggregates the metric values for GenericRows with the same dimension + time values.
+ */
+public class RollupReducer implements Reducer {
+  private static final Logger LOGGER = LoggerFactory.getLogger(RollupReducer.class);
+  private static final AggregationFunctionType DEFAULT_AGGREGATOR_TYPE = AggregationFunctionType.SUM;
+
+  private final String _partitionId;
+  private final GenericRowFileManager _fileManager;
+  private final Map<String, AggregationFunctionType> _aggregationTypes;
+  private final File _reducerOutputDir;
+
+  public RollupReducer(String partitionId, GenericRowFileManager fileManager,
+      Map<String, AggregationFunctionType> aggregationTypes, File reducerOutputDir) {
+    _partitionId = partitionId;
+    _fileManager = fileManager;
+    _aggregationTypes = aggregationTypes;
+    _reducerOutputDir = reducerOutputDir;
+  }
+
+  @Override
+  public GenericRowFileManager reduce()
+      throws Exception {
+    LOGGER.info("Start reducing on partition: {}", _partitionId);
+    long reduceStartTimeMs = System.currentTimeMillis();
+
+    GenericRowFileReader fileReader = _fileManager.getFileReader();
+    int numRows = fileReader.getNumRows();
+    int numSortFields = fileReader.getNumSortFields();
+    LOGGER.info("Start sorting on numRows: {}, numSortFields: {}", numRows, numSortFields);
+    long sortStartTimeMs = System.currentTimeMillis();
+    GenericRowFileRecordReader recordReader = fileReader.getRecordReader();
+    LOGGER.info("Finish sorting in {}ms", System.currentTimeMillis() - sortStartTimeMs);
+
+    List<FieldSpec> fieldSpecs = _fileManager.getFieldSpecs();
+    boolean includeNullFields = _fileManager.isIncludeNullFields();
+    List<AggregatorContext> aggregatorContextList = new ArrayList<>();
+    for (FieldSpec fieldSpec : fieldSpecs) {
+      if (fieldSpec.getFieldType() == FieldType.METRIC) {
+        aggregatorContextList.add(new AggregatorContext(fieldSpec,
+            _aggregationTypes.getOrDefault(fieldSpec.getName(), DEFAULT_AGGREGATOR_TYPE)));
+      }
+    }
+
+    File partitionOutputDir = new File(_reducerOutputDir, _partitionId);
+    FileUtils.forceMkdir(partitionOutputDir);
+    LOGGER.info("Start creating rollup file under dir: {}", partitionOutputDir);
+    long rollupFileCreationStartTimeMs = System.currentTimeMillis();
+    GenericRowFileManager rollupFileManager =
+        new GenericRowFileManager(partitionOutputDir, fieldSpecs, includeNullFields, 0);
+    GenericRowFileWriter rollupFileWriter = rollupFileManager.getFileWriter();
+    GenericRow previousRow = new GenericRow();
+    recordReader.read(0, previousRow);
+    int previousRowId = 0;
+    GenericRow buffer = new GenericRow();
+    if (includeNullFields) {
+      for (int i = 1; i < numRows; i++) {
+        buffer.clear();
+        recordReader.read(i, buffer);
+        if (recordReader.compare(previousRowId, i) == 0) {
+          aggregateWithNullFields(previousRow, buffer, aggregatorContextList);
+        } else {
+          rollupFileWriter.write(previousRow);
+          previousRowId = i;
+          GenericRow temp = previousRow;
+          previousRow = buffer;
+          buffer = temp;
+        }
+      }
+    } else {
+      for (int i = 1; i < numRows; i++) {
+        buffer.clear();
+        recordReader.read(i, buffer);
+        if (recordReader.compare(previousRowId, i) == 0) {
+          aggregateWithoutNullFields(previousRow, buffer, aggregatorContextList);
+        } else {
+          rollupFileWriter.write(previousRow);
+          previousRowId = i;
+          GenericRow temp = previousRow;
+          previousRow = buffer;
+          buffer = temp;
+        }
+      }
+    }
+    rollupFileWriter.write(previousRow);
+    rollupFileManager.closeFileWriter();
+    LOGGER.info("Finish creating rollup file in {}ms", System.currentTimeMillis() - rollupFileCreationStartTimeMs);
+
+    _fileManager.cleanUp();

Review comment:
       how are we controlling the number of records per segment now?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #7092: SegmentProcessorFramework Enhancement

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #7092:
URL: https://github.com/apache/incubator-pinot/pull/7092#discussion_r660881743



##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutor.java
##########
@@ -62,33 +64,52 @@
     LOGGER.info("Starting task: {} with configs: {}", taskType, configs);
     long startMillis = System.currentTimeMillis();
 
-    String mergeTypeString = configs.get(MinionConstants.MergeRollupTask.MERGE_TYPE_KEY);
-    Preconditions.checkNotNull(mergeTypeString, "MergeType cannot be null");
-    Preconditions.checkState(mergeTypeString.equalsIgnoreCase(CollectorFactory.CollectorType.CONCAT.name()),
+    Preconditions.checkState(
+        MergeType.CONCAT.name().equalsIgnoreCase(configs.get(MinionConstants.MergeRollupTask.MERGE_TYPE_KEY)),
         "Only 'CONCAT' mode is currently supported.");
 
     String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
     TableConfig tableConfig = getTableConfig(tableNameWithType);
     Schema schema = getSchema(tableNameWithType);
 
-    Map<String, ValueAggregatorFactory.ValueAggregatorType> aggregatorConfigs =
-        MergeRollupTaskUtils.getRollupAggregationTypeMap(configs);
+    Map<String, AggregationFunctionType> aggregationTypes = MergeRollupTaskUtils.getRollupAggregationTypes(configs);
     String numRecordsPerSegmentString = configs.get(MinionConstants.MergeRollupTask.MAX_NUM_RECORDS_PER_SEGMENT);
-    int numRecordsPerSegment =
-        numRecordsPerSegmentString != null ? Integer.parseInt(numRecordsPerSegmentString) : DEFAULT_NUM_RECORDS_PER_SEGMENT;
 
-    MergeRollupConverter processorFramework =
-        new MergeRollupConverter.Builder().setTableConfig(tableConfig).setSchema(schema)
-            .setMergeType(mergeTypeString).setAggregatorConfigs(aggregatorConfigs)
-            .setNumRecordsPerSegment(numRecordsPerSegment).setOriginalIndexDirs(originalIndexDirs)
-            .setWorkingDir(workingDir).setInputSegmentDir(INPUT_SEGMENTS_DIR).setOutputSegmentDir(OUTPUT_SEGMENTS_DIR)
-            .build();
-    File[] outputFiles = processorFramework.convert();
+    SegmentProcessorConfig.Builder segmentProcessorConfigBuilder =
+        new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema)
+            .setMergeType(MergeType.CONCAT);
+    if (!aggregationTypes.isEmpty()) {
+      segmentProcessorConfigBuilder.setAggregationTypes(aggregationTypes);
+    }
+    if (numRecordsPerSegmentString != null) {
+      segmentProcessorConfigBuilder.setSegmentConfig(
+          new SegmentConfig.Builder().setMaxNumRecordsPerSegment(Integer.parseInt(numRecordsPerSegmentString)).build());
+    }
+
+    SegmentProcessorConfig segmentProcessorConfig = segmentProcessorConfigBuilder.build();
+
+    File inputSegmentsDir = new File(workingDir, INPUT_SEGMENTS_DIR);
+    Preconditions.checkState(inputSegmentsDir.mkdirs(), "Failed to create input directory: %s for task: %s",
+        inputSegmentsDir.getAbsolutePath(), taskType);
+    for (File indexDir : originalIndexDirs) {

Review comment:
       i remember a user had filed a bug once for RealtimeToOffline, where this copy from originalIndexDir to the working dir was causing space utilization to shoot up. I think they submitted a fix to use the originalIndexDir directly. Should we do that here as well?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #7092: SegmentProcessorFramework Enhancement

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on a change in pull request #7092:
URL: https://github.com/apache/incubator-pinot/pull/7092#discussion_r660895862



##########
File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutor.java
##########
@@ -62,33 +64,52 @@
     LOGGER.info("Starting task: {} with configs: {}", taskType, configs);
     long startMillis = System.currentTimeMillis();
 
-    String mergeTypeString = configs.get(MinionConstants.MergeRollupTask.MERGE_TYPE_KEY);
-    Preconditions.checkNotNull(mergeTypeString, "MergeType cannot be null");
-    Preconditions.checkState(mergeTypeString.equalsIgnoreCase(CollectorFactory.CollectorType.CONCAT.name()),
+    Preconditions.checkState(
+        MergeType.CONCAT.name().equalsIgnoreCase(configs.get(MinionConstants.MergeRollupTask.MERGE_TYPE_KEY)),
         "Only 'CONCAT' mode is currently supported.");
 
     String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
     TableConfig tableConfig = getTableConfig(tableNameWithType);
     Schema schema = getSchema(tableNameWithType);
 
-    Map<String, ValueAggregatorFactory.ValueAggregatorType> aggregatorConfigs =
-        MergeRollupTaskUtils.getRollupAggregationTypeMap(configs);
+    Map<String, AggregationFunctionType> aggregationTypes = MergeRollupTaskUtils.getRollupAggregationTypes(configs);
     String numRecordsPerSegmentString = configs.get(MinionConstants.MergeRollupTask.MAX_NUM_RECORDS_PER_SEGMENT);
-    int numRecordsPerSegment =
-        numRecordsPerSegmentString != null ? Integer.parseInt(numRecordsPerSegmentString) : DEFAULT_NUM_RECORDS_PER_SEGMENT;
 
-    MergeRollupConverter processorFramework =
-        new MergeRollupConverter.Builder().setTableConfig(tableConfig).setSchema(schema)
-            .setMergeType(mergeTypeString).setAggregatorConfigs(aggregatorConfigs)
-            .setNumRecordsPerSegment(numRecordsPerSegment).setOriginalIndexDirs(originalIndexDirs)
-            .setWorkingDir(workingDir).setInputSegmentDir(INPUT_SEGMENTS_DIR).setOutputSegmentDir(OUTPUT_SEGMENTS_DIR)
-            .build();
-    File[] outputFiles = processorFramework.convert();
+    SegmentProcessorConfig.Builder segmentProcessorConfigBuilder =
+        new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema)
+            .setMergeType(MergeType.CONCAT);
+    if (!aggregationTypes.isEmpty()) {
+      segmentProcessorConfigBuilder.setAggregationTypes(aggregationTypes);
+    }
+    if (numRecordsPerSegmentString != null) {
+      segmentProcessorConfigBuilder.setSegmentConfig(
+          new SegmentConfig.Builder().setMaxNumRecordsPerSegment(Integer.parseInt(numRecordsPerSegmentString)).build());
+    }
+
+    SegmentProcessorConfig segmentProcessorConfig = segmentProcessorConfigBuilder.build();
+
+    File inputSegmentsDir = new File(workingDir, INPUT_SEGMENTS_DIR);
+    Preconditions.checkState(inputSegmentsDir.mkdirs(), "Failed to create input directory: %s for task: %s",
+        inputSegmentsDir.getAbsolutePath(), taskType);
+    for (File indexDir : originalIndexDirs) {

Review comment:
       Yes, we should avoid the copying within the executor. Planning to eliminate the copy for both realtime-to-offline and merge-rollup in a separate PR




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] codecov-commenter commented on pull request #7092: SegmentProcessorFramework Enhancement

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #7092:
URL: https://github.com/apache/incubator-pinot/pull/7092#issuecomment-868856811


   # [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/7092?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#7092](https://codecov.io/gh/apache/incubator-pinot/pull/7092?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (87f6846) into [master](https://codecov.io/gh/apache/incubator-pinot/commit/c856c6ca1938d52ec3e287e112695a1425032e51?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (c856c6c) will **decrease** coverage by `32.01%`.
   > The diff coverage is `32.71%`.
   
   > :exclamation: Current head 87f6846 differs from pull request most recent head 1749030. Consider uploading reports for the commit 1749030 to get more accurate results
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-pinot/pull/7092/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-pinot/pull/7092?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@              Coverage Diff              @@
   ##             master    #7092       +/-   ##
   =============================================
   - Coverage     73.56%   41.55%   -32.02%     
   + Complexity       91        7       -84     
   =============================================
     Files          1491     1487        -4     
     Lines         73408    73361       -47     
     Branches      10573    10575        +2     
   =============================================
   - Hits          54002    30483    -23519     
   - Misses        15882    40306    +24424     
   + Partials       3524     2572      -952     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration | `41.55% <32.71%> (-0.11%)` | :arrow_down: |
   | unittests | `?` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-pinot/pull/7092?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...not/core/minion/rollup/RollupRecordAggregator.java](https://codecov.io/gh/apache/incubator-pinot/pull/7092/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9taW5pb24vcm9sbHVwL1JvbGx1cFJlY29yZEFnZ3JlZ2F0b3IuamF2YQ==) | `0.00% <0.00%> (-87.50%)` | :arrow_down: |
   | [...ment/processing/aggregator/MaxValueAggregator.java](https://codecov.io/gh/apache/incubator-pinot/pull/7092/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9zZWdtZW50L3Byb2Nlc3NpbmcvYWdncmVnYXRvci9NYXhWYWx1ZUFnZ3JlZ2F0b3IuamF2YQ==) | `0.00% <ø> (ø)` | |
   | [...ment/processing/aggregator/MinValueAggregator.java](https://codecov.io/gh/apache/incubator-pinot/pull/7092/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9zZWdtZW50L3Byb2Nlc3NpbmcvYWdncmVnYXRvci9NaW5WYWx1ZUFnZ3JlZ2F0b3IuamF2YQ==) | `0.00% <ø> (ø)` | |
   | [...ment/processing/aggregator/SumValueAggregator.java](https://codecov.io/gh/apache/incubator-pinot/pull/7092/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9zZWdtZW50L3Byb2Nlc3NpbmcvYWdncmVnYXRvci9TdW1WYWx1ZUFnZ3JlZ2F0b3IuamF2YQ==) | `0.00% <ø> (ø)` | |
   | [.../processing/aggregator/ValueAggregatorFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/7092/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9zZWdtZW50L3Byb2Nlc3NpbmcvYWdncmVnYXRvci9WYWx1ZUFnZ3JlZ2F0b3JGYWN0b3J5LmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...nt/processing/genericrow/GenericRowSerializer.java](https://codecov.io/gh/apache/incubator-pinot/pull/7092/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9zZWdtZW50L3Byb2Nlc3NpbmcvZ2VuZXJpY3Jvdy9HZW5lcmljUm93U2VyaWFsaXplci5qYXZh) | `68.90% <0.00%> (-24.32%)` | :arrow_down: |
   | [.../core/segment/processing/reducer/DedupReducer.java](https://codecov.io/gh/apache/incubator-pinot/pull/7092/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9zZWdtZW50L3Byb2Nlc3NpbmcvcmVkdWNlci9EZWR1cFJlZHVjZXIuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...core/segment/processing/reducer/RollupReducer.java](https://codecov.io/gh/apache/incubator-pinot/pull/7092/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9zZWdtZW50L3Byb2Nlc3NpbmcvcmVkdWNlci9Sb2xsdXBSZWR1Y2VyLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...on/tasks/merge\_rollup/MergeRollupTaskExecutor.java](https://codecov.io/gh/apache/incubator-pinot/pull/7092/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvbWVyZ2Vfcm9sbHVwL01lcmdlUm9sbHVwVGFza0V4ZWN1dG9yLmphdmE=) | `0.00% <0.00%> (-88.24%)` | :arrow_down: |
   | [...inion/tasks/merge\_rollup/MergeRollupTaskUtils.java](https://codecov.io/gh/apache/incubator-pinot/pull/7092/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvbWVyZ2Vfcm9sbHVwL01lcmdlUm9sbHVwVGFza1V0aWxzLmphdmE=) | `0.00% <0.00%> (-87.24%)` | :arrow_down: |
   | ... and [963 more](https://codecov.io/gh/apache/incubator-pinot/pull/7092/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/7092?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/7092?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [c856c6c...1749030](https://codecov.io/gh/apache/incubator-pinot/pull/7092?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #7092: SegmentProcessorFramework Enhancement

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on a change in pull request #7092:
URL: https://github.com/apache/incubator-pinot/pull/7092#discussion_r660894070



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/RollupReducer.java
##########
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.reducer;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.segment.processing.aggregator.ValueAggregator;
+import org.apache.pinot.core.segment.processing.aggregator.ValueAggregatorFactory;
+import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileManager;
+import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileReader;
+import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileRecordReader;
+import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileWriter;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.FieldType;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * RollupReducer aggregates the metric values for GenericRows with the same dimension + time values.
+ */
+public class RollupReducer implements Reducer {
+  private static final Logger LOGGER = LoggerFactory.getLogger(RollupReducer.class);
+  private static final AggregationFunctionType DEFAULT_AGGREGATOR_TYPE = AggregationFunctionType.SUM;
+
+  private final String _partitionId;
+  private final GenericRowFileManager _fileManager;
+  private final Map<String, AggregationFunctionType> _aggregationTypes;
+  private final File _reducerOutputDir;
+
+  public RollupReducer(String partitionId, GenericRowFileManager fileManager,
+      Map<String, AggregationFunctionType> aggregationTypes, File reducerOutputDir) {
+    _partitionId = partitionId;
+    _fileManager = fileManager;
+    _aggregationTypes = aggregationTypes;
+    _reducerOutputDir = reducerOutputDir;
+  }
+
+  @Override
+  public GenericRowFileManager reduce()
+      throws Exception {
+    LOGGER.info("Start reducing on partition: {}", _partitionId);
+    long reduceStartTimeMs = System.currentTimeMillis();
+
+    GenericRowFileReader fileReader = _fileManager.getFileReader();
+    int numRows = fileReader.getNumRows();
+    int numSortFields = fileReader.getNumSortFields();
+    LOGGER.info("Start sorting on numRows: {}, numSortFields: {}", numRows, numSortFields);
+    long sortStartTimeMs = System.currentTimeMillis();
+    GenericRowFileRecordReader recordReader = fileReader.getRecordReader();
+    LOGGER.info("Finish sorting in {}ms", System.currentTimeMillis() - sortStartTimeMs);
+
+    List<FieldSpec> fieldSpecs = _fileManager.getFieldSpecs();
+    boolean includeNullFields = _fileManager.isIncludeNullFields();
+    List<AggregatorContext> aggregatorContextList = new ArrayList<>();
+    for (FieldSpec fieldSpec : fieldSpecs) {
+      if (fieldSpec.getFieldType() == FieldType.METRIC) {
+        aggregatorContextList.add(new AggregatorContext(fieldSpec,
+            _aggregationTypes.getOrDefault(fieldSpec.getName(), DEFAULT_AGGREGATOR_TYPE)));
+      }
+    }
+
+    File partitionOutputDir = new File(_reducerOutputDir, _partitionId);
+    FileUtils.forceMkdir(partitionOutputDir);
+    LOGGER.info("Start creating rollup file under dir: {}", partitionOutputDir);
+    long rollupFileCreationStartTimeMs = System.currentTimeMillis();
+    GenericRowFileManager rollupFileManager =
+        new GenericRowFileManager(partitionOutputDir, fieldSpecs, includeNullFields, 0);
+    GenericRowFileWriter rollupFileWriter = rollupFileManager.getFileWriter();
+    GenericRow previousRow = new GenericRow();
+    recordReader.read(0, previousRow);
+    int previousRowId = 0;
+    GenericRow buffer = new GenericRow();
+    if (includeNullFields) {
+      for (int i = 1; i < numRows; i++) {
+        buffer.clear();
+        recordReader.read(i, buffer);
+        if (recordReader.compare(previousRowId, i) == 0) {
+          aggregateWithNullFields(previousRow, buffer, aggregatorContextList);
+        } else {
+          rollupFileWriter.write(previousRow);
+          previousRowId = i;
+          GenericRow temp = previousRow;
+          previousRow = buffer;
+          buffer = temp;
+        }
+      }
+    } else {
+      for (int i = 1; i < numRows; i++) {
+        buffer.clear();
+        recordReader.read(i, buffer);
+        if (recordReader.compare(previousRowId, i) == 0) {
+          aggregateWithoutNullFields(previousRow, buffer, aggregatorContextList);
+        } else {
+          rollupFileWriter.write(previousRow);
+          previousRowId = i;
+          GenericRow temp = previousRow;
+          previousRow = buffer;
+          buffer = temp;
+        }
+      }
+    }
+    rollupFileWriter.write(previousRow);
+    rollupFileManager.closeFileWriter();
+    LOGGER.info("Finish creating rollup file in {}ms", System.currentTimeMillis() - rollupFileCreationStartTimeMs);
+
+    _fileManager.cleanUp();

Review comment:
       It is done during the segment creation phase, where we create the segment on a range of docs.
   The reducer will always work on the whole partition. Since the sorting is done off-heap, it should not cause any memory issue.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org