You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2023/06/14 03:39:31 UTC
[pinot] branch master updated: Making segmentMapper do the init and cleanup of RecordReader (#10874)
This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 449c95c6b9 Making segmentMapper do the init and cleanup of RecordReader (#10874)
449c95c6b9 is described below
commit 449c95c6b93499417a75574607d160231158e2e3
Author: swaminathanmanish <12...@users.noreply.github.com>
AuthorDate: Tue Jun 13 20:39:25 2023 -0700
Making segmentMapper do the init and cleanup of RecordReader (#10874)
---
.../framework/SegmentProcessorFramework.java | 34 ++++++++--
.../segment/processing/mapper/SegmentMapper.java | 75 ++++++++++++++--------
.../processing/framework/SegmentMapperTest.java | 4 +-
.../framework/SegmentProcessorFrameworkTest.java | 37 +++++++++++
.../spi/data/readers/RecordReaderFileConfig.java | 57 ++++++++++++++++
5 files changed, 172 insertions(+), 35 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
index d64f35f515..f1757c1e4c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
@@ -41,6 +41,7 @@ import org.apache.pinot.segment.spi.creator.name.SegmentNameGeneratorFactory;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderFileConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,7 +59,7 @@ import org.slf4j.LoggerFactory;
public class SegmentProcessorFramework {
private static final Logger LOGGER = LoggerFactory.getLogger(SegmentProcessorFramework.class);
- private final List<RecordReader> _recordReaders;
+ private final List<RecordReaderFileConfig> _recordReaderFileConfigs;
private final SegmentProcessorConfig _segmentProcessorConfig;
private final File _mapperOutputDir;
private final File _reducerOutputDir;
@@ -66,17 +67,26 @@ public class SegmentProcessorFramework {
private Map<String, GenericRowFileManager> _partitionToFileManagerMap;
/**
- * Initializes the SegmentProcessorFramework with record readers, config and working directory.
+ * Initializes the SegmentProcessorFramework with record readers, config and working directory. We will now rely on
+ * users passing RecordReaderFileConfig, since that also allows us to do lazy initialization of RecordReaders.
+ * Please use the other constructor that uses RecordReaderFileConfig.
*/
+ @Deprecated
public SegmentProcessorFramework(List<RecordReader> recordReaders, SegmentProcessorConfig segmentProcessorConfig,
File workingDir)
throws IOException {
- Preconditions.checkState(!recordReaders.isEmpty(), "No record reader is provided");
+ this(segmentProcessorConfig, workingDir, convertRecordReadersToRecordReaderFileConfig(recordReaders));
+ }
+
+ public SegmentProcessorFramework(SegmentProcessorConfig segmentProcessorConfig, File workingDir,
+ List<RecordReaderFileConfig> recordReaderFileConfigs)
+ throws IOException {
+ Preconditions.checkState(!recordReaderFileConfigs.isEmpty(), "No recordReaderFileConfigs provided");
LOGGER.info("Initializing SegmentProcessorFramework with {} record readers, config: {}, working dir: {}",
- recordReaders.size(), segmentProcessorConfig, workingDir.getAbsolutePath());
+ recordReaderFileConfigs.size(), segmentProcessorConfig, workingDir.getAbsolutePath());
+ _recordReaderFileConfigs = recordReaderFileConfigs;
- _recordReaders = recordReaders;
_segmentProcessorConfig = segmentProcessorConfig;
_mapperOutputDir = new File(workingDir, "mapper_output");
@@ -87,6 +97,16 @@ public class SegmentProcessorFramework {
FileUtils.forceMkdir(_segmentsOutputDir);
}
+ private static List<RecordReaderFileConfig> convertRecordReadersToRecordReaderFileConfig(
+ List<RecordReader> recordReaders) {
+ Preconditions.checkState(!recordReaders.isEmpty(), "No record reader is provided");
+ List<RecordReaderFileConfig> recordReaderFileConfigs = new ArrayList<>();
+ for (RecordReader recordReader : recordReaders) {
+ recordReaderFileConfigs.add(new RecordReaderFileConfig(recordReader));
+ }
+ return recordReaderFileConfigs;
+ }
+
/**
* Processes records from record readers per the provided config, returns the directories for the generated segments.
*/
@@ -114,8 +134,8 @@ public class SegmentProcessorFramework {
private List<File> doProcess()
throws Exception {
// Map phase
- LOGGER.info("Beginning map phase on {} record readers", _recordReaders.size());
- SegmentMapper mapper = new SegmentMapper(_recordReaders, _segmentProcessorConfig, _mapperOutputDir);
+ LOGGER.info("Beginning map phase on {} record readers", _recordReaderFileConfigs.size());
+ SegmentMapper mapper = new SegmentMapper(_recordReaderFileConfigs, _segmentProcessorConfig, _mapperOutputDir);
_partitionToFileManagerMap = mapper.map();
// Check for mapper output files
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
index 302aa2869f..dae6fd52ee 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
@@ -44,6 +44,8 @@ import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderFactory;
+import org.apache.pinot.spi.data.readers.RecordReaderFileConfig;
import org.apache.pinot.spi.utils.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,7 +62,7 @@ import org.slf4j.LoggerFactory;
public class SegmentMapper {
private static final Logger LOGGER = LoggerFactory.getLogger(SegmentMapper.class);
- private final List<RecordReader> _recordReaders;
+ private List<RecordReaderFileConfig> _recordReaderFileConfigs;
private final SegmentProcessorConfig _processorConfig;
private final File _mapperOutputDir;
@@ -75,8 +77,9 @@ public class SegmentMapper {
// NOTE: Use TreeMap so that the order is deterministic
private final Map<String, GenericRowFileManager> _partitionToFileManagerMap = new TreeMap<>();
- public SegmentMapper(List<RecordReader> recordReaders, SegmentProcessorConfig processorConfig, File mapperOutputDir) {
- _recordReaders = recordReaders;
+ public SegmentMapper(List<RecordReaderFileConfig> recordReaderFileConfigs,
+ SegmentProcessorConfig processorConfig, File mapperOutputDir) {
+ _recordReaderFileConfigs = recordReaderFileConfigs;
_processorConfig = processorConfig;
_mapperOutputDir = mapperOutputDir;
@@ -97,8 +100,9 @@ public class SegmentMapper {
}
// Time partition + partition from partitioners
_partitionsBuffer = new String[numPartitioners + 1];
+
LOGGER.info("Initialized mapper with {} record readers, output dir: {}, timeHandler: {}, partitioners: {}",
- _recordReaders.size(), _mapperOutputDir, _timeHandler.getClass(),
+ _recordReaderFileConfigs.size(), _mapperOutputDir, _timeHandler.getClass(),
Arrays.stream(_partitioners).map(p -> p.getClass().toString()).collect(Collectors.joining(",")));
}
@@ -122,33 +126,24 @@ public class SegmentMapper {
private Map<String, GenericRowFileManager> doMap()
throws Exception {
Consumer<Object> observer = _processorConfig.getProgressObserver();
- int totalCount = _recordReaders.size();
+ int totalCount = _recordReaderFileConfigs.size();
int count = 1;
GenericRow reuse = new GenericRow();
- for (RecordReader recordReader : _recordReaders) {
- observer.accept(String.format("Doing map phase on data from RecordReader (%d out of %d)", count++, totalCount));
- while (recordReader.hasNext()) {
- reuse = recordReader.next(reuse);
-
- // TODO: Add ComplexTypeTransformer here. Currently it is not idempotent so cannot add it
-
- if (reuse.getValue(GenericRow.MULTIPLE_RECORDS_KEY) != null) {
- //noinspection unchecked
- for (GenericRow row : (Collection<GenericRow>) reuse.getValue(GenericRow.MULTIPLE_RECORDS_KEY)) {
- GenericRow transformedRow = _recordTransformer.transform(row);
- if (transformedRow != null && IngestionUtils.shouldIngestRow(transformedRow)) {
- writeRecord(transformedRow);
- }
- }
- } else {
- GenericRow transformedRow = _recordTransformer.transform(reuse);
- if (transformedRow != null && IngestionUtils.shouldIngestRow(transformedRow)) {
- writeRecord(transformedRow);
- }
+ for (RecordReaderFileConfig recordReaderFileConfig : _recordReaderFileConfigs) {
+ RecordReader recordReader = recordReaderFileConfig._recordReader;
+ if (recordReader == null) {
+ try {
+ recordReader =
+ RecordReaderFactory.getRecordReader(recordReaderFileConfig._fileFormat, recordReaderFileConfig._dataFile,
+ recordReaderFileConfig._fieldsToRead, recordReaderFileConfig._recordReaderConfig);
+ mapAndTransformRow(recordReader, reuse, observer, count, totalCount);
+ } finally {
+ recordReader.close();
}
-
- reuse.clear();
+ } else {
+ mapAndTransformRow(recordReader, reuse, observer, count, totalCount);
}
+ count++;
}
for (GenericRowFileManager fileManager : _partitionToFileManagerMap.values()) {
@@ -158,6 +153,32 @@ public class SegmentMapper {
return _partitionToFileManagerMap;
}
+ private void mapAndTransformRow(RecordReader recordReader, GenericRow reuse,
+ Consumer<Object> observer, int count, int totalCount) throws Exception {
+ observer.accept(String.format("Doing map phase on data from RecordReader (%d out of %d)", count, totalCount));
+ while (recordReader.hasNext()) {
+ reuse = recordReader.next(reuse);
+
+ // TODO: Add ComplexTypeTransformer here. Currently it is not idempotent so cannot add it
+
+ if (reuse.getValue(GenericRow.MULTIPLE_RECORDS_KEY) != null) {
+ //noinspection unchecked
+ for (GenericRow row : (Collection<GenericRow>) reuse.getValue(GenericRow.MULTIPLE_RECORDS_KEY)) {
+ GenericRow transformedRow = _recordTransformer.transform(row);
+ if (transformedRow != null && IngestionUtils.shouldIngestRow(transformedRow)) {
+ writeRecord(transformedRow);
+ }
+ }
+ } else {
+ GenericRow transformedRow = _recordTransformer.transform(reuse);
+ if (transformedRow != null && IngestionUtils.shouldIngestRow(transformedRow)) {
+ writeRecord(transformedRow);
+ }
+ }
+ reuse.clear();
+ }
+ }
+
private void writeRecord(GenericRow row)
throws IOException {
String timePartition = _timeHandler.handleTime(row);
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
index 3616e190d6..3a3ee33398 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
@@ -48,6 +48,7 @@ import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderFileConfig;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -141,7 +142,8 @@ public class SegmentMapperTest {
PinotSegmentRecordReader segmentRecordReader = new PinotSegmentRecordReader();
segmentRecordReader.init(_indexDir, null, null, true);
SegmentMapper segmentMapper =
- new SegmentMapper(Collections.singletonList(segmentRecordReader), processorConfig, mapperOutputDir);
+ new SegmentMapper(Collections.singletonList(new RecordReaderFileConfig(segmentRecordReader)),
+ processorConfig, mapperOutputDir);
Map<String, GenericRowFileManager> partitionToFileManagerMap = segmentMapper.map();
segmentRecordReader.close();
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
index c5868063e7..e977ad364c 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.segment.processing.framework;
import java.io.File;
import java.io.IOException;
+import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -42,8 +43,10 @@ import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.FileFormat;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderFileConfig;
import org.apache.pinot.spi.utils.ReadMode;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.AfterClass;
@@ -175,6 +178,40 @@ public class SegmentProcessorFrameworkTest {
}
}
+ /**
+ * Test lazy initialization of record readers. Here we create
+ * RecoderReaderFileConfig and the actual reader is initialized during the
+ * map phase.
+ * @throws Exception
+ */
+ @Test
+ public void testRecordReaderFileConfigInit() throws Exception {
+ File workingDir = new File(TEMP_DIR, "segmentOutput");
+ FileUtils.forceMkdir(workingDir);
+ ClassLoader classLoader = getClass().getClassLoader();
+ URL resource = classLoader.getResource("data/dimBaseballTeams.csv");
+ RecordReaderFileConfig reader = new RecordReaderFileConfig(FileFormat.CSV,
+ new File(resource.toURI()),
+ null, null);
+ TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").
+ setTimeColumnName("time").build();
+
+ Schema schema =
+ new Schema.SchemaBuilder().setSchemaName("mySchema").addSingleValueDimension("teamId",
+ DataType.STRING, "")
+ .addSingleValueDimension("teamName", DataType.STRING, "")
+ .addDateTime("time", DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
+
+ SegmentProcessorConfig config =
+ new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema).build();
+ SegmentProcessorFramework framework = new SegmentProcessorFramework(config, workingDir, List.of(reader));
+ List<File> outputSegments = framework.process();
+ assertEquals(outputSegments.size(), 1);
+ ImmutableSegment segment = ImmutableSegmentLoader.load(outputSegments.get(0), ReadMode.mmap);
+ SegmentMetadata segmentMetadata = segment.getSegmentMetadata();
+ assertEquals(segmentMetadata.getTotalDocs(), 51);
+ }
+
@Test
public void testSingleSegment()
throws Exception {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java
new file mode 100644
index 0000000000..628de1052a
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data.readers;
+
+import java.io.File;
+import java.util.Set;
+import javax.annotation.Nullable;
+
+
+/**
+ * Wraps RecordReader info to instantiate a reader. Users can either pass in the
+ * RecordReader instance directly or the info required to initialize the RecordReader, so that the
+ * RecordReader can be initialized just when its about to be used, which avoids early/eager
+ * initialization/memory allocation.
+ */
+public class RecordReaderFileConfig {
+ public final FileFormat _fileFormat;
+ public final File _dataFile;
+ public final Set<String> _fieldsToRead;
+ public final RecordReaderConfig _recordReaderConfig;
+ public final RecordReader _recordReader;
+
+ // Pass in the info needed to initialize the reader
+ public RecordReaderFileConfig(FileFormat fileFormat, File dataFile, Set<String> fieldsToRead,
+ @Nullable RecordReaderConfig recordReaderConfig) {
+ _fileFormat = fileFormat;
+ _dataFile = dataFile;
+ _fieldsToRead = fieldsToRead;
+ _recordReaderConfig = recordReaderConfig;
+ _recordReader = null;
+ }
+
+ // Pass in the reader instance directly
+ public RecordReaderFileConfig(RecordReader recordReader) {
+ _recordReader = recordReader;
+ _fileFormat = null;
+ _dataFile = null;
+ _fieldsToRead = null;
+ _recordReaderConfig = null;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org