You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/07/27 20:51:11 UTC

[pinot] branch master updated: Optimize DimensionTableDataManager to abort unnecesarry loading (#11192)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2904f1bce1 Optimize DimensionTableDataManager to abort unnecesarry loading (#11192)
2904f1bce1 is described below

commit 2904f1bce19f93b4e6f2f6492e741a5adc02dd82
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Thu Jul 27 13:51:05 2023 -0700

    Optimize DimensionTableDataManager to abort unnecesarry loading (#11192)
---
 .../manager/offline/DimensionTableDataManager.java | 114 ++++++++++++++-------
 .../manager/offline/FastLookupDimensionTable.java  |   9 +-
 .../offline/MemoryOptimizedDimensionTable.java     |  18 ++--
 .../segment/readers/PinotSegmentRecordReader.java  |   4 +
 4 files changed, 93 insertions(+), 52 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
index 7031c135dc..a9635f2a66 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
@@ -20,12 +20,15 @@ package org.apache.pinot.core.data.manager.offline;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
@@ -77,10 +80,12 @@ public class DimensionTableDataManager extends OfflineTableDataManager {
     return INSTANCES.get(tableNameWithType);
   }
 
-  private static final AtomicReferenceFieldUpdater<DimensionTableDataManager, DimensionTable> UPDATER =
-      AtomicReferenceFieldUpdater.newUpdater(DimensionTableDataManager.class, DimensionTable.class, "_dimensionTable");
+  private final AtomicReference<DimensionTable> _dimensionTable = new AtomicReference<>();
+
+  // Assign a token when loading the lookup table, cancel the loading when token changes because we will load it again
+  // anyway
+  private final AtomicInteger _loadToken = new AtomicInteger();
 
-  private volatile DimensionTable _dimensionTable;
   private boolean _disablePreload = false;
 
   @Override
@@ -102,10 +107,11 @@ public class DimensionTableDataManager extends OfflineTableDataManager {
     }
 
     if (_disablePreload) {
-      _dimensionTable = new MemoryOptimizedDimensionTable(schema, primaryKeyColumns, Collections.emptyMap(),
-          Collections.emptyList(), this);
+      _dimensionTable.set(
+          new MemoryOptimizedDimensionTable(schema, primaryKeyColumns, Collections.emptyMap(), Collections.emptyList(),
+              Collections.emptyList(), this));
     } else {
-      _dimensionTable = new FastLookupDimensionTable(schema, primaryKeyColumns, new HashMap<>());
+      _dimensionTable.set(new FastLookupDimensionTable(schema, primaryKeyColumns, new HashMap<>()));
     }
   }
 
@@ -114,8 +120,11 @@ public class DimensionTableDataManager extends OfflineTableDataManager {
     super.addSegment(immutableSegment);
     String segmentName = immutableSegment.getSegmentName();
     try {
-      loadLookupTable();
-      _logger.info("Successfully loaded lookup table: {} after adding segment: {}", _tableNameWithType, segmentName);
+      if (loadLookupTable()) {
+        _logger.info("Successfully loaded lookup table after adding segment: {}", segmentName);
+      } else {
+        _logger.info("Skip loading lookup table after adding segment: {}, another loading in progress", segmentName);
+      }
     } catch (Exception e) {
       throw new RuntimeException(
           String.format("Caught exception while loading lookup table: %s after adding segment: %s", _tableNameWithType,
@@ -127,8 +136,11 @@ public class DimensionTableDataManager extends OfflineTableDataManager {
   public void removeSegment(String segmentName) {
     super.removeSegment(segmentName);
     try {
-      loadLookupTable();
-      _logger.info("Successfully loaded lookup table: {} after removing segment: {}", _tableNameWithType, segmentName);
+      if (loadLookupTable()) {
+        _logger.info("Successfully loaded lookup table after removing segment: {}", segmentName);
+      } else {
+        _logger.info("Skip loading lookup table after removing segment: {}, another loading in progress", segmentName);
+      }
     } catch (Exception e) {
       throw new RuntimeException(
           String.format("Caught exception while loading lookup table: %s after removing segment: %s",
@@ -138,39 +150,39 @@ public class DimensionTableDataManager extends OfflineTableDataManager {
 
   @Override
   protected void doShutdown() {
-    closeDimensionTable(_dimensionTable);
+    closeDimensionTable(_dimensionTable.get());
   }
 
   private void closeDimensionTable(DimensionTable dimensionTable) {
     try {
       dimensionTable.close();
     } catch (Exception e) {
-      _logger.warn("Cannot close dimension table: {}", _tableNameWithType, e);
+      _logger.error("Caught exception while closing the dimension table", e);
     }
   }
 
   /**
    * `loadLookupTable()` reads contents of the DimensionTable into _lookupTable HashMap for fast lookup.
    */
-  private void loadLookupTable() {
-    DimensionTable snapshot;
-    DimensionTable replacement;
-    do {
-      snapshot = _dimensionTable;
-      if (_disablePreload) {
-        replacement = createMemOptimisedDimensionTable();
-      } else {
-        replacement = createFastLookupDimensionTable();
-      }
-    } while (!UPDATER.compareAndSet(this, snapshot, replacement));
-
-    closeDimensionTable(snapshot);
+  private boolean loadLookupTable() {
+    DimensionTable dimensionTable =
+        _disablePreload ? createMemOptimisedDimensionTable() : createFastLookupDimensionTable();
+    if (dimensionTable != null) {
+      closeDimensionTable(_dimensionTable.getAndSet(dimensionTable));
+      return true;
+    } else {
+      return false;
+    }
   }
 
+  @Nullable
   private DimensionTable createFastLookupDimensionTable() {
+    // Acquire a token in the beginning. Abort the loading and return null when the token changes because another
+    // loading is in progress.
+    int token = _loadToken.incrementAndGet();
+
     Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType);
     Preconditions.checkState(schema != null, "Failed to find schema for dimension table: %s", _tableNameWithType);
-
     List<String> primaryKeyColumns = schema.getPrimaryKeyColumns();
     Preconditions.checkState(CollectionUtils.isNotEmpty(primaryKeyColumns),
         "Primary key columns must be configured for dimension table: %s", _tableNameWithType);
@@ -185,6 +197,10 @@ public class DimensionTableDataManager extends OfflineTableDataManager {
           try (PinotSegmentRecordReader recordReader = new PinotSegmentRecordReader()) {
             recordReader.init(indexSegment);
             for (int i = 0; i < numTotalDocs; i++) {
+              if (_loadToken.get() != token) {
+                // Token changed during the loading, abort the loading
+                return null;
+              }
               GenericRow row = new GenericRow();
               recordReader.getRecord(i, row);
               lookupTable.put(row.getPrimaryKey(primaryKeyColumns), row);
@@ -203,16 +219,22 @@ public class DimensionTableDataManager extends OfflineTableDataManager {
     }
   }
 
+  @Nullable
   private DimensionTable createMemOptimisedDimensionTable() {
+    // Acquire a token in the beginning. Abort the loading and return null when the token changes because another
+    // loading is in progress.
+    int token = _loadToken.incrementAndGet();
+
     Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType);
     Preconditions.checkState(schema != null, "Failed to find schema for dimension table: %s", _tableNameWithType);
-
     List<String> primaryKeyColumns = schema.getPrimaryKeyColumns();
     Preconditions.checkState(CollectionUtils.isNotEmpty(primaryKeyColumns),
         "Primary key columns must be configured for dimension table: %s", _tableNameWithType);
+    int numPrimaryKeyColumns = primaryKeyColumns.size();
 
     Map<PrimaryKey, LookupRecordLocation> lookupTable = new HashMap<>();
     List<SegmentDataManager> segmentDataManagers = acquireAllSegments();
+    List<PinotSegmentRecordReader> recordReaders = new ArrayList<>(segmentDataManagers.size());
     for (SegmentDataManager segmentManager : segmentDataManagers) {
       IndexSegment indexSegment = segmentManager.getSegment();
       int numTotalDocs = indexSegment.getSegmentMetadata().getTotalDocs();
@@ -220,10 +242,28 @@ public class DimensionTableDataManager extends OfflineTableDataManager {
         try {
           PinotSegmentRecordReader recordReader = new PinotSegmentRecordReader();
           recordReader.init(indexSegment);
+          recordReaders.add(recordReader);
           for (int i = 0; i < numTotalDocs; i++) {
-            GenericRow row = new GenericRow();
-            recordReader.getRecord(i, row);
-            lookupTable.put(row.getPrimaryKey(primaryKeyColumns), new LookupRecordLocation(recordReader, i));
+            if (_loadToken.get() != token) {
+              // Token changed during the loading, abort the loading
+              for (PinotSegmentRecordReader reader : recordReaders) {
+                try {
+                  reader.close();
+                } catch (Exception e) {
+                  _logger.error("Caught exception while closing record reader for segment: {}", reader.getSegmentName(),
+                      e);
+                }
+              }
+              for (SegmentDataManager dataManager : segmentDataManagers) {
+                releaseSegment(dataManager);
+              }
+              return null;
+            }
+            Object[] values = new Object[numPrimaryKeyColumns];
+            for (int j = 0; j < numPrimaryKeyColumns; j++) {
+              values[j] = recordReader.getValue(i, primaryKeyColumns.get(j));
+            }
+            lookupTable.put(new PrimaryKey(values), new LookupRecordLocation(recordReader, i));
           }
         } catch (Exception e) {
           throw new RuntimeException(
@@ -231,23 +271,23 @@ public class DimensionTableDataManager extends OfflineTableDataManager {
         }
       }
     }
-    return new MemoryOptimizedDimensionTable(schema, primaryKeyColumns, lookupTable,
-        segmentDataManagers, this);
+    return new MemoryOptimizedDimensionTable(schema, primaryKeyColumns, lookupTable, segmentDataManagers, recordReaders,
+        this);
   }
 
   public boolean isPopulated() {
-    return !_dimensionTable.isEmpty();
+    return !_dimensionTable.get().isEmpty();
   }
 
   public GenericRow lookupRowByPrimaryKey(PrimaryKey pk) {
-    return _dimensionTable.get(pk);
+    return _dimensionTable.get().get(pk);
   }
 
   public FieldSpec getColumnFieldSpec(String columnName) {
-    return _dimensionTable.getFieldSpecFor(columnName);
+    return _dimensionTable.get().getFieldSpecFor(columnName);
   }
 
   public List<String> getPrimaryKeyColumns() {
-    return _dimensionTable.getPrimaryKeyColumns();
+    return _dimensionTable.get().getPrimaryKeyColumns();
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/FastLookupDimensionTable.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/FastLookupDimensionTable.java
index ae6776aba3..81798e4cce 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/FastLookupDimensionTable.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/FastLookupDimensionTable.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.core.data.manager.offline;
 
-import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -27,9 +26,8 @@ import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.PrimaryKey;
 
 
-class FastLookupDimensionTable implements DimensionTable {
-
-  private Map<PrimaryKey, GenericRow> _lookupTable;
+public class FastLookupDimensionTable implements DimensionTable {
+  private final Map<PrimaryKey, GenericRow> _lookupTable;
   private final Schema _tableSchema;
   private final List<String> _primaryKeyColumns;
 
@@ -61,7 +59,6 @@ class FastLookupDimensionTable implements DimensionTable {
   }
 
   @Override
-  public void close()
-      throws IOException {
+  public void close() {
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/MemoryOptimizedDimensionTable.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/MemoryOptimizedDimensionTable.java
index ecaa4342b8..96fe847e54 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/MemoryOptimizedDimensionTable.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/MemoryOptimizedDimensionTable.java
@@ -18,11 +18,11 @@
  */
 package org.apache.pinot.core.data.manager.offline;
 
-import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
@@ -31,7 +31,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-class MemoryOptimizedDimensionTable implements DimensionTable {
+public class MemoryOptimizedDimensionTable implements DimensionTable {
   private static final Logger LOGGER = LoggerFactory.getLogger(MemoryOptimizedDimensionTable.class);
 
   private final Map<PrimaryKey, LookupRecordLocation> _lookupTable;
@@ -39,15 +39,17 @@ class MemoryOptimizedDimensionTable implements DimensionTable {
   private final List<String> _primaryKeyColumns;
   private final ThreadLocal<GenericRow> _reuse = ThreadLocal.withInitial(GenericRow::new);
   private final List<SegmentDataManager> _segmentDataManagers;
+  private final List<PinotSegmentRecordReader> _recordReaders;
   private final TableDataManager _tableDataManager;
 
   MemoryOptimizedDimensionTable(Schema tableSchema, List<String> primaryKeyColumns,
       Map<PrimaryKey, LookupRecordLocation> lookupTable, List<SegmentDataManager> segmentDataManagers,
-      TableDataManager tableDataManager) {
+      List<PinotSegmentRecordReader> recordReaders, TableDataManager tableDataManager) {
     _tableSchema = tableSchema;
     _primaryKeyColumns = primaryKeyColumns;
     _lookupTable = lookupTable;
     _segmentDataManagers = segmentDataManagers;
+    _recordReaders = recordReaders;
     _tableDataManager = tableDataManager;
   }
 
@@ -78,16 +80,14 @@ class MemoryOptimizedDimensionTable implements DimensionTable {
   }
 
   @Override
-  public void close()
-      throws IOException {
-    for (LookupRecordLocation lookupRecordLocation : _lookupTable.values()) {
+  public void close() {
+    for (PinotSegmentRecordReader recordReader : _recordReaders) {
       try {
-        lookupRecordLocation.getPinotSegmentRecordReader().close();
+        recordReader.close();
       } catch (Exception e) {
-        LOGGER.warn("Cannot close segment record reader", e);
+        LOGGER.error("Caught exception while closing record reader for segment: {}", recordReader.getSegmentName(), e);
       }
     }
-
     for (SegmentDataManager segmentDataManager : _segmentDataManagers) {
       _tableDataManager.releaseSegment(segmentDataManager);
     }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReader.java
index bdc83791f5..2c7a0f37d5 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReader.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReader.java
@@ -215,6 +215,10 @@ public class PinotSegmentRecordReader implements RecordReader {
     return reuse;
   }
 
+  public String getSegmentName() {
+    return _indexSegment.getSegmentName();
+  }
+
   public void getRecord(int docId, GenericRow buffer) {
     for (Map.Entry<String, PinotSegmentColumnReader> entry : _columnReaderMap.entrySet()) {
       String column = entry.getKey();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org