You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/11/12 20:13:10 UTC

[incubator-pinot] 11/12: small improve on loading data for virtual column

This is an automated email from the ASF dual-hosted git repository.

jamesshao pushed a commit to branch upsert
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 51684f7ff2d8779b9afba6080b4e19447fb2751e
Author: james Shao <sj...@uber.com>
AuthorDate: Fri Nov 8 13:30:49 2019 -0800

    small improve on loading data for virtual column
    
    Summary: reduce data loading time and GC pressure by removing intermediate arrayList that stores all update event, use iterable that generate new short-live object as needed
    
    Reviewers: tingchen, bzzhang, #streaming_pinot
    
    Reviewed By: bzzhang, #streaming_pinot
    
    Maniphest Tasks: T4441685
    
    Differential Revision: https://code.uberinternal.com/D3572409
---
 pinot-api/pom.xml                                  |  2 +-
 pinot-azure-filesystem/pom.xml                     |  2 +-
 pinot-broker/pom.xml                               |  2 +-
 pinot-common/pom.xml                               |  2 +-
 .../pinot-connector-kafka-0.11/pom.xml             |  2 +-
 pinot-connectors/pinot-connector-kafka-0.9/pom.xml |  2 +-
 pinot-connectors/pom.xml                           |  2 +-
 pinot-controller/pom.xml                           |  2 +-
 pinot-core/pom.xml                                 |  2 +-
 .../pinot/core/indexsegment/UpsertSegment.java     |  3 +-
 .../immutable/ImmutableUpsertSegmentImpl.java      |  7 ++-
 .../mutable/MutableUpsertSegmentImpl.java          |  8 +--
 .../immutable/ImmutableUpsertSegmentImplTest.java  | 10 +++-
 pinot-distribution/pom.xml                         |  2 +-
 pinot-grigio/pinot-grigio-common/pom.xml           |  2 +-
 .../pinot/grigio/common/messages/LogEventType.java |  8 +--
 .../SegmentUpdateLogStorageProvider.java           | 25 +-------
 .../common/storageProvider/UpdateLogEntry.java     |  3 +
 .../common/storageProvider/UpdateLogEntrySet.java  | 69 ++++++++++++++++++++++
 .../storageProvider/UpdateLogStorageExplorer.java  |  7 +--
 .../storageProvider/UpdateLogStorageProvider.java  |  7 +--
 .../SegmentUpdateLogStorageProviderTest.java       | 62 +++++++++++++------
 pinot-grigio/pinot-grigio-coordinator/pom.xml      |  2 +-
 pinot-grigio/pom.xml                               |  2 +-
 pinot-hadoop-filesystem/pom.xml                    |  2 +-
 pinot-hadoop/pom.xml                               |  2 +-
 pinot-integration-tests/pom.xml                    |  2 +-
 pinot-minion/pom.xml                               |  2 +-
 pinot-orc/pom.xml                                  |  2 +-
 pinot-parquet/pom.xml                              |  2 +-
 pinot-perf/pom.xml                                 |  2 +-
 pinot-server/pom.xml                               |  2 +-
 pinot-tools/pom.xml                                |  2 +-
 pinot-transport/pom.xml                            |  2 +-
 pom.xml                                            |  2 +-
 35 files changed, 169 insertions(+), 88 deletions(-)

diff --git a/pinot-api/pom.xml b/pinot-api/pom.xml
index 41391eb..a1b43aa 100644
--- a/pinot-api/pom.xml
+++ b/pinot-api/pom.xml
@@ -24,7 +24,7 @@
   <parent>
     <artifactId>pinot</artifactId>
     <groupId>org.apache.pinot</groupId>
-    <version>0.2.2.5.57-SNAPSHOT</version>
+    <version>0.2.2.5.58-SNAPSHOT</version>
   </parent>
   <artifactId>pinot-api</artifactId>
   <name>Pinot API</name>
diff --git a/pinot-azure-filesystem/pom.xml b/pinot-azure-filesystem/pom.xml
index ce07b85..4f6b42d 100644
--- a/pinot-azure-filesystem/pom.xml
+++ b/pinot-azure-filesystem/pom.xml
@@ -24,7 +24,7 @@
   <parent>
     <artifactId>pinot</artifactId>
     <groupId>org.apache.pinot</groupId>
-    <version>0.2.2.5.57-SNAPSHOT</version>
+    <version>0.2.2.5.58-SNAPSHOT</version>
   </parent>
   <artifactId>pinot-azure-filesystem</artifactId>
   <name>Pinot Azure Filesystem</name>
diff --git a/pinot-broker/pom.xml b/pinot-broker/pom.xml
index 4e92b3f..47402c4 100644
--- a/pinot-broker/pom.xml
+++ b/pinot-broker/pom.xml
@@ -24,7 +24,7 @@
   <parent>
     <artifactId>pinot</artifactId>
     <groupId>org.apache.pinot</groupId>
-    <version>0.2.2.5.57-SNAPSHOT</version>
+    <version>0.2.2.5.58-SNAPSHOT</version>
   </parent>
   <artifactId>pinot-broker</artifactId>
   <name>Pinot Broker</name>
diff --git a/pinot-common/pom.xml b/pinot-common/pom.xml
index 4ce2e32..339905e 100644
--- a/pinot-common/pom.xml
+++ b/pinot-common/pom.xml
@@ -24,7 +24,7 @@
   <parent>
     <artifactId>pinot</artifactId>
     <groupId>org.apache.pinot</groupId>
-    <version>0.2.2.5.57-SNAPSHOT</version>
+    <version>0.2.2.5.58-SNAPSHOT</version>
   </parent>
   <artifactId>pinot-common</artifactId>
   <name>Pinot Common</name>
diff --git a/pinot-connectors/pinot-connector-kafka-0.11/pom.xml b/pinot-connectors/pinot-connector-kafka-0.11/pom.xml
index 5e384d5..aa658ef 100644
--- a/pinot-connectors/pinot-connector-kafka-0.11/pom.xml
+++ b/pinot-connectors/pinot-connector-kafka-0.11/pom.xml
@@ -23,7 +23,7 @@
   <parent>
     <artifactId>pinot-connectors</artifactId>
     <groupId>org.apache.pinot</groupId>
-    <version>0.2.2.5.57-SNAPSHOT</version>
+    <version>0.2.2.5.58-SNAPSHOT</version>
     <relativePath>..</relativePath>
   </parent>
   <modelVersion>4.0.0</modelVersion>
diff --git a/pinot-connectors/pinot-connector-kafka-0.9/pom.xml b/pinot-connectors/pinot-connector-kafka-0.9/pom.xml
index daf74ad..c674d7a 100644
--- a/pinot-connectors/pinot-connector-kafka-0.9/pom.xml
+++ b/pinot-connectors/pinot-connector-kafka-0.9/pom.xml
@@ -23,7 +23,7 @@
   <parent>
     <artifactId>pinot-connectors</artifactId>
     <groupId>org.apache.pinot</groupId>
-    <version>0.2.2.5.57-SNAPSHOT</version>
+    <version>0.2.2.5.58-SNAPSHOT</version>
     <relativePath>..</relativePath>
   </parent>
   <modelVersion>4.0.0</modelVersion>
diff --git a/pinot-connectors/pom.xml b/pinot-connectors/pom.xml
index f4e8763..44b5673 100644
--- a/pinot-connectors/pom.xml
+++ b/pinot-connectors/pom.xml
@@ -24,7 +24,7 @@
   <parent>
     <artifactId>pinot</artifactId>
     <groupId>org.apache.pinot</groupId>
-    <version>0.2.2.5.57-SNAPSHOT</version>
+    <version>0.2.2.5.58-SNAPSHOT</version>
   </parent>
 
   <artifactId>pinot-connectors</artifactId>
diff --git a/pinot-controller/pom.xml b/pinot-controller/pom.xml
index 259300d..be511e3 100644
--- a/pinot-controller/pom.xml
+++ b/pinot-controller/pom.xml
@@ -24,7 +24,7 @@
   <parent>
     <artifactId>pinot</artifactId>
     <groupId>org.apache.pinot</groupId>
-    <version>0.2.2.5.57-SNAPSHOT</version>
+    <version>0.2.2.5.58-SNAPSHOT</version>
   </parent>
   <artifactId>pinot-controller</artifactId>
   <name>Pinot Controller</name>
diff --git a/pinot-core/pom.xml b/pinot-core/pom.xml
index 06309cc..0dbdd63 100644
--- a/pinot-core/pom.xml
+++ b/pinot-core/pom.xml
@@ -24,7 +24,7 @@
   <parent>
     <artifactId>pinot</artifactId>
     <groupId>org.apache.pinot</groupId>
-    <version>0.2.2.5.57-SNAPSHOT</version>
+    <version>0.2.2.5.58-SNAPSHOT</version>
   </parent>
   <artifactId>pinot-core</artifactId>
   <name>Pinot Core</name>
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/UpsertSegment.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/UpsertSegment.java
index 45fe425..125cf06 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/UpsertSegment.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/UpsertSegment.java
@@ -21,7 +21,6 @@ package org.apache.pinot.core.indexsegment;
 import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntry;
 
 import java.io.IOException;
-import java.util.List;
 
 public interface UpsertSegment {
 
@@ -29,7 +28,7 @@ public interface UpsertSegment {
    * update the upsert-related virtual columns with the new values in this list of update logs
    * @param messages list of updates logs to update the virtual columns
    */
-  void updateVirtualColumn(List<UpdateLogEntry> messages);
+  void updateVirtualColumn(Iterable<UpdateLogEntry> messages);
 
   /**
    * get the upsert related virtual column debug info given an offset
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableUpsertSegmentImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableUpsertSegmentImpl.java
index 1537689..5ab42f5 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableUpsertSegmentImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableUpsertSegmentImpl.java
@@ -34,6 +34,7 @@ import org.apache.pinot.core.segment.updater.UpsertWaterMarkManager;
 import org.apache.pinot.core.segment.virtualcolumn.mutable.VirtualColumnLongValueReaderWriter;
 import org.apache.pinot.core.startree.v2.store.StarTreeIndexContainer;
 import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntry;
+import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntrySet;
 import org.apache.pinot.grigio.common.storageProvider.UpdateLogStorageProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -149,8 +150,8 @@ public class ImmutableUpsertSegmentImpl extends ImmutableSegmentImpl implements
   }
 
   @Override
-  public void updateVirtualColumn(List<UpdateLogEntry> logEntryList) {
-    for (UpdateLogEntry logEntry: logEntryList) {
+  public void updateVirtualColumn(Iterable<UpdateLogEntry> logEntries) {
+    for (UpdateLogEntry logEntry: logEntries) {
       boolean updated = false;
       int docId = getDocIdFromSourceOffset(logEntry.getOffset());
       for (VirtualColumnLongValueReaderWriter readerWriter : _virtualColumnsReaderWriter) {
@@ -180,7 +181,7 @@ public class ImmutableUpsertSegmentImpl extends ImmutableSegmentImpl implements
   @Override
   public void initVirtualColumn() throws IOException {
     long start = System.currentTimeMillis();
-    final List<UpdateLogEntry> updateLogEntries = _updateLogStorageProvider.getAllMessages(_tableNameWithType, _segmentName);
+    final UpdateLogEntrySet updateLogEntries = _updateLogStorageProvider.getAllMessages(_tableNameWithType, _segmentName);
     LOGGER.info("load {} update log entry from update log storage provider for segment {} in {} ms",
         updateLogEntries.size(), _segmentName, System.currentTimeMillis() - start);
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableUpsertSegmentImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableUpsertSegmentImpl.java
index b69221d..ba6b6b5 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableUpsertSegmentImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableUpsertSegmentImpl.java
@@ -27,6 +27,7 @@ import org.apache.pinot.core.segment.updater.UpsertWaterMarkManager;
 import org.apache.pinot.core.segment.virtualcolumn.mutable.VirtualColumnLongValueReaderWriter;
 import org.apache.pinot.grigio.common.messages.LogEventType;
 import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntry;
+import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntrySet;
 import org.apache.pinot.grigio.common.storageProvider.UpdateLogStorageProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,7 +45,6 @@ public class MutableUpsertSegmentImpl extends MutableSegmentImpl implements Upse
 
   private final String _kafkaOffsetColumnName;
 
-
   private final List<VirtualColumnLongValueReaderWriter> _mutableSegmentReaderWriters = new ArrayList<>();
   // use map for mapping between kafka offset and docId because we at-most have 1 mutable segment per consumer
   // will use more memory, but we can update this later
@@ -71,8 +71,8 @@ public class MutableUpsertSegmentImpl extends MutableSegmentImpl implements Upse
   }
 
   @Override
-  public synchronized void updateVirtualColumn(List<UpdateLogEntry> logEntryList) {
-    for (UpdateLogEntry logEntry: logEntryList) {
+  public synchronized void updateVirtualColumn(Iterable<UpdateLogEntry> logEntries) {
+    for (UpdateLogEntry logEntry: logEntries) {
       boolean updated = false;
       boolean offsetFound = false;
       Integer docId = _sourceOffsetToDocId.get(logEntry.getOffset());
@@ -126,7 +126,7 @@ public class MutableUpsertSegmentImpl extends MutableSegmentImpl implements Upse
   @Override
   public void initVirtualColumn() throws IOException {
     Preconditions.checkState(_numDocsIndexed == 0, "should init virtual column before ingestion");
-    List<UpdateLogEntry> updateLogEntries = UpdateLogStorageProvider.getInstance().getAllMessages(_tableName, _segmentName);
+    UpdateLogEntrySet updateLogEntries = UpdateLogStorageProvider.getInstance().getAllMessages(_tableName, _segmentName);
     LOGGER.info("got {} update log entries for current segment {}", updateLogEntries.size(), _segmentName);
     // some physical data might have been ingested when we init virtual column, we will go through the normal update
     // flow to ensure we wont miss records
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/immutable/ImmutableUpsertSegmentImplTest.java b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/immutable/ImmutableUpsertSegmentImplTest.java
index 4712625..19bef14 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/immutable/ImmutableUpsertSegmentImplTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/immutable/ImmutableUpsertSegmentImplTest.java
@@ -22,6 +22,7 @@ import org.apache.pinot.core.segment.updater.UpsertWaterMarkManager;
 import org.apache.pinot.core.segment.virtualcolumn.mutable.VirtualColumnLongValueReaderWriter;
 import org.apache.pinot.grigio.common.messages.LogEventType;
 import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntry;
+import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntrySet;
 import org.apache.pinot.grigio.common.storageProvider.UpdateLogStorageProvider;
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
@@ -29,6 +30,7 @@ import org.testng.annotations.Test;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 
 import static org.mockito.ArgumentMatchers.anyString;
@@ -92,7 +94,13 @@ public class ImmutableUpsertSegmentImplTest {
       updateLogEntries.add(new UpdateLogEntry(minOffset + i, 50, LogEventType.INSERT, i%8));
       updateLogEntries.add(new UpdateLogEntry(minOffset + i, 100, LogEventType.DELETE, i%8));
     }
-    when(_mockProvider.getAllMessages(anyString(), anyString())).thenReturn(updateLogEntries);
+    UpdateLogEntrySet entrySet = new UpdateLogEntrySet(null, 2) {
+      @Override
+      public Iterator<UpdateLogEntry> iterator() {
+        return updateLogEntries.iterator();
+      }
+    };
+    when(_mockProvider.getAllMessages(anyString(), anyString())).thenReturn(entrySet);
     System.out.println("run time for set up: " + (System.currentTimeMillis() - start));
 
     start = System.currentTimeMillis();
diff --git a/pinot-distribution/pom.xml b/pinot-distribution/pom.xml
index f55e1a7..12dcd75 100644
--- a/pinot-distribution/pom.xml
+++ b/pinot-distribution/pom.xml
@@ -25,7 +25,7 @@
   <parent>
     <artifactId>pinot</artifactId>
     <groupId>org.apache.pinot</groupId>
-    <version>0.2.2.5.57-SNAPSHOT</version>
+    <version>0.2.2.5.58-SNAPSHOT</version>
   </parent>
   <artifactId>pinot-distribution</artifactId>
   <name>Pinot Distribution</name>
diff --git a/pinot-grigio/pinot-grigio-common/pom.xml b/pinot-grigio/pinot-grigio-common/pom.xml
index 0c98e85..a5d4152 100644
--- a/pinot-grigio/pinot-grigio-common/pom.xml
+++ b/pinot-grigio/pinot-grigio-common/pom.xml
@@ -23,7 +23,7 @@
   <parent>
     <artifactId>pinot-grigio</artifactId>
     <groupId>org.apache.pinot</groupId>
-    <version>0.2.2.5.57-SNAPSHOT</version>
+    <version>0.2.2.5.58-SNAPSHOT</version>
   </parent>
   <modelVersion>4.0.0</modelVersion>
 
diff --git a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/messages/LogEventType.java b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/messages/LogEventType.java
index fa0db46..b4e9605 100644
--- a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/messages/LogEventType.java
+++ b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/messages/LogEventType.java
@@ -18,8 +18,6 @@
  */
 package org.apache.pinot.grigio.common.messages;
 
-import com.google.common.base.Preconditions;
-
 import java.util.HashMap;
 import java.util.Map;
 
@@ -44,9 +42,7 @@ public enum LogEventType {
     return this._uuid;
   }
 
-  public static LogEventType getEventType(long uuid) {
-    int uuidInt = Math.toIntExact(uuid);
-    Preconditions.checkState(UUID_MAP.containsKey(uuidInt));
-    return UUID_MAP.get(uuidInt);
+  public static LogEventType getEventType(int uuid) {
+    return UUID_MAP.get(uuid);
   }
 }
diff --git a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/SegmentUpdateLogStorageProvider.java b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/SegmentUpdateLogStorageProvider.java
index 02f2fba..e7b035f 100644
--- a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/SegmentUpdateLogStorageProvider.java
+++ b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/SegmentUpdateLogStorageProvider.java
@@ -20,8 +20,6 @@ package org.apache.pinot.grigio.common.storageProvider;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import org.apache.pinot.grigio.common.messages.LogEventType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,7 +31,6 @@ import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.file.Files;
 import java.nio.file.Paths;
-import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -57,31 +54,15 @@ public class SegmentUpdateLogStorageProvider {
     _outputStream = new FileOutputStream(_file, true);
   }
 
-  public synchronized List<UpdateLogEntry> readAllMessagesFromFile() throws IOException {
-    long start = System.currentTimeMillis();
-    int insertMessageCount = 0;
-    int deleteMessageCount = 0;
+  public synchronized UpdateLogEntrySet readAllMessagesFromFile() throws IOException {
     int fileLength = (int) _file.length();
     if (fileLength > 0) {
       ByteBuffer buffer = ByteBuffer.allocate(fileLength);
       readFullyFromBeginning(_file, buffer);
       int messageCount = fileLength / UpdateLogEntry.SIZE;
-      List<UpdateLogEntry> logs = new ArrayList<>(messageCount);
-      for (int i = 0; i < messageCount; i++) {
-        UpdateLogEntry logEntry = UpdateLogEntry.fromBytesBuffer(buffer);
-        if ((logEntry.getType() == LogEventType.INSERT)) {
-          insertMessageCount++;
-        } else {
-          deleteMessageCount++;
-        }
-        logs.add(logEntry);
-      }
-      buffer.clear();
-      LOGGER.info("loaded {} message from file, {} insert and {} delete in {} ms", messageCount, insertMessageCount,
-          deleteMessageCount, System.currentTimeMillis() - start);
-      return logs;
+      return new UpdateLogEntrySet(buffer, messageCount);
     } else {
-      return ImmutableList.of();
+      return UpdateLogEntrySet.getEmptySet();
     }
   }
 
diff --git a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogEntry.java b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogEntry.java
index c46f163..2ecc547 100644
--- a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogEntry.java
+++ b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogEntry.java
@@ -74,6 +74,9 @@ public class UpdateLogEntry implements Serializable {
   }
 
   public static UpdateLogEntry fromBytesBuffer(ByteBuffer buffer) {
+    if (buffer == null) {
+      throw new RuntimeException("trying to get update log event from null buffer");
+    }
     return new UpdateLogEntry(buffer.getLong(), buffer.getLong(), LogEventType.getEventType(buffer.getInt()), buffer.getInt());
   }
 
diff --git a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogEntrySet.java b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogEntrySet.java
new file mode 100644
index 0000000..f756d73
--- /dev/null
+++ b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogEntrySet.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.grigio.common.storageProvider;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+
+/**
+ * class for holding the list of update logs we read from files
+ * provide iterator interface for more efficient memory access
+ */
+public class UpdateLogEntrySet implements Iterable<UpdateLogEntry> {
+
+  private final ByteBuffer _buffer;
+  private final int _messageCount;
+  private static final UpdateLogEntrySet EMPTY_LOG_ENTRY_SET = new UpdateLogEntrySet(ByteBuffer.allocate(0),
+      0);
+
+  public UpdateLogEntrySet(ByteBuffer buffer, int messageCount) {
+    _buffer = buffer;
+    _messageCount = messageCount;
+  }
+
+  public int size() {
+    return _messageCount;
+  }
+
+  @Override
+  public Iterator<UpdateLogEntry> iterator() {
+    return new Iterator<UpdateLogEntry>() {
+      @Override
+      public boolean hasNext() {
+        return _buffer != null && _buffer.hasRemaining();
+      }
+
+      @Override
+      public UpdateLogEntry next() {
+        if (!hasNext()) {
+          throw new RuntimeException("no more entries in buffer");
+        }
+        return UpdateLogEntry.fromBytesBuffer(_buffer);
+      }
+    };
+  }
+
+  /**
+   * helper method to create a default empty set in case of invalid/missing input
+   * @return an empty entry set has no data
+   */
+  public static UpdateLogEntrySet getEmptySet() {
+    return EMPTY_LOG_ENTRY_SET;
+  }
+}
diff --git a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogStorageExplorer.java b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogStorageExplorer.java
index 3c9685d..0e38deb 100644
--- a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogStorageExplorer.java
+++ b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogStorageExplorer.java
@@ -26,7 +26,6 @@ import org.apache.commons.configuration.PropertiesConfiguration;
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.List;
 import java.util.Scanner;
 
 /**
@@ -56,10 +55,10 @@ public class UpdateLogStorageExplorer {
     String segmentName = inputSplits[1];
 
     provider.loadTable(tableName);
-    List<UpdateLogEntry> updateLogEntryList = provider.getAllMessages(tableName, segmentName);
+    UpdateLogEntrySet updateLogEntrySet = provider.getAllMessages(tableName, segmentName);
     Multimap<Long, UpdateLogEntry> map = ArrayListMultimap.create();
-    System.out.println("update log size: " + updateLogEntryList.size());
-    updateLogEntryList.forEach(u -> {
+    System.out.println("update log size: " + updateLogEntrySet.size());
+    updateLogEntrySet.forEach(u -> {
       map.put(u.getOffset(), u);
     });
 
diff --git a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogStorageProvider.java b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogStorageProvider.java
index d04b1a2..b9802c2 100644
--- a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogStorageProvider.java
+++ b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogStorageProvider.java
@@ -20,7 +20,6 @@ package org.apache.pinot.grigio.common.storageProvider;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
@@ -142,18 +141,18 @@ public class UpdateLogStorageProvider {
     }
   }
 
-  public List<UpdateLogEntry> getAllMessages(String tableName, String segmentName) throws IOException {
+  public UpdateLogEntrySet getAllMessages(String tableName, String segmentName) throws IOException {
     if (_virtualColumnStorage.containsKey(tableName)) {
       SegmentUpdateLogStorageProvider provider = _virtualColumnStorage.get(tableName).get(segmentName);
       if (provider != null) {
         return provider.readAllMessagesFromFile();
       } else {
         LOGGER.warn("don't have data for segment {}", segmentName);
-        return ImmutableList.of();
+        return UpdateLogEntrySet.getEmptySet();
       }
     } else {
       LOGGER.error("don't have data for table {}", tableName);
-      return ImmutableList.of();
+      return UpdateLogEntrySet.getEmptySet();
     }
   }
 
diff --git a/pinot-grigio/pinot-grigio-common/src/test/java/org/apache/pinot/grigio/common/storageProvider/SegmentUpdateLogStorageProviderTest.java b/pinot-grigio/pinot-grigio-common/src/test/java/org/apache/pinot/grigio/common/storageProvider/SegmentUpdateLogStorageProviderTest.java
index 64d4d3b..219fbc8 100644
--- a/pinot-grigio/pinot-grigio-common/src/test/java/org/apache/pinot/grigio/common/storageProvider/SegmentUpdateLogStorageProviderTest.java
+++ b/pinot-grigio/pinot-grigio-common/src/test/java/org/apache/pinot/grigio/common/storageProvider/SegmentUpdateLogStorageProviderTest.java
@@ -27,6 +27,7 @@ import org.testng.annotations.Test;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
@@ -34,6 +35,7 @@ import java.util.concurrent.Executors;
 
 public class SegmentUpdateLogStorageProviderTest {
 
+  protected volatile UpdateLogEntry entryHolder;
   SegmentUpdateLogStorageProvider provider;
   List<UpdateLogEntry> inputDataList = ImmutableList.of(
       new UpdateLogEntry(1, 2, LogEventType.INSERT, 0),
@@ -49,17 +51,19 @@ public class SegmentUpdateLogStorageProviderTest {
 
   @Test
   public void testWriteAndReadData() throws IOException {
-    List<UpdateLogEntry> logEntryList = provider.readAllMessagesFromFile();
+    UpdateLogEntrySet logEntrySet = provider.readAllMessagesFromFile();
     // new file should have no data
-    Assert.assertEquals(logEntryList.size(), 0);
+    Assert.assertEquals(logEntrySet.size(), 0);
     provider.addData(inputDataList);
 
     SegmentUpdateLogStorageProvider provider1= new SegmentUpdateLogStorageProvider(provider._file);
-    logEntryList = provider1.readAllMessagesFromFile();
-    Assert.assertEquals(logEntryList.size(), inputDataList.size());
-    Assert.assertEquals(logEntryList.get(0), inputDataList.get(0));
-    Assert.assertEquals(logEntryList.get(1), inputDataList.get(1));
-    Assert.assertEquals(logEntryList.get(2), inputDataList.get(2));
+    logEntrySet = provider1.readAllMessagesFromFile();
+    Iterator<UpdateLogEntry> it = logEntrySet.iterator();
+    Assert.assertEquals(logEntrySet.size(), inputDataList.size());
+    Assert.assertEquals(it.next(), inputDataList.get(0));
+    Assert.assertEquals(it.next(), inputDataList.get(1));
+    Assert.assertEquals(it.next(), inputDataList.get(2));
+    Assert.assertFalse(it.hasNext());
     provider.addData(inputDataList);
     Assert.assertEquals(provider.readAllMessagesFromFile().size(), inputDataList.size() * 2);
   }
@@ -72,11 +76,13 @@ public class SegmentUpdateLogStorageProviderTest {
     provider._outputStream.flush();
 
     SegmentUpdateLogStorageProvider provider1 = new SegmentUpdateLogStorageProvider(provider._file);
-    List<UpdateLogEntry> logEntryList = provider1.readAllMessagesFromFile();
-    Assert.assertEquals(logEntryList.size(), inputDataList.size());
-    Assert.assertEquals(logEntryList.get(0), inputDataList.get(0));
-    Assert.assertEquals(logEntryList.get(1), inputDataList.get(1));
-    Assert.assertEquals(logEntryList.get(2), inputDataList.get(2));
+    UpdateLogEntrySet logEntrySet = provider1.readAllMessagesFromFile();
+    Iterator<UpdateLogEntry> it = logEntrySet.iterator();
+    Assert.assertEquals(logEntrySet.size(), inputDataList.size());
+    Assert.assertEquals(it.next(), inputDataList.get(0));
+    Assert.assertEquals(it.next(), inputDataList.get(1));
+    Assert.assertEquals(it.next(), inputDataList.get(2));
+    Assert.assertFalse(it.hasNext());
   }
 
   @Test
@@ -108,18 +114,21 @@ public class SegmentUpdateLogStorageProviderTest {
     });
     service.invokeAll(tasks);
     service.shutdownNow();
-    List<UpdateLogEntry> updateLogEntries = provider.readAllMessagesFromFile();
+    UpdateLogEntrySet updateLogEntries = provider.readAllMessagesFromFile();
     Assert.assertEquals(updateLogEntries.size(), writeIterationCount * inputDataList.size());
+    Iterator<UpdateLogEntry> it = updateLogEntries.iterator();
     for (int i = 0; i < writeIterationCount; i++) {
-      Assert.assertEquals(updateLogEntries.get(i * inputDataList.size()), inputDataList.get(0));
-      Assert.assertEquals(updateLogEntries.get(i * inputDataList.size() + 1), inputDataList.get(1));
-      Assert.assertEquals(updateLogEntries.get(i * inputDataList.size() + 2), inputDataList.get(2));
+      Assert.assertEquals(it.next(), inputDataList.get(0));
+      Assert.assertEquals(it.next(), inputDataList.get(1));
+      Assert.assertEquals(it.next(), inputDataList.get(2));
     }
+    Assert.assertFalse(it.hasNext());
   }
 
   @Test
   public void testReadMesssagePerf() throws IOException {
     int totalMessageCount = 5_000_000;
+    // write a lot of data to file
     List<UpdateLogEntry> inputMessages = new ArrayList<>(totalMessageCount * 2);
     for (int i = 0; i < totalMessageCount; i++) {
       inputMessages.add(new UpdateLogEntry(i, 50, LogEventType.INSERT, i%8));
@@ -128,8 +137,25 @@ public class SegmentUpdateLogStorageProviderTest {
     long start = System.currentTimeMillis();
     provider.addData(inputMessages);
     System.out.println("write data takes ms: " + (System.currentTimeMillis() - start));
+
+    // load data from file to temp object, we don't measure this performance as it depends on disk/computer
+    start = System.currentTimeMillis();
+    UpdateLogEntrySet entrySet = provider.readAllMessagesFromFile();
+    long loadTime = System.currentTimeMillis() - start;
+    System.out.println("load data takes ms: " + loadTime);
+    Assert.assertTrue(entrySet.size() == totalMessageCount * 2);
+
+    // old implementation where we hold the data in array list will take 1000 - 2000 seconds for the data loading
+    // using iterator (current implementation) should make this code finished within 300 - 600 ms.
+    // test accessing those object
     start = System.currentTimeMillis();
-    provider.readAllMessagesFromFile();
-    System.out.println("read data takes ms: " + (System.currentTimeMillis() - start));
+    for (UpdateLogEntry entry: entrySet) {
+      // ensure we hold them in volatile member to force JVM allocate the object and
+      // prevent JIT optimize this part of code away
+      entryHolder = entry;
+    }
+    long readTime = System.currentTimeMillis() - start;
+    Assert.assertTrue(readTime < 1_000L); // this should be relatively fast
+    System.out.println("read data takes ms: " + readTime);
   }
 }
\ No newline at end of file
diff --git a/pinot-grigio/pinot-grigio-coordinator/pom.xml b/pinot-grigio/pinot-grigio-coordinator/pom.xml
index 7ef66ce..8e3c353 100644
--- a/pinot-grigio/pinot-grigio-coordinator/pom.xml
+++ b/pinot-grigio/pinot-grigio-coordinator/pom.xml
@@ -23,7 +23,7 @@
   <parent>
     <artifactId>pinot-grigio</artifactId>
     <groupId>org.apache.pinot</groupId>
-    <version>0.2.2.5.57-SNAPSHOT</version>
+    <version>0.2.2.5.58-SNAPSHOT</version>
   </parent>
   <modelVersion>4.0.0</modelVersion>
   <artifactId>pinot-grigio-coordinator</artifactId>
diff --git a/pinot-grigio/pom.xml b/pinot-grigio/pom.xml
index 2d3a6a7..145912c 100644
--- a/pinot-grigio/pom.xml
+++ b/pinot-grigio/pom.xml
@@ -23,7 +23,7 @@
   <parent>
     <artifactId>pinot</artifactId>
     <groupId>org.apache.pinot</groupId>
-    <version>0.2.2.5.57-SNAPSHOT</version>
+    <version>0.2.2.5.58-SNAPSHOT</version>
   </parent>
   <modelVersion>4.0.0</modelVersion>
   <artifactId>pinot-grigio</artifactId>
diff --git a/pinot-hadoop-filesystem/pom.xml b/pinot-hadoop-filesystem/pom.xml
index 0e0f03d..fa64d3a 100644
--- a/pinot-hadoop-filesystem/pom.xml
+++ b/pinot-hadoop-filesystem/pom.xml
@@ -24,7 +24,7 @@
   <parent>
     <artifactId>pinot</artifactId>
     <groupId>org.apache.pinot</groupId>
-    <version>0.2.2.5.57-SNAPSHOT</version>
+    <version>0.2.2.5.58-SNAPSHOT</version>
   </parent>
   <artifactId>pinot-hadoop-filesystem</artifactId>
   <name>Pinot Hadoop Filesystem</name>
diff --git a/pinot-hadoop/pom.xml b/pinot-hadoop/pom.xml
index 13691d8..967264a 100644
--- a/pinot-hadoop/pom.xml
+++ b/pinot-hadoop/pom.xml
@@ -24,7 +24,7 @@
   <parent>
     <artifactId>pinot</artifactId>
     <groupId>org.apache.pinot</groupId>
-    <version>0.2.2.5.57-SNAPSHOT</version>
+    <version>0.2.2.5.58-SNAPSHOT</version>
   </parent>
   <artifactId>pinot-hadoop</artifactId>
   <name>Pinot Hadoop</name>
diff --git a/pinot-integration-tests/pom.xml b/pinot-integration-tests/pom.xml
index 8ea6a8d..76af408 100644
--- a/pinot-integration-tests/pom.xml
+++ b/pinot-integration-tests/pom.xml
@@ -24,7 +24,7 @@
   <parent>
     <artifactId>pinot</artifactId>
     <groupId>org.apache.pinot</groupId>
-    <version>0.2.2.5.57-SNAPSHOT</version>
+    <version>0.2.2.5.58-SNAPSHOT</version>
   </parent>
   <artifactId>pinot-integration-tests</artifactId>
   <name>Pinot Integration Tests</name>
diff --git a/pinot-minion/pom.xml b/pinot-minion/pom.xml
index 72b1d85..82a34f5 100644
--- a/pinot-minion/pom.xml
+++ b/pinot-minion/pom.xml
@@ -24,7 +24,7 @@
   <parent>
     <artifactId>pinot</artifactId>
     <groupId>org.apache.pinot</groupId>
-    <version>0.2.2.5.57-SNAPSHOT</version>
+    <version>0.2.2.5.58-SNAPSHOT</version>
   </parent>
   <artifactId>pinot-minion</artifactId>
   <name>Pinot Minion</name>
diff --git a/pinot-orc/pom.xml b/pinot-orc/pom.xml
index 9508747..fcc5064 100644
--- a/pinot-orc/pom.xml
+++ b/pinot-orc/pom.xml
@@ -23,7 +23,7 @@
   <parent>
     <artifactId>pinot</artifactId>
     <groupId>org.apache.pinot</groupId>
-    <version>0.2.2.5.57-SNAPSHOT</version>
+    <version>0.2.2.5.58-SNAPSHOT</version>
   </parent>
   <modelVersion>4.0.0</modelVersion>
 
diff --git a/pinot-parquet/pom.xml b/pinot-parquet/pom.xml
index 577e283..956ed02 100644
--- a/pinot-parquet/pom.xml
+++ b/pinot-parquet/pom.xml
@@ -23,7 +23,7 @@
   <parent>
     <artifactId>pinot</artifactId>
     <groupId>org.apache.pinot</groupId>
-    <version>0.2.2.5.57-SNAPSHOT</version>
+    <version>0.2.2.5.58-SNAPSHOT</version>
   </parent>
   <modelVersion>4.0.0</modelVersion>
 
diff --git a/pinot-perf/pom.xml b/pinot-perf/pom.xml
index 704f659..0301af5 100644
--- a/pinot-perf/pom.xml
+++ b/pinot-perf/pom.xml
@@ -24,7 +24,7 @@
   <parent>
     <artifactId>pinot</artifactId>
     <groupId>org.apache.pinot</groupId>
-    <version>0.2.2.5.57-SNAPSHOT</version>
+    <version>0.2.2.5.58-SNAPSHOT</version>
   </parent>
   <artifactId>pinot-perf</artifactId>
   <name>Pinot Perf</name>
diff --git a/pinot-server/pom.xml b/pinot-server/pom.xml
index 0503db3..41039c0 100644
--- a/pinot-server/pom.xml
+++ b/pinot-server/pom.xml
@@ -24,7 +24,7 @@
   <parent>
     <artifactId>pinot</artifactId>
     <groupId>org.apache.pinot</groupId>
-    <version>0.2.2.5.57-SNAPSHOT</version>
+    <version>0.2.2.5.58-SNAPSHOT</version>
   </parent>
   <artifactId>pinot-server</artifactId>
   <name>Pinot Server</name>
diff --git a/pinot-tools/pom.xml b/pinot-tools/pom.xml
index 383cc1f..b6c8cda 100644
--- a/pinot-tools/pom.xml
+++ b/pinot-tools/pom.xml
@@ -24,7 +24,7 @@
   <parent>
     <artifactId>pinot</artifactId>
     <groupId>org.apache.pinot</groupId>
-    <version>0.2.2.5.57-SNAPSHOT</version>
+    <version>0.2.2.5.58-SNAPSHOT</version>
   </parent>
   <artifactId>pinot-tools</artifactId>
   <name>Pinot Tools</name>
diff --git a/pinot-transport/pom.xml b/pinot-transport/pom.xml
index 5359b71..e38eda3 100644
--- a/pinot-transport/pom.xml
+++ b/pinot-transport/pom.xml
@@ -24,7 +24,7 @@
   <parent>
     <artifactId>pinot</artifactId>
     <groupId>org.apache.pinot</groupId>
-    <version>0.2.2.5.57-SNAPSHOT</version>
+    <version>0.2.2.5.58-SNAPSHOT</version>
   </parent>
   <artifactId>pinot-transport</artifactId>
   <name>Pinot Transport</name>
diff --git a/pom.xml b/pom.xml
index 66af82b..9d81542 100644
--- a/pom.xml
+++ b/pom.xml
@@ -30,7 +30,7 @@
 
   <groupId>org.apache.pinot</groupId>
   <artifactId>pinot</artifactId>
-  <version>0.2.2.5.57-SNAPSHOT</version>
+  <version>0.2.2.5.58-SNAPSHOT</version>
   <packaging>pom</packaging>
   <name>Pinot</name>
   <description>A realtime distributed OLAP datastore</description>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org