You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/06/21 00:07:26 UTC
[pinot] branch master updated: avoid redundant record transforms (#8935)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9464359324 avoid redundant record transforms (#8935)
9464359324 is described below
commit 94643593247729ff5319feefbad17f17b9531d47
Author: Xiaobing <61...@users.noreply.github.com>
AuthorDate: Mon Jun 20 17:07:21 2022 -0700
avoid redundant record transforms (#8935)
---
.../framework/SegmentProcessorFramework.java | 5 ++---
.../realtime/converter/RealtimeSegmentConverter.java | 4 ++--
.../creator/RecordReaderSegmentCreationDataSource.java | 9 +++++++--
.../local/segment/creator/TransformPipeline.java | 7 +++++++
.../creator/impl/SegmentIndexCreationDriverImpl.java | 18 +++++++++++++-----
.../tools/admin/command/DataImportDryRunCommand.java | 5 -----
6 files changed, 31 insertions(+), 17 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
index 6815c58d97..e86b456915 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
@@ -32,8 +32,8 @@ import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileRecordR
import org.apache.pinot.core.segment.processing.mapper.SegmentMapper;
import org.apache.pinot.core.segment.processing.reducer.Reducer;
import org.apache.pinot.core.segment.processing.reducer.ReducerFactory;
-import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
import org.apache.pinot.segment.local.segment.creator.RecordReaderSegmentCreationDataSource;
+import org.apache.pinot.segment.local.segment.creator.TransformPipeline;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.creator.name.SegmentNameGeneratorFactory;
@@ -132,7 +132,6 @@ public class SegmentProcessorFramework {
}
int maxNumRecordsPerSegment = _segmentProcessorConfig.getSegmentConfig().getMaxNumRecordsPerSegment();
- CompositeTransformer passThroughTransformer = CompositeTransformer.getPassThroughTransformer();
int sequenceId = 0;
for (Map.Entry<String, GenericRowFileManager> entry : partitionToFileManagerMap.entrySet()) {
String partitionId = entry.getKey();
@@ -151,7 +150,7 @@ public class SegmentProcessorFramework {
GenericRowFileRecordReader recordReaderForRange = recordReader.getRecordReaderForRange(startRowId, endRowId);
SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
driver.init(generatorConfig, new RecordReaderSegmentCreationDataSource(recordReaderForRange),
- passThroughTransformer, null);
+ TransformPipeline.getPassThroughPipeline());
driver.build();
outputSegmentDirs.add(driver.getOutputDirectory());
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
index 8544ea5d19..2ca1758651 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
@@ -27,7 +27,7 @@ import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
import org.apache.pinot.segment.local.realtime.converter.stats.RealtimeSegmentSegmentCreationDataSource;
-import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
+import org.apache.pinot.segment.local.segment.creator.TransformPipeline;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
@@ -119,7 +119,7 @@ public class RealtimeSegmentConverter {
recordReader.init(_realtimeSegmentImpl, sortedDocIds);
RealtimeSegmentSegmentCreationDataSource dataSource =
new RealtimeSegmentSegmentCreationDataSource(_realtimeSegmentImpl, recordReader);
- driver.init(genConfig, dataSource, CompositeTransformer.getPassThroughTransformer(), null);
+ driver.init(genConfig, dataSource, TransformPipeline.getPassThroughPipeline());
driver.build();
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/RecordReaderSegmentCreationDataSource.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/RecordReaderSegmentCreationDataSource.java
index 2db825b566..411c040cdd 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/RecordReaderSegmentCreationDataSource.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/RecordReaderSegmentCreationDataSource.java
@@ -38,16 +38,21 @@ public class RecordReaderSegmentCreationDataSource implements SegmentCreationDat
private static final Logger LOGGER = LoggerFactory.getLogger(RecordReaderSegmentCreationDataSource.class);
private final RecordReader _recordReader;
+ private TransformPipeline _transformPipeline;
public RecordReaderSegmentCreationDataSource(RecordReader recordReader) {
_recordReader = recordReader;
}
+ public void setTransformPipeline(TransformPipeline transformPipeline) {
+ _transformPipeline = transformPipeline;
+ }
+
@Override
public SegmentPreIndexStatsCollector gatherStats(StatsCollectorConfig statsCollectorConfig) {
try {
- TransformPipeline transformPipeline =
- new TransformPipeline(statsCollectorConfig.getTableConfig(), statsCollectorConfig.getSchema());
+ TransformPipeline transformPipeline = _transformPipeline != null ? _transformPipeline
+ : new TransformPipeline(statsCollectorConfig.getTableConfig(), statsCollectorConfig.getSchema());
SegmentPreIndexStatsCollector collector = new SegmentPreIndexStatsCollectorImpl(statsCollectorConfig);
collector.init();
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/TransformPipeline.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/TransformPipeline.java
index 94021193e7..9da9dab2b9 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/TransformPipeline.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/TransformPipeline.java
@@ -63,6 +63,13 @@ public class TransformPipeline {
_complexTypeTransformer = ComplexTypeTransformer.getComplexTypeTransformer(tableConfig);
}
+ /**
+ * Returns a pass through pipeline that does not transform the record.
+ */
+ public static TransformPipeline getPassThroughPipeline() {
+ return new TransformPipeline(CompositeTransformer.getPassThroughTransformer(), null);
+ }
+
/**
* Process and validate the decoded row against schema.
* @param decodedRow the row data to pass in
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
index 18a50dbf0b..98ef1e7e0b 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
@@ -33,7 +33,6 @@ import javax.annotation.Nullable;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.segment.local.recordtransformer.ComplexTypeTransformer;
-import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
import org.apache.pinot.segment.local.recordtransformer.RecordTransformer;
import org.apache.pinot.segment.local.segment.creator.IntermediateSegmentSegmentCreationDataSource;
import org.apache.pinot.segment.local.segment.creator.RecordReaderSegmentCreationDataSource;
@@ -77,7 +76,6 @@ import org.slf4j.LoggerFactory;
* Implementation of an index segment creator.
*/
// TODO: Check resource leaks
-@SuppressWarnings("serial")
public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDriver {
private static final Logger LOGGER = LoggerFactory.getLogger(SegmentIndexCreationDriverImpl.class);
@@ -151,20 +149,30 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive
LOGGER.info("RecordReaderSegmentCreationDataSource is used");
dataSource = new RecordReaderSegmentCreationDataSource(recordReader);
}
- init(config, dataSource, CompositeTransformer.getDefaultTransformer(config.getTableConfig(), config.getSchema()),
- ComplexTypeTransformer.getComplexTypeTransformer(config.getTableConfig()));
+ init(config, dataSource, new TransformPipeline(config.getTableConfig(), config.getSchema()));
}
+ @Deprecated
public void init(SegmentGeneratorConfig config, SegmentCreationDataSource dataSource,
RecordTransformer recordTransformer, @Nullable ComplexTypeTransformer complexTypeTransformer)
throws Exception {
+ init(config, dataSource, new TransformPipeline(recordTransformer, complexTypeTransformer));
+ }
+
+ public void init(SegmentGeneratorConfig config, SegmentCreationDataSource dataSource,
+ TransformPipeline transformPipeline)
+ throws Exception {
_config = config;
_recordReader = dataSource.getRecordReader();
_dataSchema = config.getSchema();
if (config.isFailOnEmptySegment()) {
Preconditions.checkState(_recordReader.hasNext(), "No record in data source");
}
- _transformPipeline = new TransformPipeline(recordTransformer, complexTypeTransformer);
+ _transformPipeline = transformPipeline;
+ // Use the same transform pipeline if the data source is backed by a record reader
+ if (dataSource instanceof RecordReaderSegmentCreationDataSource) {
+ ((RecordReaderSegmentCreationDataSource) dataSource).setTransformPipeline(transformPipeline);
+ }
// Initialize stats collection
_segmentStats = dataSource.gatherStats(
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/DataImportDryRunCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/DataImportDryRunCommand.java
index 984b324b08..0047806e00 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/DataImportDryRunCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/DataImportDryRunCommand.java
@@ -21,7 +21,6 @@ package org.apache.pinot.tools.admin.command;
import java.io.File;
import java.util.TreeMap;
import org.apache.pinot.plugin.inputformat.json.JSONRecordReader;
-import org.apache.pinot.segment.local.segment.creator.RecordReaderSegmentCreationDataSource;
import org.apache.pinot.segment.local.segment.creator.TransformPipeline;
import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -57,16 +56,12 @@ public class DataImportDryRunCommand extends AbstractBaseAdminCommand implements
JSONRecordReader jsonRecordReader = new JSONRecordReader();
jsonRecordReader.init(new File(_jsonFile), null, null);
- RecordReaderSegmentCreationDataSource dataSource =
- new RecordReaderSegmentCreationDataSource(jsonRecordReader);
-
TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFile), TableConfig.class);
StatsCollectorConfig statsCollectorConfig = new StatsCollectorConfig(tableConfig, new Schema(), null);
TransformPipeline transformPipeline =
new TransformPipeline(statsCollectorConfig.getTableConfig(), statsCollectorConfig.getSchema());
-
// Gather the stats
GenericRow reuse = new GenericRow();
TransformPipeline.Result reusedResult = new TransformPipeline.Result();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org