You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2020/08/27 22:29:49 UTC

[GitHub] [incubator-pinot] npawar opened a new pull request #5934: Segment processing framework

npawar opened a new pull request #5934:
URL: https://github.com/apache/incubator-pinot/pull/5934


   ## Description
   A Segment Processing Framework to convert "m" segments to "n" segments
   The phases of the Segment Processor are
   1. **Map**
   - RecordTransformation (using transform functions)
   - Partitioning (Column value based, transform function based, table config's partition config based)
   - PartitionFiltering (using filter function)
   2. **Reduce**
   - Rollup/Concat records
   - Split into parts 
   3. **Segment generation**
   
   A SegmentProcessorFrameworkCommand is provided to run this on demand. Run using command
   `bin/pinot-admin.sh SegmentProcessorFramework -segmentProcessorFrameworkSpec /<path>/spec.json` 
   where spec.json is
   ```
   {
     "inputSegmentsDir": "/<base_dir>/segmentsDir",
     "outputSegmentsDir": "/<base_dir>/outputDir/",
     "schemaFile": "/<base_dir>/schema.json",
     "tableConfigFile": "/<base_dir>/table.json",
     "recordTransformerConfig": {
       "transformFunctionsMap": {
         "epochMillis": "round(epochMillis, 86400000)" // round to nearest day
       }
     },
     "partitioningConfig": {
       "partitionerType": "COLUMN_VALUE", // partition on epochMillis
       "columnName": "epochMillis"
     },
     "collectorConfig": {
       "collectorType": "ROLLUP", // rollup clicks by summing
       "aggregatorTypeMap": {
         "clicks": "SUM"
       }
     },
     "segmentConfig": {
       "maxNumRecordsPerSegment": 200_000
     }
   }
   ```
   
   Note:
   1. Currently this framework attempts to do no parallelism in the map/reduce/segment creation jobs. Each input file will be processed sequentially in map stage, each part will be executed sequentially in reduce, and each segment will be built one after another. We can change this in the future if the need arises to make this more advanced.
   2. The framework makes the assumption that there's enough memory to hold all records of a partition in memory, during rollups in reducer. A limit of 5M records has been set on the Reducer as the number of records to collect before forcing a flush, as a safety measure. In future we could consider using off heap processing, if memory becomes a problem.
   
    This framework will typically be used by minion tasks, which want to perform some processing on segments
    (eg task which merges segments, tasks which aligns segments per time boundaries etc). The existing Segment merge jobs can be changed to use this framework.
   
   **Pending**
   An end-to-end test for the framework (WIP)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #5934: Segment processing framework

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #5934:
URL: https://github.com/apache/incubator-pinot/pull/5934#discussion_r486699922



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java
##########
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.framework;
+
+import com.google.common.base.Preconditions;
+import org.apache.pinot.core.segment.processing.collector.CollectorConfig;
+import org.apache.pinot.core.segment.processing.filter.RecordFilterConfig;
+import org.apache.pinot.core.segment.processing.partitioner.PartitioningConfig;
+import org.apache.pinot.core.segment.processing.transformer.RecordTransformerConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+
+
+/**
+ * Config for configuring the phases of {@link SegmentProcessorFramework}
+ */
+public class SegmentProcessorConfig {

Review comment:
       These config can go into the `SegmentConfig` class.
   I will add seqId to the SegmentGenerationConfig in the driver. seqId will also help with the other problem you caught (same segment name if start/end is same). 
   
   Config for prefix can be added in a future change.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #5934: Segment processing framework

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #5934:
URL: https://github.com/apache/incubator-pinot/pull/5934#discussion_r488812749



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/utils/SegmentProcessorUtils.java
##########
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.utils;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * Helper util methods for SegmentProcessorFramework
+ */
+public final class SegmentProcessorUtils {
+
+  private SegmentProcessorUtils() {
+  }
+
+  /**
+   * Convert a GenericRow to an avro GenericRecord
+   */
+  public static GenericData.Record convertGenericRowToAvroRecord(GenericRow genericRow,
+      GenericData.Record reusableRecord) {
+    for (String field : genericRow.getFieldToValueMap().keySet()) {

Review comment:
       I see, got it. In the interest of getting this long standing and big PR merged, I will think about this and take it up immediately in a following PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #5934: Segment processing framework

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #5934:
URL: https://github.com/apache/incubator-pinot/pull/5934#discussion_r481326091



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapper.java
##########
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.framework;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
+import org.apache.pinot.core.segment.processing.partitioner.PartitionFilter;
+import org.apache.pinot.core.segment.processing.partitioner.Partitioner;
+import org.apache.pinot.core.segment.processing.partitioner.PartitionerFactory;
+import org.apache.pinot.core.segment.processing.transformer.RecordTransformer;
+import org.apache.pinot.core.segment.processing.transformer.RecordTransformerFactory;
+import org.apache.pinot.core.segment.processing.utils.SegmentProcessorUtils;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Mapper phase of the SegmentProcessorFramework.
+ * Reads the input segment and creates partitioned avro data files
+ * Performs:
+ * - record transformations
+ * - partitioning
+ * - partition filtering
+ */
+public class SegmentMapper {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentMapper.class);
+  private final File _inputSegment;
+  private final File _mapperOutputDir;
+
+  private final String _mapperId;
+  private final Schema _avroSchema;
+  private final RecordTransformer _recordTransformer;
+  private final Partitioner _partitioner;
+  private final PartitionFilter _partitionFilter;
+  private final Map<String, DataFileWriter<GenericData.Record>> _partitionToDataFileWriterMap = new HashMap<>();
+
+  public SegmentMapper(String mapperId, File inputSegment, SegmentMapperConfig mapperConfig, File mapperOutputDir) {
+    _inputSegment = inputSegment;
+    _mapperOutputDir = mapperOutputDir;
+
+    _mapperId = mapperId;
+    _avroSchema = SegmentProcessorUtils.convertPinotSchemaToAvroSchema(mapperConfig.getPinotSchema());
+    _recordTransformer = RecordTransformerFactory.getRecordTransformer(mapperConfig.getRecordTransformerConfig());
+    _partitioner = PartitionerFactory.getPartitioner(mapperConfig.getPartitioningConfig());
+    _partitionFilter = PartitionerFactory.getPartitionFilter(mapperConfig.getPartitioningConfig());
+    LOGGER.info(
+        "Initialized mapper with id: {}, input segment: {}, output dir: {}, recordTransformer: {}, partitioner: {}, partitionFilter: {}",
+        _mapperId, _inputSegment, _mapperOutputDir, _recordTransformer.getClass(), _partitioner.getClass(),
+        _partitionFilter.getClass());
+  }
+
+  /**
+   * Reads the input segment and generates partitioned avro data files into the mapper output directory
+   * Records for each partition are put into a directory of its own withing the mapper output directory, identified by the partition name
+   */
+  public void map()
+      throws Exception {
+
+    PinotSegmentRecordReader segmentRecordReader = new PinotSegmentRecordReader(_inputSegment);
+    GenericRow reusableRow = new GenericRow();
+    GenericData.Record reusableRecord = new GenericData.Record(_avroSchema);
+
+    Set<String> selectedPartitions = new HashSet<>();
+    Set<String> rejectedPartitions = new HashSet<>();
+
+    while (segmentRecordReader.hasNext()) {
+      reusableRow = segmentRecordReader.next(reusableRow);
+
+      // Record transformation
+      reusableRow = _recordTransformer.transformRecord(reusableRow);

Review comment:
       I have removed PartitionFilter step and introduced record filtering.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #5934: Segment processing framework

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on a change in pull request #5934:
URL: https://github.com/apache/incubator-pinot/pull/5934#discussion_r480489910



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/CollectorConfig.java
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.collector;
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+
+/**
+ * Config for Collector
+ */
+@JsonDeserialize(builder = CollectorConfig.Builder.class)
+public class CollectorConfig {
+  private static final CollectorFactory.CollectorType DEFAULT_COLLECTOR_TYPE = CollectorFactory.CollectorType.CONCAT;
+
+  private final CollectorFactory.CollectorType _collectorType;
+  private final Map<String, ValueAggregatorFactory.ValueAggregatorType> _aggregatorTypeMap;
+
+  private CollectorConfig(CollectorFactory.CollectorType collectorType,

Review comment:
       It should also include sorted columns (list of columns, where records are sorted on the first column firstly, then second column etc.)

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/function/FunctionEvaluator.java
##########
@@ -36,4 +36,9 @@
    * Evaluate the function on the generic row and return the result
    */
   Object evaluate(GenericRow genericRow);
+
+  /**
+   * Evaluate the function on the given arguments
+   */
+  Object evaluate(Object[] arguments);

Review comment:
       The arguments should be a key-value pair instead of an array. How are you going to map the values to the function parameters?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/function/InbuiltFunctionEvaluator.java
##########
@@ -155,5 +177,10 @@ public String execute(GenericRow row) {
     public Object execute(GenericRow row) {
       return row.getValue(_column);
     }
+
+    @Override
+    public Object execute(Object[] arguments) {
+      throw new UnsupportedOperationException("Operation not supported for ColumnExecutionNode");

Review comment:
       This API won't work for in-build functions because it needs to read column values in order to evaluate the function

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentReducer.java
##########
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.framework;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.pinot.core.segment.processing.collector.Collector;
+import org.apache.pinot.core.segment.processing.collector.CollectorFactory;
+import org.apache.pinot.core.segment.processing.utils.SegmentProcessorUtils;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Reducer phase of the SegmentProcessorFramework
+ * Reads the avro files in the input directory and creates output avro files in the reducer output directory.
+ * The avro files in the input directory are expected to contain data for only 1 partition
+ * Performs operations on that partition data as follows:
+ * - concatenation/rollup of records
+ * - split
+ * - TODO: dedup
+ */
+public class SegmentReducer {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentReducer.class);
+  private static final int MAX_RECORDS_TO_COLLECT = 5_000_000;
+
+  private final File _reducerInputDir;
+  private final File _reducerOutputDir;
+
+  private final String _reducerId;
+  private final Schema _pinotSchema;
+  private final org.apache.avro.Schema _avroSchema;
+  private final Collector _collector;
+  private final int _numRecordsPerPart;
+
+  public SegmentReducer(String reducerId, File reducerInputDir, SegmentReducerConfig reducerConfig,
+      File reducerOutputDir) {
+    _reducerInputDir = reducerInputDir;
+    _reducerOutputDir = reducerOutputDir;
+
+    _reducerId = reducerId;
+    _pinotSchema = reducerConfig.getPinotSchema();
+    _avroSchema = SegmentProcessorUtils.convertPinotSchemaToAvroSchema(_pinotSchema);
+    _collector = CollectorFactory.getCollector(reducerConfig.getCollectorConfig(), _pinotSchema);
+    _numRecordsPerPart = reducerConfig.getNumRecordsPerPart();
+    LOGGER.info("Initialized reducer with id: {}, input dir: {}, output dir: {}, collector: {}, numRecordsPerPart: {}",
+        _reducerId, _reducerInputDir, _reducerOutputDir, _collector.getClass(), _numRecordsPerPart);
+  }
+
+  /**
+   * Reads the avro files in the input directory.
+   * Performs configured operations and outputs to other avro file(s) in the reducer output directory.
+   */
+  public void reduce()
+      throws Exception {
+
+    int part = 0;
+    for (File inputFile : _reducerInputDir.listFiles()) {
+
+      RecordReader avroRecordReader = RecordReaderFactory
+          .getRecordReaderByClass("org.apache.pinot.plugin.inputformat.avro.AvroRecordReader", inputFile,
+              _pinotSchema.getColumnNames(), null);
+
+      while (avroRecordReader.hasNext()) {
+        GenericRow next = avroRecordReader.next();
+
+        // Aggregations
+        _collector.collect(next);
+
+        // Exceeded max records allowed to collect. Flush
+        if (_collector.size() == MAX_RECORDS_TO_COLLECT) {

Review comment:
       Should we use `_numRecordsPerPart` here so that once the collector collects enough records, we flush them?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/partitioner/PartitionFilter.java
##########
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.partitioner;
+
+/**
+ * Used for filtering partitions in the mapper
+ */
+public interface PartitionFilter {

Review comment:
       Why associating the filter with the partition? We can apply the filter to the records, then you can directly use the current `FunctionEvaluator`
   IMO, record filtering is easier to config and more intuitive. I cannot think of a use case where we have to apply the filter to the partition

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/partitioner/PartitionerFactory.java
##########
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.partitioner;
+
+import com.google.common.base.Preconditions;
+
+
+/**
+ * Factory for Partitioner and PartitionFilter
+ */
+public final class PartitionerFactory {
+
+  private PartitionerFactory() {
+
+  }
+
+  public enum PartitionerType {
+    NO_OP, ROW_HASH, COLUMN_VALUE, TRANSFORM_FUNCTION, TABLE_PARTITION_CONFIG
+  }
+
+  /**
+   * Construct a Partitioner using the PartitioningConfig
+   */
+  public static Partitioner getPartitioner(PartitioningConfig config) {
+
+    Partitioner partitioner = null;
+    switch (config.getPartitionerType()) {
+      case NO_OP:
+        partitioner = new NoOpPartitioner();
+        break;
+      case ROW_HASH:

Review comment:
       I feel ROW_HASH is not really useful. Maybe ROUND_ROBIN is enough?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #5934: Segment processing framework

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on a change in pull request #5934:
URL: https://github.com/apache/incubator-pinot/pull/5934#discussion_r488337421



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/utils/SegmentProcessorUtils.java
##########
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.utils;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * Helper util methods for SegmentProcessorFramework
+ */
+public final class SegmentProcessorUtils {
+
+  private SegmentProcessorUtils() {
+  }
+
+  /**
+   * Convert a GenericRow to an avro GenericRecord
+   */
+  public static GenericData.Record convertGenericRowToAvroRecord(GenericRow genericRow,
+      GenericData.Record reusableRecord) {
+    for (String field : genericRow.getFieldToValueMap().keySet()) {

Review comment:
       The value in `fieldToValueMap` should be the default value if the original value is `null`. IMO we should put `null` instead of default value in the `GenericData.Record` so that the `null` values can be populated to the new segment




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #5934: Segment processing framework

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #5934:
URL: https://github.com/apache/incubator-pinot/pull/5934#discussion_r486690778



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/partitioner/ColumnValuePartitioner.java
##########
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.partitioner;
+
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * Partitioner which extracts a column value as the partition
+ */
+public class ColumnValuePartitioner implements Partitioner {
+
+  private final String _columnName;
+
+  public ColumnValuePartitioner(String columnName) {
+    _columnName = columnName;
+  }
+
+  @Override
+  public String getPartition(GenericRow genericRow) {

Review comment:
       Not particularly written for supporting time alignment, but can be used for that if values are to be used as is for partitioning.
   
   If time column is seconds/hours, but we want to align by day, yes we can use TransformFunctionPartitioner.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #5934: Segment processing framework

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #5934:
URL: https://github.com/apache/incubator-pinot/pull/5934#discussion_r485969046



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/RollupCollector.java
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.collector;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.MetricFieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * A Collector that rolls up the incoming records on unique dimensions + time columns, based on provided aggregation types for metrics.
+ * By default will use the SUM aggregation on metrics.
+ */
+public class RollupCollector implements Collector {
+
+  private final Map<Record, GenericRow> _collection = new HashMap<>();
+  private Iterator<GenericRow> _iterator;
+  private GenericRowSorter _sorter;
+
+  private final int _keySize;
+  private final int _valueSize;
+  private final String[] _keyColumns;
+  private final String[] _valueColumns;
+  private final ValueAggregator[] _valueAggregators;
+  private final MetricFieldSpec[] _metricFieldSpecs;
+
+  public RollupCollector(CollectorConfig collectorConfig, Schema schema) {
+    _keySize = schema.getPhysicalColumnNames().size() - schema.getMetricNames().size();
+    _valueSize = schema.getMetricNames().size();
+    _keyColumns = new String[_keySize];
+    _valueColumns = new String[_valueSize];
+    _valueAggregators = new ValueAggregator[_valueSize];
+    _metricFieldSpecs = new MetricFieldSpec[_valueSize];
+
+    Map<String, ValueAggregatorFactory.ValueAggregatorType> aggregatorTypeMap = collectorConfig.getAggregatorTypeMap();
+    if (aggregatorTypeMap == null) {
+      aggregatorTypeMap = Collections.emptyMap();
+    }
+    int valIdx = 0;
+    int keyIdx = 0;
+    for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
+      if (!fieldSpec.isVirtualColumn()) {
+        String name = fieldSpec.getName();
+        if (fieldSpec.getFieldType().equals(FieldSpec.FieldType.METRIC)) {
+          _metricFieldSpecs[valIdx] = (MetricFieldSpec) fieldSpec;
+          _valueColumns[valIdx] = name;
+          _valueAggregators[valIdx] = ValueAggregatorFactory.getValueAggregator(
+              aggregatorTypeMap.getOrDefault(name, ValueAggregatorFactory.ValueAggregatorType.SUM).toString());
+          valIdx++;
+        } else {
+          _keyColumns[keyIdx++] = name;
+        }
+      }
+    }
+
+    List<String> sortOrder = collectorConfig.getSortOrder();
+    if (sortOrder.size() > 0) {
+      _sorter = new GenericRowSorter(sortOrder, schema);
+    }
+  }
+
+  /**
+   * If a row already exists in the collection (based on dimension + time columns), rollup the metric values, else add the row
+   */
+  @Override
+  public void collect(GenericRow genericRow) {

Review comment:
       I guess that we basically keep the entire data for a segment on JVM heap? 
   
   In the future, we may need to add the off-heap or file-based collector to avoid OOM error when reading large segments. (e.g. 1-2gb Pinot segment can be extremely large in row format)
   
   Another way to save memory is to sort the data on all dimensions and scan at once for aggregation (but this paying a large cost for cases when the data doesn't need to be sorted)

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/partitioner/TableConfigPartitioner.java
##########
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.partitioner;
+
+import org.apache.pinot.core.data.partition.PartitionFunction;
+import org.apache.pinot.core.data.partition.PartitionFunctionFactory;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * Partitioner which computes partition values based on the ColumnPartitionConfig from the table config
+ */
+public class TableConfigPartitioner implements Partitioner {

Review comment:
       What if I need to align data on time while the table is custom partitioned? (just trying to brainstorm how we will extend the current partitioner to support this)
   
   Then, we can probably add the new partitioner that combines the value from `TableConfigPartitioner` and `TransformationPartitioner`?
   
   
   e.g.  partition on memberId using murmur, need to enable segment merge so the data needs to be time aligned.
   
   1. Use table config partitioner to get the partition id based on murmur on memberId -> Let's day `2`
   2. Use time align partitioner -> Let's say `2020/12/12`
   
   Combine 1&2 -> `2020/12/12-2` <- example of partitionId
   
   We can do something like the above?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java
##########
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.framework;
+
+import com.google.common.base.Preconditions;
+import org.apache.pinot.core.segment.processing.collector.CollectorConfig;
+import org.apache.pinot.core.segment.processing.filter.RecordFilterConfig;
+import org.apache.pinot.core.segment.processing.partitioner.PartitioningConfig;
+import org.apache.pinot.core.segment.processing.transformer.RecordTransformerConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+
+
+/**
+ * Config for configuring the phases of {@link SegmentProcessorFramework}
+ */
+public class SegmentProcessorConfig {

Review comment:
       One requirement for `SegmentMergeRollup` is to be able to put the custom name for the segment name (or at least need to put the prefix and the sequenced `merged_XXX_0...M`
   
   Where do you think it's the best place to configure those?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/partitioner/ColumnValuePartitioner.java
##########
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.partitioner;
+
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * Partitioner which extracts a column value as the partition
+ */
+public class ColumnValuePartitioner implements Partitioner {
+
+  private final String _columnName;
+
+  public ColumnValuePartitioner(String columnName) {
+    _columnName = columnName;
+  }
+
+  @Override
+  public String getPartition(GenericRow genericRow) {

Review comment:
       Is this intended for supporting time alignment?
   
   What if the time column granularity is in seconds/hours while push frequency is DAY?
   
   In that case, we may need to use `TransformationPartitioner`?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/partitioner/NoOpPartitioner.java
##########
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.partitioner;
+
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * Partitioner implementation which always returns constant partition value "0"
+ */
+public class NoOpPartitioner implements Partitioner {
+  @Override
+  public String getPartition(GenericRow genericRow) {
+    return "0";

Review comment:
       No-op partitioner means that we always create a single output file?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
##########
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.framework;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.util.Arrays;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.FileFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A framework to process "m" given segments and convert them into "n" segments
+ * The phases of the Segment Processor are
+ * 1. Map - record transformation, partitioning, partition filtering
+ * 2. Reduce - rollup, concat, split etc
+ * 3. Segment generation
+ *
+ * This will typically be used by minion tasks, which want to perform some processing on segments
+ * (eg task which merges segments, tasks which aligns segments per time boundaries etc)
+ */
+public class SegmentProcessorFramework {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentProcessorFramework.class);
+
+  private final File _inputSegmentsDir;
+  private final File _outputSegmentsDir;
+  private final SegmentProcessorConfig _segmentProcessorConfig;
+
+  private final Schema _pinotSchema;
+  private final TableConfig _tableConfig;
+
+  private final File _baseDir;
+  private final File _mapperInputDir;
+  private final File _mapperOutputDir;
+  private final File _reducerOutputDir;
+
+  /**
+   * Initializes the Segment Processor framework with input segments, output path and processing config
+   * @param inputSegmentsDir directory containing the input segments. These can be tarred or untarred.
+   * @param segmentProcessorConfig config for segment processing
+   * @param outputSegmentsDir directory for placing the resulting segments. This should already exist.
+   */
+  public SegmentProcessorFramework(File inputSegmentsDir, SegmentProcessorConfig segmentProcessorConfig,
+      File outputSegmentsDir) {
+
+    LOGGER.info(
+        "Initializing SegmentProcessorFramework with input segments dir: {}, output segments dir: {} and segment processor config: {}",
+        inputSegmentsDir.getAbsolutePath(), outputSegmentsDir.getAbsolutePath(), segmentProcessorConfig.toString());
+
+    _inputSegmentsDir = inputSegmentsDir;
+    Preconditions.checkState(_inputSegmentsDir.exists() && _inputSegmentsDir.isDirectory(),
+        "Input path: %s must be a directory with Pinot segments", _inputSegmentsDir.getAbsolutePath());
+    _outputSegmentsDir = outputSegmentsDir;
+    Preconditions.checkState(
+        _outputSegmentsDir.exists() && _outputSegmentsDir.isDirectory() && (_outputSegmentsDir.list().length == 0),
+        "Must provide existing empty output directory: %s", _outputSegmentsDir.getAbsolutePath());
+
+    _segmentProcessorConfig = segmentProcessorConfig;
+    _pinotSchema = segmentProcessorConfig.getSchema();
+    _tableConfig = segmentProcessorConfig.getTableConfig();
+
+    _baseDir = new File(FileUtils.getTempDirectory(), "segment_processor_" + System.currentTimeMillis());
+    FileUtils.deleteQuietly(_baseDir);
+    Preconditions.checkState(_baseDir.mkdirs(), "Failed to create base directory: %s for SegmentProcessor", _baseDir);
+    _mapperInputDir = new File(_baseDir, "mapper_input");
+    Preconditions
+        .checkState(_mapperInputDir.mkdirs(), "Failed to create mapper input directory: %s for SegmentProcessor",
+            _mapperInputDir);
+    _mapperOutputDir = new File(_baseDir, "mapper_output");
+    Preconditions
+        .checkState(_mapperOutputDir.mkdirs(), "Failed to create mapper output directory: %s for SegmentProcessor",
+            _mapperOutputDir);
+    _reducerOutputDir = new File(_baseDir, "reducer_output");
+    Preconditions
+        .checkState(_reducerOutputDir.mkdirs(), "Failed to create reducer output directory: %s for SegmentProcessor",
+            _reducerOutputDir);
+  }
+
+  /**
+   * Processes segments from the input directory as per the provided configs, then puts resulting segments into the output directory
+   */
+  public void processSegments()
+      throws Exception {
+
+    // Check for input segments
+    File[] segmentFiles = _inputSegmentsDir.listFiles();
+    if (segmentFiles.length == 0) {
+      throw new IllegalStateException("No segments found in input dir: " + _inputSegmentsDir.getAbsolutePath()
+          + ". Exiting SegmentProcessorFramework.");
+    }
+
+    // Mapper phase.
+    LOGGER.info("Beginning mapper phase. Processing segments: {}", Arrays.toString(_inputSegmentsDir.list()));
+    for (File segment : segmentFiles) {
+
+      String fileName = segment.getName();
+      File mapperInput = segment;
+
+      // Untar the segments if needed
+      if (!segment.isDirectory()) {
+        if (fileName.endsWith(".tar.gz") || fileName.endsWith(".tgz")) {
+          mapperInput = TarGzCompressionUtils.untar(segment, _mapperInputDir).get(0);
+        } else {
+          throw new IllegalStateException("Unsupported segment format: " + segment.getAbsolutePath());
+        }
+      }
+
+      // Set mapperId as the name of the segment
+      SegmentMapperConfig mapperConfig =
+          new SegmentMapperConfig(_pinotSchema, _segmentProcessorConfig.getRecordTransformerConfig(),
+              _segmentProcessorConfig.getRecordFilterConfig(), _segmentProcessorConfig.getPartitioningConfig());
+      SegmentMapper mapper = new SegmentMapper(mapperInput.getName(), mapperInput, mapperConfig, _mapperOutputDir);
+      mapper.map();
+      mapper.cleanup();
+    }
+
+    // Check for mapper output files
+    File[] mapperOutputFiles = _mapperOutputDir.listFiles();
+    if (mapperOutputFiles.length == 0) {
+      throw new IllegalStateException("No files found in mapper output directory: " + _mapperOutputDir.getAbsolutePath()
+          + ". Exiting SegmentProcessorFramework.");
+    }
+
+    // Reducer phase.
+    LOGGER.info("Beginning reducer phase. Processing files: {}", Arrays.toString(_mapperOutputDir.list()));
+    // Mapper output directory has 1 directory per partition, named after the partition. Each directory contains 1 or more avro files.
+    for (File partDir : mapperOutputFiles) {
+
+      // Set partition as reducerId
+      SegmentReducerConfig reducerConfig =
+          new SegmentReducerConfig(_pinotSchema, _segmentProcessorConfig.getCollectorConfig(),
+              _segmentProcessorConfig.getSegmentConfig().getMaxNumRecordsPerSegment());
+      SegmentReducer reducer = new SegmentReducer(partDir.getName(), partDir, reducerConfig, _reducerOutputDir);
+      reducer.reduce();
+      reducer.cleanup();
+    }
+
+    // Check for reducer output files
+    File[] reducerOutputFiles = _reducerOutputDir.listFiles();
+    if (reducerOutputFiles.length == 0) {
+      throw new IllegalStateException(
+          "No files found in reducer output directory: " + _reducerOutputDir.getAbsolutePath()
+              + ". Exiting SegmentProcessorFramework.");
+    }
+
+    // Segment generation phase.
+    LOGGER.info("Beginning segment generation phase. Processing files: {}", Arrays.toString(_reducerOutputDir.list()));
+    // Reducer output directory will have 1 or more avro files
+    for (File resultFile : reducerOutputFiles) {

Review comment:
       Did you check the output segment names when the output is more than 1 files?
   
   It's possible that the final segments may have the same segment name. (e.g. `<tablename>_<start>_<end>`)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #5934: Segment processing framework

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #5934:
URL: https://github.com/apache/incubator-pinot/pull/5934#discussion_r486700161



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
##########
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.framework;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.util.Arrays;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.FileFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A framework to process "m" given segments and convert them into "n" segments
+ * The phases of the Segment Processor are
+ * 1. Map - record transformation, partitioning, partition filtering
+ * 2. Reduce - rollup, concat, split etc
+ * 3. Segment generation
+ *
+ * This will typically be used by minion tasks, which want to perform some processing on segments
+ * (eg task which merges segments, tasks which aligns segments per time boundaries etc)
+ */
+public class SegmentProcessorFramework {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentProcessorFramework.class);
+
+  private final File _inputSegmentsDir;
+  private final File _outputSegmentsDir;
+  private final SegmentProcessorConfig _segmentProcessorConfig;
+
+  private final Schema _pinotSchema;
+  private final TableConfig _tableConfig;
+
+  private final File _baseDir;
+  private final File _mapperInputDir;
+  private final File _mapperOutputDir;
+  private final File _reducerOutputDir;
+
+  /**
+   * Initializes the Segment Processor framework with input segments, output path and processing config
+   * @param inputSegmentsDir directory containing the input segments. These can be tarred or untarred.
+   * @param segmentProcessorConfig config for segment processing
+   * @param outputSegmentsDir directory for placing the resulting segments. This should already exist.
+   */
+  public SegmentProcessorFramework(File inputSegmentsDir, SegmentProcessorConfig segmentProcessorConfig,
+      File outputSegmentsDir) {
+
+    LOGGER.info(
+        "Initializing SegmentProcessorFramework with input segments dir: {}, output segments dir: {} and segment processor config: {}",
+        inputSegmentsDir.getAbsolutePath(), outputSegmentsDir.getAbsolutePath(), segmentProcessorConfig.toString());
+
+    _inputSegmentsDir = inputSegmentsDir;
+    Preconditions.checkState(_inputSegmentsDir.exists() && _inputSegmentsDir.isDirectory(),
+        "Input path: %s must be a directory with Pinot segments", _inputSegmentsDir.getAbsolutePath());
+    _outputSegmentsDir = outputSegmentsDir;
+    Preconditions.checkState(
+        _outputSegmentsDir.exists() && _outputSegmentsDir.isDirectory() && (_outputSegmentsDir.list().length == 0),
+        "Must provide existing empty output directory: %s", _outputSegmentsDir.getAbsolutePath());
+
+    _segmentProcessorConfig = segmentProcessorConfig;
+    _pinotSchema = segmentProcessorConfig.getSchema();
+    _tableConfig = segmentProcessorConfig.getTableConfig();
+
+    _baseDir = new File(FileUtils.getTempDirectory(), "segment_processor_" + System.currentTimeMillis());
+    FileUtils.deleteQuietly(_baseDir);
+    Preconditions.checkState(_baseDir.mkdirs(), "Failed to create base directory: %s for SegmentProcessor", _baseDir);
+    _mapperInputDir = new File(_baseDir, "mapper_input");
+    Preconditions
+        .checkState(_mapperInputDir.mkdirs(), "Failed to create mapper input directory: %s for SegmentProcessor",
+            _mapperInputDir);
+    _mapperOutputDir = new File(_baseDir, "mapper_output");
+    Preconditions
+        .checkState(_mapperOutputDir.mkdirs(), "Failed to create mapper output directory: %s for SegmentProcessor",
+            _mapperOutputDir);
+    _reducerOutputDir = new File(_baseDir, "reducer_output");
+    Preconditions
+        .checkState(_reducerOutputDir.mkdirs(), "Failed to create reducer output directory: %s for SegmentProcessor",
+            _reducerOutputDir);
+  }
+
+  /**
+   * Processes segments from the input directory as per the provided configs, then puts resulting segments into the output directory
+   */
+  public void processSegments()
+      throws Exception {
+
+    // Check for input segments
+    File[] segmentFiles = _inputSegmentsDir.listFiles();
+    if (segmentFiles.length == 0) {
+      throw new IllegalStateException("No segments found in input dir: " + _inputSegmentsDir.getAbsolutePath()
+          + ". Exiting SegmentProcessorFramework.");
+    }
+
+    // Mapper phase.
+    LOGGER.info("Beginning mapper phase. Processing segments: {}", Arrays.toString(_inputSegmentsDir.list()));
+    for (File segment : segmentFiles) {
+
+      String fileName = segment.getName();
+      File mapperInput = segment;
+
+      // Untar the segments if needed
+      if (!segment.isDirectory()) {
+        if (fileName.endsWith(".tar.gz") || fileName.endsWith(".tgz")) {
+          mapperInput = TarGzCompressionUtils.untar(segment, _mapperInputDir).get(0);
+        } else {
+          throw new IllegalStateException("Unsupported segment format: " + segment.getAbsolutePath());
+        }
+      }
+
+      // Set mapperId as the name of the segment
+      SegmentMapperConfig mapperConfig =
+          new SegmentMapperConfig(_pinotSchema, _segmentProcessorConfig.getRecordTransformerConfig(),
+              _segmentProcessorConfig.getRecordFilterConfig(), _segmentProcessorConfig.getPartitioningConfig());
+      SegmentMapper mapper = new SegmentMapper(mapperInput.getName(), mapperInput, mapperConfig, _mapperOutputDir);
+      mapper.map();
+      mapper.cleanup();
+    }
+
+    // Check for mapper output files
+    File[] mapperOutputFiles = _mapperOutputDir.listFiles();
+    if (mapperOutputFiles.length == 0) {
+      throw new IllegalStateException("No files found in mapper output directory: " + _mapperOutputDir.getAbsolutePath()
+          + ". Exiting SegmentProcessorFramework.");
+    }
+
+    // Reducer phase.
+    LOGGER.info("Beginning reducer phase. Processing files: {}", Arrays.toString(_mapperOutputDir.list()));
+    // Mapper output directory has 1 directory per partition, named after the partition. Each directory contains 1 or more avro files.
+    for (File partDir : mapperOutputFiles) {
+
+      // Set partition as reducerId
+      SegmentReducerConfig reducerConfig =
+          new SegmentReducerConfig(_pinotSchema, _segmentProcessorConfig.getCollectorConfig(),
+              _segmentProcessorConfig.getSegmentConfig().getMaxNumRecordsPerSegment());
+      SegmentReducer reducer = new SegmentReducer(partDir.getName(), partDir, reducerConfig, _reducerOutputDir);
+      reducer.reduce();
+      reducer.cleanup();
+    }
+
+    // Check for reducer output files
+    File[] reducerOutputFiles = _reducerOutputDir.listFiles();
+    if (reducerOutputFiles.length == 0) {
+      throw new IllegalStateException(
+          "No files found in reducer output directory: " + _reducerOutputDir.getAbsolutePath()
+              + ". Exiting SegmentProcessorFramework.");
+    }
+
+    // Segment generation phase.
+    LOGGER.info("Beginning segment generation phase. Processing files: {}", Arrays.toString(_reducerOutputDir.list()));
+    // Reducer output directory will have 1 or more avro files
+    for (File resultFile : reducerOutputFiles) {

Review comment:
       Good catch. Will add seqId. Also working on adding end-to-end tests for the framework, and will include this case




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #5934: Segment processing framework

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #5934:
URL: https://github.com/apache/incubator-pinot/pull/5934#discussion_r486673217



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java
##########
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.framework;
+
+import com.google.common.base.Preconditions;
+import org.apache.pinot.core.segment.processing.collector.CollectorConfig;
+import org.apache.pinot.core.segment.processing.filter.RecordFilterConfig;
+import org.apache.pinot.core.segment.processing.partitioner.PartitioningConfig;
+import org.apache.pinot.core.segment.processing.transformer.RecordTransformerConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+
+
+/**
+ * Config for configuring the phases of {@link SegmentProcessorFramework}
+ */
+public class SegmentProcessorConfig {
+
+  private final TableConfig _tableConfig;
+  private final Schema _schema;
+  private final RecordTransformerConfig _recordTransformerConfig;
+  private final RecordFilterConfig _recordFilterConfig;
+  private final PartitioningConfig _partitioningConfig;
+  private final CollectorConfig _collectorConfig;
+  private final SegmentConfig _segmentConfig;
+
+  private SegmentProcessorConfig(TableConfig tableConfig, Schema schema,
+      RecordTransformerConfig recordTransformerConfig, RecordFilterConfig recordFilterConfig,
+      PartitioningConfig partitioningConfig, CollectorConfig collectorConfig, SegmentConfig segmentConfig) {
+    _tableConfig = tableConfig;
+    _schema = schema;
+    _recordTransformerConfig = recordTransformerConfig;
+    _recordFilterConfig = recordFilterConfig;
+    _partitioningConfig = partitioningConfig;
+    _collectorConfig = collectorConfig;
+    _segmentConfig = segmentConfig;
+  }
+
+  /**
+   * The Pinot table config
+   */
+  public TableConfig getTableConfig() {
+    return _tableConfig;
+  }
+
+  /**
+   * The Pinot schema
+   */
+  public Schema getSchema() {
+    return _schema;
+  }
+
+  /**
+   * The RecordTransformerConfig for the SegmentProcessorFramework's map phase
+   */
+  public RecordTransformerConfig getRecordTransformerConfig() {
+    return _recordTransformerConfig;
+  }
+
+  /**
+   * The RecordFilterConfig to filter records
+   */
+  public RecordFilterConfig getRecordFilterConfig() {
+    return _recordFilterConfig;
+  }
+
+  /**
+   * The PartitioningConfig for the SegmentProcessorFramework's map phase
+   */
+  public PartitioningConfig getPartitioningConfig() {
+    return _partitioningConfig;
+  }
+
+  /**
+   * The CollectorConfig for the SegmentProcessorFramework's reduce phase
+   */
+  public CollectorConfig getCollectorConfig() {
+    return _collectorConfig;
+  }
+
+  /**
+   * The SegmentConfig for the SegmentProcessorFramework's segment generation phase
+   */
+  public SegmentConfig getSegmentConfig() {
+    return _segmentConfig;
+  }
+
+  /**
+   * Builder for SegmentProcessorConfig
+   */
+  public static class Builder {
+    private TableConfig tableConfig;
+    private Schema schema;
+    private RecordTransformerConfig recordTransformerConfig;
+    private RecordFilterConfig recordFilterConfig;
+    private PartitioningConfig partitioningConfig;
+    private CollectorConfig collectorConfig;
+    private SegmentConfig _segmentConfig;
+
+    public Builder setTableConfig(TableConfig tableConfig) {
+      this.tableConfig = tableConfig;
+      return this;
+    }
+
+    public Builder setSchema(Schema schema) {
+      this.schema = schema;
+      return this;
+    }
+
+    public Builder setRecordTransformerConfig(RecordTransformerConfig recordTransformerConfig) {
+      this.recordTransformerConfig = recordTransformerConfig;
+      return this;
+    }
+
+    public Builder setRecordFilterConfig(RecordFilterConfig recordFilterConfig) {
+      this.recordFilterConfig = recordFilterConfig;
+      return this;
+    }
+
+    public Builder setPartitioningConfig(PartitioningConfig partitioningConfig) {
+      this.partitioningConfig = partitioningConfig;
+      return this;
+    }
+
+    public Builder setCollectorConfig(CollectorConfig collectorConfig) {
+      this.collectorConfig = collectorConfig;
+      return this;
+    }
+
+    public Builder setSegmentConfig(SegmentConfig segmentConfig) {
+      this._segmentConfig = segmentConfig;
+      return this;
+    }
+
+    public SegmentProcessorConfig build() {
+      Preconditions.checkNotNull(tableConfig, "Must provide table config in SegmentProcessorConfig");
+      Preconditions.checkNotNull(schema, "Must provide schema in SegmentProcessorConfig");
+      if (recordTransformerConfig == null) {

Review comment:
       i just wanted to avoid checking null before using them in the mapper and reducer




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #5934: Segment processing framework

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #5934:
URL: https://github.com/apache/incubator-pinot/pull/5934#discussion_r488326079



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/partitioner/PartitionerFactory.java
##########
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.partitioner;
+
+import com.google.common.base.Preconditions;
+
+
+/**
+ * Factory for Partitioner and PartitionFilter
+ */
+public final class PartitionerFactory {
+
+  private PartitionerFactory() {
+
+  }
+
+  public enum PartitionerType {
+    NO_OP, ROW_HASH, COLUMN_VALUE, TRANSFORM_FUNCTION, TABLE_PARTITION_CONFIG
+  }
+
+  /**
+   * Construct a Partitioner using the PartitioningConfig
+   */
+  public static Partitioner getPartitioner(PartitioningConfig config) {
+
+    Partitioner partitioner = null;
+    switch (config.getPartitionerType()) {
+      case NO_OP:
+        partitioner = new NoOpPartitioner();
+        break;
+      case ROW_HASH:
+        Preconditions
+            .checkState(config.getNumPartitions() > 0, "Must provide numPartitions > 0 for ROW_HASH partitioner");
+        partitioner = new RowHashPartitioner(config.getNumPartitions());
+        break;
+      case COLUMN_VALUE:
+        Preconditions.checkState(config.getColumnName() != null, "Must provide columnName for COLUMN_VALUE partitioner");
+        partitioner = new ColumnValuePartitioner(config.getColumnName());
+        break;
+      case TRANSFORM_FUNCTION:
+        Preconditions.checkState(config.getTransformFunction() != null,
+            "Must provide transformFunction for TRANSFORM_FUNCTION partitioner");
+        partitioner = new TransformFunctionPartitioner(config.getTransformFunction());
+        break;
+      case TABLE_PARTITION_CONFIG:

Review comment:
       Yes. Seunghyun pointed out similar thing. Discussion below.
   I'll be changing the partitioner to a 2 step partitioner in the next PR (have put TODO in code and descritpion)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #5934: Segment processing framework

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #5934:
URL: https://github.com/apache/incubator-pinot/pull/5934#discussion_r486683841



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/partitioner/NoOpPartitioner.java
##########
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.partitioner;
+
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * Partitioner implementation which always returns constant partition value "0"
+ */
+public class NoOpPartitioner implements Partitioner {
+  @Override
+  public String getPartition(GenericRow genericRow) {
+    return "0";

Review comment:
       That is correct. A single output from each Mapper. The reducer will break that data as per `maxRecordsPerSegment`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #5934: Segment processing framework

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #5934:
URL: https://github.com/apache/incubator-pinot/pull/5934#discussion_r486672513



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/CollectorConfig.java
##########
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.collector;
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+
+/**
+ * Config for Collector
+ */
+@JsonDeserialize(builder = CollectorConfig.Builder.class)

Review comment:
       Agreed about JsonCreator. Changed it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #5934: Segment processing framework

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #5934:
URL: https://github.com/apache/incubator-pinot/pull/5934#discussion_r481517602



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/partitioner/PartitionerFactory.java
##########
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.partitioner;
+
+import com.google.common.base.Preconditions;
+
+
+/**
+ * Factory for Partitioner and PartitionFilter
+ */
+public final class PartitionerFactory {
+
+  private PartitionerFactory() {
+
+  }
+
+  public enum PartitionerType {
+    NO_OP, ROW_HASH, COLUMN_VALUE, TRANSFORM_FUNCTION, TABLE_PARTITION_CONFIG
+  }
+
+  /**
+   * Construct a Partitioner using the PartitioningConfig
+   */
+  public static Partitioner getPartitioner(PartitioningConfig config) {
+
+    Partitioner partitioner = null;
+    switch (config.getPartitionerType()) {
+      case NO_OP:
+        partitioner = new NoOpPartitioner();
+        break;
+      case ROW_HASH:

Review comment:
       The intention of ROW_HASH is to have a partitioner which lets you set number of partitions. The existing SegmentConverter supports only this type of partitioning. Agree that ROW_HASH is not very indicative of the intent.
   How about NUM_PARTITIONS or FIXED_NUM_PARTITIONS




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] codecov-commenter commented on pull request #5934: Segment processing framework

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #5934:
URL: https://github.com/apache/incubator-pinot/pull/5934#issuecomment-682237310


   # [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/5934?src=pr&el=h1) Report
   > Merging [#5934](https://codecov.io/gh/apache/incubator-pinot/pull/5934?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-pinot/commit/1beaab59b73f26c4e35f3b9bc856b03806cddf5a?el=desc) will **decrease** coverage by `23.23%`.
   > The diff coverage is `51.23%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-pinot/pull/5934/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz)](https://codecov.io/gh/apache/incubator-pinot/pull/5934?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff             @@
   ##           master    #5934       +/-   ##
   ===========================================
   - Coverage   66.44%   43.20%   -23.24%     
   ===========================================
     Files        1075     1210      +135     
     Lines       54773    62540     +7767     
     Branches     8168     9529     +1361     
   ===========================================
   - Hits        36396    27023     -9373     
   - Misses      15700    33081    +17381     
   + Partials     2677     2436      -241     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | #integration | `43.20% <51.23%> (?)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-pinot/pull/5934?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...ot/broker/broker/AllowAllAccessControlFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/5934/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL0FsbG93QWxsQWNjZXNzQ29udHJvbEZhY3RvcnkuamF2YQ==) | `100.00% <ø> (ø)` | |
   | [.../helix/BrokerUserDefinedMessageHandlerFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/5934/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL2hlbGl4L0Jyb2tlclVzZXJEZWZpbmVkTWVzc2FnZUhhbmRsZXJGYWN0b3J5LmphdmE=) | `52.83% <0.00%> (-13.84%)` | :arrow_down: |
   | [...ava/org/apache/pinot/client/AbstractResultSet.java](https://codecov.io/gh/apache/incubator-pinot/pull/5934/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0Fic3RyYWN0UmVzdWx0U2V0LmphdmE=) | `26.66% <0.00%> (-30.48%)` | :arrow_down: |
   | [.../main/java/org/apache/pinot/client/Connection.java](https://codecov.io/gh/apache/incubator-pinot/pull/5934/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0Nvbm5lY3Rpb24uamF2YQ==) | `22.22% <0.00%> (-26.62%)` | :arrow_down: |
   | [.../org/apache/pinot/client/ResultTableResultSet.java](https://codecov.io/gh/apache/incubator-pinot/pull/5934/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L1Jlc3VsdFRhYmxlUmVzdWx0U2V0LmphdmE=) | `24.00% <0.00%> (-10.29%)` | :arrow_down: |
   | [.../org/apache/pinot/common/lineage/LineageEntry.java](https://codecov.io/gh/apache/incubator-pinot/pull/5934/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbGluZWFnZS9MaW5lYWdlRW50cnkuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...apache/pinot/common/lineage/LineageEntryState.java](https://codecov.io/gh/apache/incubator-pinot/pull/5934/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbGluZWFnZS9MaW5lYWdlRW50cnlTdGF0ZS5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...rg/apache/pinot/common/lineage/SegmentLineage.java](https://codecov.io/gh/apache/incubator-pinot/pull/5934/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbGluZWFnZS9TZWdtZW50TGluZWFnZS5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...ache/pinot/common/lineage/SegmentLineageUtils.java](https://codecov.io/gh/apache/incubator-pinot/pull/5934/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbGluZWFnZS9TZWdtZW50TGluZWFnZVV0aWxzLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...ot/common/messages/RoutingTableRebuildMessage.java](https://codecov.io/gh/apache/incubator-pinot/pull/5934/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWVzc2FnZXMvUm91dGluZ1RhYmxlUmVidWlsZE1lc3NhZ2UuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | ... and [1132 more](https://codecov.io/gh/apache/incubator-pinot/pull/5934/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/5934?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/5934?src=pr&el=footer). Last update [4fd70fe...eece981](https://codecov.io/gh/apache/incubator-pinot/pull/5934?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #5934: Segment processing framework

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #5934:
URL: https://github.com/apache/incubator-pinot/pull/5934#discussion_r483876019



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentReducer.java
##########
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.framework;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.pinot.core.segment.processing.collector.Collector;
+import org.apache.pinot.core.segment.processing.collector.CollectorFactory;
+import org.apache.pinot.core.segment.processing.utils.SegmentProcessorUtils;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Reducer phase of the SegmentProcessorFramework
+ * Reads the avro files in the input directory and creates output avro files in the reducer output directory.
+ * The avro files in the input directory are expected to contain data for only 1 partition
+ * Performs operations on that partition data as follows:
+ * - concatenation/rollup of records
+ * - split
+ * - TODO: dedup
+ */
+public class SegmentReducer {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentReducer.class);
+  private static final int MAX_RECORDS_TO_COLLECT = 5_000_000;
+
+  private final File _reducerInputDir;
+  private final File _reducerOutputDir;
+
+  private final String _reducerId;
+  private final Schema _pinotSchema;
+  private final org.apache.avro.Schema _avroSchema;
+  private final Collector _collector;
+  private final int _numRecordsPerPart;
+
+  public SegmentReducer(String reducerId, File reducerInputDir, SegmentReducerConfig reducerConfig,
+      File reducerOutputDir) {
+    _reducerInputDir = reducerInputDir;
+    _reducerOutputDir = reducerOutputDir;
+
+    _reducerId = reducerId;
+    _pinotSchema = reducerConfig.getPinotSchema();
+    _avroSchema = SegmentProcessorUtils.convertPinotSchemaToAvroSchema(_pinotSchema);
+    _collector = CollectorFactory.getCollector(reducerConfig.getCollectorConfig(), _pinotSchema);
+    _numRecordsPerPart = reducerConfig.getNumRecordsPerPart();
+    LOGGER.info("Initialized reducer with id: {}, input dir: {}, output dir: {}, collector: {}, numRecordsPerPart: {}",
+        _reducerId, _reducerInputDir, _reducerOutputDir, _collector.getClass(), _numRecordsPerPart);
+  }
+
+  /**
+   * Reads the avro files in the input directory.
+   * Performs configured operations and outputs to other avro file(s) in the reducer output directory.
+   */
+  public void reduce()
+      throws Exception {
+
+    int part = 0;
+    for (File inputFile : _reducerInputDir.listFiles()) {
+
+      RecordReader avroRecordReader = RecordReaderFactory
+          .getRecordReaderByClass("org.apache.pinot.plugin.inputformat.avro.AvroRecordReader", inputFile,
+              _pinotSchema.getColumnNames(), null);
+
+      while (avroRecordReader.hasNext()) {
+        GenericRow next = avroRecordReader.next();
+
+        // Aggregations
+        _collector.collect(next);
+
+        // Exceeded max records allowed to collect. Flush
+        if (_collector.size() == MAX_RECORDS_TO_COLLECT) {

Review comment:
       sounds good. Changed it to flush when numRecordsPerPart is reached.
   @mcvsubbu we can certainly do that if we start seeing need for more advanced controls and tuning on collection size and flushing. For starters, I wanted to keep it simple 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #5934: Segment processing framework

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #5934:
URL: https://github.com/apache/incubator-pinot/pull/5934#discussion_r486657189



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/ValueAggregator.java
##########
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.minion.rollup.aggregate;
+package org.apache.pinot.core.segment.processing.collector;

Review comment:
       Changed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #5934: Segment processing framework

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on a change in pull request #5934:
URL: https://github.com/apache/incubator-pinot/pull/5934#discussion_r485258306



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/Collector.java
##########
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.collector;
+
+import java.util.Iterator;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * Collects and stores GenericRows
+ */
+public interface Collector {
+
+  /**
+   * Collects the given GenericRow and stores it
+   * @param genericRow the generic row to add to the collection
+   */
+  void collect(GenericRow genericRow);
+
+  /**
+   * Provides an iterator for the GenericRows in the collection
+   */
+  Iterator<GenericRow> iterator();

Review comment:
       Recommend combining `iterator()` and `finish()` into one method because we always need to call `finish()` then `iterator()` (maybe remove `finish()` and move the sorting logic into `iterator()`)

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/GenericRowSorter.java
##########
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.collector;
+
+import com.google.common.base.Preconditions;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * A sorter for GenericRows
+ */
+public class GenericRowSorter {
+
+  private final Comparator<GenericRow> _genericRowComparator;
+
+  public GenericRowSorter(List<String> sortOrder, Schema schema) {
+    int sortOrderSize = sortOrder.size();
+    Comparator[] comparators = new Comparator[sortOrderSize];
+    for (int i = 0; i < sortOrderSize; i++) {
+      String column = sortOrder.get(i);
+      FieldSpec fieldSpec = schema.getFieldSpecFor(column);
+      Preconditions.checkState(fieldSpec.isSingleValueField(), "Cannot use multi value column: %s for sorting", column);
+      comparators[i] = getComparator(fieldSpec.getDataType());

Review comment:
       From the past experience, storing `dataType` and do per-value switch is faster than storing `Comparator`

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/GenericRowSorter.java
##########
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.collector;
+
+import com.google.common.base.Preconditions;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * A sorter for GenericRows
+ */
+public class GenericRowSorter {
+
+  private final Comparator<GenericRow> _genericRowComparator;
+
+  public GenericRowSorter(List<String> sortOrder, Schema schema) {
+    int sortOrderSize = sortOrder.size();
+    Comparator[] comparators = new Comparator[sortOrderSize];
+    for (int i = 0; i < sortOrderSize; i++) {
+      String column = sortOrder.get(i);
+      FieldSpec fieldSpec = schema.getFieldSpecFor(column);
+      Preconditions.checkState(fieldSpec.isSingleValueField(), "Cannot use multi value column: %s for sorting", column);
+      comparators[i] = getComparator(fieldSpec.getDataType());
+    }
+    _genericRowComparator = (o1, o2) -> {
+      for (int i = 0; i < comparators.length; i++) {
+        String column = sortOrder.get(i);
+        int result = comparators[i].compare(o1.getValue(column), o2.getValue(column));
+        if (result != 0) {
+          return result;
+        }
+      }
+      return 0;
+    };
+  }
+
+  private Comparator getComparator(FieldSpec.DataType dataType) {
+    switch (dataType) {
+
+      case INT:
+        return Comparator.comparingInt(o -> (int) o);
+      case LONG:
+        return Comparator.comparingLong(o -> (long) o);
+      case FLOAT:
+        return (o1, o2) -> Float.compare((float) o1, (float) o2);
+      case DOUBLE:
+        return Comparator.comparingDouble(o -> (double) o);
+      case STRING:
+        return Comparator.comparing(o -> ((String) o));
+      default:

Review comment:
       Add BYTES support `ByteArray.compare()`

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java
##########
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.framework;
+
+import com.google.common.base.Preconditions;
+import org.apache.pinot.core.segment.processing.collector.CollectorConfig;
+import org.apache.pinot.core.segment.processing.filter.RecordFilterConfig;
+import org.apache.pinot.core.segment.processing.partitioner.PartitioningConfig;
+import org.apache.pinot.core.segment.processing.transformer.RecordTransformerConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+
+
+/**
+ * Config for configuring the phases of {@link SegmentProcessorFramework}
+ */
+public class SegmentProcessorConfig {
+
+  private final TableConfig _tableConfig;
+  private final Schema _schema;
+  private final RecordTransformerConfig _recordTransformerConfig;
+  private final RecordFilterConfig _recordFilterConfig;
+  private final PartitioningConfig _partitioningConfig;
+  private final CollectorConfig _collectorConfig;
+  private final SegmentConfig _segmentConfig;
+
+  private SegmentProcessorConfig(TableConfig tableConfig, Schema schema,
+      RecordTransformerConfig recordTransformerConfig, RecordFilterConfig recordFilterConfig,
+      PartitioningConfig partitioningConfig, CollectorConfig collectorConfig, SegmentConfig segmentConfig) {
+    _tableConfig = tableConfig;
+    _schema = schema;
+    _recordTransformerConfig = recordTransformerConfig;
+    _recordFilterConfig = recordFilterConfig;
+    _partitioningConfig = partitioningConfig;
+    _collectorConfig = collectorConfig;
+    _segmentConfig = segmentConfig;
+  }
+
+  /**
+   * The Pinot table config
+   */
+  public TableConfig getTableConfig() {
+    return _tableConfig;
+  }
+
+  /**
+   * The Pinot schema
+   */
+  public Schema getSchema() {
+    return _schema;
+  }
+
+  /**
+   * The RecordTransformerConfig for the SegmentProcessorFramework's map phase
+   */
+  public RecordTransformerConfig getRecordTransformerConfig() {
+    return _recordTransformerConfig;
+  }
+
+  /**
+   * The RecordFilterConfig to filter records
+   */
+  public RecordFilterConfig getRecordFilterConfig() {
+    return _recordFilterConfig;
+  }
+
+  /**
+   * The PartitioningConfig for the SegmentProcessorFramework's map phase
+   */
+  public PartitioningConfig getPartitioningConfig() {
+    return _partitioningConfig;
+  }
+
+  /**
+   * The CollectorConfig for the SegmentProcessorFramework's reduce phase
+   */
+  public CollectorConfig getCollectorConfig() {
+    return _collectorConfig;
+  }
+
+  /**
+   * The SegmentConfig for the SegmentProcessorFramework's segment generation phase
+   */
+  public SegmentConfig getSegmentConfig() {
+    return _segmentConfig;
+  }
+
+  /**
+   * Builder for SegmentProcessorConfig
+   */
+  public static class Builder {
+    private TableConfig tableConfig;
+    private Schema schema;
+    private RecordTransformerConfig recordTransformerConfig;
+    private RecordFilterConfig recordFilterConfig;
+    private PartitioningConfig partitioningConfig;
+    private CollectorConfig collectorConfig;
+    private SegmentConfig _segmentConfig;
+
+    public Builder setTableConfig(TableConfig tableConfig) {
+      this.tableConfig = tableConfig;

Review comment:
       Same for other places
   ```suggestion
         _tableConfig = tableConfig;
   ```

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java
##########
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.framework;
+
+import com.google.common.base.Preconditions;
+import org.apache.pinot.core.segment.processing.collector.CollectorConfig;
+import org.apache.pinot.core.segment.processing.filter.RecordFilterConfig;
+import org.apache.pinot.core.segment.processing.partitioner.PartitioningConfig;
+import org.apache.pinot.core.segment.processing.transformer.RecordTransformerConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+
+
+/**
+ * Config for configuring the phases of {@link SegmentProcessorFramework}
+ */
+public class SegmentProcessorConfig {
+
+  private final TableConfig _tableConfig;
+  private final Schema _schema;
+  private final RecordTransformerConfig _recordTransformerConfig;
+  private final RecordFilterConfig _recordFilterConfig;
+  private final PartitioningConfig _partitioningConfig;
+  private final CollectorConfig _collectorConfig;
+  private final SegmentConfig _segmentConfig;
+
+  private SegmentProcessorConfig(TableConfig tableConfig, Schema schema,
+      RecordTransformerConfig recordTransformerConfig, RecordFilterConfig recordFilterConfig,
+      PartitioningConfig partitioningConfig, CollectorConfig collectorConfig, SegmentConfig segmentConfig) {
+    _tableConfig = tableConfig;
+    _schema = schema;
+    _recordTransformerConfig = recordTransformerConfig;
+    _recordFilterConfig = recordFilterConfig;
+    _partitioningConfig = partitioningConfig;
+    _collectorConfig = collectorConfig;
+    _segmentConfig = segmentConfig;
+  }
+
+  /**
+   * The Pinot table config
+   */
+  public TableConfig getTableConfig() {
+    return _tableConfig;
+  }
+
+  /**
+   * The Pinot schema
+   */
+  public Schema getSchema() {
+    return _schema;
+  }
+
+  /**
+   * The RecordTransformerConfig for the SegmentProcessorFramework's map phase
+   */
+  public RecordTransformerConfig getRecordTransformerConfig() {
+    return _recordTransformerConfig;
+  }
+
+  /**
+   * The RecordFilterConfig to filter records
+   */
+  public RecordFilterConfig getRecordFilterConfig() {
+    return _recordFilterConfig;
+  }
+
+  /**
+   * The PartitioningConfig for the SegmentProcessorFramework's map phase
+   */
+  public PartitioningConfig getPartitioningConfig() {
+    return _partitioningConfig;
+  }
+
+  /**
+   * The CollectorConfig for the SegmentProcessorFramework's reduce phase
+   */
+  public CollectorConfig getCollectorConfig() {
+    return _collectorConfig;
+  }
+
+  /**
+   * The SegmentConfig for the SegmentProcessorFramework's segment generation phase
+   */
+  public SegmentConfig getSegmentConfig() {
+    return _segmentConfig;
+  }
+
+  /**
+   * Builder for SegmentProcessorConfig
+   */
+  public static class Builder {
+    private TableConfig tableConfig;

Review comment:
       Same for other places
   ```suggestion
       private TableConfig _tableConfig;
   ```

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/CollectorConfig.java
##########
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.collector;
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+
+/**
+ * Config for Collector
+ */
+@JsonDeserialize(builder = CollectorConfig.Builder.class)

Review comment:
       I feel this is not as readable as the `JsonCreator` annotation on the constructor. IMO We don't really need a builder for very simple config

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/CollectorConfig.java
##########
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.collector;
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+
+/**
+ * Config for Collector
+ */
+@JsonDeserialize(builder = CollectorConfig.Builder.class)
+public class CollectorConfig {
+  private static final CollectorFactory.CollectorType DEFAULT_COLLECTOR_TYPE = CollectorFactory.CollectorType.CONCAT;
+
+  private final CollectorFactory.CollectorType _collectorType;
+  private final Map<String, ValueAggregatorFactory.ValueAggregatorType> _aggregatorTypeMap;
+  private final List<String> _sortOrder;
+
+  private CollectorConfig(CollectorFactory.CollectorType collectorType,
+      Map<String, ValueAggregatorFactory.ValueAggregatorType> aggregatorTypeMap, List<String> sortOrder) {
+    _collectorType = collectorType;
+    _aggregatorTypeMap = aggregatorTypeMap;
+    _sortOrder = sortOrder;
+  }
+
+  /**
+   * The type of the Collector
+   */
+  public CollectorFactory.CollectorType getCollectorType() {
+    return _collectorType;
+  }
+
+  /**
+   * Map containing aggregation types for the metrics
+   */
+  @Nullable
+  public Map<String, ValueAggregatorFactory.ValueAggregatorType> getAggregatorTypeMap() {
+    return _aggregatorTypeMap;
+  }
+
+  /**
+   * The columns on which to sort
+   */
+  public List<String> getSortOrder() {

Review comment:
       Should this be `nullable` as well?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/CollectorConfig.java
##########
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.collector;
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+
+/**
+ * Config for Collector
+ */
+@JsonDeserialize(builder = CollectorConfig.Builder.class)
+public class CollectorConfig {
+  private static final CollectorFactory.CollectorType DEFAULT_COLLECTOR_TYPE = CollectorFactory.CollectorType.CONCAT;
+
+  private final CollectorFactory.CollectorType _collectorType;
+  private final Map<String, ValueAggregatorFactory.ValueAggregatorType> _aggregatorTypeMap;
+  private final List<String> _sortOrder;
+
+  private CollectorConfig(CollectorFactory.CollectorType collectorType,
+      Map<String, ValueAggregatorFactory.ValueAggregatorType> aggregatorTypeMap, List<String> sortOrder) {
+    _collectorType = collectorType;
+    _aggregatorTypeMap = aggregatorTypeMap;
+    _sortOrder = sortOrder;
+  }
+
+  /**
+   * The type of the Collector
+   */
+  public CollectorFactory.CollectorType getCollectorType() {
+    return _collectorType;
+  }
+
+  /**
+   * Map containing aggregation types for the metrics
+   */
+  @Nullable
+  public Map<String, ValueAggregatorFactory.ValueAggregatorType> getAggregatorTypeMap() {
+    return _aggregatorTypeMap;
+  }
+
+  /**
+   * The columns on which to sort
+   */
+  public List<String> getSortOrder() {
+    return _sortOrder;
+  }
+
+  /**
+   * Builder for CollectorConfig
+   */
+  @JsonPOJOBuilder(withPrefix = "set")
+  public static class Builder {
+    private CollectorFactory.CollectorType collectorType = DEFAULT_COLLECTOR_TYPE;
+    private Map<String, ValueAggregatorFactory.ValueAggregatorType> aggregatorTypeMap;
+    private List<String> sortOrder = new ArrayList<>();
+
+    public Builder setCollectorType(CollectorFactory.CollectorType collectorType) {
+      this.collectorType = collectorType;

Review comment:
       Avoid using `this`

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/CollectorConfig.java
##########
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.collector;
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+
+/**
+ * Config for Collector
+ */
+@JsonDeserialize(builder = CollectorConfig.Builder.class)
+public class CollectorConfig {
+  private static final CollectorFactory.CollectorType DEFAULT_COLLECTOR_TYPE = CollectorFactory.CollectorType.CONCAT;
+
+  private final CollectorFactory.CollectorType _collectorType;
+  private final Map<String, ValueAggregatorFactory.ValueAggregatorType> _aggregatorTypeMap;
+  private final List<String> _sortOrder;
+
+  private CollectorConfig(CollectorFactory.CollectorType collectorType,
+      Map<String, ValueAggregatorFactory.ValueAggregatorType> aggregatorTypeMap, List<String> sortOrder) {
+    _collectorType = collectorType;
+    _aggregatorTypeMap = aggregatorTypeMap;
+    _sortOrder = sortOrder;
+  }
+
+  /**
+   * The type of the Collector
+   */
+  public CollectorFactory.CollectorType getCollectorType() {
+    return _collectorType;
+  }
+
+  /**
+   * Map containing aggregation types for the metrics
+   */
+  @Nullable
+  public Map<String, ValueAggregatorFactory.ValueAggregatorType> getAggregatorTypeMap() {
+    return _aggregatorTypeMap;
+  }
+
+  /**
+   * The columns on which to sort
+   */
+  public List<String> getSortOrder() {
+    return _sortOrder;
+  }
+
+  /**
+   * Builder for CollectorConfig
+   */
+  @JsonPOJOBuilder(withPrefix = "set")
+  public static class Builder {
+    private CollectorFactory.CollectorType collectorType = DEFAULT_COLLECTOR_TYPE;

Review comment:
       (Code style)
   ```suggestion
       private CollectorFactory.CollectorType _collectorType = DEFAULT_COLLECTOR_TYPE;
   ```

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/GenericRowSorter.java
##########
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.collector;
+
+import com.google.common.base.Preconditions;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * A sorter for GenericRows
+ */
+public class GenericRowSorter {
+
+  private final Comparator<GenericRow> _genericRowComparator;
+
+  public GenericRowSorter(List<String> sortOrder, Schema schema) {
+    int sortOrderSize = sortOrder.size();
+    Comparator[] comparators = new Comparator[sortOrderSize];
+    for (int i = 0; i < sortOrderSize; i++) {
+      String column = sortOrder.get(i);
+      FieldSpec fieldSpec = schema.getFieldSpecFor(column);
+      Preconditions.checkState(fieldSpec.isSingleValueField(), "Cannot use multi value column: %s for sorting", column);
+      comparators[i] = getComparator(fieldSpec.getDataType());
+    }
+    _genericRowComparator = (o1, o2) -> {
+      for (int i = 0; i < comparators.length; i++) {
+        String column = sortOrder.get(i);
+        int result = comparators[i].compare(o1.getValue(column), o2.getValue(column));
+        if (result != 0) {
+          return result;
+        }
+      }
+      return 0;
+    };
+  }
+
+  private Comparator getComparator(FieldSpec.DataType dataType) {
+    switch (dataType) {
+
+      case INT:
+        return Comparator.comparingInt(o -> (int) o);
+      case LONG:
+        return Comparator.comparingLong(o -> (long) o);
+      case FLOAT:
+        return (o1, o2) -> Float.compare((float) o1, (float) o2);

Review comment:
       In favor of this flavor for performance concern (avoid using function)

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/RollupCollector.java
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.collector;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.MetricFieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * A Collector that rolls up the incoming records on unique dimensions + time columns, based on provided aggregation types for metrics.
+ * By default will use the SUM aggregation on metrics.
+ */
+public class RollupCollector implements Collector {
+
+  private final Map<Record, GenericRow> _collection = new HashMap<>();
+  private Iterator<GenericRow> _iterator;
+  private GenericRowSorter _sorter;

Review comment:
       (nit) Make it final?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/RollupCollector.java
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.collector;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.MetricFieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * A Collector that rolls up the incoming records on unique dimensions + time columns, based on provided aggregation types for metrics.
+ * By default will use the SUM aggregation on metrics.
+ */
+public class RollupCollector implements Collector {
+
+  private final Map<Record, GenericRow> _collection = new HashMap<>();
+  private Iterator<GenericRow> _iterator;
+  private GenericRowSorter _sorter;
+
+  private final int _keySize;
+  private final int _valueSize;
+  private final String[] _keyColumns;
+  private final String[] _valueColumns;
+  private final ValueAggregator[] _valueAggregators;
+  private final MetricFieldSpec[] _metricFieldSpecs;
+
+  public RollupCollector(CollectorConfig collectorConfig, Schema schema) {
+    _keySize = schema.getPhysicalColumnNames().size() - schema.getMetricNames().size();

Review comment:
       You might want to extract number of columns from field specs as metric column can also be virtual

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/ConcatCollector.java
##########
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.collector;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * A Collector implementation for collecting and concatenating all incoming rows
+ */
+public class ConcatCollector implements Collector {
+  private final List<GenericRow> _collection = new ArrayList<>();
+  private Iterator<GenericRow> _iterator;
+  private GenericRowSorter _sorter;

Review comment:
       (nit) Make it final?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/utils/SegmentProcessorUtils.java
##########
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.utils;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * Helper util methods for SegmentProcessorFramework
+ */
+public final class SegmentProcessorUtils {
+
+  private SegmentProcessorUtils() {
+  }
+
+  /**
+   * Convert a GenericRow to an avro GenericRecord
+   */
+  public static GenericData.Record convertGenericRowToAvroRecord(GenericRow genericRow,
+      GenericData.Record reusableRecord) {
+    for (String field : genericRow.getFieldToValueMap().keySet()) {

Review comment:
       Put `null` values from `GenericRow.getNullValueFields()`?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/ValueAggregator.java
##########
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.minion.rollup.aggregate;
+package org.apache.pinot.core.segment.processing.collector;

Review comment:
       Not introduced in this PR, but we should not pass in `MetricFieldSpec` for every `aggregate()` call. Instead, we should set it in constructor or add an `init()` method

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/Collector.java
##########
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.collector;
+
+import java.util.Iterator;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * Collects and stores GenericRows
+ */
+public interface Collector {
+
+  /**
+   * Collects the given GenericRow and stores it
+   * @param genericRow the generic row to add to the collection
+   */
+  void collect(GenericRow genericRow);

Review comment:
       Should we work on `GenericRecord` (Avro object) instead of `GenericRow` (Pinot object)? We are converting them back and forth right now

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/RollupCollector.java
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.collector;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.MetricFieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * A Collector that rolls up the incoming records on unique dimensions + time columns, based on provided aggregation types for metrics.
+ * By default will use the SUM aggregation on metrics.
+ */
+public class RollupCollector implements Collector {
+
+  private final Map<Record, GenericRow> _collection = new HashMap<>();
+  private Iterator<GenericRow> _iterator;
+  private GenericRowSorter _sorter;
+
+  private final int _keySize;
+  private final int _valueSize;
+  private final String[] _keyColumns;
+  private final String[] _valueColumns;
+  private final ValueAggregator[] _valueAggregators;
+  private final MetricFieldSpec[] _metricFieldSpecs;
+
+  public RollupCollector(CollectorConfig collectorConfig, Schema schema) {
+    _keySize = schema.getPhysicalColumnNames().size() - schema.getMetricNames().size();
+    _valueSize = schema.getMetricNames().size();
+    _keyColumns = new String[_keySize];
+    _valueColumns = new String[_valueSize];
+    _valueAggregators = new ValueAggregator[_valueSize];
+    _metricFieldSpecs = new MetricFieldSpec[_valueSize];
+
+    Map<String, ValueAggregatorFactory.ValueAggregatorType> aggregatorTypeMap = collectorConfig.getAggregatorTypeMap();
+    if (aggregatorTypeMap == null) {
+      aggregatorTypeMap = Collections.emptyMap();
+    }
+    int valIdx = 0;
+    int keyIdx = 0;
+    for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
+      if (!fieldSpec.isVirtualColumn()) {
+        String name = fieldSpec.getName();
+        if (fieldSpec.getFieldType().equals(FieldSpec.FieldType.METRIC)) {

Review comment:
       (nit) 
   ```suggestion
           if (fieldSpec.getFieldType() == FieldSpec.FieldType.METRIC) {
   ```

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapper.java
##########
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.framework;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
+import org.apache.pinot.core.segment.processing.filter.RecordFilter;
+import org.apache.pinot.core.segment.processing.filter.RecordFilterFactory;
+import org.apache.pinot.core.segment.processing.partitioner.Partitioner;
+import org.apache.pinot.core.segment.processing.partitioner.PartitionerFactory;
+import org.apache.pinot.core.segment.processing.transformer.RecordTransformer;
+import org.apache.pinot.core.segment.processing.transformer.RecordTransformerFactory;
+import org.apache.pinot.core.segment.processing.utils.SegmentProcessorUtils;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Mapper phase of the SegmentProcessorFramework.
+ * Reads the input segment and creates partitioned avro data files
+ * Performs:
+ * - record transformations
+ * - partitioning
+ * - partition filtering
+ */
+public class SegmentMapper {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentMapper.class);
+  private final File _inputSegment;
+  private final File _mapperOutputDir;
+
+  private final String _mapperId;
+  private final Schema _avroSchema;
+  private final RecordTransformer _recordTransformer;
+  private final RecordFilter _recordFilter;
+  private final Partitioner _partitioner;
+  private final Map<String, DataFileWriter<GenericData.Record>> _partitionToDataFileWriterMap = new HashMap<>();
+
+  public SegmentMapper(String mapperId, File inputSegment, SegmentMapperConfig mapperConfig, File mapperOutputDir) {
+    _inputSegment = inputSegment;
+    _mapperOutputDir = mapperOutputDir;
+
+    _mapperId = mapperId;
+    _avroSchema = SegmentProcessorUtils.convertPinotSchemaToAvroSchema(mapperConfig.getPinotSchema());
+    _recordTransformer = RecordTransformerFactory.getRecordTransformer(mapperConfig.getRecordTransformerConfig());
+    _recordFilter = RecordFilterFactory.getRecordFilter(mapperConfig.getRecordFilterConfig());
+    _partitioner = PartitionerFactory.getPartitioner(mapperConfig.getPartitioningConfig());
+    LOGGER.info(
+        "Initialized mapper with id: {}, input segment: {}, output dir: {}, recordTransformer: {}, recordFilter: {}, partitioner: {}",
+        _mapperId, _inputSegment, _mapperOutputDir, _recordTransformer.getClass(), _recordFilter.getClass(),
+        _partitioner.getClass());
+  }
+
+  /**
+   * Reads the input segment and generates partitioned avro data files into the mapper output directory
+   * Records for each partition are put into a directory of its own withing the mapper output directory, identified by the partition name
+   */
+  public void map()
+      throws Exception {
+
+    PinotSegmentRecordReader segmentRecordReader = new PinotSegmentRecordReader(_inputSegment);
+    GenericRow reusableRow = new GenericRow();
+    GenericData.Record reusableRecord = new GenericData.Record(_avroSchema);
+
+    while (segmentRecordReader.hasNext()) {
+      reusableRow = segmentRecordReader.next(reusableRow);
+
+      // Record transformation
+      reusableRow = _recordTransformer.transformRecord(reusableRow);
+
+      // Record filtering
+      if (_recordFilter.filter(reusableRow)) {
+        continue;
+      }
+
+      // Partitioning
+      String partition = _partitioner.getPartition(reusableRow);
+
+      // Create writer for the partition, if not exists
+      if (!_partitionToDataFileWriterMap.containsKey(partition)) {

Review comment:
       Use `_partitionToDataFileWriterMap.get(partition)` and check if the value is null to save one map lookup

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java
##########
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.framework;
+
+import com.google.common.base.Preconditions;
+import org.apache.pinot.core.segment.processing.collector.CollectorConfig;
+import org.apache.pinot.core.segment.processing.filter.RecordFilterConfig;
+import org.apache.pinot.core.segment.processing.partitioner.PartitioningConfig;
+import org.apache.pinot.core.segment.processing.transformer.RecordTransformerConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+
+
+/**
+ * Config for configuring the phases of {@link SegmentProcessorFramework}
+ */
+public class SegmentProcessorConfig {
+
+  private final TableConfig _tableConfig;
+  private final Schema _schema;
+  private final RecordTransformerConfig _recordTransformerConfig;
+  private final RecordFilterConfig _recordFilterConfig;
+  private final PartitioningConfig _partitioningConfig;
+  private final CollectorConfig _collectorConfig;
+  private final SegmentConfig _segmentConfig;
+
+  private SegmentProcessorConfig(TableConfig tableConfig, Schema schema,
+      RecordTransformerConfig recordTransformerConfig, RecordFilterConfig recordFilterConfig,
+      PartitioningConfig partitioningConfig, CollectorConfig collectorConfig, SegmentConfig segmentConfig) {
+    _tableConfig = tableConfig;
+    _schema = schema;
+    _recordTransformerConfig = recordTransformerConfig;
+    _recordFilterConfig = recordFilterConfig;
+    _partitioningConfig = partitioningConfig;
+    _collectorConfig = collectorConfig;
+    _segmentConfig = segmentConfig;
+  }
+
+  /**
+   * The Pinot table config
+   */
+  public TableConfig getTableConfig() {
+    return _tableConfig;
+  }
+
+  /**
+   * The Pinot schema
+   */
+  public Schema getSchema() {
+    return _schema;
+  }
+
+  /**
+   * The RecordTransformerConfig for the SegmentProcessorFramework's map phase
+   */
+  public RecordTransformerConfig getRecordTransformerConfig() {
+    return _recordTransformerConfig;
+  }
+
+  /**
+   * The RecordFilterConfig to filter records
+   */
+  public RecordFilterConfig getRecordFilterConfig() {
+    return _recordFilterConfig;
+  }
+
+  /**
+   * The PartitioningConfig for the SegmentProcessorFramework's map phase
+   */
+  public PartitioningConfig getPartitioningConfig() {
+    return _partitioningConfig;
+  }
+
+  /**
+   * The CollectorConfig for the SegmentProcessorFramework's reduce phase
+   */
+  public CollectorConfig getCollectorConfig() {
+    return _collectorConfig;
+  }
+
+  /**
+   * The SegmentConfig for the SegmentProcessorFramework's segment generation phase
+   */
+  public SegmentConfig getSegmentConfig() {
+    return _segmentConfig;
+  }
+
+  /**
+   * Builder for SegmentProcessorConfig
+   */
+  public static class Builder {
+    private TableConfig tableConfig;
+    private Schema schema;
+    private RecordTransformerConfig recordTransformerConfig;
+    private RecordFilterConfig recordFilterConfig;
+    private PartitioningConfig partitioningConfig;
+    private CollectorConfig collectorConfig;
+    private SegmentConfig _segmentConfig;
+
+    public Builder setTableConfig(TableConfig tableConfig) {
+      this.tableConfig = tableConfig;
+      return this;
+    }
+
+    public Builder setSchema(Schema schema) {
+      this.schema = schema;
+      return this;
+    }
+
+    public Builder setRecordTransformerConfig(RecordTransformerConfig recordTransformerConfig) {
+      this.recordTransformerConfig = recordTransformerConfig;
+      return this;
+    }
+
+    public Builder setRecordFilterConfig(RecordFilterConfig recordFilterConfig) {
+      this.recordFilterConfig = recordFilterConfig;
+      return this;
+    }
+
+    public Builder setPartitioningConfig(PartitioningConfig partitioningConfig) {
+      this.partitioningConfig = partitioningConfig;
+      return this;
+    }
+
+    public Builder setCollectorConfig(CollectorConfig collectorConfig) {
+      this.collectorConfig = collectorConfig;
+      return this;
+    }
+
+    public Builder setSegmentConfig(SegmentConfig segmentConfig) {
+      this._segmentConfig = segmentConfig;
+      return this;
+    }
+
+    public SegmentProcessorConfig build() {
+      Preconditions.checkNotNull(tableConfig, "Must provide table config in SegmentProcessorConfig");
+      Preconditions.checkNotNull(schema, "Must provide schema in SegmentProcessorConfig");
+      if (recordTransformerConfig == null) {

Review comment:
       Suggest leaving them as `null` if not set




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #5934: Segment processing framework

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #5934:
URL: https://github.com/apache/incubator-pinot/pull/5934#discussion_r485965064



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/Collector.java
##########
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.collector;
+
+import java.util.Iterator;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * Collects and stores GenericRows
+ */
+public interface Collector {
+
+  /**
+   * Collects the given GenericRow and stores it
+   * @param genericRow the generic row to add to the collection
+   */
+  void collect(GenericRow genericRow);

Review comment:
       I personally prefer GenericRow to allow the extension of using file formats other than Avro.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #5934: Segment processing framework

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #5934:
URL: https://github.com/apache/incubator-pinot/pull/5934#discussion_r486692906



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/partitioner/TableConfigPartitioner.java
##########
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.partitioner;
+
+import org.apache.pinot.core.data.partition.PartitionFunction;
+import org.apache.pinot.core.data.partition.PartitionFunctionFactory;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * Partitioner which computes partition values based on the ColumnPartitionConfig from the table config
+ */
+public class TableConfigPartitioner implements Partitioner {

Review comment:
       this is a good point. I did not think of this. Yes we can add new partitioner in that case, which combines the partitioners as needed.
   
   But this might be a common case, so I'm thinking if we should have that in the design itself. How about we always do partitioning in 2 steps inside the mapper. 
   1) Apply any partitioning from Segment Processor Config 
   2) Apply any partitioning from Table Config. 
   So in your example, first TransformFunctionPartitioner gets applied and generated date partition. Then, we check if table config partitioner exists, and if yes, further apply the PartitionFunction. Then concat the 2 partitions like you suggested.
   
   Another option is we make PartitionerConfig a list, and apply all partitioners one by one, and concat all to get final partition.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #5934: Segment processing framework

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #5934:
URL: https://github.com/apache/incubator-pinot/pull/5934#discussion_r481507983



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/CollectorConfig.java
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.collector;
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+
+/**
+ * Config for Collector
+ */
+@JsonDeserialize(builder = CollectorConfig.Builder.class)
+public class CollectorConfig {
+  private static final CollectorFactory.CollectorType DEFAULT_COLLECTOR_TYPE = CollectorFactory.CollectorType.CONCAT;
+
+  private final CollectorFactory.CollectorType _collectorType;
+  private final Map<String, ValueAggregatorFactory.ValueAggregatorType> _aggregatorTypeMap;
+
+  private CollectorConfig(CollectorFactory.CollectorType collectorType,

Review comment:
       Added the config and support for sorting in the collection.
   Kept the implementation very simple right now. Since we already have all records in memory due to aggregation step, simply called list.sort.
   Lmk if you think any optimizations are needed there




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #5934: Segment processing framework

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #5934:
URL: https://github.com/apache/incubator-pinot/pull/5934#discussion_r479437446



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/transformer/TransformFunctionRecordTransformer.java
##########
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.transformer;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.core.data.function.FunctionEvaluator;
+import org.apache.pinot.core.data.function.FunctionEvaluatorFactory;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * RecordTransformer which executes transform functions to transform columns of record
+ * Does not follow any particular order, and hence cannot support transformations where strict order of execution is needed
+ */
+public class TransformFunctionRecordTransformer implements RecordTransformer {
+
+  private final Map<String, FunctionEvaluator> _functionEvaluatorMap = new HashMap<>();
+
+  public TransformFunctionRecordTransformer(Map<String, String> transformFunctionMap) {

Review comment:
       Several options:
   1. Use Groovy function to write any logic
   2. Add the function with @ScalarFunction annotation and put the jar in plugins dir
   3. Introduce new RecordTransformer implementation, introduce RecordTransformerType and change factory logic 
   
   I did not do the option 3 right now, because I think 1 and 2 should suffice most times. If we see the demand for 3, we can add it. 
   Will put a TODO




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #5934: Segment processing framework

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #5934:
URL: https://github.com/apache/incubator-pinot/pull/5934#discussion_r486677843



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/Collector.java
##########
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.collector;
+
+import java.util.Iterator;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * Collects and stores GenericRows
+ */
+public interface Collector {
+
+  /**
+   * Collects the given GenericRow and stores it
+   * @param genericRow the generic row to add to the collection
+   */
+  void collect(GenericRow genericRow);

Review comment:
       I also felt that GenericRow is better. I was aiming for consistency between Mapper and Reducer. Seunghyun's point also makes sense.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on pull request #5934: Segment processing framework

Posted by GitBox <gi...@apache.org>.
npawar commented on pull request #5934:
URL: https://github.com/apache/incubator-pinot/pull/5934#issuecomment-692401724


   > Overall, I like that all the core components are interfaced out and easy to extend. I have put some comments. Some of them are questions or points that I would like to discuss.
   
   Addressed the comments. Added TODOs in code and description for those that will be handled in future PRs


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #5934: Segment processing framework

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on a change in pull request #5934:
URL: https://github.com/apache/incubator-pinot/pull/5934#discussion_r483325250



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/partitioner/PartitionerFactory.java
##########
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.partitioner;
+
+import com.google.common.base.Preconditions;
+
+
+/**
+ * Factory for Partitioner and PartitionFilter
+ */
+public final class PartitionerFactory {
+
+  private PartitionerFactory() {
+
+  }
+
+  public enum PartitionerType {
+    NO_OP, ROW_HASH, COLUMN_VALUE, TRANSFORM_FUNCTION, TABLE_PARTITION_CONFIG
+  }
+
+  /**
+   * Construct a Partitioner using the PartitioningConfig
+   */
+  public static Partitioner getPartitioner(PartitioningConfig config) {
+
+    Partitioner partitioner = null;
+    switch (config.getPartitionerType()) {
+      case NO_OP:
+        partitioner = new NoOpPartitioner();
+        break;
+      case ROW_HASH:

Review comment:
       I understand the intention for this partitioner. My suggestion is to not use the hashcode of the record, but simply do round robin on the configured `_numPartitions` which guarantees the records for each partition are balanced. This is also much cheaper comparing to the hashcode based partitioner.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar merged pull request #5934: Segment processing framework

Posted by GitBox <gi...@apache.org>.
npawar merged pull request #5934:
URL: https://github.com/apache/incubator-pinot/pull/5934


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #5934: Segment processing framework

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on a change in pull request #5934:
URL: https://github.com/apache/incubator-pinot/pull/5934#discussion_r483328086



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentReducer.java
##########
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.framework;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.pinot.core.segment.processing.collector.Collector;
+import org.apache.pinot.core.segment.processing.collector.CollectorFactory;
+import org.apache.pinot.core.segment.processing.utils.SegmentProcessorUtils;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Reducer phase of the SegmentProcessorFramework
+ * Reads the avro files in the input directory and creates output avro files in the reducer output directory.
+ * The avro files in the input directory are expected to contain data for only 1 partition
+ * Performs operations on that partition data as follows:
+ * - concatenation/rollup of records
+ * - split
+ * - TODO: dedup
+ */
+public class SegmentReducer {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentReducer.class);
+  private static final int MAX_RECORDS_TO_COLLECT = 5_000_000;
+
+  private final File _reducerInputDir;
+  private final File _reducerOutputDir;
+
+  private final String _reducerId;
+  private final Schema _pinotSchema;
+  private final org.apache.avro.Schema _avroSchema;
+  private final Collector _collector;
+  private final int _numRecordsPerPart;
+
+  public SegmentReducer(String reducerId, File reducerInputDir, SegmentReducerConfig reducerConfig,
+      File reducerOutputDir) {
+    _reducerInputDir = reducerInputDir;
+    _reducerOutputDir = reducerOutputDir;
+
+    _reducerId = reducerId;
+    _pinotSchema = reducerConfig.getPinotSchema();
+    _avroSchema = SegmentProcessorUtils.convertPinotSchemaToAvroSchema(_pinotSchema);
+    _collector = CollectorFactory.getCollector(reducerConfig.getCollectorConfig(), _pinotSchema);
+    _numRecordsPerPart = reducerConfig.getNumRecordsPerPart();
+    LOGGER.info("Initialized reducer with id: {}, input dir: {}, output dir: {}, collector: {}, numRecordsPerPart: {}",
+        _reducerId, _reducerInputDir, _reducerOutputDir, _collector.getClass(), _numRecordsPerPart);
+  }
+
+  /**
+   * Reads the avro files in the input directory.
+   * Performs configured operations and outputs to other avro file(s) in the reducer output directory.
+   */
+  public void reduce()
+      throws Exception {
+
+    int part = 0;
+    for (File inputFile : _reducerInputDir.listFiles()) {
+
+      RecordReader avroRecordReader = RecordReaderFactory
+          .getRecordReaderByClass("org.apache.pinot.plugin.inputformat.avro.AvroRecordReader", inputFile,
+              _pinotSchema.getColumnNames(), null);
+
+      while (avroRecordReader.hasNext()) {
+        GenericRow next = avroRecordReader.next();
+
+        // Aggregations
+        _collector.collect(next);
+
+        // Exceeded max records allowed to collect. Flush
+        if (_collector.size() == MAX_RECORDS_TO_COLLECT) {

Review comment:
       I think it is fine if user configures a very low `_numRecordsPerPart` and not get much aggregation. The problem of using the fixed large threshold is that for use cases with lots of columns, we might run out of memory and there is no knob to tune it.
   In order to get better aggregation, we can potentially do an external sort on the files before collecting the records. Not requires in the first version.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #5934: Segment processing framework

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #5934:
URL: https://github.com/apache/incubator-pinot/pull/5934#discussion_r481532460



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/RollupCollector.java
##########
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.collector;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.MetricFieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * A Collector that rolls up the incoming records on unique dimensions + time columns, based on provided aggregation types for metrics.
+ * By default will use the SUM aggregation on metrics.
+ */
+public class RollupCollector implements Collector {
+
+  private final Map<Record, GenericRow> _collection = new HashMap<>();
+
+  private final int _keySize;
+  private final int _valueSize;
+  private final String[] _keyColumns;
+  private final String[] _valueColumns;
+  private final ValueAggregator[] _valueAggregators;
+  private final MetricFieldSpec[] _metricFieldSpecs;
+
+  public RollupCollector(CollectorConfig collectorConfig, Schema schema) {
+    _keySize = schema.getColumnNames().size() - schema.getMetricNames().size();

Review comment:
       nope, changed it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #5934: Segment processing framework

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #5934:
URL: https://github.com/apache/incubator-pinot/pull/5934#discussion_r486683505



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/RollupCollector.java
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.collector;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.MetricFieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * A Collector that rolls up the incoming records on unique dimensions + time columns, based on provided aggregation types for metrics.
+ * By default will use the SUM aggregation on metrics.
+ */
+public class RollupCollector implements Collector {
+
+  private final Map<Record, GenericRow> _collection = new HashMap<>();
+  private Iterator<GenericRow> _iterator;
+  private GenericRowSorter _sorter;
+
+  private final int _keySize;
+  private final int _valueSize;
+  private final String[] _keyColumns;
+  private final String[] _valueColumns;
+  private final ValueAggregator[] _valueAggregators;
+  private final MetricFieldSpec[] _metricFieldSpecs;
+
+  public RollupCollector(CollectorConfig collectorConfig, Schema schema) {
+    _keySize = schema.getPhysicalColumnNames().size() - schema.getMetricNames().size();
+    _valueSize = schema.getMetricNames().size();
+    _keyColumns = new String[_keySize];
+    _valueColumns = new String[_valueSize];
+    _valueAggregators = new ValueAggregator[_valueSize];
+    _metricFieldSpecs = new MetricFieldSpec[_valueSize];
+
+    Map<String, ValueAggregatorFactory.ValueAggregatorType> aggregatorTypeMap = collectorConfig.getAggregatorTypeMap();
+    if (aggregatorTypeMap == null) {
+      aggregatorTypeMap = Collections.emptyMap();
+    }
+    int valIdx = 0;
+    int keyIdx = 0;
+    for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
+      if (!fieldSpec.isVirtualColumn()) {
+        String name = fieldSpec.getName();
+        if (fieldSpec.getFieldType().equals(FieldSpec.FieldType.METRIC)) {
+          _metricFieldSpecs[valIdx] = (MetricFieldSpec) fieldSpec;
+          _valueColumns[valIdx] = name;
+          _valueAggregators[valIdx] = ValueAggregatorFactory.getValueAggregator(
+              aggregatorTypeMap.getOrDefault(name, ValueAggregatorFactory.ValueAggregatorType.SUM).toString());
+          valIdx++;
+        } else {
+          _keyColumns[keyIdx++] = name;
+        }
+      }
+    }
+
+    List<String> sortOrder = collectorConfig.getSortOrder();
+    if (sortOrder.size() > 0) {
+      _sorter = new GenericRowSorter(sortOrder, schema);
+    }
+  }
+
+  /**
+   * If a row already exists in the collection (based on dimension + time columns), rollup the metric values, else add the row
+   */
+  @Override
+  public void collect(GenericRow genericRow) {

Review comment:
       Yes, we keep the entire data of a partition on heap.
   Yes, we can certainly add off-heap or file-based in future. For now, we have a knob `maxRecordsPerSegment` that can help control number of records collected in memory.
   
   In order to sort the entire data on all dimensions+time, we'll still have to get all the data into memory right? And then we'll have to rewrite the files onto disk. The input is raw avro file. We do not have any dictionaries or docIds. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #5934: Segment processing framework

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #5934:
URL: https://github.com/apache/incubator-pinot/pull/5934#discussion_r481533201



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/Collector.java
##########
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.collector;
+
+import java.util.Iterator;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * Collects and stores GenericRows
+ */
+public interface Collector {
+
+  /**
+   * Collects the given GenericRow and stores it
+   * @param genericRow the generic row to add to the collection
+   */
+  void collect(GenericRow genericRow);
+
+  /**
+   * Provides an iterator for the GenericRows in the collection
+   */
+  Iterator<GenericRow> iterator();
+
+  /**
+   * The size of the collection

Review comment:
       clarified in javadoc. it is num records 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #5934: Segment processing framework

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #5934:
URL: https://github.com/apache/incubator-pinot/pull/5934#discussion_r483874738



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/partitioner/PartitionerFactory.java
##########
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.partitioner;
+
+import com.google.common.base.Preconditions;
+
+
+/**
+ * Factory for Partitioner and PartitionFilter
+ */
+public final class PartitionerFactory {
+
+  private PartitionerFactory() {
+
+  }
+
+  public enum PartitionerType {
+    NO_OP, ROW_HASH, COLUMN_VALUE, TRANSFORM_FUNCTION, TABLE_PARTITION_CONFIG
+  }
+
+  /**
+   * Construct a Partitioner using the PartitioningConfig
+   */
+  public static Partitioner getPartitioner(PartitioningConfig config) {
+
+    Partitioner partitioner = null;
+    switch (config.getPartitionerType()) {
+      case NO_OP:
+        partitioner = new NoOpPartitioner();
+        break;
+      case ROW_HASH:

Review comment:
       got it. Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #5934: Segment processing framework

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #5934:
URL: https://github.com/apache/incubator-pinot/pull/5934#discussion_r478757718



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapper.java
##########
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.framework;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
+import org.apache.pinot.core.segment.processing.partitioner.PartitionFilter;
+import org.apache.pinot.core.segment.processing.partitioner.Partitioner;
+import org.apache.pinot.core.segment.processing.partitioner.PartitionerFactory;
+import org.apache.pinot.core.segment.processing.transformer.RecordTransformer;
+import org.apache.pinot.core.segment.processing.transformer.RecordTransformerFactory;
+import org.apache.pinot.core.segment.processing.utils.SegmentProcessorUtils;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Mapper phase of the SegmentProcessorFramework.
+ * Reads the input segment and creates partitioned avro data files
+ * Performs:
+ * - record transformations
+ * - partitioning
+ * - partition filtering
+ */
+public class SegmentMapper {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentMapper.class);
+  private final File _inputSegment;
+  private final File _mapperOutputDir;
+
+  private final String _mapperId;
+  private final Schema _avroSchema;
+  private final RecordTransformer _recordTransformer;
+  private final Partitioner _partitioner;
+  private final PartitionFilter _partitionFilter;
+  private final Map<String, DataFileWriter<GenericData.Record>> _partitionToDataFileWriterMap = new HashMap<>();
+
+  public SegmentMapper(String mapperId, File inputSegment, SegmentMapperConfig mapperConfig, File mapperOutputDir) {
+    _inputSegment = inputSegment;
+    _mapperOutputDir = mapperOutputDir;
+
+    _mapperId = mapperId;
+    _avroSchema = SegmentProcessorUtils.convertPinotSchemaToAvroSchema(mapperConfig.getPinotSchema());
+    _recordTransformer = RecordTransformerFactory.getRecordTransformer(mapperConfig.getRecordTransformerConfig());
+    _partitioner = PartitionerFactory.getPartitioner(mapperConfig.getPartitioningConfig());
+    _partitionFilter = PartitionerFactory.getPartitionFilter(mapperConfig.getPartitioningConfig());
+    LOGGER.info(
+        "Initialized mapper with id: {}, input segment: {}, output dir: {}, recordTransformer: {}, partitioner: {}, partitionFilter: {}",
+        _mapperId, _inputSegment, _mapperOutputDir, _recordTransformer.getClass(), _partitioner.getClass(),
+        _partitionFilter.getClass());
+  }
+
+  /**
+   * Reads the input segment and generates partitioned avro data files into the mapper output directory
+   * Records for each partition are put into a directory of its own withing the mapper output directory, identified by the partition name
+   */
+  public void map()
+      throws Exception {
+
+    PinotSegmentRecordReader segmentRecordReader = new PinotSegmentRecordReader(_inputSegment);
+    GenericRow reusableRow = new GenericRow();
+    GenericData.Record reusableRecord = new GenericData.Record(_avroSchema);
+
+    Set<String> selectedPartitions = new HashSet<>();
+    Set<String> rejectedPartitions = new HashSet<>();
+
+    while (segmentRecordReader.hasNext()) {
+      reusableRow = segmentRecordReader.next(reusableRow);
+
+      // Record transformation
+      reusableRow = _recordTransformer.transformRecord(reusableRow);

Review comment:
       How do we handle `record filtering` that happens in the `transformRecord`?
   
   Or, are we model filtering as a separate step? We should have some convention on record filtering. 

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapper.java
##########
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.framework;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
+import org.apache.pinot.core.segment.processing.partitioner.PartitionFilter;
+import org.apache.pinot.core.segment.processing.partitioner.Partitioner;
+import org.apache.pinot.core.segment.processing.partitioner.PartitionerFactory;
+import org.apache.pinot.core.segment.processing.transformer.RecordTransformer;
+import org.apache.pinot.core.segment.processing.transformer.RecordTransformerFactory;
+import org.apache.pinot.core.segment.processing.utils.SegmentProcessorUtils;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Mapper phase of the SegmentProcessorFramework.
+ * Reads the input segment and creates partitioned avro data files
+ * Performs:
+ * - record transformations
+ * - partitioning
+ * - partition filtering
+ */
+public class SegmentMapper {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentMapper.class);
+  private final File _inputSegment;
+  private final File _mapperOutputDir;
+
+  private final String _mapperId;
+  private final Schema _avroSchema;
+  private final RecordTransformer _recordTransformer;
+  private final Partitioner _partitioner;
+  private final PartitionFilter _partitionFilter;
+  private final Map<String, DataFileWriter<GenericData.Record>> _partitionToDataFileWriterMap = new HashMap<>();
+
+  public SegmentMapper(String mapperId, File inputSegment, SegmentMapperConfig mapperConfig, File mapperOutputDir) {
+    _inputSegment = inputSegment;
+    _mapperOutputDir = mapperOutputDir;
+
+    _mapperId = mapperId;
+    _avroSchema = SegmentProcessorUtils.convertPinotSchemaToAvroSchema(mapperConfig.getPinotSchema());
+    _recordTransformer = RecordTransformerFactory.getRecordTransformer(mapperConfig.getRecordTransformerConfig());
+    _partitioner = PartitionerFactory.getPartitioner(mapperConfig.getPartitioningConfig());
+    _partitionFilter = PartitionerFactory.getPartitionFilter(mapperConfig.getPartitioningConfig());
+    LOGGER.info(
+        "Initialized mapper with id: {}, input segment: {}, output dir: {}, recordTransformer: {}, partitioner: {}, partitionFilter: {}",
+        _mapperId, _inputSegment, _mapperOutputDir, _recordTransformer.getClass(), _partitioner.getClass(),
+        _partitionFilter.getClass());
+  }
+
+  /**
+   * Reads the input segment and generates partitioned avro data files into the mapper output directory
+   * Records for each partition are put into a directory of its own withing the mapper output directory, identified by the partition name
+   */
+  public void map()
+      throws Exception {
+
+    PinotSegmentRecordReader segmentRecordReader = new PinotSegmentRecordReader(_inputSegment);
+    GenericRow reusableRow = new GenericRow();
+    GenericData.Record reusableRecord = new GenericData.Record(_avroSchema);

Review comment:
       Can we interface out the underlying data file format? We may need to plug in other data format for performance boost (for example, @Jackie-Jiang used the mmaped file with custom format for star-tree generator and it's also doing the similar work - sort, aggregation)

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/transformer/TransformFunctionRecordTransformer.java
##########
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.transformer;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.core.data.function.FunctionEvaluator;
+import org.apache.pinot.core.data.function.FunctionEvaluatorFactory;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * RecordTransformer which executes transform functions to transform columns of record
+ * Does not follow any particular order, and hence cannot support transformations where strict order of execution is needed
+ */
+public class TransformFunctionRecordTransformer implements RecordTransformer {
+
+  private final Map<String, FunctionEvaluator> _functionEvaluatorMap = new HashMap<>();
+
+  public TransformFunctionRecordTransformer(Map<String, String> transformFunctionMap) {

Review comment:
       What if I want to run the custom transformation function?
   
   The spec doesn't have a way to specify the custom transformer?
   
   ```
     "recordTransformerConfig": {
       "transformFunctionsMap": {
         "epochMillis": "round(epochMillis, 86400000)" // round to nearest day
       }
     },
   
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #5934: Segment processing framework

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #5934:
URL: https://github.com/apache/incubator-pinot/pull/5934#discussion_r488325887



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/utils/SegmentProcessorUtils.java
##########
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.utils;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * Helper util methods for SegmentProcessorFramework
+ */
+public final class SegmentProcessorUtils {
+
+  private SegmentProcessorUtils() {
+  }
+
+  /**
+   * Convert a GenericRow to an avro GenericRecord
+   */
+  public static GenericData.Record convertGenericRowToAvroRecord(GenericRow genericRow,

Review comment:
       avro plugins is more for the extractors and decoders right. This is a util method. And pinot-core cannot be made to depend on pinot-avro for this




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #5934: Segment processing framework

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #5934:
URL: https://github.com/apache/incubator-pinot/pull/5934#discussion_r488812122



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapper.java
##########
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.framework;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
+import org.apache.pinot.core.segment.processing.partitioner.PartitionFilter;
+import org.apache.pinot.core.segment.processing.partitioner.Partitioner;
+import org.apache.pinot.core.segment.processing.partitioner.PartitionerFactory;
+import org.apache.pinot.core.segment.processing.transformer.RecordTransformer;
+import org.apache.pinot.core.segment.processing.transformer.RecordTransformerFactory;
+import org.apache.pinot.core.segment.processing.utils.SegmentProcessorUtils;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Mapper phase of the SegmentProcessorFramework.
+ * Reads the input segment and creates partitioned avro data files
+ * Performs:
+ * - record transformations
+ * - partitioning
+ * - partition filtering
+ */
+public class SegmentMapper {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentMapper.class);
+  private final File _inputSegment;
+  private final File _mapperOutputDir;
+
+  private final String _mapperId;
+  private final Schema _avroSchema;
+  private final RecordTransformer _recordTransformer;
+  private final Partitioner _partitioner;
+  private final PartitionFilter _partitionFilter;
+  private final Map<String, DataFileWriter<GenericData.Record>> _partitionToDataFileWriterMap = new HashMap<>();
+
+  public SegmentMapper(String mapperId, File inputSegment, SegmentMapperConfig mapperConfig, File mapperOutputDir) {
+    _inputSegment = inputSegment;
+    _mapperOutputDir = mapperOutputDir;
+
+    _mapperId = mapperId;
+    _avroSchema = SegmentProcessorUtils.convertPinotSchemaToAvroSchema(mapperConfig.getPinotSchema());
+    _recordTransformer = RecordTransformerFactory.getRecordTransformer(mapperConfig.getRecordTransformerConfig());
+    _partitioner = PartitionerFactory.getPartitioner(mapperConfig.getPartitioningConfig());
+    _partitionFilter = PartitionerFactory.getPartitionFilter(mapperConfig.getPartitioningConfig());
+    LOGGER.info(
+        "Initialized mapper with id: {}, input segment: {}, output dir: {}, recordTransformer: {}, partitioner: {}, partitionFilter: {}",
+        _mapperId, _inputSegment, _mapperOutputDir, _recordTransformer.getClass(), _partitioner.getClass(),
+        _partitionFilter.getClass());
+  }
+
+  /**
+   * Reads the input segment and generates partitioned avro data files into the mapper output directory
+   * Records for each partition are put into a directory of its own withing the mapper output directory, identified by the partition name
+   */
+  public void map()
+      throws Exception {
+
+    PinotSegmentRecordReader segmentRecordReader = new PinotSegmentRecordReader(_inputSegment);
+    GenericRow reusableRow = new GenericRow();
+    GenericData.Record reusableRecord = new GenericData.Record(_avroSchema);

Review comment:
       Added TODO




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #5934: Segment processing framework

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #5934:
URL: https://github.com/apache/incubator-pinot/pull/5934#discussion_r481519977



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentReducer.java
##########
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.framework;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.pinot.core.segment.processing.collector.Collector;
+import org.apache.pinot.core.segment.processing.collector.CollectorFactory;
+import org.apache.pinot.core.segment.processing.utils.SegmentProcessorUtils;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Reducer phase of the SegmentProcessorFramework
+ * Reads the avro files in the input directory and creates output avro files in the reducer output directory.
+ * The avro files in the input directory are expected to contain data for only 1 partition
+ * Performs operations on that partition data as follows:
+ * - concatenation/rollup of records
+ * - split
+ * - TODO: dedup
+ */
+public class SegmentReducer {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentReducer.class);
+  private static final int MAX_RECORDS_TO_COLLECT = 5_000_000;
+
+  private final File _reducerInputDir;
+  private final File _reducerOutputDir;
+
+  private final String _reducerId;
+  private final Schema _pinotSchema;
+  private final org.apache.avro.Schema _avroSchema;
+  private final Collector _collector;
+  private final int _numRecordsPerPart;
+
+  public SegmentReducer(String reducerId, File reducerInputDir, SegmentReducerConfig reducerConfig,
+      File reducerOutputDir) {
+    _reducerInputDir = reducerInputDir;
+    _reducerOutputDir = reducerOutputDir;
+
+    _reducerId = reducerId;
+    _pinotSchema = reducerConfig.getPinotSchema();
+    _avroSchema = SegmentProcessorUtils.convertPinotSchemaToAvroSchema(_pinotSchema);
+    _collector = CollectorFactory.getCollector(reducerConfig.getCollectorConfig(), _pinotSchema);
+    _numRecordsPerPart = reducerConfig.getNumRecordsPerPart();
+    LOGGER.info("Initialized reducer with id: {}, input dir: {}, output dir: {}, collector: {}, numRecordsPerPart: {}",
+        _reducerId, _reducerInputDir, _reducerOutputDir, _collector.getClass(), _numRecordsPerPart);
+  }
+
+  /**
+   * Reads the avro files in the input directory.
+   * Performs configured operations and outputs to other avro file(s) in the reducer output directory.
+   */
+  public void reduce()
+      throws Exception {
+
+    int part = 0;
+    for (File inputFile : _reducerInputDir.listFiles()) {
+
+      RecordReader avroRecordReader = RecordReaderFactory
+          .getRecordReaderByClass("org.apache.pinot.plugin.inputformat.avro.AvroRecordReader", inputFile,
+              _pinotSchema.getColumnNames(), null);
+
+      while (avroRecordReader.hasNext()) {
+        GenericRow next = avroRecordReader.next();
+
+        // Aggregations
+        _collector.collect(next);
+
+        // Exceeded max records allowed to collect. Flush
+        if (_collector.size() == MAX_RECORDS_TO_COLLECT) {

Review comment:
       `Should we use _numRecordsPerPart here so that once the collector collects enough records, we flush them` - I had initially done it this way. But then I noticed while testing, that if user sets a very low numRecordsPerPart, we will aggregate less and flush very frequently. Having a bigger MAX_RECORDS_TO_COLLECT allows for more aggregation




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #5934: Segment processing framework

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #5934:
URL: https://github.com/apache/incubator-pinot/pull/5934#discussion_r479439110



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapper.java
##########
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.framework;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
+import org.apache.pinot.core.segment.processing.partitioner.PartitionFilter;
+import org.apache.pinot.core.segment.processing.partitioner.Partitioner;
+import org.apache.pinot.core.segment.processing.partitioner.PartitionerFactory;
+import org.apache.pinot.core.segment.processing.transformer.RecordTransformer;
+import org.apache.pinot.core.segment.processing.transformer.RecordTransformerFactory;
+import org.apache.pinot.core.segment.processing.utils.SegmentProcessorUtils;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Mapper phase of the SegmentProcessorFramework.
+ * Reads the input segment and creates partitioned avro data files
+ * Performs:
+ * - record transformations
+ * - partitioning
+ * - partition filtering
+ */
+public class SegmentMapper {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentMapper.class);
+  private final File _inputSegment;
+  private final File _mapperOutputDir;
+
+  private final String _mapperId;
+  private final Schema _avroSchema;
+  private final RecordTransformer _recordTransformer;
+  private final Partitioner _partitioner;
+  private final PartitionFilter _partitionFilter;
+  private final Map<String, DataFileWriter<GenericData.Record>> _partitionToDataFileWriterMap = new HashMap<>();
+
+  public SegmentMapper(String mapperId, File inputSegment, SegmentMapperConfig mapperConfig, File mapperOutputDir) {
+    _inputSegment = inputSegment;
+    _mapperOutputDir = mapperOutputDir;
+
+    _mapperId = mapperId;
+    _avroSchema = SegmentProcessorUtils.convertPinotSchemaToAvroSchema(mapperConfig.getPinotSchema());
+    _recordTransformer = RecordTransformerFactory.getRecordTransformer(mapperConfig.getRecordTransformerConfig());
+    _partitioner = PartitionerFactory.getPartitioner(mapperConfig.getPartitioningConfig());
+    _partitionFilter = PartitionerFactory.getPartitionFilter(mapperConfig.getPartitioningConfig());
+    LOGGER.info(
+        "Initialized mapper with id: {}, input segment: {}, output dir: {}, recordTransformer: {}, partitioner: {}, partitionFilter: {}",
+        _mapperId, _inputSegment, _mapperOutputDir, _recordTransformer.getClass(), _partitioner.getClass(),
+        _partitionFilter.getClass());
+  }
+
+  /**
+   * Reads the input segment and generates partitioned avro data files into the mapper output directory
+   * Records for each partition are put into a directory of its own withing the mapper output directory, identified by the partition name
+   */
+  public void map()
+      throws Exception {
+
+    PinotSegmentRecordReader segmentRecordReader = new PinotSegmentRecordReader(_inputSegment);
+    GenericRow reusableRow = new GenericRow();
+    GenericData.Record reusableRecord = new GenericData.Record(_avroSchema);
+
+    Set<String> selectedPartitions = new HashSet<>();
+    Set<String> rejectedPartitions = new HashSet<>();
+
+    while (segmentRecordReader.hasNext()) {
+      reusableRow = segmentRecordReader.next(reusableRow);
+
+      // Record transformation
+      reusableRow = _recordTransformer.transformRecord(reusableRow);

Review comment:
       Currently no record filtering happens in transformRecord. If we want to do record filtering in Mapper, I think we can introduce a new phase, then it would become  transform -> filter -> partition -> partitionFilter -> flush
   
   My goal in this phase was to mainly include everything that the current code (of segment merge/rollup) does, and also include everything that the realtime to offline project needs. Filtering wasn't required in either, so skipped it




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #5934: Segment processing framework

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #5934:
URL: https://github.com/apache/incubator-pinot/pull/5934#discussion_r481506439



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/partitioner/PartitionFilter.java
##########
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.partitioner;
+
+/**
+ * Used for filtering partitions in the mapper
+ */
+public interface PartitionFilter {

Review comment:
       true. Remove partition filtering. Added Record filtering. All the corresponding changes in FunctionEvaluator have been reverted.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #5934: Segment processing framework

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #5934:
URL: https://github.com/apache/incubator-pinot/pull/5934#discussion_r486676878



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/utils/SegmentProcessorUtils.java
##########
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.utils;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * Helper util methods for SegmentProcessorFramework
+ */
+public final class SegmentProcessorUtils {
+
+  private SegmentProcessorUtils() {
+  }
+
+  /**
+   * Convert a GenericRow to an avro GenericRecord
+   */
+  public static GenericData.Record convertGenericRowToAvroRecord(GenericRow genericRow,
+      GenericData.Record reusableRecord) {
+    for (String field : genericRow.getFieldToValueMap().keySet()) {

Review comment:
       I didn't follow. Aren't all nullValueFields already expected to be in fieldToValueMap ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #5934: Segment processing framework

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on a change in pull request #5934:
URL: https://github.com/apache/incubator-pinot/pull/5934#discussion_r481309965



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/utils/SegmentProcessorUtils.java
##########
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.utils;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * Helper util methods for SegmentProcessorFramework
+ */
+public final class SegmentProcessorUtils {
+
+  private SegmentProcessorUtils() {
+  }
+
+  /**
+   * Convert a GenericRow to an avro GenericRecord
+   */
+  public static GenericData.Record convertGenericRowToAvroRecord(GenericRow genericRow,

Review comment:
       These should go into avro plugins? Why introduce dependency on avro here?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/RollupCollector.java
##########
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.collector;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.MetricFieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * A Collector that rolls up the incoming records on unique dimensions + time columns, based on provided aggregation types for metrics.
+ * By default will use the SUM aggregation on metrics.
+ */
+public class RollupCollector implements Collector {
+
+  private final Map<Record, GenericRow> _collection = new HashMap<>();
+
+  private final int _keySize;
+  private final int _valueSize;
+  private final String[] _keyColumns;
+  private final String[] _valueColumns;
+  private final ValueAggregator[] _valueAggregators;
+  private final MetricFieldSpec[] _metricFieldSpecs;
+
+  public RollupCollector(CollectorConfig collectorConfig, Schema schema) {
+    _keySize = schema.getColumnNames().size() - schema.getMetricNames().size();
+    _valueSize = schema.getMetricNames().size();
+    _keyColumns = new String[_keySize];
+    _valueColumns = new String[_valueSize];
+    _valueAggregators = new ValueAggregator[_valueSize];
+    _metricFieldSpecs = new MetricFieldSpec[_valueSize];
+
+    Map<String, ValueAggregatorFactory.ValueAggregatorType> aggregatorTypeMap = collectorConfig.getAggregatorTypeMap();
+    if (aggregatorTypeMap == null) {
+      aggregatorTypeMap = Collections.emptyMap();
+    }
+    int valIdx = 0;
+    int keyIdx = 0;
+    for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
+      if (!fieldSpec.isVirtualColumn()) {
+        String name = fieldSpec.getName();
+        if (fieldSpec.getFieldType().equals(FieldSpec.FieldType.METRIC)) {
+          _metricFieldSpecs[valIdx] = (MetricFieldSpec) fieldSpec;
+          _valueColumns[valIdx] = name;
+          _valueAggregators[valIdx] = ValueAggregatorFactory.getValueAggregator(
+              aggregatorTypeMap.getOrDefault(name, ValueAggregatorFactory.ValueAggregatorType.SUM).toString());
+          valIdx++;
+        } else {
+          _keyColumns[keyIdx++] = name;
+        }
+      }
+    }
+  }
+
+  /**
+   * If a row already exists in the collection (based on dimension + time columns), rollup the metric values, else add the row
+   */
+  @Override
+  public void collect(GenericRow genericRow) {
+    Object[] key = new Object[_keySize];
+    for (int i = 0; i < _keySize; i++) {
+      key[i] = genericRow.getValue(_keyColumns[i]);
+    }
+    Record keyRecord = new Record(key);
+    GenericRow prev = _collection.get(keyRecord);
+    if (prev == null) {
+      _collection.put(keyRecord, genericRow);
+    } else {
+      for (int i = 0; i < _valueSize; i++) {
+        String valueColumn = _valueColumns[i];
+        Object aggregate = _valueAggregators[i]
+            .aggregate(prev.getValue(valueColumn), genericRow.getValue(valueColumn), _metricFieldSpecs[i]);
+        prev.putValue(valueColumn, aggregate);
+      }
+    }
+  }
+
+  @Override
+  public Iterator<GenericRow> iterator() {
+    return _collection.values().iterator();
+  }
+
+  @Override
+  public int size() {
+    return _collection.size();
+  }
+
+  @Override
+  public void reset() {
+    _collection.clear();
+  }
+
+  /**
+   * A record representation for the keys of the record
+   * Note that the dimensions can have multi-value columns, and hence the equals and hashCode need deep array operations
+   */
+  private static class Record {
+    private final Object[] _values;

Review comment:
       Are these the key parts? If so, can we name the member as `_keyParts`? `_values` gets confusing.

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/RollupCollector.java
##########
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.collector;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.MetricFieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * A Collector that rolls up the incoming records on unique dimensions + time columns, based on provided aggregation types for metrics.
+ * By default will use the SUM aggregation on metrics.
+ */
+public class RollupCollector implements Collector {
+
+  private final Map<Record, GenericRow> _collection = new HashMap<>();
+
+  private final int _keySize;
+  private final int _valueSize;
+  private final String[] _keyColumns;
+  private final String[] _valueColumns;
+  private final ValueAggregator[] _valueAggregators;
+  private final MetricFieldSpec[] _metricFieldSpecs;
+
+  public RollupCollector(CollectorConfig collectorConfig, Schema schema) {
+    _keySize = schema.getColumnNames().size() - schema.getMetricNames().size();

Review comment:
       So, we include virtual columns here?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/CollectorFactory.java
##########
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.collector;
+
+import org.apache.pinot.spi.data.Schema;
+
+
+/**
+ * Factory for constructing a Collector from CollectorConfig
+ */
+public final class CollectorFactory {
+
+  private CollectorFactory() {
+
+  }
+
+  public enum CollectorType {
+    ROLLUP, CONCAT

Review comment:
       It is useful to add some comments on each of these, explaining what the collector does (or, what it does differently than others)

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/partitioner/PartitionerFactory.java
##########
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.partitioner;
+
+import com.google.common.base.Preconditions;
+
+
+/**
+ * Factory for Partitioner and PartitionFilter
+ */
+public final class PartitionerFactory {
+
+  private PartitionerFactory() {
+
+  }
+
+  public enum PartitionerType {
+    NO_OP, ROW_HASH, COLUMN_VALUE, TRANSFORM_FUNCTION, TABLE_PARTITION_CONFIG
+  }
+
+  /**
+   * Construct a Partitioner using the PartitioningConfig
+   */
+  public static Partitioner getPartitioner(PartitioningConfig config) {
+
+    Partitioner partitioner = null;
+    switch (config.getPartitionerType()) {
+      case NO_OP:
+        partitioner = new NoOpPartitioner();
+        break;
+      case ROW_HASH:
+        Preconditions
+            .checkState(config.getNumPartitions() > 0, "Must provide numPartitions > 0 for ROW_HASH partitioner");
+        partitioner = new RowHashPartitioner(config.getNumPartitions());
+        break;
+      case COLUMN_VALUE:
+        Preconditions.checkState(config.getColumnName() != null, "Must provide columnName for COLUMN_VALUE partitioner");
+        partitioner = new ColumnValuePartitioner(config.getColumnName());
+        break;
+      case TRANSFORM_FUNCTION:
+        Preconditions.checkState(config.getTransformFunction() != null,
+            "Must provide transformFunction for TRANSFORM_FUNCTION partitioner");
+        partitioner = new TransformFunctionPartitioner(config.getTransformFunction());
+        break;
+      case TABLE_PARTITION_CONFIG:

Review comment:
       Since these are all derived segments (merging m segments into n) should we not be using the table's partitioner all the time?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/Collector.java
##########
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.collector;
+
+import java.util.Iterator;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * Collects and stores GenericRows
+ */
+public interface Collector {
+
+  /**
+   * Collects the given GenericRow and stores it
+   * @param genericRow the generic row to add to the collection
+   */
+  void collect(GenericRow genericRow);
+
+  /**
+   * Provides an iterator for the GenericRows in the collection
+   */
+  Iterator<GenericRow> iterator();
+
+  /**
+   * The size of the collection

Review comment:
       What does size mean? Number of rows? Memory size?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentReducer.java
##########
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.framework;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.pinot.core.segment.processing.collector.Collector;
+import org.apache.pinot.core.segment.processing.collector.CollectorFactory;
+import org.apache.pinot.core.segment.processing.utils.SegmentProcessorUtils;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Reducer phase of the SegmentProcessorFramework
+ * Reads the avro files in the input directory and creates output avro files in the reducer output directory.
+ * The avro files in the input directory are expected to contain data for only 1 partition
+ * Performs operations on that partition data as follows:
+ * - concatenation/rollup of records
+ * - split
+ * - TODO: dedup
+ */
+public class SegmentReducer {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentReducer.class);
+  private static final int MAX_RECORDS_TO_COLLECT = 5_000_000;
+
+  private final File _reducerInputDir;
+  private final File _reducerOutputDir;
+
+  private final String _reducerId;
+  private final Schema _pinotSchema;
+  private final org.apache.avro.Schema _avroSchema;
+  private final Collector _collector;
+  private final int _numRecordsPerPart;
+
+  public SegmentReducer(String reducerId, File reducerInputDir, SegmentReducerConfig reducerConfig,
+      File reducerOutputDir) {
+    _reducerInputDir = reducerInputDir;
+    _reducerOutputDir = reducerOutputDir;
+
+    _reducerId = reducerId;
+    _pinotSchema = reducerConfig.getPinotSchema();
+    _avroSchema = SegmentProcessorUtils.convertPinotSchemaToAvroSchema(_pinotSchema);
+    _collector = CollectorFactory.getCollector(reducerConfig.getCollectorConfig(), _pinotSchema);
+    _numRecordsPerPart = reducerConfig.getNumRecordsPerPart();
+    LOGGER.info("Initialized reducer with id: {}, input dir: {}, output dir: {}, collector: {}, numRecordsPerPart: {}",
+        _reducerId, _reducerInputDir, _reducerOutputDir, _collector.getClass(), _numRecordsPerPart);
+  }
+
+  /**
+   * Reads the avro files in the input directory.
+   * Performs configured operations and outputs to other avro file(s) in the reducer output directory.
+   */
+  public void reduce()
+      throws Exception {
+
+    int part = 0;
+    for (File inputFile : _reducerInputDir.listFiles()) {
+
+      RecordReader avroRecordReader = RecordReaderFactory
+          .getRecordReaderByClass("org.apache.pinot.plugin.inputformat.avro.AvroRecordReader", inputFile,
+              _pinotSchema.getColumnNames(), null);
+
+      while (avroRecordReader.hasNext()) {
+        GenericRow next = avroRecordReader.next();
+
+        // Aggregations
+        _collector.collect(next);
+
+        // Exceeded max records allowed to collect. Flush
+        if (_collector.size() == MAX_RECORDS_TO_COLLECT) {

Review comment:
       Instead of checking the number of records, can we have a method in the collector that says it is full? The criteria could then be something else depending on the collector.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #5934: Segment processing framework

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #5934:
URL: https://github.com/apache/incubator-pinot/pull/5934#discussion_r486009106



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java
##########
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.framework;
+
+import com.google.common.base.Preconditions;
+import org.apache.pinot.core.segment.processing.collector.CollectorConfig;
+import org.apache.pinot.core.segment.processing.filter.RecordFilterConfig;
+import org.apache.pinot.core.segment.processing.partitioner.PartitioningConfig;
+import org.apache.pinot.core.segment.processing.transformer.RecordTransformerConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+
+
+/**
+ * Config for configuring the phases of {@link SegmentProcessorFramework}
+ */
+public class SegmentProcessorConfig {

Review comment:
       One requirement for `SegmentMergeRollup` is to be able to put the custom name for the segment name (or at least need to put the prefix and the sequenced `merged_XXX_0...M` Where do you think it's the best place to configure those?
   
   Your segment framework also faces the same issue with the sequence id. So, the sequence id should be handled implicitly by the framework at least. And, we can probably add the config for the prefix.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org