You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/11/12 20:13:09 UTC
[incubator-pinot] 10/12: improve pinot server upsert segment
loading time
This is an automated email from the ASF dual-hosted git repository.
jamesshao pushed a commit to branch upsert
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 7c8cc38a0aec511e2afb41d7ef9d1cdf1c3174d6
Author: james Shao <sj...@uber.com>
AuthorDate: Mon Nov 11 13:47:16 2019 -0800
improve pinot server upsert segment loading time
Summary:
previously pinot server spent too much time on loading virtual column for upsert segment (10+ seconds for segment with 5M updates), we did some investigation and found issues related to how we load the virtual columns. The major issues boils down to:
1. for each message, we provide an message to Preconditions.checkState(...) with String.format(..). This cause java to construct a new string with String.format for every update events and it is very costly
2. various synchronization that are no longer necessary
3. various minor improvement by picking correct data types/optimization
after optimization the time for init virtual column should be < 0.5 seconds for segments with 5M updates. The overall segments loading time should be within 1-2 seconds (including loading virtual column file from disk and others related work)
Reviewers: bzzhang, tingchen, #streaming_pinot
Reviewed By: bzzhang, #streaming_pinot
Maniphest Tasks: T4441685
Differential Revision: https://code.uberinternal.com/D3571549
---
pinot-api/pom.xml | 2 +-
pinot-azure-filesystem/pom.xml | 2 +-
pinot-broker/pom.xml | 2 +-
pinot-common/pom.xml | 2 +-
.../pinot-connector-kafka-0.11/pom.xml | 2 +-
pinot-connectors/pinot-connector-kafka-0.9/pom.xml | 2 +-
pinot-connectors/pom.xml | 2 +-
pinot-controller/pom.xml | 2 +-
pinot-core/pom.xml | 2 +-
.../immutable/ImmutableUpsertSegmentImpl.java | 119 +++++++++++++--------
.../segment/updater/UpsertWaterMarkManager.java | 13 ++-
.../ValidFromInMemoryVirtualColumnProvider.java | 3 +-
.../ValidUntilInMemoryVirtualColumnProvider.java | 3 +-
.../VirtualColumnLongValueReaderWriter.java | 28 +++--
.../immutable/ImmutableUpsertSegmentImplTest.java | 118 ++++++++++++++++++++
pinot-distribution/pom.xml | 2 +-
pinot-grigio/pinot-grigio-common/pom.xml | 2 +-
.../pinot/grigio/common/metrics/GrigioMeter.java | 1 -
.../SegmentUpdateLogStorageProvider.java | 7 +-
.../storageProvider/UpdateLogStorageProvider.java | 1 -
.../SegmentUpdateLogStorageProviderTest.java | 15 +++
pinot-grigio/pinot-grigio-coordinator/pom.xml | 2 +-
pinot-grigio/pom.xml | 2 +-
pinot-hadoop-filesystem/pom.xml | 2 +-
pinot-hadoop/pom.xml | 2 +-
pinot-integration-tests/pom.xml | 2 +-
pinot-minion/pom.xml | 2 +-
pinot-orc/pom.xml | 2 +-
pinot-parquet/pom.xml | 2 +-
pinot-perf/pom.xml | 2 +-
pinot-server/pom.xml | 2 +-
pinot-tools/pom.xml | 2 +-
pinot-transport/pom.xml | 2 +-
pom.xml | 2 +-
34 files changed, 262 insertions(+), 94 deletions(-)
diff --git a/pinot-api/pom.xml b/pinot-api/pom.xml
index 8763b4b..41391eb 100644
--- a/pinot-api/pom.xml
+++ b/pinot-api/pom.xml
@@ -24,7 +24,7 @@
<parent>
<artifactId>pinot</artifactId>
<groupId>org.apache.pinot</groupId>
- <version>0.2.2.5.49-SNAPSHOT</version>
+ <version>0.2.2.5.57-SNAPSHOT</version>
</parent>
<artifactId>pinot-api</artifactId>
<name>Pinot API</name>
diff --git a/pinot-azure-filesystem/pom.xml b/pinot-azure-filesystem/pom.xml
index e42b355..ce07b85 100644
--- a/pinot-azure-filesystem/pom.xml
+++ b/pinot-azure-filesystem/pom.xml
@@ -24,7 +24,7 @@
<parent>
<artifactId>pinot</artifactId>
<groupId>org.apache.pinot</groupId>
- <version>0.2.2.5.49-SNAPSHOT</version>
+ <version>0.2.2.5.57-SNAPSHOT</version>
</parent>
<artifactId>pinot-azure-filesystem</artifactId>
<name>Pinot Azure Filesystem</name>
diff --git a/pinot-broker/pom.xml b/pinot-broker/pom.xml
index d625e2d..4e92b3f 100644
--- a/pinot-broker/pom.xml
+++ b/pinot-broker/pom.xml
@@ -24,7 +24,7 @@
<parent>
<artifactId>pinot</artifactId>
<groupId>org.apache.pinot</groupId>
- <version>0.2.2.5.49-SNAPSHOT</version>
+ <version>0.2.2.5.57-SNAPSHOT</version>
</parent>
<artifactId>pinot-broker</artifactId>
<name>Pinot Broker</name>
diff --git a/pinot-common/pom.xml b/pinot-common/pom.xml
index cde3078..4ce2e32 100644
--- a/pinot-common/pom.xml
+++ b/pinot-common/pom.xml
@@ -24,7 +24,7 @@
<parent>
<artifactId>pinot</artifactId>
<groupId>org.apache.pinot</groupId>
- <version>0.2.2.5.49-SNAPSHOT</version>
+ <version>0.2.2.5.57-SNAPSHOT</version>
</parent>
<artifactId>pinot-common</artifactId>
<name>Pinot Common</name>
diff --git a/pinot-connectors/pinot-connector-kafka-0.11/pom.xml b/pinot-connectors/pinot-connector-kafka-0.11/pom.xml
index 0b32fe0..5e384d5 100644
--- a/pinot-connectors/pinot-connector-kafka-0.11/pom.xml
+++ b/pinot-connectors/pinot-connector-kafka-0.11/pom.xml
@@ -23,7 +23,7 @@
<parent>
<artifactId>pinot-connectors</artifactId>
<groupId>org.apache.pinot</groupId>
- <version>0.2.2.5.49-SNAPSHOT</version>
+ <version>0.2.2.5.57-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git a/pinot-connectors/pinot-connector-kafka-0.9/pom.xml b/pinot-connectors/pinot-connector-kafka-0.9/pom.xml
index f409a8f..daf74ad 100644
--- a/pinot-connectors/pinot-connector-kafka-0.9/pom.xml
+++ b/pinot-connectors/pinot-connector-kafka-0.9/pom.xml
@@ -23,7 +23,7 @@
<parent>
<artifactId>pinot-connectors</artifactId>
<groupId>org.apache.pinot</groupId>
- <version>0.2.2.5.49-SNAPSHOT</version>
+ <version>0.2.2.5.57-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git a/pinot-connectors/pom.xml b/pinot-connectors/pom.xml
index b2cfd90..f4e8763 100644
--- a/pinot-connectors/pom.xml
+++ b/pinot-connectors/pom.xml
@@ -24,7 +24,7 @@
<parent>
<artifactId>pinot</artifactId>
<groupId>org.apache.pinot</groupId>
- <version>0.2.2.5.49-SNAPSHOT</version>
+ <version>0.2.2.5.57-SNAPSHOT</version>
</parent>
<artifactId>pinot-connectors</artifactId>
diff --git a/pinot-controller/pom.xml b/pinot-controller/pom.xml
index cf1e1f3..259300d 100644
--- a/pinot-controller/pom.xml
+++ b/pinot-controller/pom.xml
@@ -24,7 +24,7 @@
<parent>
<artifactId>pinot</artifactId>
<groupId>org.apache.pinot</groupId>
- <version>0.2.2.5.49-SNAPSHOT</version>
+ <version>0.2.2.5.57-SNAPSHOT</version>
</parent>
<artifactId>pinot-controller</artifactId>
<name>Pinot Controller</name>
diff --git a/pinot-core/pom.xml b/pinot-core/pom.xml
index 141249d..06309cc 100644
--- a/pinot-core/pom.xml
+++ b/pinot-core/pom.xml
@@ -24,7 +24,7 @@
<parent>
<artifactId>pinot</artifactId>
<groupId>org.apache.pinot</groupId>
- <version>0.2.2.5.49-SNAPSHOT</version>
+ <version>0.2.2.5.57-SNAPSHOT</version>
</parent>
<artifactId>pinot-core</artifactId>
<name>Pinot Core</name>
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableUpsertSegmentImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableUpsertSegmentImpl.java
index 80723b4..1537689 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableUpsertSegmentImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableUpsertSegmentImpl.java
@@ -19,10 +19,9 @@
package org.apache.pinot.core.indexsegment.immutable;
import com.clearspring.analytics.util.Preconditions;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Multimap;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.pinot.common.Utils;
import org.apache.pinot.common.config.TableNameBuilder;
-import org.apache.pinot.common.data.Schema;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.core.indexsegment.UpsertSegment;
import org.apache.pinot.core.io.reader.BaseSingleColumnSingleValueReader;
@@ -42,7 +41,6 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -51,19 +49,19 @@ public class ImmutableUpsertSegmentImpl extends ImmutableSegmentImpl implements
private static final Logger LOGGER = LoggerFactory.getLogger(ImmutableUpsertSegmentImpl.class);
- private final List<VirtualColumnLongValueReaderWriter> _virtualColumnsReaderWriter = new ArrayList<>();
-
+ private final List<VirtualColumnLongValueReaderWriter> _virtualColumnsReaderWriter;
private final String _tableNameWithType;
private final String _segmentName;
- private final Schema _schema;
private final int _totalDoc;
- private final ColumnIndexContainer _offsetColumnIndexContainer;
- private final UpsertWaterMarkManager _upsertWaterMarkManager;
private long _minSourceOffset;
+ private final UpsertWaterMarkManager _upsertWaterMarkManager;
+ private final UpdateLogStorageProvider _updateLogStorageProvider;
// use array for mapping bewteen offset to docId, where actual offset = min_offset + array_index
// use 4 bytes per record
private int[] _sourceOffsetToDocIdArray;
+ private static final int DEFAULT_DOC_ID_FOR_MISSING_ENTRY = -1;
+
public ImmutableUpsertSegmentImpl(SegmentDirectory segmentDirectory,
SegmentMetadataImpl segmentMetadata,
Map<String, ColumnIndexContainer> columnIndexContainerMap,
@@ -73,23 +71,44 @@ public class ImmutableUpsertSegmentImpl extends ImmutableSegmentImpl implements
_tableNameWithType = TableNameBuilder.ensureTableNameWithType(segmentMetadata.getTableName(),
CommonConstants.Helix.TableType.REALTIME);
_segmentName = segmentMetadata.getName();
- _schema = segmentMetadata.getSchema();
_totalDoc = segmentMetadata.getTotalDocs();
- _offsetColumnIndexContainer = columnIndexContainerMap.get(_schema.getOffsetKey());
_upsertWaterMarkManager = UpsertWaterMarkManager.getInstance();
+ _updateLogStorageProvider = UpdateLogStorageProvider.getInstance();
+ _virtualColumnsReaderWriter = new ArrayList<>();
for (Map.Entry<String, ColumnIndexContainer> entry: columnIndexContainerMap.entrySet()) {
String columnName = entry.getKey();
ColumnIndexContainer container = entry.getValue();
- if (_schema.isVirtualColumn(columnName) && (container.getForwardIndex() instanceof VirtualColumnLongValueReaderWriter)) {
+ if (segmentMetadata.getSchema().isVirtualColumn(columnName) && (container.getForwardIndex() instanceof VirtualColumnLongValueReaderWriter)) {
_virtualColumnsReaderWriter.add((VirtualColumnLongValueReaderWriter) container.getForwardIndex());
}
}
- buildOffsetToDocIdMap();
+ buildOffsetToDocIdMap(columnIndexContainerMap.get(segmentMetadata.getSchema().getOffsetKey()));
}
- private void buildOffsetToDocIdMap() {
- final DataFileReader reader = _offsetColumnIndexContainer.getForwardIndex();
- final Dictionary dictionary = _offsetColumnIndexContainer.getDictionary();
+ /** constructor used for creating instance in test cases
+ * should not be used for creating regular segment
+ */
+ @VisibleForTesting
+ protected ImmutableUpsertSegmentImpl(List<VirtualColumnLongValueReaderWriter> readerWriters,
+ int totalDoc, UpsertWaterMarkManager manager,
+ UpdateLogStorageProvider updateLogStorageProvider,
+ long minSourceOffset, int[] offsetToDocId) {
+ super(null, null, null, null);
+ _tableNameWithType = "testTable";
+ _segmentName = "testSegment";
+ _virtualColumnsReaderWriter = readerWriters;
+ _totalDoc = totalDoc;
+ _upsertWaterMarkManager = manager;
+ _updateLogStorageProvider = updateLogStorageProvider;
+ _minSourceOffset = minSourceOffset;
+ _sourceOffsetToDocIdArray = offsetToDocId;
+
+ }
+
+ private void buildOffsetToDocIdMap(ColumnIndexContainer offsetColumnIndexContainer) {
+ long start = System.currentTimeMillis();
+ final DataFileReader reader = offsetColumnIndexContainer.getForwardIndex();
+ final Dictionary dictionary = offsetColumnIndexContainer.getDictionary();
Map<Long, Integer> kafkaOffsetToDocIdMap = new HashMap<>();
long minOffset = Long.MAX_VALUE;
long maxOffset = 0;
@@ -114,15 +133,13 @@ public class ImmutableUpsertSegmentImpl extends ImmutableSegmentImpl implements
int size = Math.toIntExact(maxOffset - minOffset + 1);
_sourceOffsetToDocIdArray = new int[size];
for (int i = 0; i < size; i++) {
- if (kafkaOffsetToDocIdMap.containsKey(i + minOffset)) {
- _sourceOffsetToDocIdArray[i] = kafkaOffsetToDocIdMap.get(i + minOffset);
- } else {
- _sourceOffsetToDocIdArray[i] = -1;
- }
+ _sourceOffsetToDocIdArray[i] = kafkaOffsetToDocIdMap.getOrDefault(i + minOffset,
+ DEFAULT_DOC_ID_FOR_MISSING_ENTRY);
}
} else {
throw new RuntimeException("unexpected forward reader type for kafka offset column " + reader.getClass());
}
+ LOGGER.info("built offset to DocId map for segment {} with {} documents in {} ms", _segmentName, _totalDoc, System.currentTimeMillis() - start);
}
public static ImmutableUpsertSegmentImpl copyOf(ImmutableSegmentImpl immutableSegment) {
@@ -163,32 +180,48 @@ public class ImmutableUpsertSegmentImpl extends ImmutableSegmentImpl implements
@Override
public void initVirtualColumn() throws IOException {
long start = System.currentTimeMillis();
- List<UpdateLogEntry> updateLogEntries = UpdateLogStorageProvider.getInstance().getAllMessages(_tableNameWithType, _segmentName);
- Multimap<Long, UpdateLogEntry> updateLogEntryMap = ArrayListMultimap.create();
- for (UpdateLogEntry logEntry: updateLogEntries) {
- updateLogEntryMap.put(logEntry.getOffset(), logEntry);
- }
- for (int i = 0; i < _sourceOffsetToDocIdArray.length; i++) {
- if (_sourceOffsetToDocIdArray[i] != -1) {
- final long offset = i + _minSourceOffset;
- final int docId = _sourceOffsetToDocIdArray[i];
- if (updateLogEntryMap.containsKey(offset)) {
- boolean updated = false;
- Collection<UpdateLogEntry> entries = updateLogEntryMap.get(offset);
- UpdateLogEntry lastEntry = null;
- for (UpdateLogEntry entry : entries) {
- lastEntry = entry;
- for (VirtualColumnLongValueReaderWriter readerWriter : _virtualColumnsReaderWriter) {
- updated = readerWriter.update(docId, entry.getValue(), entry.getType()) || updated;
+ final List<UpdateLogEntry> updateLogEntries = _updateLogStorageProvider.getAllMessages(_tableNameWithType, _segmentName);
+ LOGGER.info("load {} update log entry from update log storage provider for segment {} in {} ms",
+ updateLogEntries.size(), _segmentName, System.currentTimeMillis() - start);
+
+ start = System.currentTimeMillis();
+ final long maxOffset = _totalDoc + _minSourceOffset;
+ int unmatchedLogEntryCount = 0;
+ try {
+ Map<Integer, Long> partitionToHighestWatermark = new HashMap<>();
+ int readerWriteCount = _virtualColumnsReaderWriter.size();
+ for (UpdateLogEntry logEntry: updateLogEntries) {
+ final int partition = logEntry.getPartition();
+ final long offset = logEntry.getOffset();
+ if (offset >= _minSourceOffset && offset < maxOffset) {
+ final int docId = _sourceOffsetToDocIdArray[Math.toIntExact(offset - _minSourceOffset)];
+ if (docId != DEFAULT_DOC_ID_FOR_MISSING_ENTRY) {
+ // use traditional for loop over int instead of foreach to give hints for JIT to do loop unroll in byte code
+ for (int i = 0; i < readerWriteCount; i++) {
+ _virtualColumnsReaderWriter.get(i).update(docId, logEntry.getValue(), logEntry.getType());
}
+ if (logEntry.getValue() > partitionToHighestWatermark.getOrDefault(partition, -1L)) {
+ partitionToHighestWatermark.put(partition, logEntry.getValue());
+ }
+ } else {
+ LOGGER.error("segment {} got in-range update log at offset {} but no matching docId", _segmentName, offset);
}
- if (updated) {
- _upsertWaterMarkManager.processMessage(_tableNameWithType, _segmentName, lastEntry);
- }
+ } else {
+ unmatchedLogEntryCount++;
}
}
+ if (unmatchedLogEntryCount > 0) {
+ LOGGER.info("segment {} encountered {} update logs that are outside of its range", _segmentName,
+ unmatchedLogEntryCount);
+ }
+ partitionToHighestWatermark.forEach((partition, value) ->
+ _upsertWaterMarkManager.processVersionUpdate(_tableNameWithType, partition, value));
+
+ } catch (Exception e) {
+ LOGGER.error("failed to load the offset with thread pool");
+ Utils.rethrowException(e);
}
- LOGGER.info("loaded {} update log entries for current immutable segment {} in {} ms", updateLogEntries.size(),
+ LOGGER.info("populated all log entries to virtual columns for current immutable segment {} in {} ms",
_segmentName, System.currentTimeMillis() - start);
}
@@ -205,7 +238,7 @@ public class ImmutableUpsertSegmentImpl extends ImmutableSegmentImpl implements
throw new RuntimeException("offset outside range");
} else {
int position = Math.toIntExact(offset - _minSourceOffset);
- if (_sourceOffsetToDocIdArray[position] == -1) {
+ if (_sourceOffsetToDocIdArray[position] == DEFAULT_DOC_ID_FOR_MISSING_ENTRY) {
LOGGER.error("no docId associated with offset {} for segment {}", offset, _segmentName);
throw new RuntimeException("docId not found");
} else {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertWaterMarkManager.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertWaterMarkManager.java
index 6d37322..bbea037 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertWaterMarkManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertWaterMarkManager.java
@@ -22,7 +22,6 @@ import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import org.apache.pinot.grigio.common.metrics.GrigioGauge;
-import org.apache.pinot.grigio.common.metrics.GrigioMeter;
import org.apache.pinot.grigio.common.metrics.GrigioMetrics;
import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntry;
import org.slf4j.Logger;
@@ -62,15 +61,15 @@ public class UpsertWaterMarkManager {
}
long version = logEntry.getValue();
int partition = logEntry.getPartition();
- Preconditions.checkState(partition >= 0, "logEntry has no partition info {} for table ", logEntry.toString(), table);
+ processVersionUpdate(table, partition, version);
+ }
+ public void processVersionUpdate(String table, int partition, long version) {
+ Preconditions.checkState(partition >= 0, "logEntry has invalid version {} for table {}",
+ version, table);
Map<Integer, Long> partitionToHighWaterMark = _highWaterMarkTablePartitionMap.computeIfAbsent(table, t -> new ConcurrentHashMap<>());
long currentVersion = partitionToHighWaterMark.getOrDefault(partition, -1L); // assumes that valid version is non-negative
- if (version < currentVersion) {
- // We expect the version number to increase monotonically unless we are reprocessing previous seen messages.
- _metrics.addMeteredGlobalValue(GrigioMeter.VERSION_LOWER_THAN_CURRENT, 1);
- LOGGER.debug("The latest record {} has lower version than the current one for the table {} ", logEntry, table);
- } else if (version > currentVersion) {
+ if (version > currentVersion) {
partitionToHighWaterMark.put(partition, version);
_metrics.setValueOfTableGauge(String.valueOf(partition), GrigioGauge.SERVER_VERSION_CONSUMED, version);
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/mutable/ValidFromInMemoryVirtualColumnProvider.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/mutable/ValidFromInMemoryVirtualColumnProvider.java
index 2dedc8e..ebb45e7 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/mutable/ValidFromInMemoryVirtualColumnProvider.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/mutable/ValidFromInMemoryVirtualColumnProvider.java
@@ -30,7 +30,8 @@ public class ValidFromInMemoryVirtualColumnProvider extends BaseLongVirtualColum
@Override
public boolean update(int docId, int value, LogEventType eventType) {
if (eventType == LogEventType.INSERT) {
- return updateValue(docId, value);
+ updateValue(docId, value);
+ return true;
}
return false;
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/mutable/ValidUntilInMemoryVirtualColumnProvider.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/mutable/ValidUntilInMemoryVirtualColumnProvider.java
index 1a0856e..9aef94c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/mutable/ValidUntilInMemoryVirtualColumnProvider.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/mutable/ValidUntilInMemoryVirtualColumnProvider.java
@@ -30,7 +30,8 @@ public class ValidUntilInMemoryVirtualColumnProvider extends BaseLongVirtualColu
@Override
public boolean update(int docId, int offset, LogEventType eventType) {
if (eventType == LogEventType.DELETE) {
- return this.updateValue(docId, offset);
+ this.updateValue(docId, offset);
+ return true;
}
return false;
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/mutable/VirtualColumnLongValueReaderWriter.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/mutable/VirtualColumnLongValueReaderWriter.java
index 75d1b74..78f3bda 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/mutable/VirtualColumnLongValueReaderWriter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/mutable/VirtualColumnLongValueReaderWriter.java
@@ -31,17 +31,19 @@ public abstract class VirtualColumnLongValueReaderWriter extends BaseVirtualColu
private static final Logger LOGGER = LoggerFactory.getLogger(VirtualColumnLongValueReaderWriter.class);
- private final VirtualColumnContext _context;
private int _totalDocSize;
private int _currentMaxDocId;
private final int [] _values;
private final int DEFAULT_NEW_VALUE = -1;
public VirtualColumnLongValueReaderWriter(VirtualColumnContext context) {
- _context = context;
- _totalDocSize = context.getTotalDocCount();
+ this(context.getTotalDocCount(), context.isMutableSegment());
+ }
+
+ public VirtualColumnLongValueReaderWriter(int totalDocSize, boolean isMutableSegment) {
+ _totalDocSize = totalDocSize;
_values = new int[_totalDocSize];
- if (!_context.isMutableSegment()) {
+ if (!isMutableSegment) {
Arrays.fill(_values, -1);
}
}
@@ -80,10 +82,11 @@ public abstract class VirtualColumnLongValueReaderWriter extends BaseVirtualColu
}
/**
- * method to update the internal data to value at a given location
- * synchronized to ensure the we will not modify the internal array at the same time from multiple threads
+ * update the internal data to value at a given location
+ * assume the update will be idempotent and we won't check if the existing data have been set
+ *
*/
- protected synchronized boolean updateValue(int docId, int value) {
+ protected void updateValue(int docId, int value) {
if (docId >= _totalDocSize) {
throw new RuntimeException(String.format("new record docId %s is larger than capacity %s", docId, _totalDocSize));
}
@@ -91,17 +94,12 @@ public abstract class VirtualColumnLongValueReaderWriter extends BaseVirtualColu
throw new RuntimeException(String.format("failed to update virtual column: with value %s:%s we are trying to " +
"update a value that has not been ingested yet, max doc id %s", docId, value, _currentMaxDocId));
}
- if (_values[docId] == value) {
- return false;
- } else {
- _values[docId] = value;
- return true;
- }
+ _values[docId] = value;
}
// ensure backward compatibility
- protected boolean updateValue(int docId, long value) {
- return updateValue(docId, Math.toIntExact(value));
+ protected void updateValue(int docId, long value) {
+ updateValue(docId, Math.toIntExact(value));
}
public abstract boolean update(int docId, long value, LogEventType eventType);
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/immutable/ImmutableUpsertSegmentImplTest.java b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/immutable/ImmutableUpsertSegmentImplTest.java
new file mode 100644
index 0000000..4712625
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/immutable/ImmutableUpsertSegmentImplTest.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.indexsegment.immutable;
+
+import org.apache.pinot.core.segment.updater.UpsertWaterMarkManager;
+import org.apache.pinot.core.segment.virtualcolumn.mutable.VirtualColumnLongValueReaderWriter;
+import org.apache.pinot.grigio.common.messages.LogEventType;
+import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntry;
+import org.apache.pinot.grigio.common.storageProvider.UpdateLogStorageProvider;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class ImmutableUpsertSegmentImplTest {
+
+ UpdateLogStorageProvider _mockProvider;
+ UpsertWaterMarkManager _mockUpsertWaterMarkManager;
+ List<VirtualColumnLongValueReaderWriter> _readerWriters = new ArrayList<>();
+
+ @BeforeMethod
+ public void init() {
+ _mockProvider = mock(UpdateLogStorageProvider.class);
+ _mockUpsertWaterMarkManager = mock(UpsertWaterMarkManager.class);
+ }
+
+ @Test
+ public void testInitVirtualColumn() throws IOException {
+ long start = System.currentTimeMillis();
+ long minOffset = 5000_000l;
+ int totalDocs = 5_000_000;
+ _readerWriters.add(new VirtualColumnLongValueReaderWriter(totalDocs, false) {
+ @Override
+ public boolean update(int docId, long value, LogEventType eventType) {
+ if (eventType == LogEventType.INSERT) {
+ updateValue(docId, value);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public boolean update(int docId, int value, LogEventType eventType) {
+ return update(docId, value, eventType);
+ }
+ });
+ _readerWriters.add(new VirtualColumnLongValueReaderWriter(totalDocs, false) {
+ @Override
+ public boolean update(int docId, long value, LogEventType eventType) {
+ if (eventType == LogEventType.DELETE) {
+ updateValue(docId, value);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public boolean update(int docId, int value, LogEventType eventType) {
+ return update(docId, value, eventType);
+ }
+ });
+ int[] offsetToDocId = new int[totalDocs];
+ for (int i = 0; i < totalDocs; i++) {
+ offsetToDocId[i] = i;
+ }
+ List<UpdateLogEntry> updateLogEntries = new ArrayList<>(totalDocs * 2);
+ for (int i = 0; i < totalDocs; i++) {
+ updateLogEntries.add(new UpdateLogEntry(minOffset + i, 50, LogEventType.INSERT, i%8));
+ updateLogEntries.add(new UpdateLogEntry(minOffset + i, 100, LogEventType.DELETE, i%8));
+ }
+ when(_mockProvider.getAllMessages(anyString(), anyString())).thenReturn(updateLogEntries);
+ System.out.println("run time for set up: " + (System.currentTimeMillis() - start));
+
+ start = System.currentTimeMillis();
+
+ ImmutableUpsertSegmentImpl immutableUpsertSegment = new ImmutableUpsertSegmentImpl(_readerWriters, totalDocs,
+ _mockUpsertWaterMarkManager, _mockProvider, minOffset, offsetToDocId);
+
+ immutableUpsertSegment.initVirtualColumn();
+ long runtime = System.currentTimeMillis() - start;
+ System.out.println("run time is " + runtime);
+ Assert.assertTrue(runtime < 1_000L, "run time should be less than 1 second");
+
+ VirtualColumnLongValueReaderWriter insertReaderWrite = _readerWriters.get(0);
+ VirtualColumnLongValueReaderWriter deleteReaderWrite = _readerWriters.get(1);
+ for (int i = 0; i < totalDocs; i++) {
+ if (insertReaderWrite.getLong(i) != 50 || deleteReaderWrite.getLong(i) != 100) {
+ System.out.println(String.format("position %d has value %d/%d", i, insertReaderWrite.getLong(i),
+ deleteReaderWrite.getLong(i)));
+ Assert.fail("no correct value");
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/pinot-distribution/pom.xml b/pinot-distribution/pom.xml
index ebf0146..f55e1a7 100644
--- a/pinot-distribution/pom.xml
+++ b/pinot-distribution/pom.xml
@@ -25,7 +25,7 @@
<parent>
<artifactId>pinot</artifactId>
<groupId>org.apache.pinot</groupId>
- <version>0.2.2.5.49-SNAPSHOT</version>
+ <version>0.2.2.5.57-SNAPSHOT</version>
</parent>
<artifactId>pinot-distribution</artifactId>
<name>Pinot Distribution</name>
diff --git a/pinot-grigio/pinot-grigio-common/pom.xml b/pinot-grigio/pinot-grigio-common/pom.xml
index 42e07c0..0c98e85 100644
--- a/pinot-grigio/pinot-grigio-common/pom.xml
+++ b/pinot-grigio/pinot-grigio-common/pom.xml
@@ -23,7 +23,7 @@
<parent>
<artifactId>pinot-grigio</artifactId>
<groupId>org.apache.pinot</groupId>
- <version>0.2.2.5.49-SNAPSHOT</version>
+ <version>0.2.2.5.57-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/metrics/GrigioMeter.java b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/metrics/GrigioMeter.java
index ab04b6e..8a738da 100644
--- a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/metrics/GrigioMeter.java
+++ b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/metrics/GrigioMeter.java
@@ -29,7 +29,6 @@ public enum GrigioMeter implements AbstractMetrics.Meter {
// segment updater metrics
MESSAGE_FETCH_PER_ROUND("messages", MetricsType.SERVER_ONLY),
- VERSION_LOWER_THAN_CURRENT("messages", MetricsType.SERVER_ONLY),
// key coordinator related metrics
MESSAGE_PRODUCE_FAILED_COUNT("message", MetricsType.KC_ONLY),
diff --git a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/SegmentUpdateLogStorageProvider.java b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/SegmentUpdateLogStorageProvider.java
index 2972473..02f2fba 100644
--- a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/SegmentUpdateLogStorageProvider.java
+++ b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/SegmentUpdateLogStorageProvider.java
@@ -58,6 +58,7 @@ public class SegmentUpdateLogStorageProvider {
}
public synchronized List<UpdateLogEntry> readAllMessagesFromFile() throws IOException {
+ long start = System.currentTimeMillis();
int insertMessageCount = 0;
int deleteMessageCount = 0;
int fileLength = (int) _file.length();
@@ -76,7 +77,8 @@ public class SegmentUpdateLogStorageProvider {
logs.add(logEntry);
}
buffer.clear();
- LOGGER.info("loaded {} message from file, {} insert and {} delete", messageCount, insertMessageCount, deleteMessageCount);
+ LOGGER.info("loaded {} message from file, {} insert and {} delete in {} ms", messageCount, insertMessageCount,
+ deleteMessageCount, System.currentTimeMillis() - start);
return logs;
} else {
return ImmutableList.of();
@@ -123,6 +125,7 @@ public class SegmentUpdateLogStorageProvider {
}
private synchronized void readFullyFromBeginning(File segmentUpdateFile, ByteBuffer buffer) throws IOException {
+ long start = System.currentTimeMillis();
FileChannel channel = new RandomAccessFile(segmentUpdateFile, "r").getChannel();
channel.position(0);
long position = 0;
@@ -132,6 +135,8 @@ public class SegmentUpdateLogStorageProvider {
position += byteRead;
} while (byteRead != -1 && buffer.hasRemaining());
buffer.flip();
+ LOGGER.info("read all data from segment update file {} to buffer in {} ms", segmentUpdateFile.getName(),
+ System.currentTimeMillis() - start);
}
/**
diff --git a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogStorageProvider.java b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogStorageProvider.java
index 73ba01f..d04b1a2 100644
--- a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogStorageProvider.java
+++ b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogStorageProvider.java
@@ -143,7 +143,6 @@ public class UpdateLogStorageProvider {
}
public List<UpdateLogEntry> getAllMessages(String tableName, String segmentName) throws IOException {
- LOGGER.info("loading all message for table {} segment {}", tableName, segmentName);
if (_virtualColumnStorage.containsKey(tableName)) {
SegmentUpdateLogStorageProvider provider = _virtualColumnStorage.get(tableName).get(segmentName);
if (provider != null) {
diff --git a/pinot-grigio/pinot-grigio-common/src/test/java/org/apache/pinot/grigio/common/storageProvider/SegmentUpdateLogStorageProviderTest.java b/pinot-grigio/pinot-grigio-common/src/test/java/org/apache/pinot/grigio/common/storageProvider/SegmentUpdateLogStorageProviderTest.java
index 69d30bc..64d4d3b 100644
--- a/pinot-grigio/pinot-grigio-common/src/test/java/org/apache/pinot/grigio/common/storageProvider/SegmentUpdateLogStorageProviderTest.java
+++ b/pinot-grigio/pinot-grigio-common/src/test/java/org/apache/pinot/grigio/common/storageProvider/SegmentUpdateLogStorageProviderTest.java
@@ -115,6 +115,21 @@ public class SegmentUpdateLogStorageProviderTest {
Assert.assertEquals(updateLogEntries.get(i * inputDataList.size() + 1), inputDataList.get(1));
Assert.assertEquals(updateLogEntries.get(i * inputDataList.size() + 2), inputDataList.get(2));
}
+ }
+ @Test
+ public void testReadMesssagePerf() throws IOException {
+ int totalMessageCount = 5_000_000;
+ List<UpdateLogEntry> inputMessages = new ArrayList<>(totalMessageCount * 2);
+ for (int i = 0; i < totalMessageCount; i++) {
+ inputMessages.add(new UpdateLogEntry(i, 50, LogEventType.INSERT, i%8));
+ inputMessages.add(new UpdateLogEntry(i, 100, LogEventType.DELETE, i%8));
+ }
+ long start = System.currentTimeMillis();
+ provider.addData(inputMessages);
+ System.out.println("write data takes ms: " + (System.currentTimeMillis() - start));
+ start = System.currentTimeMillis();
+ provider.readAllMessagesFromFile();
+ System.out.println("read data takes ms: " + (System.currentTimeMillis() - start));
}
}
\ No newline at end of file
diff --git a/pinot-grigio/pinot-grigio-coordinator/pom.xml b/pinot-grigio/pinot-grigio-coordinator/pom.xml
index f5b9527..7ef66ce 100644
--- a/pinot-grigio/pinot-grigio-coordinator/pom.xml
+++ b/pinot-grigio/pinot-grigio-coordinator/pom.xml
@@ -23,7 +23,7 @@
<parent>
<artifactId>pinot-grigio</artifactId>
<groupId>org.apache.pinot</groupId>
- <version>0.2.2.5.49-SNAPSHOT</version>
+ <version>0.2.2.5.57-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>pinot-grigio-coordinator</artifactId>
diff --git a/pinot-grigio/pom.xml b/pinot-grigio/pom.xml
index 3cdad0c..2d3a6a7 100644
--- a/pinot-grigio/pom.xml
+++ b/pinot-grigio/pom.xml
@@ -23,7 +23,7 @@
<parent>
<artifactId>pinot</artifactId>
<groupId>org.apache.pinot</groupId>
- <version>0.2.2.5.49-SNAPSHOT</version>
+ <version>0.2.2.5.57-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>pinot-grigio</artifactId>
diff --git a/pinot-hadoop-filesystem/pom.xml b/pinot-hadoop-filesystem/pom.xml
index 49b0a8a..0e0f03d 100644
--- a/pinot-hadoop-filesystem/pom.xml
+++ b/pinot-hadoop-filesystem/pom.xml
@@ -24,7 +24,7 @@
<parent>
<artifactId>pinot</artifactId>
<groupId>org.apache.pinot</groupId>
- <version>0.2.2.5.49-SNAPSHOT</version>
+ <version>0.2.2.5.57-SNAPSHOT</version>
</parent>
<artifactId>pinot-hadoop-filesystem</artifactId>
<name>Pinot Hadoop Filesystem</name>
diff --git a/pinot-hadoop/pom.xml b/pinot-hadoop/pom.xml
index 928a11b..13691d8 100644
--- a/pinot-hadoop/pom.xml
+++ b/pinot-hadoop/pom.xml
@@ -24,7 +24,7 @@
<parent>
<artifactId>pinot</artifactId>
<groupId>org.apache.pinot</groupId>
- <version>0.2.2.5.49-SNAPSHOT</version>
+ <version>0.2.2.5.57-SNAPSHOT</version>
</parent>
<artifactId>pinot-hadoop</artifactId>
<name>Pinot Hadoop</name>
diff --git a/pinot-integration-tests/pom.xml b/pinot-integration-tests/pom.xml
index 9d7cb12..8ea6a8d 100644
--- a/pinot-integration-tests/pom.xml
+++ b/pinot-integration-tests/pom.xml
@@ -24,7 +24,7 @@
<parent>
<artifactId>pinot</artifactId>
<groupId>org.apache.pinot</groupId>
- <version>0.2.2.5.49-SNAPSHOT</version>
+ <version>0.2.2.5.57-SNAPSHOT</version>
</parent>
<artifactId>pinot-integration-tests</artifactId>
<name>Pinot Integration Tests</name>
diff --git a/pinot-minion/pom.xml b/pinot-minion/pom.xml
index c057094..72b1d85 100644
--- a/pinot-minion/pom.xml
+++ b/pinot-minion/pom.xml
@@ -24,7 +24,7 @@
<parent>
<artifactId>pinot</artifactId>
<groupId>org.apache.pinot</groupId>
- <version>0.2.2.5.49-SNAPSHOT</version>
+ <version>0.2.2.5.57-SNAPSHOT</version>
</parent>
<artifactId>pinot-minion</artifactId>
<name>Pinot Minion</name>
diff --git a/pinot-orc/pom.xml b/pinot-orc/pom.xml
index 34a8740..9508747 100644
--- a/pinot-orc/pom.xml
+++ b/pinot-orc/pom.xml
@@ -23,7 +23,7 @@
<parent>
<artifactId>pinot</artifactId>
<groupId>org.apache.pinot</groupId>
- <version>0.2.2.5.49-SNAPSHOT</version>
+ <version>0.2.2.5.57-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git a/pinot-parquet/pom.xml b/pinot-parquet/pom.xml
index 2d7dbb0..577e283 100644
--- a/pinot-parquet/pom.xml
+++ b/pinot-parquet/pom.xml
@@ -23,7 +23,7 @@
<parent>
<artifactId>pinot</artifactId>
<groupId>org.apache.pinot</groupId>
- <version>0.2.2.5.49-SNAPSHOT</version>
+ <version>0.2.2.5.57-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git a/pinot-perf/pom.xml b/pinot-perf/pom.xml
index 060ebaf..704f659 100644
--- a/pinot-perf/pom.xml
+++ b/pinot-perf/pom.xml
@@ -24,7 +24,7 @@
<parent>
<artifactId>pinot</artifactId>
<groupId>org.apache.pinot</groupId>
- <version>0.2.2.5.49-SNAPSHOT</version>
+ <version>0.2.2.5.57-SNAPSHOT</version>
</parent>
<artifactId>pinot-perf</artifactId>
<name>Pinot Perf</name>
diff --git a/pinot-server/pom.xml b/pinot-server/pom.xml
index ac77f20..0503db3 100644
--- a/pinot-server/pom.xml
+++ b/pinot-server/pom.xml
@@ -24,7 +24,7 @@
<parent>
<artifactId>pinot</artifactId>
<groupId>org.apache.pinot</groupId>
- <version>0.2.2.5.49-SNAPSHOT</version>
+ <version>0.2.2.5.57-SNAPSHOT</version>
</parent>
<artifactId>pinot-server</artifactId>
<name>Pinot Server</name>
diff --git a/pinot-tools/pom.xml b/pinot-tools/pom.xml
index 4b4e686..383cc1f 100644
--- a/pinot-tools/pom.xml
+++ b/pinot-tools/pom.xml
@@ -24,7 +24,7 @@
<parent>
<artifactId>pinot</artifactId>
<groupId>org.apache.pinot</groupId>
- <version>0.2.2.5.49-SNAPSHOT</version>
+ <version>0.2.2.5.57-SNAPSHOT</version>
</parent>
<artifactId>pinot-tools</artifactId>
<name>Pinot Tools</name>
diff --git a/pinot-transport/pom.xml b/pinot-transport/pom.xml
index b977921..5359b71 100644
--- a/pinot-transport/pom.xml
+++ b/pinot-transport/pom.xml
@@ -24,7 +24,7 @@
<parent>
<artifactId>pinot</artifactId>
<groupId>org.apache.pinot</groupId>
- <version>0.2.2.5.49-SNAPSHOT</version>
+ <version>0.2.2.5.57-SNAPSHOT</version>
</parent>
<artifactId>pinot-transport</artifactId>
<name>Pinot Transport</name>
diff --git a/pom.xml b/pom.xml
index 77e8647..66af82b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -30,7 +30,7 @@
<groupId>org.apache.pinot</groupId>
<artifactId>pinot</artifactId>
- <version>0.2.2.5.49-SNAPSHOT</version>
+ <version>0.2.2.5.57-SNAPSHOT</version>
<packaging>pom</packaging>
<name>Pinot</name>
<description>A realtime distributed OLAP datastore</description>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org