You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/06/21 00:07:26 UTC

[pinot] branch master updated: avoid redundant record transforms (#8935)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9464359324 avoid redundant record transforms (#8935)
9464359324 is described below

commit 94643593247729ff5319feefbad17f17b9531d47
Author: Xiaobing <61...@users.noreply.github.com>
AuthorDate: Mon Jun 20 17:07:21 2022 -0700

    avoid redundant record transforms (#8935)
---
 .../framework/SegmentProcessorFramework.java           |  5 ++---
 .../realtime/converter/RealtimeSegmentConverter.java   |  4 ++--
 .../creator/RecordReaderSegmentCreationDataSource.java |  9 +++++++--
 .../local/segment/creator/TransformPipeline.java       |  7 +++++++
 .../creator/impl/SegmentIndexCreationDriverImpl.java   | 18 +++++++++++++-----
 .../tools/admin/command/DataImportDryRunCommand.java   |  5 -----
 6 files changed, 31 insertions(+), 17 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
index 6815c58d97..e86b456915 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
@@ -32,8 +32,8 @@ import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileRecordR
 import org.apache.pinot.core.segment.processing.mapper.SegmentMapper;
 import org.apache.pinot.core.segment.processing.reducer.Reducer;
 import org.apache.pinot.core.segment.processing.reducer.ReducerFactory;
-import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
 import org.apache.pinot.segment.local.segment.creator.RecordReaderSegmentCreationDataSource;
+import org.apache.pinot.segment.local.segment.creator.TransformPipeline;
 import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.segment.spi.creator.name.SegmentNameGeneratorFactory;
@@ -132,7 +132,6 @@ public class SegmentProcessorFramework {
     }
 
     int maxNumRecordsPerSegment = _segmentProcessorConfig.getSegmentConfig().getMaxNumRecordsPerSegment();
-    CompositeTransformer passThroughTransformer = CompositeTransformer.getPassThroughTransformer();
     int sequenceId = 0;
     for (Map.Entry<String, GenericRowFileManager> entry : partitionToFileManagerMap.entrySet()) {
       String partitionId = entry.getKey();
@@ -151,7 +150,7 @@ public class SegmentProcessorFramework {
         GenericRowFileRecordReader recordReaderForRange = recordReader.getRecordReaderForRange(startRowId, endRowId);
         SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
         driver.init(generatorConfig, new RecordReaderSegmentCreationDataSource(recordReaderForRange),
-            passThroughTransformer, null);
+            TransformPipeline.getPassThroughPipeline());
         driver.build();
         outputSegmentDirs.add(driver.getOutputDirectory());
       }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
index 8544ea5d19..2ca1758651 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
@@ -27,7 +27,7 @@ import org.apache.pinot.common.metrics.ServerGauge;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
 import org.apache.pinot.segment.local.realtime.converter.stats.RealtimeSegmentSegmentCreationDataSource;
-import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
+import org.apache.pinot.segment.local.segment.creator.TransformPipeline;
 import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
@@ -119,7 +119,7 @@ public class RealtimeSegmentConverter {
       recordReader.init(_realtimeSegmentImpl, sortedDocIds);
       RealtimeSegmentSegmentCreationDataSource dataSource =
           new RealtimeSegmentSegmentCreationDataSource(_realtimeSegmentImpl, recordReader);
-      driver.init(genConfig, dataSource, CompositeTransformer.getPassThroughTransformer(), null);
+      driver.init(genConfig, dataSource, TransformPipeline.getPassThroughPipeline());
       driver.build();
     }
 
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/RecordReaderSegmentCreationDataSource.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/RecordReaderSegmentCreationDataSource.java
index 2db825b566..411c040cdd 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/RecordReaderSegmentCreationDataSource.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/RecordReaderSegmentCreationDataSource.java
@@ -38,16 +38,21 @@ public class RecordReaderSegmentCreationDataSource implements SegmentCreationDat
   private static final Logger LOGGER = LoggerFactory.getLogger(RecordReaderSegmentCreationDataSource.class);
 
   private final RecordReader _recordReader;
+  private TransformPipeline _transformPipeline;
 
   public RecordReaderSegmentCreationDataSource(RecordReader recordReader) {
     _recordReader = recordReader;
   }
 
+  public void setTransformPipeline(TransformPipeline transformPipeline) {
+    _transformPipeline = transformPipeline;
+  }
+
   @Override
   public SegmentPreIndexStatsCollector gatherStats(StatsCollectorConfig statsCollectorConfig) {
     try {
-      TransformPipeline transformPipeline =
-          new TransformPipeline(statsCollectorConfig.getTableConfig(), statsCollectorConfig.getSchema());
+      TransformPipeline transformPipeline = _transformPipeline != null ? _transformPipeline
+          : new TransformPipeline(statsCollectorConfig.getTableConfig(), statsCollectorConfig.getSchema());
 
       SegmentPreIndexStatsCollector collector = new SegmentPreIndexStatsCollectorImpl(statsCollectorConfig);
       collector.init();
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/TransformPipeline.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/TransformPipeline.java
index 94021193e7..9da9dab2b9 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/TransformPipeline.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/TransformPipeline.java
@@ -63,6 +63,13 @@ public class TransformPipeline {
     _complexTypeTransformer = ComplexTypeTransformer.getComplexTypeTransformer(tableConfig);
   }
 
+  /**
+   * Returns a pass through pipeline that does not transform the record.
+   */
+  public static TransformPipeline getPassThroughPipeline() {
+    return new TransformPipeline(CompositeTransformer.getPassThroughTransformer(), null);
+  }
+
   /**
    * Process and validate the decoded row against schema.
    * @param decodedRow the row data to pass in
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
index 18a50dbf0b..98ef1e7e0b 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
@@ -33,7 +33,6 @@ import javax.annotation.Nullable;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.segment.local.recordtransformer.ComplexTypeTransformer;
-import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
 import org.apache.pinot.segment.local.recordtransformer.RecordTransformer;
 import org.apache.pinot.segment.local.segment.creator.IntermediateSegmentSegmentCreationDataSource;
 import org.apache.pinot.segment.local.segment.creator.RecordReaderSegmentCreationDataSource;
@@ -77,7 +76,6 @@ import org.slf4j.LoggerFactory;
  * Implementation of an index segment creator.
  */
 // TODO: Check resource leaks
-@SuppressWarnings("serial")
 public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDriver {
   private static final Logger LOGGER = LoggerFactory.getLogger(SegmentIndexCreationDriverImpl.class);
 
@@ -151,20 +149,30 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive
       LOGGER.info("RecordReaderSegmentCreationDataSource is used");
       dataSource = new RecordReaderSegmentCreationDataSource(recordReader);
     }
-    init(config, dataSource, CompositeTransformer.getDefaultTransformer(config.getTableConfig(), config.getSchema()),
-        ComplexTypeTransformer.getComplexTypeTransformer(config.getTableConfig()));
+    init(config, dataSource, new TransformPipeline(config.getTableConfig(), config.getSchema()));
   }
 
+  @Deprecated
   public void init(SegmentGeneratorConfig config, SegmentCreationDataSource dataSource,
       RecordTransformer recordTransformer, @Nullable ComplexTypeTransformer complexTypeTransformer)
       throws Exception {
+    init(config, dataSource, new TransformPipeline(recordTransformer, complexTypeTransformer));
+  }
+
+  public void init(SegmentGeneratorConfig config, SegmentCreationDataSource dataSource,
+      TransformPipeline transformPipeline)
+      throws Exception {
     _config = config;
     _recordReader = dataSource.getRecordReader();
     _dataSchema = config.getSchema();
     if (config.isFailOnEmptySegment()) {
       Preconditions.checkState(_recordReader.hasNext(), "No record in data source");
     }
-    _transformPipeline = new TransformPipeline(recordTransformer, complexTypeTransformer);
+    _transformPipeline = transformPipeline;
+    // Use the same transform pipeline if the data source is backed by a record reader
+    if (dataSource instanceof RecordReaderSegmentCreationDataSource) {
+      ((RecordReaderSegmentCreationDataSource) dataSource).setTransformPipeline(transformPipeline);
+    }
 
     // Initialize stats collection
     _segmentStats = dataSource.gatherStats(
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/DataImportDryRunCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/DataImportDryRunCommand.java
index 984b324b08..0047806e00 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/DataImportDryRunCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/DataImportDryRunCommand.java
@@ -21,7 +21,6 @@ package org.apache.pinot.tools.admin.command;
 import java.io.File;
 import java.util.TreeMap;
 import org.apache.pinot.plugin.inputformat.json.JSONRecordReader;
-import org.apache.pinot.segment.local.segment.creator.RecordReaderSegmentCreationDataSource;
 import org.apache.pinot.segment.local.segment.creator.TransformPipeline;
 import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -57,16 +56,12 @@ public class DataImportDryRunCommand extends AbstractBaseAdminCommand implements
     JSONRecordReader jsonRecordReader = new JSONRecordReader();
     jsonRecordReader.init(new File(_jsonFile), null, null);
 
-    RecordReaderSegmentCreationDataSource dataSource =
-        new RecordReaderSegmentCreationDataSource(jsonRecordReader);
-
     TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFile), TableConfig.class);
     StatsCollectorConfig statsCollectorConfig = new StatsCollectorConfig(tableConfig, new Schema(), null);
 
     TransformPipeline transformPipeline =
         new TransformPipeline(statsCollectorConfig.getTableConfig(), statsCollectorConfig.getSchema());
 
-
     // Gather the stats
     GenericRow reuse = new GenericRow();
     TransformPipeline.Result reusedResult = new TransformPipeline.Result();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org