You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/07/18 22:39:11 UTC

[pinot] branch master updated: Bug fix: TableUpsertMetadataManager is null (#11129)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 43bcd1ea96 Bug fix: TableUpsertMetadataManager is null (#11129)
43bcd1ea96 is described below

commit 43bcd1ea964362e152609268aa7918aaa0f39279
Author: Kartik Khare <kh...@gmail.com>
AuthorDate: Wed Jul 19 04:09:05 2023 +0530

    Bug fix: TableUpsertMetadataManager is null (#11129)
---
 .../data/manager/realtime/RealtimeTableDataManager.java    | 14 ++++++--------
 .../local/upsert/TableUpsertMetadataManagerFactory.java    | 11 +----------
 .../mutable/MutableSegmentImplUpsertComparisonColTest.java |  9 +++++----
 .../indexsegment/mutable/MutableSegmentImplUpsertTest.java | 12 ++++++------
 4 files changed, 18 insertions(+), 28 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 653d6bba3f..a7ee82b9eb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -125,7 +125,6 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
 
   private TableDedupMetadataManager _tableDedupMetadataManager;
   private TableUpsertMetadataManager _tableUpsertMetadataManager;
-  private boolean _isUpsertEnabled;
   private BooleanSupplier _isTableReadyToConsumeData;
 
   public RealtimeTableDataManager(Semaphore segmentBuildSemaphore) {
@@ -206,12 +205,11 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
           _tableUpsertMetadataManager);
       Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType);
       Preconditions.checkState(schema != null, "Failed to find schema for table: %s", _tableNameWithType);
-      // While creating _tableUpsertMetadataManager object, some methods want to check if upsert is enabled, so track
-      // this status with a boolean, instead of relying on if _tableUpsertMetadataManager is null or not.
-      _isUpsertEnabled = true;
-      _tableUpsertMetadataManager =
-          TableUpsertMetadataManagerFactory.create(tableConfig, schema, this, _serverMetrics, _helixManager,
-              _segmentPreloadExecutor);
+      // NOTE: Set _tableUpsertMetadataManager before initializing it because when preloading is enabled, we need to
+      //       load segments into it
+      _tableUpsertMetadataManager = TableUpsertMetadataManagerFactory.create(tableConfig);
+      _tableUpsertMetadataManager.init(tableConfig, schema, this, _serverMetrics, _helixManager,
+          _segmentPreloadExecutor);
     }
 
     // For dedup and partial-upsert, need to wait for all segments loaded before starting consuming data
@@ -358,7 +356,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
   }
 
   public boolean isUpsertEnabled() {
-    return _isUpsertEnabled;
+    return _tableUpsertMetadataManager != null;
   }
 
   public boolean isPartialUpsertEnabled() {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java
index 3320c60a43..1750aaefc9 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java
@@ -19,15 +19,9 @@
 package org.apache.pinot.segment.local.upsert;
 
 import com.google.common.base.Preconditions;
-import java.util.concurrent.ExecutorService;
-import javax.annotation.Nullable;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.helix.HelixManager;
-import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.UpsertConfig;
-import org.apache.pinot.spi.data.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,9 +32,7 @@ public class TableUpsertMetadataManagerFactory {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(TableUpsertMetadataManagerFactory.class);
 
-  public static TableUpsertMetadataManager create(TableConfig tableConfig, Schema schema,
-      TableDataManager tableDataManager, ServerMetrics serverMetrics, HelixManager helixManager,
-      @Nullable ExecutorService segmentPreloadExecutor) {
+  public static TableUpsertMetadataManager create(TableConfig tableConfig) {
     String tableNameWithType = tableConfig.getTableName();
     UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
     Preconditions.checkArgument(upsertConfig != null, "Must provide upsert config for table: %s", tableNameWithType);
@@ -63,7 +55,6 @@ public class TableUpsertMetadataManagerFactory {
       metadataManager = new ConcurrentMapTableUpsertMetadataManager();
     }
 
-    metadataManager.init(tableConfig, schema, tableDataManager, serverMetrics, helixManager, segmentPreloadExecutor);
     return metadataManager;
   }
 }
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
index 52f8590020..aae9b9cec6 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
@@ -27,6 +27,7 @@ import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
 import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
+import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManager;
 import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManagerFactory;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -67,10 +68,10 @@ public class MutableSegmentImplUpsertComparisonColTest {
             .build();
     _recordTransformer = CompositeTransformer.getDefaultTransformer(_tableConfig, _schema);
     File jsonFile = new File(dataResourceUrl.getFile());
-    _partitionUpsertMetadataManager =
-        TableUpsertMetadataManagerFactory.create(_tableConfig, _schema, mock(TableDataManager.class),
-                mock(ServerMetrics.class), mock(HelixManager.class), mock(ExecutorService.class))
-            .getOrCreatePartitionManager(0);
+    TableUpsertMetadataManager tableUpsertMetadataManager = TableUpsertMetadataManagerFactory.create(_tableConfig);
+    tableUpsertMetadataManager.init(_tableConfig, _schema, mock(TableDataManager.class), mock(ServerMetrics.class),
+        mock(HelixManager.class), mock(ExecutorService.class));
+    _partitionUpsertMetadataManager = tableUpsertMetadataManager.getOrCreatePartitionManager(0);
     _mutableSegmentImpl =
         MutableSegmentImplTestUtils.createMutableSegmentImpl(_schema, Collections.emptySet(), Collections.emptySet(),
             Collections.emptySet(), false, true, offsetUpsertConfig, "secondsSinceEpoch",
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
index b4ed454c2d..4f243c6000 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
@@ -29,6 +29,7 @@ import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
 import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
+import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManager;
 import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManagerFactory;
 import org.apache.pinot.spi.config.table.HashFunction;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -78,14 +79,13 @@ public class MutableSegmentImplUpsertTest {
     _schema = Schema.fromFile(new File(schemaResourceUrl.getFile()));
     _tableConfig =
         new TableConfigBuilder(TableType.REALTIME).setTableName("testTable").setUpsertConfig(upsertConfigWithHash)
-            .setNullHandlingEnabled(true)
-            .build();
+            .setNullHandlingEnabled(true).build();
     _recordTransformer = CompositeTransformer.getDefaultTransformer(_tableConfig, _schema);
     File jsonFile = new File(dataResourceUrl.getFile());
-    _partitionUpsertMetadataManager =
-        TableUpsertMetadataManagerFactory.create(_tableConfig, _schema, mock(TableDataManager.class),
-                mock(ServerMetrics.class), mock(HelixManager.class), mock(ExecutorService.class))
-            .getOrCreatePartitionManager(0);
+    TableUpsertMetadataManager tableUpsertMetadataManager = TableUpsertMetadataManagerFactory.create(_tableConfig);
+    tableUpsertMetadataManager.init(_tableConfig, _schema, mock(TableDataManager.class), mock(ServerMetrics.class),
+        mock(HelixManager.class), mock(ExecutorService.class));
+    _partitionUpsertMetadataManager = tableUpsertMetadataManager.getOrCreatePartitionManager(0);
     _mutableSegmentImpl =
         MutableSegmentImplTestUtils.createMutableSegmentImpl(_schema, Collections.emptySet(), Collections.emptySet(),
             Collections.emptySet(), false, true, upsertConfigWithHash, "secondsSinceEpoch",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org