You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/07/27 20:51:11 UTC
[pinot] branch master updated: Optimize DimensionTableDataManager to abort unnecesarry loading (#11192)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 2904f1bce1 Optimize DimensionTableDataManager to abort unnecesarry loading (#11192)
2904f1bce1 is described below
commit 2904f1bce19f93b4e6f2f6492e741a5adc02dd82
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Thu Jul 27 13:51:05 2023 -0700
Optimize DimensionTableDataManager to abort unnecesarry loading (#11192)
---
.../manager/offline/DimensionTableDataManager.java | 114 ++++++++++++++-------
.../manager/offline/FastLookupDimensionTable.java | 9 +-
.../offline/MemoryOptimizedDimensionTable.java | 18 ++--
.../segment/readers/PinotSegmentRecordReader.java | 4 +
4 files changed, 93 insertions(+), 52 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
index 7031c135dc..a9635f2a66 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
@@ -20,12 +20,15 @@ package org.apache.pinot.core.data.manager.offline;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.collections.CollectionUtils;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
@@ -77,10 +80,12 @@ public class DimensionTableDataManager extends OfflineTableDataManager {
return INSTANCES.get(tableNameWithType);
}
- private static final AtomicReferenceFieldUpdater<DimensionTableDataManager, DimensionTable> UPDATER =
- AtomicReferenceFieldUpdater.newUpdater(DimensionTableDataManager.class, DimensionTable.class, "_dimensionTable");
+ private final AtomicReference<DimensionTable> _dimensionTable = new AtomicReference<>();
+
+ // Assign a token when loading the lookup table, cancel the loading when token changes because we will load it again
+ // anyway
+ private final AtomicInteger _loadToken = new AtomicInteger();
- private volatile DimensionTable _dimensionTable;
private boolean _disablePreload = false;
@Override
@@ -102,10 +107,11 @@ public class DimensionTableDataManager extends OfflineTableDataManager {
}
if (_disablePreload) {
- _dimensionTable = new MemoryOptimizedDimensionTable(schema, primaryKeyColumns, Collections.emptyMap(),
- Collections.emptyList(), this);
+ _dimensionTable.set(
+ new MemoryOptimizedDimensionTable(schema, primaryKeyColumns, Collections.emptyMap(), Collections.emptyList(),
+ Collections.emptyList(), this));
} else {
- _dimensionTable = new FastLookupDimensionTable(schema, primaryKeyColumns, new HashMap<>());
+ _dimensionTable.set(new FastLookupDimensionTable(schema, primaryKeyColumns, new HashMap<>()));
}
}
@@ -114,8 +120,11 @@ public class DimensionTableDataManager extends OfflineTableDataManager {
super.addSegment(immutableSegment);
String segmentName = immutableSegment.getSegmentName();
try {
- loadLookupTable();
- _logger.info("Successfully loaded lookup table: {} after adding segment: {}", _tableNameWithType, segmentName);
+ if (loadLookupTable()) {
+ _logger.info("Successfully loaded lookup table after adding segment: {}", segmentName);
+ } else {
+ _logger.info("Skip loading lookup table after adding segment: {}, another loading in progress", segmentName);
+ }
} catch (Exception e) {
throw new RuntimeException(
String.format("Caught exception while loading lookup table: %s after adding segment: %s", _tableNameWithType,
@@ -127,8 +136,11 @@ public class DimensionTableDataManager extends OfflineTableDataManager {
public void removeSegment(String segmentName) {
super.removeSegment(segmentName);
try {
- loadLookupTable();
- _logger.info("Successfully loaded lookup table: {} after removing segment: {}", _tableNameWithType, segmentName);
+ if (loadLookupTable()) {
+ _logger.info("Successfully loaded lookup table after removing segment: {}", segmentName);
+ } else {
+ _logger.info("Skip loading lookup table after removing segment: {}, another loading in progress", segmentName);
+ }
} catch (Exception e) {
throw new RuntimeException(
String.format("Caught exception while loading lookup table: %s after removing segment: %s",
@@ -138,39 +150,39 @@ public class DimensionTableDataManager extends OfflineTableDataManager {
@Override
protected void doShutdown() {
- closeDimensionTable(_dimensionTable);
+ closeDimensionTable(_dimensionTable.get());
}
private void closeDimensionTable(DimensionTable dimensionTable) {
try {
dimensionTable.close();
} catch (Exception e) {
- _logger.warn("Cannot close dimension table: {}", _tableNameWithType, e);
+ _logger.error("Caught exception while closing the dimension table", e);
}
}
/**
* `loadLookupTable()` reads contents of the DimensionTable into _lookupTable HashMap for fast lookup.
*/
- private void loadLookupTable() {
- DimensionTable snapshot;
- DimensionTable replacement;
- do {
- snapshot = _dimensionTable;
- if (_disablePreload) {
- replacement = createMemOptimisedDimensionTable();
- } else {
- replacement = createFastLookupDimensionTable();
- }
- } while (!UPDATER.compareAndSet(this, snapshot, replacement));
-
- closeDimensionTable(snapshot);
+ private boolean loadLookupTable() {
+ DimensionTable dimensionTable =
+ _disablePreload ? createMemOptimisedDimensionTable() : createFastLookupDimensionTable();
+ if (dimensionTable != null) {
+ closeDimensionTable(_dimensionTable.getAndSet(dimensionTable));
+ return true;
+ } else {
+ return false;
+ }
}
+ @Nullable
private DimensionTable createFastLookupDimensionTable() {
+ // Acquire a token in the beginning. Abort the loading and return null when the token changes because another
+ // loading is in progress.
+ int token = _loadToken.incrementAndGet();
+
Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType);
Preconditions.checkState(schema != null, "Failed to find schema for dimension table: %s", _tableNameWithType);
-
List<String> primaryKeyColumns = schema.getPrimaryKeyColumns();
Preconditions.checkState(CollectionUtils.isNotEmpty(primaryKeyColumns),
"Primary key columns must be configured for dimension table: %s", _tableNameWithType);
@@ -185,6 +197,10 @@ public class DimensionTableDataManager extends OfflineTableDataManager {
try (PinotSegmentRecordReader recordReader = new PinotSegmentRecordReader()) {
recordReader.init(indexSegment);
for (int i = 0; i < numTotalDocs; i++) {
+ if (_loadToken.get() != token) {
+ // Token changed during the loading, abort the loading
+ return null;
+ }
GenericRow row = new GenericRow();
recordReader.getRecord(i, row);
lookupTable.put(row.getPrimaryKey(primaryKeyColumns), row);
@@ -203,16 +219,22 @@ public class DimensionTableDataManager extends OfflineTableDataManager {
}
}
+ @Nullable
private DimensionTable createMemOptimisedDimensionTable() {
+ // Acquire a token in the beginning. Abort the loading and return null when the token changes because another
+ // loading is in progress.
+ int token = _loadToken.incrementAndGet();
+
Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType);
Preconditions.checkState(schema != null, "Failed to find schema for dimension table: %s", _tableNameWithType);
-
List<String> primaryKeyColumns = schema.getPrimaryKeyColumns();
Preconditions.checkState(CollectionUtils.isNotEmpty(primaryKeyColumns),
"Primary key columns must be configured for dimension table: %s", _tableNameWithType);
+ int numPrimaryKeyColumns = primaryKeyColumns.size();
Map<PrimaryKey, LookupRecordLocation> lookupTable = new HashMap<>();
List<SegmentDataManager> segmentDataManagers = acquireAllSegments();
+ List<PinotSegmentRecordReader> recordReaders = new ArrayList<>(segmentDataManagers.size());
for (SegmentDataManager segmentManager : segmentDataManagers) {
IndexSegment indexSegment = segmentManager.getSegment();
int numTotalDocs = indexSegment.getSegmentMetadata().getTotalDocs();
@@ -220,10 +242,28 @@ public class DimensionTableDataManager extends OfflineTableDataManager {
try {
PinotSegmentRecordReader recordReader = new PinotSegmentRecordReader();
recordReader.init(indexSegment);
+ recordReaders.add(recordReader);
for (int i = 0; i < numTotalDocs; i++) {
- GenericRow row = new GenericRow();
- recordReader.getRecord(i, row);
- lookupTable.put(row.getPrimaryKey(primaryKeyColumns), new LookupRecordLocation(recordReader, i));
+ if (_loadToken.get() != token) {
+ // Token changed during the loading, abort the loading
+ for (PinotSegmentRecordReader reader : recordReaders) {
+ try {
+ reader.close();
+ } catch (Exception e) {
+ _logger.error("Caught exception while closing record reader for segment: {}", reader.getSegmentName(),
+ e);
+ }
+ }
+ for (SegmentDataManager dataManager : segmentDataManagers) {
+ releaseSegment(dataManager);
+ }
+ return null;
+ }
+ Object[] values = new Object[numPrimaryKeyColumns];
+ for (int j = 0; j < numPrimaryKeyColumns; j++) {
+ values[j] = recordReader.getValue(i, primaryKeyColumns.get(j));
+ }
+ lookupTable.put(new PrimaryKey(values), new LookupRecordLocation(recordReader, i));
}
} catch (Exception e) {
throw new RuntimeException(
@@ -231,23 +271,23 @@ public class DimensionTableDataManager extends OfflineTableDataManager {
}
}
}
- return new MemoryOptimizedDimensionTable(schema, primaryKeyColumns, lookupTable,
- segmentDataManagers, this);
+ return new MemoryOptimizedDimensionTable(schema, primaryKeyColumns, lookupTable, segmentDataManagers, recordReaders,
+ this);
}
public boolean isPopulated() {
- return !_dimensionTable.isEmpty();
+ return !_dimensionTable.get().isEmpty();
}
public GenericRow lookupRowByPrimaryKey(PrimaryKey pk) {
- return _dimensionTable.get(pk);
+ return _dimensionTable.get().get(pk);
}
public FieldSpec getColumnFieldSpec(String columnName) {
- return _dimensionTable.getFieldSpecFor(columnName);
+ return _dimensionTable.get().getFieldSpecFor(columnName);
}
public List<String> getPrimaryKeyColumns() {
- return _dimensionTable.getPrimaryKeyColumns();
+ return _dimensionTable.get().getPrimaryKeyColumns();
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/FastLookupDimensionTable.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/FastLookupDimensionTable.java
index ae6776aba3..81798e4cce 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/FastLookupDimensionTable.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/FastLookupDimensionTable.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.core.data.manager.offline;
-import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.pinot.spi.data.FieldSpec;
@@ -27,9 +26,8 @@ import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.PrimaryKey;
-class FastLookupDimensionTable implements DimensionTable {
-
- private Map<PrimaryKey, GenericRow> _lookupTable;
+public class FastLookupDimensionTable implements DimensionTable {
+ private final Map<PrimaryKey, GenericRow> _lookupTable;
private final Schema _tableSchema;
private final List<String> _primaryKeyColumns;
@@ -61,7 +59,6 @@ class FastLookupDimensionTable implements DimensionTable {
}
@Override
- public void close()
- throws IOException {
+ public void close() {
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/MemoryOptimizedDimensionTable.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/MemoryOptimizedDimensionTable.java
index ecaa4342b8..96fe847e54 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/MemoryOptimizedDimensionTable.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/MemoryOptimizedDimensionTable.java
@@ -18,11 +18,11 @@
*/
package org.apache.pinot.core.data.manager.offline;
-import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
@@ -31,7 +31,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-class MemoryOptimizedDimensionTable implements DimensionTable {
+public class MemoryOptimizedDimensionTable implements DimensionTable {
private static final Logger LOGGER = LoggerFactory.getLogger(MemoryOptimizedDimensionTable.class);
private final Map<PrimaryKey, LookupRecordLocation> _lookupTable;
@@ -39,15 +39,17 @@ class MemoryOptimizedDimensionTable implements DimensionTable {
private final List<String> _primaryKeyColumns;
private final ThreadLocal<GenericRow> _reuse = ThreadLocal.withInitial(GenericRow::new);
private final List<SegmentDataManager> _segmentDataManagers;
+ private final List<PinotSegmentRecordReader> _recordReaders;
private final TableDataManager _tableDataManager;
MemoryOptimizedDimensionTable(Schema tableSchema, List<String> primaryKeyColumns,
Map<PrimaryKey, LookupRecordLocation> lookupTable, List<SegmentDataManager> segmentDataManagers,
- TableDataManager tableDataManager) {
+ List<PinotSegmentRecordReader> recordReaders, TableDataManager tableDataManager) {
_tableSchema = tableSchema;
_primaryKeyColumns = primaryKeyColumns;
_lookupTable = lookupTable;
_segmentDataManagers = segmentDataManagers;
+ _recordReaders = recordReaders;
_tableDataManager = tableDataManager;
}
@@ -78,16 +80,14 @@ class MemoryOptimizedDimensionTable implements DimensionTable {
}
@Override
- public void close()
- throws IOException {
- for (LookupRecordLocation lookupRecordLocation : _lookupTable.values()) {
+ public void close() {
+ for (PinotSegmentRecordReader recordReader : _recordReaders) {
try {
- lookupRecordLocation.getPinotSegmentRecordReader().close();
+ recordReader.close();
} catch (Exception e) {
- LOGGER.warn("Cannot close segment record reader", e);
+ LOGGER.error("Caught exception while closing record reader for segment: {}", recordReader.getSegmentName(), e);
}
}
-
for (SegmentDataManager segmentDataManager : _segmentDataManagers) {
_tableDataManager.releaseSegment(segmentDataManager);
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReader.java
index bdc83791f5..2c7a0f37d5 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReader.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReader.java
@@ -215,6 +215,10 @@ public class PinotSegmentRecordReader implements RecordReader {
return reuse;
}
+ public String getSegmentName() {
+ return _indexSegment.getSegmentName();
+ }
+
public void getRecord(int docId, GenericRow buffer) {
for (Map.Entry<String, PinotSegmentColumnReader> entry : _columnReaderMap.entrySet()) {
String column = entry.getKey();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org