You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2023/06/30 03:21:49 UTC
[pinot] branch master updated: Adding segmentRowComputer interface for SegmentProcessorFramework to use (#10987)
This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 379d7610ac Adding segmentRowComputer interface for SegmentProcessorFramework to use (#10987)
379d7610ac is described below
commit 379d7610ac58724d1dcfe5968ddad8bb37a988d3
Author: swaminathanmanish <12...@users.noreply.github.com>
AuthorDate: Thu Jun 29 20:21:43 2023 -0700
Adding segmentRowComputer interface for SegmentProcessorFramework to use (#10987)
* Adding segmentRowComputer interface for SegmentProcessorFramework to use
* Addressing comments
---
.../framework/DefaultSegmentNumRowProvider.java | 39 ++++++++++++++++++++++
.../framework/SegmentNumRowProvider.java | 28 ++++++++++++++++
.../framework/SegmentProcessorFramework.java | 13 ++++++--
.../framework/SegmentProcessorFrameworkTest.java | 3 +-
4 files changed, 79 insertions(+), 4 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/DefaultSegmentNumRowProvider.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/DefaultSegmentNumRowProvider.java
new file mode 100644
index 0000000000..740e56c137
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/DefaultSegmentNumRowProvider.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.framework;
+
+/**
+ * Provides the configured maxRecordsPerSegment
+ */
+public class DefaultSegmentNumRowProvider implements SegmentNumRowProvider {
+ private int _maxRecordsPerSegment;
+
+ public DefaultSegmentNumRowProvider(int maxRecordsPerSegment) {
+ _maxRecordsPerSegment = maxRecordsPerSegment;
+ }
+
+ @Override
+ public int getNumRows() {
+ return _maxRecordsPerSegment;
+ }
+
+ @Override
+ public void updateSegmentInfo(int numRows, long segmentSize) {
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentNumRowProvider.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentNumRowProvider.java
new file mode 100644
index 0000000000..04e7312a33
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentNumRowProvider.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.framework;
+
+/**
+ * Interface to compute rows for a segment, using past segment size
+ * and number of rows. This will be used by SegmentProcessorFramework.
+ */
+public interface SegmentNumRowProvider {
+ int getNumRows();
+ void updateSegmentInfo(int numRows, long segmentSize);
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
index f1757c1e4c..cee643c529 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
@@ -65,6 +65,7 @@ public class SegmentProcessorFramework {
private final File _reducerOutputDir;
private final File _segmentsOutputDir;
private Map<String, GenericRowFileManager> _partitionToFileManagerMap;
+ private final SegmentNumRowProvider _segmentNumRowProvider;
/**
* Initializes the SegmentProcessorFramework with record readers, config and working directory. We will now rely on
@@ -75,11 +76,11 @@ public class SegmentProcessorFramework {
public SegmentProcessorFramework(List<RecordReader> recordReaders, SegmentProcessorConfig segmentProcessorConfig,
File workingDir)
throws IOException {
- this(segmentProcessorConfig, workingDir, convertRecordReadersToRecordReaderFileConfig(recordReaders));
+ this(segmentProcessorConfig, workingDir, convertRecordReadersToRecordReaderFileConfig(recordReaders), null);
}
public SegmentProcessorFramework(SegmentProcessorConfig segmentProcessorConfig, File workingDir,
- List<RecordReaderFileConfig> recordReaderFileConfigs)
+ List<RecordReaderFileConfig> recordReaderFileConfigs, SegmentNumRowProvider segmentNumRowProvider)
throws IOException {
Preconditions.checkState(!recordReaderFileConfigs.isEmpty(), "No recordReaderFileConfigs provided");
@@ -95,6 +96,9 @@ public class SegmentProcessorFramework {
FileUtils.forceMkdir(_reducerOutputDir);
_segmentsOutputDir = new File(workingDir, "segments_output");
FileUtils.forceMkdir(_segmentsOutputDir);
+
+ _segmentNumRowProvider = (segmentNumRowProvider == null) ? new DefaultSegmentNumRowProvider(
+ segmentProcessorConfig.getSegmentConfig().getMaxNumRecordsPerSegment()) : segmentNumRowProvider;
}
private static List<RecordReaderFileConfig> convertRecordReadersToRecordReaderFileConfig(
@@ -180,7 +184,6 @@ public class SegmentProcessorFramework {
generatorConfig.setSegmentNamePostfix(segmentNamePostfix);
}
- int maxNumRecordsPerSegment = _segmentProcessorConfig.getSegmentConfig().getMaxNumRecordsPerSegment();
int sequenceId = 0;
for (Map.Entry<String, GenericRowFileManager> entry : _partitionToFileManagerMap.entrySet()) {
String partitionId = entry.getKey();
@@ -192,7 +195,9 @@ public class SegmentProcessorFramework {
LOGGER.info("Start creating segments on partition: {}, numRows: {}, numSortFields: {}", partitionId, numRows,
numSortFields);
GenericRowFileRecordReader recordReader = fileReader.getRecordReader();
+ int maxNumRecordsPerSegment;
for (int startRowId = 0; startRowId < numRows; startRowId += maxNumRecordsPerSegment, sequenceId++) {
+ maxNumRecordsPerSegment = _segmentNumRowProvider.getNumRows();
int endRowId = Math.min(startRowId + maxNumRecordsPerSegment, numRows);
LOGGER.info("Start creating segment of sequenceId: {} with row range: {} to {}", sequenceId, startRowId,
endRowId);
@@ -206,6 +211,8 @@ public class SegmentProcessorFramework {
TransformPipeline.getPassThroughPipeline());
driver.build();
outputSegmentDirs.add(driver.getOutputDirectory());
+ _segmentNumRowProvider.updateSegmentInfo(driver.getSegmentStats().getTotalDocCount(),
+ FileUtils.sizeOfDirectory(driver.getOutputDirectory()));
}
} finally {
fileManager.cleanUp();
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
index c0deaf85b5..39b66fa4dd 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
@@ -205,7 +205,8 @@ public class SegmentProcessorFrameworkTest {
SegmentProcessorConfig config =
new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema).build();
- SegmentProcessorFramework framework = new SegmentProcessorFramework(config, workingDir, ImmutableList.of(reader));
+ SegmentProcessorFramework framework = new SegmentProcessorFramework(config, workingDir, ImmutableList.of(reader),
+ null);
List<File> outputSegments = framework.process();
assertEquals(outputSegments.size(), 1);
ImmutableSegment segment = ImmutableSegmentLoader.load(outputSegments.get(0), ReadMode.mmap);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org