You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2023/06/30 03:21:49 UTC

[pinot] branch master updated: Adding segmentRowComputer interface for SegmentProcessorFramework to use (#10987)

This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 379d7610ac Adding segmentRowComputer interface for SegmentProcessorFramework to use (#10987)
379d7610ac is described below

commit 379d7610ac58724d1dcfe5968ddad8bb37a988d3
Author: swaminathanmanish <12...@users.noreply.github.com>
AuthorDate: Thu Jun 29 20:21:43 2023 -0700

    Adding segmentRowComputer interface for SegmentProcessorFramework to use (#10987)
    
    * Adding segmentRowComputer interface for SegmentProcessorFramework to use
    
    * Addressing comments
---
 .../framework/DefaultSegmentNumRowProvider.java    | 39 ++++++++++++++++++++++
 .../framework/SegmentNumRowProvider.java           | 28 ++++++++++++++++
 .../framework/SegmentProcessorFramework.java       | 13 ++++++--
 .../framework/SegmentProcessorFrameworkTest.java   |  3 +-
 4 files changed, 79 insertions(+), 4 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/DefaultSegmentNumRowProvider.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/DefaultSegmentNumRowProvider.java
new file mode 100644
index 0000000000..740e56c137
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/DefaultSegmentNumRowProvider.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.framework;
+
+/**
+ * Provides the configured maxRecordsPerSegment
+ */
+public class DefaultSegmentNumRowProvider implements SegmentNumRowProvider {
+  private int _maxRecordsPerSegment;
+
+  public DefaultSegmentNumRowProvider(int maxRecordsPerSegment) {
+    _maxRecordsPerSegment = maxRecordsPerSegment;
+  }
+
+  @Override
+  public int getNumRows() {
+    return _maxRecordsPerSegment;
+  }
+
+  @Override
+  public void updateSegmentInfo(int numRows, long segmentSize) {
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentNumRowProvider.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentNumRowProvider.java
new file mode 100644
index 0000000000..04e7312a33
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentNumRowProvider.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.framework;
+
+/**
+ * Interface to compute rows for a segment, using past segment size
+ * and number of rows. This will be used by SegmentProcessorFramework.
+ */
+public interface SegmentNumRowProvider {
+  int getNumRows();
+  void updateSegmentInfo(int numRows, long segmentSize);
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
index f1757c1e4c..cee643c529 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
@@ -65,6 +65,7 @@ public class SegmentProcessorFramework {
   private final File _reducerOutputDir;
   private final File _segmentsOutputDir;
   private Map<String, GenericRowFileManager> _partitionToFileManagerMap;
+  private final SegmentNumRowProvider _segmentNumRowProvider;
 
   /**
    * Initializes the SegmentProcessorFramework with record readers, config and working directory. We will now rely on
@@ -75,11 +76,11 @@ public class SegmentProcessorFramework {
   public SegmentProcessorFramework(List<RecordReader> recordReaders, SegmentProcessorConfig segmentProcessorConfig,
       File workingDir)
       throws IOException {
-    this(segmentProcessorConfig, workingDir, convertRecordReadersToRecordReaderFileConfig(recordReaders));
+    this(segmentProcessorConfig, workingDir, convertRecordReadersToRecordReaderFileConfig(recordReaders), null);
   }
 
   public SegmentProcessorFramework(SegmentProcessorConfig segmentProcessorConfig, File workingDir,
-      List<RecordReaderFileConfig> recordReaderFileConfigs)
+      List<RecordReaderFileConfig> recordReaderFileConfigs, SegmentNumRowProvider segmentNumRowProvider)
       throws IOException {
 
     Preconditions.checkState(!recordReaderFileConfigs.isEmpty(), "No recordReaderFileConfigs provided");
@@ -95,6 +96,9 @@ public class SegmentProcessorFramework {
     FileUtils.forceMkdir(_reducerOutputDir);
     _segmentsOutputDir = new File(workingDir, "segments_output");
     FileUtils.forceMkdir(_segmentsOutputDir);
+
+    _segmentNumRowProvider = (segmentNumRowProvider == null) ? new DefaultSegmentNumRowProvider(
+        segmentProcessorConfig.getSegmentConfig().getMaxNumRecordsPerSegment()) : segmentNumRowProvider;
   }
 
   private static List<RecordReaderFileConfig> convertRecordReadersToRecordReaderFileConfig(
@@ -180,7 +184,6 @@ public class SegmentProcessorFramework {
       generatorConfig.setSegmentNamePostfix(segmentNamePostfix);
     }
 
-    int maxNumRecordsPerSegment = _segmentProcessorConfig.getSegmentConfig().getMaxNumRecordsPerSegment();
     int sequenceId = 0;
     for (Map.Entry<String, GenericRowFileManager> entry : _partitionToFileManagerMap.entrySet()) {
       String partitionId = entry.getKey();
@@ -192,7 +195,9 @@ public class SegmentProcessorFramework {
         LOGGER.info("Start creating segments on partition: {}, numRows: {}, numSortFields: {}", partitionId, numRows,
             numSortFields);
         GenericRowFileRecordReader recordReader = fileReader.getRecordReader();
+        int maxNumRecordsPerSegment;
         for (int startRowId = 0; startRowId < numRows; startRowId += maxNumRecordsPerSegment, sequenceId++) {
+          maxNumRecordsPerSegment = _segmentNumRowProvider.getNumRows();
           int endRowId = Math.min(startRowId + maxNumRecordsPerSegment, numRows);
           LOGGER.info("Start creating segment of sequenceId: {} with row range: {} to {}", sequenceId, startRowId,
               endRowId);
@@ -206,6 +211,8 @@ public class SegmentProcessorFramework {
               TransformPipeline.getPassThroughPipeline());
           driver.build();
           outputSegmentDirs.add(driver.getOutputDirectory());
+          _segmentNumRowProvider.updateSegmentInfo(driver.getSegmentStats().getTotalDocCount(),
+              FileUtils.sizeOfDirectory(driver.getOutputDirectory()));
         }
       } finally {
         fileManager.cleanUp();
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
index c0deaf85b5..39b66fa4dd 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
@@ -205,7 +205,8 @@ public class SegmentProcessorFrameworkTest {
 
     SegmentProcessorConfig config =
         new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema).build();
-    SegmentProcessorFramework framework = new SegmentProcessorFramework(config, workingDir, ImmutableList.of(reader));
+    SegmentProcessorFramework framework = new SegmentProcessorFramework(config, workingDir, ImmutableList.of(reader),
+        null);
     List<File> outputSegments = framework.process();
     assertEquals(outputSegments.size(), 1);
     ImmutableSegment segment = ImmutableSegmentLoader.load(outputSegments.get(0), ReadMode.mmap);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org