You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/05/19 02:50:06 UTC

[incubator-pinot] branch master updated: Implement getRecord for immutableSegmentImpl (#6924)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9becc57  Implement getRecord for immutableSegmentImpl (#6924)
9becc57 is described below

commit 9becc57eec981d71d5b45af5da7b720840d18f17
Author: deemoliu <qi...@uber.com>
AuthorDate: Tue May 18 19:49:46 2021 -0700

    Implement getRecord for immutableSegmentImpl (#6924)
---
 .../immutable/ImmutableSegmentImpl.java             | 21 +++++++++++++++++++--
 .../segment/readers/PinotSegmentRecordReader.java   | 12 +++++++++++-
 2 files changed, 30 insertions(+), 3 deletions(-)

diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
index 63d58fc..b8a3062 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
@@ -29,6 +29,7 @@ import org.apache.pinot.segment.local.segment.index.datasource.ImmutableDataSour
 import org.apache.pinot.segment.local.segment.index.metadata.ColumnMetadata;
 import org.apache.pinot.segment.local.segment.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.local.segment.index.readers.ValidDocIndexReaderImpl;
+import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
 import org.apache.pinot.segment.local.segment.store.SegmentDirectory;
 import org.apache.pinot.segment.local.startree.v2.store.StarTreeIndexContainer;
 import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
@@ -57,6 +58,7 @@ public class ImmutableSegmentImpl implements ImmutableSegment {
   private PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
   private ThreadSafeMutableRoaringBitmap _validDocIds;
   private ValidDocIndexReader _validDocIndex;
+  private PinotSegmentRecordReader _pinotSegmentRecordReader;
 
   public ImmutableSegmentImpl(SegmentDirectory segmentDirectory, SegmentMetadataImpl segmentMetadata,
       Map<String, ColumnIndexContainer> columnIndexContainerMap,
@@ -155,6 +157,13 @@ public class ImmutableSegmentImpl implements ImmutableSegment {
     if (_partitionUpsertMetadataManager != null) {
       _partitionUpsertMetadataManager.removeSegment(segmentName, _validDocIds);
     }
+    if (_pinotSegmentRecordReader != null) {
+      try {
+        _pinotSegmentRecordReader.close();
+      } catch (IOException e) {
+        LOGGER.error("Failed to close record reader. Continuing with error.", e);
+      }
+    }
   }
 
   @Override
@@ -170,8 +179,16 @@ public class ImmutableSegmentImpl implements ImmutableSegment {
 
   @Override
   public GenericRow getRecord(int docId, GenericRow reuse) {
-    // NOTE: Use PinotSegmentRecordReader to read immutable segment
-    throw new UnsupportedOperationException();
+    try {
+      if (_pinotSegmentRecordReader == null) {
+        _pinotSegmentRecordReader = new PinotSegmentRecordReader();
+        _pinotSegmentRecordReader.init(this);
+      }
+      _pinotSegmentRecordReader.getRecord(reuse, docId);
+      return reuse;
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to use PinotSegmentRecordReader to read immutable segment");
+    }
   }
 
   public Map<String, ColumnIndexContainer> getIndexContainerMap() {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReader.java
index 24a2752..1d3d438 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReader.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReader.java
@@ -28,6 +28,7 @@ import javax.annotation.Nullable;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
 import org.apache.pinot.segment.local.segment.readers.sort.PinotSegmentSorter;
+import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.MutableSegment;
 import org.apache.pinot.spi.data.Schema;
@@ -121,6 +122,15 @@ public class PinotSegmentRecordReader implements RecordReader {
   }
 
   /**
+   * Initializes the record reader from an immutable segment.
+   *
+   * @param immutableSegment Immutable segment
+   */
+  public void init(ImmutableSegment immutableSegment) {
+    init(immutableSegment, false, null, null, null, false);
+  }
+
+  /**
    * Initializes the record reader from a mutable segment.
    * NOTE: The mutable segment should have already finished consumption and ready to be sealed. In order to read records
    *       from consuming segment, use {@link MutableSegment#getRecord(int, GenericRow)} instead.
@@ -208,7 +218,7 @@ public class PinotSegmentRecordReader implements RecordReader {
     return reuse;
   }
 
-  private void getRecord(GenericRow reuse, int docId) {
+  public void getRecord(GenericRow reuse, int docId) {
     for (Map.Entry<String, PinotSegmentColumnReader> entry : _columnReaderMap.entrySet()) {
       String column = entry.getKey();
       PinotSegmentColumnReader columnReader = entry.getValue();

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org