You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by mc...@apache.org on 2018/12/12 04:48:52 UTC

[incubator-pinot] 01/01: Fix segment merge command.

This is an automated email from the ASF dual-hosted git repository.

mcvsubbu pushed a commit to branch segment-merge-fix
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 333833a9d6d16ffa4c233009fc56da2647e17d0e
Author: Subbu Subramaniam <ss...@linkedin.com>
AuthorDate: Tue Dec 11 20:44:53 2018 -0800

    Fix segment merge command.
    
    Some old pinot segments were allowed with 0 documents. Considering those documents for start/end time
    causes minStartTime to become 0, an invalid value if auto-naming of segments is chosen.
    
    Also, the multi-reader did not allow for the fact that some segments may be smaller than others, so
    we need to iterate through all segments before throwing an exception.
---
 .../readers/MultiplePinotSegmentRecordReader.java  | 13 +++++-----
 .../segment/converter/SegmentMergeCommand.java     | 28 ++++++++++++++--------
 2 files changed, 24 insertions(+), 17 deletions(-)

diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/data/readers/MultiplePinotSegmentRecordReader.java b/pinot-core/src/main/java/com/linkedin/pinot/core/data/readers/MultiplePinotSegmentRecordReader.java
index 3b41af1..5a7f142 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/data/readers/MultiplePinotSegmentRecordReader.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/data/readers/MultiplePinotSegmentRecordReader.java
@@ -147,15 +147,14 @@ public class MultiplePinotSegmentRecordReader implements RecordReader {
       return reuse;
     } else {
       // If there is no sorted column specified, simply concatenate the segments
-      PinotSegmentRecordReader currentReader = _pinotSegmentRecordReaders.get(_currentReaderId);
-      if (!currentReader.hasNext()) {
-        _currentReaderId++;
-        if (_currentReaderId >= _pinotSegmentRecordReaders.size()) {
-          throw new RuntimeException("next is called after reading all data");
+      for (int i = 0; i < _pinotSegmentRecordReaders.size();
+          i++, _currentReaderId = (_currentReaderId + 1) % _pinotSegmentRecordReaders.size()) {
+        PinotSegmentRecordReader currentReader = _pinotSegmentRecordReaders.get(_currentReaderId);
+        if (currentReader.hasNext()) {
+          return currentReader.next(reuse);
         }
-        currentReader = _pinotSegmentRecordReaders.get(_currentReaderId);
       }
-      return currentReader.next(reuse);
+      throw new RuntimeException("next is called after reading all data");
     }
   }
 
diff --git a/pinot-tools/src/main/java/com/linkedin/pinot/tools/segment/converter/SegmentMergeCommand.java b/pinot-tools/src/main/java/com/linkedin/pinot/tools/segment/converter/SegmentMergeCommand.java
index 205231e..0575684 100644
--- a/pinot-tools/src/main/java/com/linkedin/pinot/tools/segment/converter/SegmentMergeCommand.java
+++ b/pinot-tools/src/main/java/com/linkedin/pinot/tools/segment/converter/SegmentMergeCommand.java
@@ -21,6 +21,7 @@ import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 
 import com.google.common.base.Preconditions;
@@ -155,18 +156,25 @@ public class SegmentMergeCommand extends AbstractBaseAdminCommand implements Com
       long minStartTime = Long.MAX_VALUE;
       long maxEndTime = Long.MIN_VALUE;
       long totalNumDocsBeforeMerge = 0L;
-      for (File indexDir : inputIndexDirs) {
+      Iterator<File> it = inputIndexDirs.iterator();
+      while (it.hasNext()) {
+        File indexDir = it.next();
         SegmentMetadata segmentMetadata = new SegmentMetadataImpl(indexDir);
-        long currentStartTime = segmentMetadata.getStartTime();
-        if (currentStartTime < minStartTime) {
-          minStartTime = currentStartTime;
-        }
-
-        long currentEndTime = segmentMetadata.getEndTime();
-        if (currentEndTime > maxEndTime) {
-          maxEndTime = currentEndTime;
+        if (segmentMetadata.getTotalDocs() > 0) {
+          long currentStartTime = segmentMetadata.getStartTime();
+          if (currentStartTime < minStartTime) {
+            minStartTime = currentStartTime;
+          }
+
+          long currentEndTime = segmentMetadata.getEndTime();
+          if (currentEndTime > maxEndTime) {
+            maxEndTime = currentEndTime;
+          }
+          totalNumDocsBeforeMerge += segmentMetadata.getTotalDocs();
+        } else {
+          LOGGER.info("Discarding segment {} since it has 0 records", segmentMetadata.getName());
+          it.remove();
         }
-        totalNumDocsBeforeMerge += segmentMetadata.getTotalDocs();
       }
 
       // Compute segment name if it is not specified


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org