You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2023/06/14 03:39:31 UTC

[pinot] branch master updated: Making segmentMapper do the init and cleanup of RecordReader (#10874)

This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 449c95c6b9 Making segmentMapper do the init and cleanup of RecordReader (#10874)
449c95c6b9 is described below

commit 449c95c6b93499417a75574607d160231158e2e3
Author: swaminathanmanish <12...@users.noreply.github.com>
AuthorDate: Tue Jun 13 20:39:25 2023 -0700

    Making segmentMapper do the init and cleanup of RecordReader (#10874)
---
 .../framework/SegmentProcessorFramework.java       | 34 ++++++++--
 .../segment/processing/mapper/SegmentMapper.java   | 75 ++++++++++++++--------
 .../processing/framework/SegmentMapperTest.java    |  4 +-
 .../framework/SegmentProcessorFrameworkTest.java   | 37 +++++++++++
 .../spi/data/readers/RecordReaderFileConfig.java   | 57 ++++++++++++++++
 5 files changed, 172 insertions(+), 35 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
index d64f35f515..f1757c1e4c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
@@ -41,6 +41,7 @@ import org.apache.pinot.segment.spi.creator.name.SegmentNameGeneratorFactory;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderFileConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,7 +59,7 @@ import org.slf4j.LoggerFactory;
 public class SegmentProcessorFramework {
   private static final Logger LOGGER = LoggerFactory.getLogger(SegmentProcessorFramework.class);
 
-  private final List<RecordReader> _recordReaders;
+  private final List<RecordReaderFileConfig> _recordReaderFileConfigs;
   private final SegmentProcessorConfig _segmentProcessorConfig;
   private final File _mapperOutputDir;
   private final File _reducerOutputDir;
@@ -66,17 +67,26 @@ public class SegmentProcessorFramework {
   private Map<String, GenericRowFileManager> _partitionToFileManagerMap;
 
   /**
-   * Initializes the SegmentProcessorFramework with record readers, config and working directory.
+   * Initializes the SegmentProcessorFramework with record readers, config and working directory. We will now rely on
+   * users passing RecordReaderFileConfig, since that also allows us to do lazy initialization of RecordReaders.
+   * Please use the other constructor that uses RecordReaderFileConfig.
    */
+  @Deprecated
   public SegmentProcessorFramework(List<RecordReader> recordReaders, SegmentProcessorConfig segmentProcessorConfig,
       File workingDir)
       throws IOException {
-    Preconditions.checkState(!recordReaders.isEmpty(), "No record reader is provided");
+    this(segmentProcessorConfig, workingDir, convertRecordReadersToRecordReaderFileConfig(recordReaders));
+  }
+
+  public SegmentProcessorFramework(SegmentProcessorConfig segmentProcessorConfig, File workingDir,
+      List<RecordReaderFileConfig> recordReaderFileConfigs)
+      throws IOException {
 
+    Preconditions.checkState(!recordReaderFileConfigs.isEmpty(), "No recordReaderFileConfigs provided");
     LOGGER.info("Initializing SegmentProcessorFramework with {} record readers, config: {}, working dir: {}",
-        recordReaders.size(), segmentProcessorConfig, workingDir.getAbsolutePath());
+        recordReaderFileConfigs.size(), segmentProcessorConfig, workingDir.getAbsolutePath());
+    _recordReaderFileConfigs = recordReaderFileConfigs;
 
-    _recordReaders = recordReaders;
     _segmentProcessorConfig = segmentProcessorConfig;
 
     _mapperOutputDir = new File(workingDir, "mapper_output");
@@ -87,6 +97,16 @@ public class SegmentProcessorFramework {
     FileUtils.forceMkdir(_segmentsOutputDir);
   }
 
+  private static List<RecordReaderFileConfig> convertRecordReadersToRecordReaderFileConfig(
+      List<RecordReader> recordReaders) {
+    Preconditions.checkState(!recordReaders.isEmpty(), "No record reader is provided");
+    List<RecordReaderFileConfig> recordReaderFileConfigs = new ArrayList<>();
+    for (RecordReader recordReader : recordReaders) {
+      recordReaderFileConfigs.add(new RecordReaderFileConfig(recordReader));
+    }
+    return recordReaderFileConfigs;
+  }
+
   /**
    * Processes records from record readers per the provided config, returns the directories for the generated segments.
    */
@@ -114,8 +134,8 @@ public class SegmentProcessorFramework {
   private List<File> doProcess()
       throws Exception {
     // Map phase
-    LOGGER.info("Beginning map phase on {} record readers", _recordReaders.size());
-    SegmentMapper mapper = new SegmentMapper(_recordReaders, _segmentProcessorConfig, _mapperOutputDir);
+    LOGGER.info("Beginning map phase on {} record readers", _recordReaderFileConfigs.size());
+    SegmentMapper mapper = new SegmentMapper(_recordReaderFileConfigs, _segmentProcessorConfig, _mapperOutputDir);
     _partitionToFileManagerMap = mapper.map();
 
     // Check for mapper output files
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
index 302aa2869f..dae6fd52ee 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
@@ -44,6 +44,8 @@ import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderFactory;
+import org.apache.pinot.spi.data.readers.RecordReaderFileConfig;
 import org.apache.pinot.spi.utils.StringUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,7 +62,7 @@ import org.slf4j.LoggerFactory;
 public class SegmentMapper {
   private static final Logger LOGGER = LoggerFactory.getLogger(SegmentMapper.class);
 
-  private final List<RecordReader> _recordReaders;
+  private List<RecordReaderFileConfig> _recordReaderFileConfigs;
   private final SegmentProcessorConfig _processorConfig;
   private final File _mapperOutputDir;
 
@@ -75,8 +77,9 @@ public class SegmentMapper {
   // NOTE: Use TreeMap so that the order is deterministic
   private final Map<String, GenericRowFileManager> _partitionToFileManagerMap = new TreeMap<>();
 
-  public SegmentMapper(List<RecordReader> recordReaders, SegmentProcessorConfig processorConfig, File mapperOutputDir) {
-    _recordReaders = recordReaders;
+  public SegmentMapper(List<RecordReaderFileConfig> recordReaderFileConfigs,
+      SegmentProcessorConfig processorConfig, File mapperOutputDir) {
+    _recordReaderFileConfigs = recordReaderFileConfigs;
     _processorConfig = processorConfig;
     _mapperOutputDir = mapperOutputDir;
 
@@ -97,8 +100,9 @@ public class SegmentMapper {
     }
     // Time partition + partition from partitioners
     _partitionsBuffer = new String[numPartitioners + 1];
+
     LOGGER.info("Initialized mapper with {} record readers, output dir: {}, timeHandler: {}, partitioners: {}",
-        _recordReaders.size(), _mapperOutputDir, _timeHandler.getClass(),
+        _recordReaderFileConfigs.size(), _mapperOutputDir, _timeHandler.getClass(),
         Arrays.stream(_partitioners).map(p -> p.getClass().toString()).collect(Collectors.joining(",")));
   }
 
@@ -122,33 +126,24 @@ public class SegmentMapper {
   private Map<String, GenericRowFileManager> doMap()
       throws Exception {
     Consumer<Object> observer = _processorConfig.getProgressObserver();
-    int totalCount = _recordReaders.size();
+    int totalCount = _recordReaderFileConfigs.size();
     int count = 1;
     GenericRow reuse = new GenericRow();
-    for (RecordReader recordReader : _recordReaders) {
-      observer.accept(String.format("Doing map phase on data from RecordReader (%d out of %d)", count++, totalCount));
-      while (recordReader.hasNext()) {
-        reuse = recordReader.next(reuse);
-
-        // TODO: Add ComplexTypeTransformer here. Currently it is not idempotent so cannot add it
-
-        if (reuse.getValue(GenericRow.MULTIPLE_RECORDS_KEY) != null) {
-          //noinspection unchecked
-          for (GenericRow row : (Collection<GenericRow>) reuse.getValue(GenericRow.MULTIPLE_RECORDS_KEY)) {
-            GenericRow transformedRow = _recordTransformer.transform(row);
-            if (transformedRow != null && IngestionUtils.shouldIngestRow(transformedRow)) {
-              writeRecord(transformedRow);
-            }
-          }
-        } else {
-          GenericRow transformedRow = _recordTransformer.transform(reuse);
-          if (transformedRow != null && IngestionUtils.shouldIngestRow(transformedRow)) {
-            writeRecord(transformedRow);
-          }
+    for (RecordReaderFileConfig recordReaderFileConfig : _recordReaderFileConfigs) {
+      RecordReader recordReader = recordReaderFileConfig._recordReader;
+      if (recordReader == null) {
+        try {
+          recordReader =
+              RecordReaderFactory.getRecordReader(recordReaderFileConfig._fileFormat, recordReaderFileConfig._dataFile,
+                  recordReaderFileConfig._fieldsToRead, recordReaderFileConfig._recordReaderConfig);
+          mapAndTransformRow(recordReader, reuse, observer, count, totalCount);
+        } finally {
+          recordReader.close();
         }
-
-        reuse.clear();
+      } else {
+        mapAndTransformRow(recordReader, reuse, observer, count, totalCount);
       }
+      count++;
     }
 
     for (GenericRowFileManager fileManager : _partitionToFileManagerMap.values()) {
@@ -158,6 +153,32 @@ public class SegmentMapper {
     return _partitionToFileManagerMap;
   }
 
+  private void mapAndTransformRow(RecordReader recordReader, GenericRow reuse,
+      Consumer<Object> observer, int count, int totalCount) throws Exception {
+    observer.accept(String.format("Doing map phase on data from RecordReader (%d out of %d)", count, totalCount));
+    while (recordReader.hasNext()) {
+      reuse = recordReader.next(reuse);
+
+      // TODO: Add ComplexTypeTransformer here. Currently it is not idempotent so cannot add it
+
+      if (reuse.getValue(GenericRow.MULTIPLE_RECORDS_KEY) != null) {
+        //noinspection unchecked
+        for (GenericRow row : (Collection<GenericRow>) reuse.getValue(GenericRow.MULTIPLE_RECORDS_KEY)) {
+          GenericRow transformedRow = _recordTransformer.transform(row);
+          if (transformedRow != null && IngestionUtils.shouldIngestRow(transformedRow)) {
+            writeRecord(transformedRow);
+          }
+        }
+      } else {
+        GenericRow transformedRow = _recordTransformer.transform(reuse);
+        if (transformedRow != null && IngestionUtils.shouldIngestRow(transformedRow)) {
+          writeRecord(transformedRow);
+        }
+      }
+      reuse.clear();
+    }
+  }
+
   private void writeRecord(GenericRow row)
       throws IOException {
     String timePartition = _timeHandler.handleTime(row);
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
index 3616e190d6..3a3ee33398 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
@@ -48,6 +48,7 @@ import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderFileConfig;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -141,7 +142,8 @@ public class SegmentMapperTest {
     PinotSegmentRecordReader segmentRecordReader = new PinotSegmentRecordReader();
     segmentRecordReader.init(_indexDir, null, null, true);
     SegmentMapper segmentMapper =
-        new SegmentMapper(Collections.singletonList(segmentRecordReader), processorConfig, mapperOutputDir);
+        new SegmentMapper(Collections.singletonList(new RecordReaderFileConfig(segmentRecordReader)),
+            processorConfig, mapperOutputDir);
     Map<String, GenericRowFileManager> partitionToFileManagerMap = segmentMapper.map();
     segmentRecordReader.close();
 
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
index c5868063e7..e977ad364c 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.segment.processing.framework;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -42,8 +43,10 @@ import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.FileFormat;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderFileConfig;
 import org.apache.pinot.spi.utils.ReadMode;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.annotations.AfterClass;
@@ -175,6 +178,40 @@ public class SegmentProcessorFrameworkTest {
     }
   }
 
+  /**
+   * Test lazy initialization of record readers. Here we create
+   * RecoderReaderFileConfig and the actual reader is initialized during the
+   * map phase.
+   * @throws Exception
+   */
+  @Test
+  public void testRecordReaderFileConfigInit() throws Exception {
+    File workingDir = new File(TEMP_DIR, "segmentOutput");
+    FileUtils.forceMkdir(workingDir);
+    ClassLoader classLoader = getClass().getClassLoader();
+    URL resource = classLoader.getResource("data/dimBaseballTeams.csv");
+    RecordReaderFileConfig reader = new RecordReaderFileConfig(FileFormat.CSV,
+        new File(resource.toURI()),
+        null, null);
+    TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").
+        setTimeColumnName("time").build();
+
+    Schema schema =
+        new Schema.SchemaBuilder().setSchemaName("mySchema").addSingleValueDimension("teamId",
+                DataType.STRING, "")
+            .addSingleValueDimension("teamName", DataType.STRING, "")
+            .addDateTime("time", DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
+
+    SegmentProcessorConfig config =
+        new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema).build();
+    SegmentProcessorFramework framework = new SegmentProcessorFramework(config, workingDir, List.of(reader));
+    List<File> outputSegments = framework.process();
+    assertEquals(outputSegments.size(), 1);
+    ImmutableSegment segment = ImmutableSegmentLoader.load(outputSegments.get(0), ReadMode.mmap);
+    SegmentMetadata segmentMetadata = segment.getSegmentMetadata();
+    assertEquals(segmentMetadata.getTotalDocs(), 51);
+  }
+
   @Test
   public void testSingleSegment()
       throws Exception {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java
new file mode 100644
index 0000000000..628de1052a
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data.readers;
+
+import java.io.File;
+import java.util.Set;
+import javax.annotation.Nullable;
+
+
+/**
+ * Wraps RecordReader info to instantiate a reader. Users can either pass in the
+ * RecordReader instance directly or the info required to initialize the RecordReader, so that the
+ * RecordReader can be initialized just when its about to be used, which avoids early/eager
+ * initialization/memory allocation.
+ */
+public class RecordReaderFileConfig {
+  public final FileFormat _fileFormat;
+  public final File _dataFile;
+  public final Set<String> _fieldsToRead;
+  public final RecordReaderConfig _recordReaderConfig;
+  public final RecordReader _recordReader;
+
+  // Pass in the info needed to initialize the reader
+  public RecordReaderFileConfig(FileFormat fileFormat, File dataFile, Set<String> fieldsToRead,
+      @Nullable RecordReaderConfig recordReaderConfig) {
+    _fileFormat = fileFormat;
+    _dataFile = dataFile;
+    _fieldsToRead = fieldsToRead;
+    _recordReaderConfig = recordReaderConfig;
+    _recordReader = null;
+  }
+
+  // Pass in the reader instance directly
+  public RecordReaderFileConfig(RecordReader recordReader) {
+    _recordReader = recordReader;
+    _fileFormat = null;
+    _dataFile = null;
+    _fieldsToRead = null;
+    _recordReaderConfig = null;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org