You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by mc...@apache.org on 2018/12/12 04:48:52 UTC
[incubator-pinot] 01/01: Fix segment merge command.
This is an automated email from the ASF dual-hosted git repository.
mcvsubbu pushed a commit to branch segment-merge-fix
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 333833a9d6d16ffa4c233009fc56da2647e17d0e
Author: Subbu Subramaniam <ss...@linkedin.com>
AuthorDate: Tue Dec 11 20:44:53 2018 -0800
Fix segment merge command.
Some old pinot segments were allowed with 0 documents. Considering those documents for start/end time
causes minStartTime to become 0, an invalid value if auto-naming of segments is chosen.
Also, the multi-reader did not allow for the fact that some segments may be smaller than others, so
we need to iterate through all segments before throwing an exception.
---
.../readers/MultiplePinotSegmentRecordReader.java | 13 +++++-----
.../segment/converter/SegmentMergeCommand.java | 28 ++++++++++++++--------
2 files changed, 24 insertions(+), 17 deletions(-)
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/data/readers/MultiplePinotSegmentRecordReader.java b/pinot-core/src/main/java/com/linkedin/pinot/core/data/readers/MultiplePinotSegmentRecordReader.java
index 3b41af1..5a7f142 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/data/readers/MultiplePinotSegmentRecordReader.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/data/readers/MultiplePinotSegmentRecordReader.java
@@ -147,15 +147,14 @@ public class MultiplePinotSegmentRecordReader implements RecordReader {
return reuse;
} else {
// If there is no sorted column specified, simply concatenate the segments
- PinotSegmentRecordReader currentReader = _pinotSegmentRecordReaders.get(_currentReaderId);
- if (!currentReader.hasNext()) {
- _currentReaderId++;
- if (_currentReaderId >= _pinotSegmentRecordReaders.size()) {
- throw new RuntimeException("next is called after reading all data");
+ for (int i = 0; i < _pinotSegmentRecordReaders.size();
+ i++, _currentReaderId = (_currentReaderId + 1) % _pinotSegmentRecordReaders.size()) {
+ PinotSegmentRecordReader currentReader = _pinotSegmentRecordReaders.get(_currentReaderId);
+ if (currentReader.hasNext()) {
+ return currentReader.next(reuse);
}
- currentReader = _pinotSegmentRecordReaders.get(_currentReaderId);
}
- return currentReader.next(reuse);
+ throw new RuntimeException("next is called after reading all data");
}
}
diff --git a/pinot-tools/src/main/java/com/linkedin/pinot/tools/segment/converter/SegmentMergeCommand.java b/pinot-tools/src/main/java/com/linkedin/pinot/tools/segment/converter/SegmentMergeCommand.java
index 205231e..0575684 100644
--- a/pinot-tools/src/main/java/com/linkedin/pinot/tools/segment/converter/SegmentMergeCommand.java
+++ b/pinot-tools/src/main/java/com/linkedin/pinot/tools/segment/converter/SegmentMergeCommand.java
@@ -21,6 +21,7 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import com.google.common.base.Preconditions;
@@ -155,18 +156,25 @@ public class SegmentMergeCommand extends AbstractBaseAdminCommand implements Com
long minStartTime = Long.MAX_VALUE;
long maxEndTime = Long.MIN_VALUE;
long totalNumDocsBeforeMerge = 0L;
- for (File indexDir : inputIndexDirs) {
+ Iterator<File> it = inputIndexDirs.iterator();
+ while (it.hasNext()) {
+ File indexDir = it.next();
SegmentMetadata segmentMetadata = new SegmentMetadataImpl(indexDir);
- long currentStartTime = segmentMetadata.getStartTime();
- if (currentStartTime < minStartTime) {
- minStartTime = currentStartTime;
- }
-
- long currentEndTime = segmentMetadata.getEndTime();
- if (currentEndTime > maxEndTime) {
- maxEndTime = currentEndTime;
+ if (segmentMetadata.getTotalDocs() > 0) {
+ long currentStartTime = segmentMetadata.getStartTime();
+ if (currentStartTime < minStartTime) {
+ minStartTime = currentStartTime;
+ }
+
+ long currentEndTime = segmentMetadata.getEndTime();
+ if (currentEndTime > maxEndTime) {
+ maxEndTime = currentEndTime;
+ }
+ totalNumDocsBeforeMerge += segmentMetadata.getTotalDocs();
+ } else {
+ LOGGER.info("Discarding segment {} since it has 0 records", segmentMetadata.getName());
+ it.remove();
}
- totalNumDocsBeforeMerge += segmentMetadata.getTotalDocs();
}
// Compute segment name if it is not specified
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org