You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2021/06/03 21:06:01 UTC

[GitHub] [incubator-pinot] npawar commented on a change in pull request #7013: Produce GenericRow file in segment processing mapper

npawar commented on a change in pull request #7013:
URL: https://github.com/apache/incubator-pinot/pull/7013#discussion_r645103605



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/utils/SegmentProcessingUtils.java
##########
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.utils;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+
+
+public class SegmentProcessingUtils {
+  private SegmentProcessingUtils() {
+  }
+
+  /**
+   * Returns the field specs with the names sorted in alphabetical order.
+   */
+  public static List<FieldSpec> getFieldSpecs(Schema schema) {

Review comment:
       nit: maybe rename this to indicate it will skip virtual columns? or add it to javadoc

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/utils/SegmentProcessingUtils.java
##########
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.utils;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+
+
+public class SegmentProcessingUtils {

Review comment:
       nit: add final here for good practice

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/utils/SegmentProcessingUtils.java
##########
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.utils;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+
+
+public class SegmentProcessingUtils {
+  private SegmentProcessingUtils() {
+  }
+
+  /**
+   * Returns the field specs with the names sorted in alphabetical order.
+   */
+  public static List<FieldSpec> getFieldSpecs(Schema schema) {
+    List<FieldSpec> fieldSpecs = new ArrayList<>();
+    for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
+      if (!fieldSpec.isVirtualColumn()) {
+        fieldSpecs.add(fieldSpec);
+      }
+    }
+    fieldSpecs.sort(Comparator.comparing(FieldSpec::getName));
+    return fieldSpecs;
+  }
+
+  /**
+   * Returns the field specs with sorted column in the front, followed by other columns sorted in alphabetical order.
+   */
+  public static List<FieldSpec> getFieldSpecs(Schema schema, List<String> sortOrder) {

Review comment:
       same. can we clarify that this will return the virtual columns if they are in sort order, and skip them otherwise? Also, is that intentional?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapper.java
##########
@@ -55,98 +59,113 @@
  * - partitioning
  */
 public class SegmentMapper {
-
   private static final Logger LOGGER = LoggerFactory.getLogger(SegmentMapper.class);
-  private final File _inputSegment;
+
+  private final List<RecordReader> _recordReaders;
   private final File _mapperOutputDir;
 
-  private final String _mapperId;
-  private final Schema _avroSchema;
-  private final RecordTransformer _recordTransformer;
+  private final List<FieldSpec> _fieldSpecs;
+  private final boolean _includeNullFields;
+
+  // TODO: Merge the following transformers into one. Currently we need an extra DataTypeTransformer in the end in case
+  //       _recordTransformer changes the data type.
+  private final CompositeTransformer _defaultRecordTransformer;
   private final RecordFilter _recordFilter;
-  private final int _numPartitioners;
+  private final RecordTransformer _recordTransformer;
+  private final DataTypeTransformer _dataTypeTransformer;
+
   private final List<Partitioner> _partitioners = new ArrayList<>();
-  private final Map<String, DataFileWriter<GenericData.Record>> _partitionToDataFileWriterMap = new HashMap<>();
+  private final Map<String, GenericRowFileManager> _partitionToFileManagerMap = new HashMap<>();
 
-  public SegmentMapper(String mapperId, File inputSegment, SegmentMapperConfig mapperConfig, File mapperOutputDir) {
-    _inputSegment = inputSegment;
+  public SegmentMapper(List<RecordReader> recordReaders, SegmentMapperConfig mapperConfig, File mapperOutputDir) {

Review comment:
       can you change the javadocs to reflect that this doesn't take a file anymore, instead multiple record readers?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapper.java
##########
@@ -55,98 +59,113 @@
  * - partitioning
  */
 public class SegmentMapper {
-
   private static final Logger LOGGER = LoggerFactory.getLogger(SegmentMapper.class);
-  private final File _inputSegment;
+
+  private final List<RecordReader> _recordReaders;
   private final File _mapperOutputDir;
 
-  private final String _mapperId;
-  private final Schema _avroSchema;
-  private final RecordTransformer _recordTransformer;
+  private final List<FieldSpec> _fieldSpecs;
+  private final boolean _includeNullFields;
+
+  // TODO: Merge the following transformers into one. Currently we need an extra DataTypeTransformer in the end in case
+  //       _recordTransformer changes the data type.
+  private final CompositeTransformer _defaultRecordTransformer;
   private final RecordFilter _recordFilter;
-  private final int _numPartitioners;
+  private final RecordTransformer _recordTransformer;
+  private final DataTypeTransformer _dataTypeTransformer;
+
   private final List<Partitioner> _partitioners = new ArrayList<>();
-  private final Map<String, DataFileWriter<GenericData.Record>> _partitionToDataFileWriterMap = new HashMap<>();
+  private final Map<String, GenericRowFileManager> _partitionToFileManagerMap = new HashMap<>();
 
-  public SegmentMapper(String mapperId, File inputSegment, SegmentMapperConfig mapperConfig, File mapperOutputDir) {
-    _inputSegment = inputSegment;
+  public SegmentMapper(List<RecordReader> recordReaders, SegmentMapperConfig mapperConfig, File mapperOutputDir) {
+    _recordReaders = recordReaders;
     _mapperOutputDir = mapperOutputDir;
 
-    _mapperId = mapperId;
-    _avroSchema = SegmentProcessorAvroUtils.convertPinotSchemaToAvroSchema(mapperConfig.getPinotSchema());
+    TableConfig tableConfig = mapperConfig.getTableConfig();
+    Schema schema = mapperConfig.getSchema();
+    List<String> sortOrder = tableConfig.getIndexingConfig().getSortedColumn();
+    if (CollectionUtils.isNotEmpty(sortOrder)) {
+      _fieldSpecs = SegmentProcessingUtils.getFieldSpecs(schema, sortOrder);
+    } else {
+      _fieldSpecs = SegmentProcessingUtils.getFieldSpecs(schema);
+    }
+    _includeNullFields = tableConfig.getIndexingConfig().isNullHandlingEnabled();
+    _defaultRecordTransformer = CompositeTransformer.getDefaultTransformer(tableConfig, schema);
     _recordFilter = RecordFilterFactory.getRecordFilter(mapperConfig.getRecordFilterConfig());
     _recordTransformer = RecordTransformerFactory.getRecordTransformer(mapperConfig.getRecordTransformerConfig());
+    _dataTypeTransformer = new DataTypeTransformer(schema);
     for (PartitionerConfig partitionerConfig : mapperConfig.getPartitionerConfigs()) {
       _partitioners.add(PartitionerFactory.getPartitioner(partitionerConfig));
     }
-    _numPartitioners = _partitioners.size();
     LOGGER.info(
-        "Initialized mapper with id: {}, input segment: {}, output dir: {}, recordTransformer: {}, recordFilter: {}, partitioners: {}",
-        _mapperId, _inputSegment, _mapperOutputDir, _recordTransformer.getClass(), _recordFilter.getClass(),
+        "Initialized mapper with {} record readers, output dir: {}, recordTransformer: {}, recordFilter: {}, partitioners: {}",
+        _recordReaders.size(), _mapperOutputDir, _recordTransformer.getClass(), _recordFilter.getClass(),
         _partitioners.stream().map(p -> p.getClass().toString()).collect(Collectors.joining(",")));
   }
 
   /**
    * Reads the input segment and generates partitioned avro data files into the mapper output directory
    * Records for each partition are put into a directory of its own withing the mapper output directory, identified by the partition name
    */
-  public void map()
+  public Map<String, GenericRowFileManager> map()
       throws Exception {
-
-    PinotSegmentRecordReader segmentRecordReader = new PinotSegmentRecordReader(_inputSegment);
-    GenericRow reusableRow = new GenericRow();
-    GenericData.Record reusableRecord = new GenericData.Record(_avroSchema);
-    String[] partitions = new String[_numPartitioners];
-
-    while (segmentRecordReader.hasNext()) {
-      reusableRow = segmentRecordReader.next(reusableRow);
-
-      // Record filtering
-      if (_recordFilter.filter(reusableRow)) {
-        continue;
-      }
-
-      // Record transformation
-      reusableRow = _recordTransformer.transformRecord(reusableRow);
-
-      // Partitioning
-      int p = 0;
-      for (Partitioner partitioner : _partitioners) {
-        partitions[p++] = partitioner.getPartition(reusableRow);
-      }
-      String partition = StringUtil.join("_", partitions);
-
-      // Create writer for the partition, if not exists
-      DataFileWriter<GenericData.Record> recordWriter = _partitionToDataFileWriterMap.get(partition);
-      if (recordWriter == null) {
-        File partDir = new File(_mapperOutputDir, partition);
-        if (!partDir.exists()) {
-          Files.createDirectory(Paths.get(partDir.getAbsolutePath()));
+    GenericRow reuse = new GenericRow();
+    for (RecordReader recordReader : _recordReaders) {
+      while (recordReader.hasNext()) {
+        reuse = recordReader.next(reuse);
+
+        // TODO: Add ComplexTypeTransformer here. Currently it is not idempotent so cannot add it
+
+        if (reuse.getValue(GenericRow.MULTIPLE_RECORDS_KEY) != null) {
+          //noinspection unchecked
+          for (GenericRow row : (Collection<GenericRow>) reuse.getValue(GenericRow.MULTIPLE_RECORDS_KEY)) {
+            GenericRow transformedRow = _defaultRecordTransformer.transform(row);
+            if (transformedRow != null && IngestionUtils.shouldIngestRow(transformedRow) && !_recordFilter
+                .filter(transformedRow)) {
+              writeRecord(transformedRow);
+            }
+          }
+        } else {
+          GenericRow transformedRow = _defaultRecordTransformer.transform(reuse);
+          if (transformedRow != null && IngestionUtils.shouldIngestRow(transformedRow) && !_recordFilter
+              .filter(transformedRow)) {
+            writeRecord(transformedRow);
+          }
         }
-        recordWriter = new DataFileWriter<>(new GenericDatumWriter<>(_avroSchema));
-        recordWriter.create(_avroSchema, new File(partDir, createMapperOutputFileName(_mapperId)));
-        _partitionToDataFileWriterMap.put(partition, recordWriter);
-      }
 
-      // Write record to avro file for its partition
-      SegmentProcessorAvroUtils.convertGenericRowToAvroRecord(reusableRow, reusableRecord);
-      recordWriter.append(reusableRecord);
+        reuse.clear();
+      }
+    }
 
-      reusableRow.clear();
+    for (GenericRowFileManager fileManager : _partitionToFileManagerMap.values()) {
+      fileManager.closeFileWriter();
     }
+
+    return _partitionToFileManagerMap;
   }
 
-  /**
-   * Cleanup the mapper state
-   */
-  public void cleanup()
+  private void writeRecord(GenericRow row)
       throws IOException {
-    for (DataFileWriter<GenericData.Record> recordDataFileWriter : _partitionToDataFileWriterMap.values()) {
-      recordDataFileWriter.close();
+    // Record transformation
+    row = _dataTypeTransformer.transform(_recordTransformer.transformRecord(row));
+
+    // Partitioning
+    int numPartitioners = _partitioners.size();
+    String[] partitions = new String[numPartitioners];

Review comment:
       does it make sense to move this `new String[]` out of write, so that it can be shared across all record writes?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/collector/ConcatCollector.java
##########
@@ -40,60 +41,39 @@
  * A Collector implementation for collecting and concatenating all incoming rows.
  */
 public class ConcatCollector implements Collector {
-  private static final String RECORD_OFFSET_FILE_NAME = "record.offset";
-  private static final String RECORD_DATA_FILE_NAME = "record.data";
-
-  private final List<FieldSpec> _fieldSpecs = new ArrayList<>();
   private final int _numSortColumns;
   private final SortOrderComparator _sortOrderComparator;
   private final File _workingDir;
-  private final File _recordOffsetFile;
-  private final File _recordDataFile;
+  private final GenericRowFileManager _recordFileManager;
 
   private GenericRowFileWriter _recordFileWriter;
   private GenericRowFileReader _recordFileReader;
   private int _numDocs;
 
   public ConcatCollector(CollectorConfig collectorConfig, Schema schema) {
     List<String> sortOrder = collectorConfig.getSortOrder();
+    List<FieldSpec> fieldSpecs;
     if (CollectionUtils.isNotEmpty(sortOrder)) {
+      fieldSpecs = SegmentProcessingUtils.getFieldSpecs(schema, sortOrder);
       _numSortColumns = sortOrder.size();
-      DataType[] sortColumnStoredTypes = new DataType[_numSortColumns];
-      for (int i = 0; i < _numSortColumns; i++) {
-        String sortColumn = sortOrder.get(i);
-        FieldSpec fieldSpec = schema.getFieldSpecFor(sortColumn);
-        Preconditions.checkArgument(fieldSpec != null, "Failed to find sort column: %s", sortColumn);
-        Preconditions.checkArgument(fieldSpec.isSingleValueField(), "Cannot sort on MV column: %s", sortColumn);
-        sortColumnStoredTypes[i] = fieldSpec.getDataType().getStoredType();
-        _fieldSpecs.add(fieldSpec);
-      }
-      _sortOrderComparator = new SortOrderComparator(_numSortColumns, sortColumnStoredTypes);
-      for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
-        if (!fieldSpec.isVirtualColumn() && !sortOrder.contains(fieldSpec.getName())) {
-          _fieldSpecs.add(fieldSpec);
-        }
-      }
+      _sortOrderComparator = SegmentProcessingUtils.getSortOrderComparator(fieldSpecs, _numSortColumns);
     } else {
+      fieldSpecs = SegmentProcessingUtils.getFieldSpecs(schema);
       _numSortColumns = 0;
       _sortOrderComparator = null;
-      for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
-        if (!fieldSpec.isVirtualColumn()) {
-          _fieldSpecs.add(fieldSpec);
-        }
-      }
     }
 
     _workingDir =
         new File(FileUtils.getTempDirectory(), String.format("concat_collector_%d", System.currentTimeMillis()));
     Preconditions.checkState(_workingDir.mkdirs(), "Failed to create dir: %s for %s with config: %s",
         _workingDir.getAbsolutePath(), ConcatCollector.class.getSimpleName(), collectorConfig);
-    _recordOffsetFile = new File(_workingDir, RECORD_OFFSET_FILE_NAME);
-    _recordDataFile = new File(_workingDir, RECORD_DATA_FILE_NAME);
 
+    // TODO: Pass 'includeNullFields' from the config

Review comment:
       not related to this PR, but why do we need a config for this? shouldn't this always be true?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapper.java
##########
@@ -55,98 +59,113 @@
  * - partitioning
  */
 public class SegmentMapper {
-
   private static final Logger LOGGER = LoggerFactory.getLogger(SegmentMapper.class);
-  private final File _inputSegment;
+
+  private final List<RecordReader> _recordReaders;
   private final File _mapperOutputDir;
 
-  private final String _mapperId;
-  private final Schema _avroSchema;
-  private final RecordTransformer _recordTransformer;
+  private final List<FieldSpec> _fieldSpecs;
+  private final boolean _includeNullFields;
+
+  // TODO: Merge the following transformers into one. Currently we need an extra DataTypeTransformer in the end in case
+  //       _recordTransformer changes the data type.
+  private final CompositeTransformer _defaultRecordTransformer;
   private final RecordFilter _recordFilter;
-  private final int _numPartitioners;
+  private final RecordTransformer _recordTransformer;
+  private final DataTypeTransformer _dataTypeTransformer;
+
   private final List<Partitioner> _partitioners = new ArrayList<>();
-  private final Map<String, DataFileWriter<GenericData.Record>> _partitionToDataFileWriterMap = new HashMap<>();
+  private final Map<String, GenericRowFileManager> _partitionToFileManagerMap = new HashMap<>();
 
-  public SegmentMapper(String mapperId, File inputSegment, SegmentMapperConfig mapperConfig, File mapperOutputDir) {
-    _inputSegment = inputSegment;
+  public SegmentMapper(List<RecordReader> recordReaders, SegmentMapperConfig mapperConfig, File mapperOutputDir) {

Review comment:
       why changed this to take multiple record readers, instead of creating multiple mappers with 1 record reader each? Initially the idea behind having 1 file per mapper was that we can evetually run the mappers in parallel.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org