You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/11/12 20:13:10 UTC
[incubator-pinot] 11/12: small improve on loading data for virtual
column
This is an automated email from the ASF dual-hosted git repository.
jamesshao pushed a commit to branch upsert
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 51684f7ff2d8779b9afba6080b4e19447fb2751e
Author: james Shao <sj...@uber.com>
AuthorDate: Fri Nov 8 13:30:49 2019 -0800
small improve on loading data for virtual column
Summary: reduce data loading time and GC pressure by removing intermediate arrayList that stores all update event, use iterable that generate new short-live object as needed
Reviewers: tingchen, bzzhang, #streaming_pinot
Reviewed By: bzzhang, #streaming_pinot
Maniphest Tasks: T4441685
Differential Revision: https://code.uberinternal.com/D3572409
---
pinot-api/pom.xml | 2 +-
pinot-azure-filesystem/pom.xml | 2 +-
pinot-broker/pom.xml | 2 +-
pinot-common/pom.xml | 2 +-
.../pinot-connector-kafka-0.11/pom.xml | 2 +-
pinot-connectors/pinot-connector-kafka-0.9/pom.xml | 2 +-
pinot-connectors/pom.xml | 2 +-
pinot-controller/pom.xml | 2 +-
pinot-core/pom.xml | 2 +-
.../pinot/core/indexsegment/UpsertSegment.java | 3 +-
.../immutable/ImmutableUpsertSegmentImpl.java | 7 ++-
.../mutable/MutableUpsertSegmentImpl.java | 8 +--
.../immutable/ImmutableUpsertSegmentImplTest.java | 10 +++-
pinot-distribution/pom.xml | 2 +-
pinot-grigio/pinot-grigio-common/pom.xml | 2 +-
.../pinot/grigio/common/messages/LogEventType.java | 8 +--
.../SegmentUpdateLogStorageProvider.java | 25 +-------
.../common/storageProvider/UpdateLogEntry.java | 3 +
.../common/storageProvider/UpdateLogEntrySet.java | 69 ++++++++++++++++++++++
.../storageProvider/UpdateLogStorageExplorer.java | 7 +--
.../storageProvider/UpdateLogStorageProvider.java | 7 +--
.../SegmentUpdateLogStorageProviderTest.java | 62 +++++++++++++------
pinot-grigio/pinot-grigio-coordinator/pom.xml | 2 +-
pinot-grigio/pom.xml | 2 +-
pinot-hadoop-filesystem/pom.xml | 2 +-
pinot-hadoop/pom.xml | 2 +-
pinot-integration-tests/pom.xml | 2 +-
pinot-minion/pom.xml | 2 +-
pinot-orc/pom.xml | 2 +-
pinot-parquet/pom.xml | 2 +-
pinot-perf/pom.xml | 2 +-
pinot-server/pom.xml | 2 +-
pinot-tools/pom.xml | 2 +-
pinot-transport/pom.xml | 2 +-
pom.xml | 2 +-
35 files changed, 169 insertions(+), 88 deletions(-)
diff --git a/pinot-api/pom.xml b/pinot-api/pom.xml
index 41391eb..a1b43aa 100644
--- a/pinot-api/pom.xml
+++ b/pinot-api/pom.xml
@@ -24,7 +24,7 @@
<parent>
<artifactId>pinot</artifactId>
<groupId>org.apache.pinot</groupId>
- <version>0.2.2.5.57-SNAPSHOT</version>
+ <version>0.2.2.5.58-SNAPSHOT</version>
</parent>
<artifactId>pinot-api</artifactId>
<name>Pinot API</name>
diff --git a/pinot-azure-filesystem/pom.xml b/pinot-azure-filesystem/pom.xml
index ce07b85..4f6b42d 100644
--- a/pinot-azure-filesystem/pom.xml
+++ b/pinot-azure-filesystem/pom.xml
@@ -24,7 +24,7 @@
<parent>
<artifactId>pinot</artifactId>
<groupId>org.apache.pinot</groupId>
- <version>0.2.2.5.57-SNAPSHOT</version>
+ <version>0.2.2.5.58-SNAPSHOT</version>
</parent>
<artifactId>pinot-azure-filesystem</artifactId>
<name>Pinot Azure Filesystem</name>
diff --git a/pinot-broker/pom.xml b/pinot-broker/pom.xml
index 4e92b3f..47402c4 100644
--- a/pinot-broker/pom.xml
+++ b/pinot-broker/pom.xml
@@ -24,7 +24,7 @@
<parent>
<artifactId>pinot</artifactId>
<groupId>org.apache.pinot</groupId>
- <version>0.2.2.5.57-SNAPSHOT</version>
+ <version>0.2.2.5.58-SNAPSHOT</version>
</parent>
<artifactId>pinot-broker</artifactId>
<name>Pinot Broker</name>
diff --git a/pinot-common/pom.xml b/pinot-common/pom.xml
index 4ce2e32..339905e 100644
--- a/pinot-common/pom.xml
+++ b/pinot-common/pom.xml
@@ -24,7 +24,7 @@
<parent>
<artifactId>pinot</artifactId>
<groupId>org.apache.pinot</groupId>
- <version>0.2.2.5.57-SNAPSHOT</version>
+ <version>0.2.2.5.58-SNAPSHOT</version>
</parent>
<artifactId>pinot-common</artifactId>
<name>Pinot Common</name>
diff --git a/pinot-connectors/pinot-connector-kafka-0.11/pom.xml b/pinot-connectors/pinot-connector-kafka-0.11/pom.xml
index 5e384d5..aa658ef 100644
--- a/pinot-connectors/pinot-connector-kafka-0.11/pom.xml
+++ b/pinot-connectors/pinot-connector-kafka-0.11/pom.xml
@@ -23,7 +23,7 @@
<parent>
<artifactId>pinot-connectors</artifactId>
<groupId>org.apache.pinot</groupId>
- <version>0.2.2.5.57-SNAPSHOT</version>
+ <version>0.2.2.5.58-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git a/pinot-connectors/pinot-connector-kafka-0.9/pom.xml b/pinot-connectors/pinot-connector-kafka-0.9/pom.xml
index daf74ad..c674d7a 100644
--- a/pinot-connectors/pinot-connector-kafka-0.9/pom.xml
+++ b/pinot-connectors/pinot-connector-kafka-0.9/pom.xml
@@ -23,7 +23,7 @@
<parent>
<artifactId>pinot-connectors</artifactId>
<groupId>org.apache.pinot</groupId>
- <version>0.2.2.5.57-SNAPSHOT</version>
+ <version>0.2.2.5.58-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git a/pinot-connectors/pom.xml b/pinot-connectors/pom.xml
index f4e8763..44b5673 100644
--- a/pinot-connectors/pom.xml
+++ b/pinot-connectors/pom.xml
@@ -24,7 +24,7 @@
<parent>
<artifactId>pinot</artifactId>
<groupId>org.apache.pinot</groupId>
- <version>0.2.2.5.57-SNAPSHOT</version>
+ <version>0.2.2.5.58-SNAPSHOT</version>
</parent>
<artifactId>pinot-connectors</artifactId>
diff --git a/pinot-controller/pom.xml b/pinot-controller/pom.xml
index 259300d..be511e3 100644
--- a/pinot-controller/pom.xml
+++ b/pinot-controller/pom.xml
@@ -24,7 +24,7 @@
<parent>
<artifactId>pinot</artifactId>
<groupId>org.apache.pinot</groupId>
- <version>0.2.2.5.57-SNAPSHOT</version>
+ <version>0.2.2.5.58-SNAPSHOT</version>
</parent>
<artifactId>pinot-controller</artifactId>
<name>Pinot Controller</name>
diff --git a/pinot-core/pom.xml b/pinot-core/pom.xml
index 06309cc..0dbdd63 100644
--- a/pinot-core/pom.xml
+++ b/pinot-core/pom.xml
@@ -24,7 +24,7 @@
<parent>
<artifactId>pinot</artifactId>
<groupId>org.apache.pinot</groupId>
- <version>0.2.2.5.57-SNAPSHOT</version>
+ <version>0.2.2.5.58-SNAPSHOT</version>
</parent>
<artifactId>pinot-core</artifactId>
<name>Pinot Core</name>
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/UpsertSegment.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/UpsertSegment.java
index 45fe425..125cf06 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/UpsertSegment.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/UpsertSegment.java
@@ -21,7 +21,6 @@ package org.apache.pinot.core.indexsegment;
import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntry;
import java.io.IOException;
-import java.util.List;
public interface UpsertSegment {
@@ -29,7 +28,7 @@ public interface UpsertSegment {
* update the upsert-related virtual columns with the new values in this list of update logs
* @param messages list of updates logs to update the virtual columns
*/
- void updateVirtualColumn(List<UpdateLogEntry> messages);
+ void updateVirtualColumn(Iterable<UpdateLogEntry> messages);
/**
* get the upsert related virtual column debug info given an offset
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableUpsertSegmentImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableUpsertSegmentImpl.java
index 1537689..5ab42f5 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableUpsertSegmentImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableUpsertSegmentImpl.java
@@ -34,6 +34,7 @@ import org.apache.pinot.core.segment.updater.UpsertWaterMarkManager;
import org.apache.pinot.core.segment.virtualcolumn.mutable.VirtualColumnLongValueReaderWriter;
import org.apache.pinot.core.startree.v2.store.StarTreeIndexContainer;
import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntry;
+import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntrySet;
import org.apache.pinot.grigio.common.storageProvider.UpdateLogStorageProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -149,8 +150,8 @@ public class ImmutableUpsertSegmentImpl extends ImmutableSegmentImpl implements
}
@Override
- public void updateVirtualColumn(List<UpdateLogEntry> logEntryList) {
- for (UpdateLogEntry logEntry: logEntryList) {
+ public void updateVirtualColumn(Iterable<UpdateLogEntry> logEntries) {
+ for (UpdateLogEntry logEntry: logEntries) {
boolean updated = false;
int docId = getDocIdFromSourceOffset(logEntry.getOffset());
for (VirtualColumnLongValueReaderWriter readerWriter : _virtualColumnsReaderWriter) {
@@ -180,7 +181,7 @@ public class ImmutableUpsertSegmentImpl extends ImmutableSegmentImpl implements
@Override
public void initVirtualColumn() throws IOException {
long start = System.currentTimeMillis();
- final List<UpdateLogEntry> updateLogEntries = _updateLogStorageProvider.getAllMessages(_tableNameWithType, _segmentName);
+ final UpdateLogEntrySet updateLogEntries = _updateLogStorageProvider.getAllMessages(_tableNameWithType, _segmentName);
LOGGER.info("load {} update log entry from update log storage provider for segment {} in {} ms",
updateLogEntries.size(), _segmentName, System.currentTimeMillis() - start);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableUpsertSegmentImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableUpsertSegmentImpl.java
index b69221d..ba6b6b5 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableUpsertSegmentImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableUpsertSegmentImpl.java
@@ -27,6 +27,7 @@ import org.apache.pinot.core.segment.updater.UpsertWaterMarkManager;
import org.apache.pinot.core.segment.virtualcolumn.mutable.VirtualColumnLongValueReaderWriter;
import org.apache.pinot.grigio.common.messages.LogEventType;
import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntry;
+import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntrySet;
import org.apache.pinot.grigio.common.storageProvider.UpdateLogStorageProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,7 +45,6 @@ public class MutableUpsertSegmentImpl extends MutableSegmentImpl implements Upse
private final String _kafkaOffsetColumnName;
-
private final List<VirtualColumnLongValueReaderWriter> _mutableSegmentReaderWriters = new ArrayList<>();
// use map for mapping between kafka offset and docId because we at-most have 1 mutable segment per consumer
// will use more memory, but we can update this later
@@ -71,8 +71,8 @@ public class MutableUpsertSegmentImpl extends MutableSegmentImpl implements Upse
}
@Override
- public synchronized void updateVirtualColumn(List<UpdateLogEntry> logEntryList) {
- for (UpdateLogEntry logEntry: logEntryList) {
+ public synchronized void updateVirtualColumn(Iterable<UpdateLogEntry> logEntries) {
+ for (UpdateLogEntry logEntry: logEntries) {
boolean updated = false;
boolean offsetFound = false;
Integer docId = _sourceOffsetToDocId.get(logEntry.getOffset());
@@ -126,7 +126,7 @@ public class MutableUpsertSegmentImpl extends MutableSegmentImpl implements Upse
@Override
public void initVirtualColumn() throws IOException {
Preconditions.checkState(_numDocsIndexed == 0, "should init virtual column before ingestion");
- List<UpdateLogEntry> updateLogEntries = UpdateLogStorageProvider.getInstance().getAllMessages(_tableName, _segmentName);
+ UpdateLogEntrySet updateLogEntries = UpdateLogStorageProvider.getInstance().getAllMessages(_tableName, _segmentName);
LOGGER.info("got {} update log entries for current segment {}", updateLogEntries.size(), _segmentName);
// some physical data might have been ingested when we init virtual column, we will go through the normal update
// flow to ensure we wont miss records
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/immutable/ImmutableUpsertSegmentImplTest.java b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/immutable/ImmutableUpsertSegmentImplTest.java
index 4712625..19bef14 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/immutable/ImmutableUpsertSegmentImplTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/immutable/ImmutableUpsertSegmentImplTest.java
@@ -22,6 +22,7 @@ import org.apache.pinot.core.segment.updater.UpsertWaterMarkManager;
import org.apache.pinot.core.segment.virtualcolumn.mutable.VirtualColumnLongValueReaderWriter;
import org.apache.pinot.grigio.common.messages.LogEventType;
import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntry;
+import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntrySet;
import org.apache.pinot.grigio.common.storageProvider.UpdateLogStorageProvider;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
@@ -29,6 +30,7 @@ import org.testng.annotations.Test;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import static org.mockito.ArgumentMatchers.anyString;
@@ -92,7 +94,13 @@ public class ImmutableUpsertSegmentImplTest {
updateLogEntries.add(new UpdateLogEntry(minOffset + i, 50, LogEventType.INSERT, i%8));
updateLogEntries.add(new UpdateLogEntry(minOffset + i, 100, LogEventType.DELETE, i%8));
}
- when(_mockProvider.getAllMessages(anyString(), anyString())).thenReturn(updateLogEntries);
+ UpdateLogEntrySet entrySet = new UpdateLogEntrySet(null, 2) {
+ @Override
+ public Iterator<UpdateLogEntry> iterator() {
+ return updateLogEntries.iterator();
+ }
+ };
+ when(_mockProvider.getAllMessages(anyString(), anyString())).thenReturn(entrySet);
System.out.println("run time for set up: " + (System.currentTimeMillis() - start));
start = System.currentTimeMillis();
diff --git a/pinot-distribution/pom.xml b/pinot-distribution/pom.xml
index f55e1a7..12dcd75 100644
--- a/pinot-distribution/pom.xml
+++ b/pinot-distribution/pom.xml
@@ -25,7 +25,7 @@
<parent>
<artifactId>pinot</artifactId>
<groupId>org.apache.pinot</groupId>
- <version>0.2.2.5.57-SNAPSHOT</version>
+ <version>0.2.2.5.58-SNAPSHOT</version>
</parent>
<artifactId>pinot-distribution</artifactId>
<name>Pinot Distribution</name>
diff --git a/pinot-grigio/pinot-grigio-common/pom.xml b/pinot-grigio/pinot-grigio-common/pom.xml
index 0c98e85..a5d4152 100644
--- a/pinot-grigio/pinot-grigio-common/pom.xml
+++ b/pinot-grigio/pinot-grigio-common/pom.xml
@@ -23,7 +23,7 @@
<parent>
<artifactId>pinot-grigio</artifactId>
<groupId>org.apache.pinot</groupId>
- <version>0.2.2.5.57-SNAPSHOT</version>
+ <version>0.2.2.5.58-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/messages/LogEventType.java b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/messages/LogEventType.java
index fa0db46..b4e9605 100644
--- a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/messages/LogEventType.java
+++ b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/messages/LogEventType.java
@@ -18,8 +18,6 @@
*/
package org.apache.pinot.grigio.common.messages;
-import com.google.common.base.Preconditions;
-
import java.util.HashMap;
import java.util.Map;
@@ -44,9 +42,7 @@ public enum LogEventType {
return this._uuid;
}
- public static LogEventType getEventType(long uuid) {
- int uuidInt = Math.toIntExact(uuid);
- Preconditions.checkState(UUID_MAP.containsKey(uuidInt));
- return UUID_MAP.get(uuidInt);
+ public static LogEventType getEventType(int uuid) {
+ return UUID_MAP.get(uuid);
}
}
diff --git a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/SegmentUpdateLogStorageProvider.java b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/SegmentUpdateLogStorageProvider.java
index 02f2fba..e7b035f 100644
--- a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/SegmentUpdateLogStorageProvider.java
+++ b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/SegmentUpdateLogStorageProvider.java
@@ -20,8 +20,6 @@ package org.apache.pinot.grigio.common.storageProvider;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import org.apache.pinot.grigio.common.messages.LogEventType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,7 +31,6 @@ import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Paths;
-import java.util.ArrayList;
import java.util.List;
/**
@@ -57,31 +54,15 @@ public class SegmentUpdateLogStorageProvider {
_outputStream = new FileOutputStream(_file, true);
}
- public synchronized List<UpdateLogEntry> readAllMessagesFromFile() throws IOException {
- long start = System.currentTimeMillis();
- int insertMessageCount = 0;
- int deleteMessageCount = 0;
+ public synchronized UpdateLogEntrySet readAllMessagesFromFile() throws IOException {
int fileLength = (int) _file.length();
if (fileLength > 0) {
ByteBuffer buffer = ByteBuffer.allocate(fileLength);
readFullyFromBeginning(_file, buffer);
int messageCount = fileLength / UpdateLogEntry.SIZE;
- List<UpdateLogEntry> logs = new ArrayList<>(messageCount);
- for (int i = 0; i < messageCount; i++) {
- UpdateLogEntry logEntry = UpdateLogEntry.fromBytesBuffer(buffer);
- if ((logEntry.getType() == LogEventType.INSERT)) {
- insertMessageCount++;
- } else {
- deleteMessageCount++;
- }
- logs.add(logEntry);
- }
- buffer.clear();
- LOGGER.info("loaded {} message from file, {} insert and {} delete in {} ms", messageCount, insertMessageCount,
- deleteMessageCount, System.currentTimeMillis() - start);
- return logs;
+ return new UpdateLogEntrySet(buffer, messageCount);
} else {
- return ImmutableList.of();
+ return UpdateLogEntrySet.getEmptySet();
}
}
diff --git a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogEntry.java b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogEntry.java
index c46f163..2ecc547 100644
--- a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogEntry.java
+++ b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogEntry.java
@@ -74,6 +74,9 @@ public class UpdateLogEntry implements Serializable {
}
public static UpdateLogEntry fromBytesBuffer(ByteBuffer buffer) {
+ if (buffer == null) {
+ throw new RuntimeException("trying to get update log event from null buffer");
+ }
return new UpdateLogEntry(buffer.getLong(), buffer.getLong(), LogEventType.getEventType(buffer.getInt()), buffer.getInt());
}
diff --git a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogEntrySet.java b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogEntrySet.java
new file mode 100644
index 0000000..f756d73
--- /dev/null
+++ b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogEntrySet.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.grigio.common.storageProvider;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+
+/**
+ * class for holding the list of update logs we read from files
+ * provide iterator interface for more efficient memory access
+ */
+public class UpdateLogEntrySet implements Iterable<UpdateLogEntry> {
+
+ private final ByteBuffer _buffer;
+ private final int _messageCount;
+ private static final UpdateLogEntrySet EMPTY_LOG_ENTRY_SET = new UpdateLogEntrySet(ByteBuffer.allocate(0),
+ 0);
+
+ public UpdateLogEntrySet(ByteBuffer buffer, int messageCount) {
+ _buffer = buffer;
+ _messageCount = messageCount;
+ }
+
+ public int size() {
+ return _messageCount;
+ }
+
+ @Override
+ public Iterator<UpdateLogEntry> iterator() {
+ return new Iterator<UpdateLogEntry>() {
+ @Override
+ public boolean hasNext() {
+ return _buffer != null && _buffer.hasRemaining();
+ }
+
+ @Override
+ public UpdateLogEntry next() {
+ if (!hasNext()) {
+ throw new RuntimeException("no more entries in buffer");
+ }
+ return UpdateLogEntry.fromBytesBuffer(_buffer);
+ }
+ };
+ }
+
+ /**
+ * helper method to create a default empty set in case of invalid/missing input
+ * @return an empty entry set has no data
+ */
+ public static UpdateLogEntrySet getEmptySet() {
+ return EMPTY_LOG_ENTRY_SET;
+ }
+}
diff --git a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogStorageExplorer.java b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogStorageExplorer.java
index 3c9685d..0e38deb 100644
--- a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogStorageExplorer.java
+++ b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogStorageExplorer.java
@@ -26,7 +26,6 @@ import org.apache.commons.configuration.PropertiesConfiguration;
import java.io.IOException;
import java.util.Collection;
-import java.util.List;
import java.util.Scanner;
/**
@@ -56,10 +55,10 @@ public class UpdateLogStorageExplorer {
String segmentName = inputSplits[1];
provider.loadTable(tableName);
- List<UpdateLogEntry> updateLogEntryList = provider.getAllMessages(tableName, segmentName);
+ UpdateLogEntrySet updateLogEntrySet = provider.getAllMessages(tableName, segmentName);
Multimap<Long, UpdateLogEntry> map = ArrayListMultimap.create();
- System.out.println("update log size: " + updateLogEntryList.size());
- updateLogEntryList.forEach(u -> {
+ System.out.println("update log size: " + updateLogEntrySet.size());
+ updateLogEntrySet.forEach(u -> {
map.put(u.getOffset(), u);
});
diff --git a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogStorageProvider.java b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogStorageProvider.java
index d04b1a2..b9802c2 100644
--- a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogStorageProvider.java
+++ b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogStorageProvider.java
@@ -20,7 +20,6 @@ package org.apache.pinot.grigio.common.storageProvider;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
@@ -142,18 +141,18 @@ public class UpdateLogStorageProvider {
}
}
- public List<UpdateLogEntry> getAllMessages(String tableName, String segmentName) throws IOException {
+ public UpdateLogEntrySet getAllMessages(String tableName, String segmentName) throws IOException {
if (_virtualColumnStorage.containsKey(tableName)) {
SegmentUpdateLogStorageProvider provider = _virtualColumnStorage.get(tableName).get(segmentName);
if (provider != null) {
return provider.readAllMessagesFromFile();
} else {
LOGGER.warn("don't have data for segment {}", segmentName);
- return ImmutableList.of();
+ return UpdateLogEntrySet.getEmptySet();
}
} else {
LOGGER.error("don't have data for table {}", tableName);
- return ImmutableList.of();
+ return UpdateLogEntrySet.getEmptySet();
}
}
diff --git a/pinot-grigio/pinot-grigio-common/src/test/java/org/apache/pinot/grigio/common/storageProvider/SegmentUpdateLogStorageProviderTest.java b/pinot-grigio/pinot-grigio-common/src/test/java/org/apache/pinot/grigio/common/storageProvider/SegmentUpdateLogStorageProviderTest.java
index 64d4d3b..219fbc8 100644
--- a/pinot-grigio/pinot-grigio-common/src/test/java/org/apache/pinot/grigio/common/storageProvider/SegmentUpdateLogStorageProviderTest.java
+++ b/pinot-grigio/pinot-grigio-common/src/test/java/org/apache/pinot/grigio/common/storageProvider/SegmentUpdateLogStorageProviderTest.java
@@ -27,6 +27,7 @@ import org.testng.annotations.Test;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
@@ -34,6 +35,7 @@ import java.util.concurrent.Executors;
public class SegmentUpdateLogStorageProviderTest {
+ protected volatile UpdateLogEntry entryHolder;
SegmentUpdateLogStorageProvider provider;
List<UpdateLogEntry> inputDataList = ImmutableList.of(
new UpdateLogEntry(1, 2, LogEventType.INSERT, 0),
@@ -49,17 +51,19 @@ public class SegmentUpdateLogStorageProviderTest {
@Test
public void testWriteAndReadData() throws IOException {
- List<UpdateLogEntry> logEntryList = provider.readAllMessagesFromFile();
+ UpdateLogEntrySet logEntrySet = provider.readAllMessagesFromFile();
// new file should have no data
- Assert.assertEquals(logEntryList.size(), 0);
+ Assert.assertEquals(logEntrySet.size(), 0);
provider.addData(inputDataList);
SegmentUpdateLogStorageProvider provider1= new SegmentUpdateLogStorageProvider(provider._file);
- logEntryList = provider1.readAllMessagesFromFile();
- Assert.assertEquals(logEntryList.size(), inputDataList.size());
- Assert.assertEquals(logEntryList.get(0), inputDataList.get(0));
- Assert.assertEquals(logEntryList.get(1), inputDataList.get(1));
- Assert.assertEquals(logEntryList.get(2), inputDataList.get(2));
+ logEntrySet = provider1.readAllMessagesFromFile();
+ Iterator<UpdateLogEntry> it = logEntrySet.iterator();
+ Assert.assertEquals(logEntrySet.size(), inputDataList.size());
+ Assert.assertEquals(it.next(), inputDataList.get(0));
+ Assert.assertEquals(it.next(), inputDataList.get(1));
+ Assert.assertEquals(it.next(), inputDataList.get(2));
+ Assert.assertFalse(it.hasNext());
provider.addData(inputDataList);
Assert.assertEquals(provider.readAllMessagesFromFile().size(), inputDataList.size() * 2);
}
@@ -72,11 +76,13 @@ public class SegmentUpdateLogStorageProviderTest {
provider._outputStream.flush();
SegmentUpdateLogStorageProvider provider1 = new SegmentUpdateLogStorageProvider(provider._file);
- List<UpdateLogEntry> logEntryList = provider1.readAllMessagesFromFile();
- Assert.assertEquals(logEntryList.size(), inputDataList.size());
- Assert.assertEquals(logEntryList.get(0), inputDataList.get(0));
- Assert.assertEquals(logEntryList.get(1), inputDataList.get(1));
- Assert.assertEquals(logEntryList.get(2), inputDataList.get(2));
+ UpdateLogEntrySet logEntrySet = provider1.readAllMessagesFromFile();
+ Iterator<UpdateLogEntry> it = logEntrySet.iterator();
+ Assert.assertEquals(logEntrySet.size(), inputDataList.size());
+ Assert.assertEquals(it.next(), inputDataList.get(0));
+ Assert.assertEquals(it.next(), inputDataList.get(1));
+ Assert.assertEquals(it.next(), inputDataList.get(2));
+ Assert.assertFalse(it.hasNext());
}
@Test
@@ -108,18 +114,21 @@ public class SegmentUpdateLogStorageProviderTest {
});
service.invokeAll(tasks);
service.shutdownNow();
- List<UpdateLogEntry> updateLogEntries = provider.readAllMessagesFromFile();
+ UpdateLogEntrySet updateLogEntries = provider.readAllMessagesFromFile();
Assert.assertEquals(updateLogEntries.size(), writeIterationCount * inputDataList.size());
+ Iterator<UpdateLogEntry> it = updateLogEntries.iterator();
for (int i = 0; i < writeIterationCount; i++) {
- Assert.assertEquals(updateLogEntries.get(i * inputDataList.size()), inputDataList.get(0));
- Assert.assertEquals(updateLogEntries.get(i * inputDataList.size() + 1), inputDataList.get(1));
- Assert.assertEquals(updateLogEntries.get(i * inputDataList.size() + 2), inputDataList.get(2));
+ Assert.assertEquals(it.next(), inputDataList.get(0));
+ Assert.assertEquals(it.next(), inputDataList.get(1));
+ Assert.assertEquals(it.next(), inputDataList.get(2));
}
+ Assert.assertFalse(it.hasNext());
}
@Test
public void testReadMesssagePerf() throws IOException {
int totalMessageCount = 5_000_000;
+ // write a lot of data to file
List<UpdateLogEntry> inputMessages = new ArrayList<>(totalMessageCount * 2);
for (int i = 0; i < totalMessageCount; i++) {
inputMessages.add(new UpdateLogEntry(i, 50, LogEventType.INSERT, i%8));
@@ -128,8 +137,25 @@ public class SegmentUpdateLogStorageProviderTest {
long start = System.currentTimeMillis();
provider.addData(inputMessages);
System.out.println("write data takes ms: " + (System.currentTimeMillis() - start));
+
+ // load data from file to temp object, we don't measure this performance as it depends on disk/computer
+ start = System.currentTimeMillis();
+ UpdateLogEntrySet entrySet = provider.readAllMessagesFromFile();
+ long loadTime = System.currentTimeMillis() - start;
+ System.out.println("load data takes ms: " + loadTime);
+ Assert.assertTrue(entrySet.size() == totalMessageCount * 2);
+
+ // old implementation where we hold the data in array list will take 1000 - 2000 seconds for the data loading
+ // using iterator (current implementation) should make this code finished within 300 - 600 ms.
+ // test accessing those object
start = System.currentTimeMillis();
- provider.readAllMessagesFromFile();
- System.out.println("read data takes ms: " + (System.currentTimeMillis() - start));
+ for (UpdateLogEntry entry: entrySet) {
+ // ensure we hold them in volatile member to force JVM allocate the object and
+ // prevent JIT optimize this part of code away
+ entryHolder = entry;
+ }
+ long readTime = System.currentTimeMillis() - start;
+ Assert.assertTrue(readTime < 1_000L); // this should be relatively fast
+ System.out.println("read data takes ms: " + readTime);
}
}
\ No newline at end of file
diff --git a/pinot-grigio/pinot-grigio-coordinator/pom.xml b/pinot-grigio/pinot-grigio-coordinator/pom.xml
index 7ef66ce..8e3c353 100644
--- a/pinot-grigio/pinot-grigio-coordinator/pom.xml
+++ b/pinot-grigio/pinot-grigio-coordinator/pom.xml
@@ -23,7 +23,7 @@
<parent>
<artifactId>pinot-grigio</artifactId>
<groupId>org.apache.pinot</groupId>
- <version>0.2.2.5.57-SNAPSHOT</version>
+ <version>0.2.2.5.58-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>pinot-grigio-coordinator</artifactId>
diff --git a/pinot-grigio/pom.xml b/pinot-grigio/pom.xml
index 2d3a6a7..145912c 100644
--- a/pinot-grigio/pom.xml
+++ b/pinot-grigio/pom.xml
@@ -23,7 +23,7 @@
<parent>
<artifactId>pinot</artifactId>
<groupId>org.apache.pinot</groupId>
- <version>0.2.2.5.57-SNAPSHOT</version>
+ <version>0.2.2.5.58-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>pinot-grigio</artifactId>
diff --git a/pinot-hadoop-filesystem/pom.xml b/pinot-hadoop-filesystem/pom.xml
index 0e0f03d..fa64d3a 100644
--- a/pinot-hadoop-filesystem/pom.xml
+++ b/pinot-hadoop-filesystem/pom.xml
@@ -24,7 +24,7 @@
<parent>
<artifactId>pinot</artifactId>
<groupId>org.apache.pinot</groupId>
- <version>0.2.2.5.57-SNAPSHOT</version>
+ <version>0.2.2.5.58-SNAPSHOT</version>
</parent>
<artifactId>pinot-hadoop-filesystem</artifactId>
<name>Pinot Hadoop Filesystem</name>
diff --git a/pinot-hadoop/pom.xml b/pinot-hadoop/pom.xml
index 13691d8..967264a 100644
--- a/pinot-hadoop/pom.xml
+++ b/pinot-hadoop/pom.xml
@@ -24,7 +24,7 @@
<parent>
<artifactId>pinot</artifactId>
<groupId>org.apache.pinot</groupId>
- <version>0.2.2.5.57-SNAPSHOT</version>
+ <version>0.2.2.5.58-SNAPSHOT</version>
</parent>
<artifactId>pinot-hadoop</artifactId>
<name>Pinot Hadoop</name>
diff --git a/pinot-integration-tests/pom.xml b/pinot-integration-tests/pom.xml
index 8ea6a8d..76af408 100644
--- a/pinot-integration-tests/pom.xml
+++ b/pinot-integration-tests/pom.xml
@@ -24,7 +24,7 @@
<parent>
<artifactId>pinot</artifactId>
<groupId>org.apache.pinot</groupId>
- <version>0.2.2.5.57-SNAPSHOT</version>
+ <version>0.2.2.5.58-SNAPSHOT</version>
</parent>
<artifactId>pinot-integration-tests</artifactId>
<name>Pinot Integration Tests</name>
diff --git a/pinot-minion/pom.xml b/pinot-minion/pom.xml
index 72b1d85..82a34f5 100644
--- a/pinot-minion/pom.xml
+++ b/pinot-minion/pom.xml
@@ -24,7 +24,7 @@
<parent>
<artifactId>pinot</artifactId>
<groupId>org.apache.pinot</groupId>
- <version>0.2.2.5.57-SNAPSHOT</version>
+ <version>0.2.2.5.58-SNAPSHOT</version>
</parent>
<artifactId>pinot-minion</artifactId>
<name>Pinot Minion</name>
diff --git a/pinot-orc/pom.xml b/pinot-orc/pom.xml
index 9508747..fcc5064 100644
--- a/pinot-orc/pom.xml
+++ b/pinot-orc/pom.xml
@@ -23,7 +23,7 @@
<parent>
<artifactId>pinot</artifactId>
<groupId>org.apache.pinot</groupId>
- <version>0.2.2.5.57-SNAPSHOT</version>
+ <version>0.2.2.5.58-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git a/pinot-parquet/pom.xml b/pinot-parquet/pom.xml
index 577e283..956ed02 100644
--- a/pinot-parquet/pom.xml
+++ b/pinot-parquet/pom.xml
@@ -23,7 +23,7 @@
<parent>
<artifactId>pinot</artifactId>
<groupId>org.apache.pinot</groupId>
- <version>0.2.2.5.57-SNAPSHOT</version>
+ <version>0.2.2.5.58-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git a/pinot-perf/pom.xml b/pinot-perf/pom.xml
index 704f659..0301af5 100644
--- a/pinot-perf/pom.xml
+++ b/pinot-perf/pom.xml
@@ -24,7 +24,7 @@
<parent>
<artifactId>pinot</artifactId>
<groupId>org.apache.pinot</groupId>
- <version>0.2.2.5.57-SNAPSHOT</version>
+ <version>0.2.2.5.58-SNAPSHOT</version>
</parent>
<artifactId>pinot-perf</artifactId>
<name>Pinot Perf</name>
diff --git a/pinot-server/pom.xml b/pinot-server/pom.xml
index 0503db3..41039c0 100644
--- a/pinot-server/pom.xml
+++ b/pinot-server/pom.xml
@@ -24,7 +24,7 @@
<parent>
<artifactId>pinot</artifactId>
<groupId>org.apache.pinot</groupId>
- <version>0.2.2.5.57-SNAPSHOT</version>
+ <version>0.2.2.5.58-SNAPSHOT</version>
</parent>
<artifactId>pinot-server</artifactId>
<name>Pinot Server</name>
diff --git a/pinot-tools/pom.xml b/pinot-tools/pom.xml
index 383cc1f..b6c8cda 100644
--- a/pinot-tools/pom.xml
+++ b/pinot-tools/pom.xml
@@ -24,7 +24,7 @@
<parent>
<artifactId>pinot</artifactId>
<groupId>org.apache.pinot</groupId>
- <version>0.2.2.5.57-SNAPSHOT</version>
+ <version>0.2.2.5.58-SNAPSHOT</version>
</parent>
<artifactId>pinot-tools</artifactId>
<name>Pinot Tools</name>
diff --git a/pinot-transport/pom.xml b/pinot-transport/pom.xml
index 5359b71..e38eda3 100644
--- a/pinot-transport/pom.xml
+++ b/pinot-transport/pom.xml
@@ -24,7 +24,7 @@
<parent>
<artifactId>pinot</artifactId>
<groupId>org.apache.pinot</groupId>
- <version>0.2.2.5.57-SNAPSHOT</version>
+ <version>0.2.2.5.58-SNAPSHOT</version>
</parent>
<artifactId>pinot-transport</artifactId>
<name>Pinot Transport</name>
diff --git a/pom.xml b/pom.xml
index 66af82b..9d81542 100644
--- a/pom.xml
+++ b/pom.xml
@@ -30,7 +30,7 @@
<groupId>org.apache.pinot</groupId>
<artifactId>pinot</artifactId>
- <version>0.2.2.5.57-SNAPSHOT</version>
+ <version>0.2.2.5.58-SNAPSHOT</version>
<packaging>pom</packaging>
<name>Pinot</name>
<description>A realtime distributed OLAP datastore</description>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org