You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/07/18 22:39:11 UTC
[pinot] branch master updated: Bug fix: TableUpsertMetadataManager is null (#11129)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 43bcd1ea96 Bug fix: TableUpsertMetadataManager is null (#11129)
43bcd1ea96 is described below
commit 43bcd1ea964362e152609268aa7918aaa0f39279
Author: Kartik Khare <kh...@gmail.com>
AuthorDate: Wed Jul 19 04:09:05 2023 +0530
Bug fix: TableUpsertMetadataManager is null (#11129)
---
.../data/manager/realtime/RealtimeTableDataManager.java | 14 ++++++--------
.../local/upsert/TableUpsertMetadataManagerFactory.java | 11 +----------
.../mutable/MutableSegmentImplUpsertComparisonColTest.java | 9 +++++----
.../indexsegment/mutable/MutableSegmentImplUpsertTest.java | 12 ++++++------
4 files changed, 18 insertions(+), 28 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 653d6bba3f..a7ee82b9eb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -125,7 +125,6 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
private TableDedupMetadataManager _tableDedupMetadataManager;
private TableUpsertMetadataManager _tableUpsertMetadataManager;
- private boolean _isUpsertEnabled;
private BooleanSupplier _isTableReadyToConsumeData;
public RealtimeTableDataManager(Semaphore segmentBuildSemaphore) {
@@ -206,12 +205,11 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
_tableUpsertMetadataManager);
Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType);
Preconditions.checkState(schema != null, "Failed to find schema for table: %s", _tableNameWithType);
- // While creating _tableUpsertMetadataManager object, some methods want to check if upsert is enabled, so track
- // this status with a boolean, instead of relying on if _tableUpsertMetadataManager is null or not.
- _isUpsertEnabled = true;
- _tableUpsertMetadataManager =
- TableUpsertMetadataManagerFactory.create(tableConfig, schema, this, _serverMetrics, _helixManager,
- _segmentPreloadExecutor);
+ // NOTE: Set _tableUpsertMetadataManager before initializing it because when preloading is enabled, we need to
+ // load segments into it
+ _tableUpsertMetadataManager = TableUpsertMetadataManagerFactory.create(tableConfig);
+ _tableUpsertMetadataManager.init(tableConfig, schema, this, _serverMetrics, _helixManager,
+ _segmentPreloadExecutor);
}
// For dedup and partial-upsert, need to wait for all segments loaded before starting consuming data
@@ -358,7 +356,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
}
public boolean isUpsertEnabled() {
- return _isUpsertEnabled;
+ return _tableUpsertMetadataManager != null;
}
public boolean isPartialUpsertEnabled() {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java
index 3320c60a43..1750aaefc9 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java
@@ -19,15 +19,9 @@
package org.apache.pinot.segment.local.upsert;
import com.google.common.base.Preconditions;
-import java.util.concurrent.ExecutorService;
-import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
-import org.apache.helix.HelixManager;
-import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
-import org.apache.pinot.spi.data.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,9 +32,7 @@ public class TableUpsertMetadataManagerFactory {
private static final Logger LOGGER = LoggerFactory.getLogger(TableUpsertMetadataManagerFactory.class);
- public static TableUpsertMetadataManager create(TableConfig tableConfig, Schema schema,
- TableDataManager tableDataManager, ServerMetrics serverMetrics, HelixManager helixManager,
- @Nullable ExecutorService segmentPreloadExecutor) {
+ public static TableUpsertMetadataManager create(TableConfig tableConfig) {
String tableNameWithType = tableConfig.getTableName();
UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
Preconditions.checkArgument(upsertConfig != null, "Must provide upsert config for table: %s", tableNameWithType);
@@ -63,7 +55,6 @@ public class TableUpsertMetadataManagerFactory {
metadataManager = new ConcurrentMapTableUpsertMetadataManager();
}
- metadataManager.init(tableConfig, schema, tableDataManager, serverMetrics, helixManager, segmentPreloadExecutor);
return metadataManager;
}
}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
index 52f8590020..aae9b9cec6 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
@@ -27,6 +27,7 @@ import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
+import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManager;
import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManagerFactory;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -67,10 +68,10 @@ public class MutableSegmentImplUpsertComparisonColTest {
.build();
_recordTransformer = CompositeTransformer.getDefaultTransformer(_tableConfig, _schema);
File jsonFile = new File(dataResourceUrl.getFile());
- _partitionUpsertMetadataManager =
- TableUpsertMetadataManagerFactory.create(_tableConfig, _schema, mock(TableDataManager.class),
- mock(ServerMetrics.class), mock(HelixManager.class), mock(ExecutorService.class))
- .getOrCreatePartitionManager(0);
+ TableUpsertMetadataManager tableUpsertMetadataManager = TableUpsertMetadataManagerFactory.create(_tableConfig);
+ tableUpsertMetadataManager.init(_tableConfig, _schema, mock(TableDataManager.class), mock(ServerMetrics.class),
+ mock(HelixManager.class), mock(ExecutorService.class));
+ _partitionUpsertMetadataManager = tableUpsertMetadataManager.getOrCreatePartitionManager(0);
_mutableSegmentImpl =
MutableSegmentImplTestUtils.createMutableSegmentImpl(_schema, Collections.emptySet(), Collections.emptySet(),
Collections.emptySet(), false, true, offsetUpsertConfig, "secondsSinceEpoch",
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
index b4ed454c2d..4f243c6000 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
@@ -29,6 +29,7 @@ import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
+import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManager;
import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManagerFactory;
import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -78,14 +79,13 @@ public class MutableSegmentImplUpsertTest {
_schema = Schema.fromFile(new File(schemaResourceUrl.getFile()));
_tableConfig =
new TableConfigBuilder(TableType.REALTIME).setTableName("testTable").setUpsertConfig(upsertConfigWithHash)
- .setNullHandlingEnabled(true)
- .build();
+ .setNullHandlingEnabled(true).build();
_recordTransformer = CompositeTransformer.getDefaultTransformer(_tableConfig, _schema);
File jsonFile = new File(dataResourceUrl.getFile());
- _partitionUpsertMetadataManager =
- TableUpsertMetadataManagerFactory.create(_tableConfig, _schema, mock(TableDataManager.class),
- mock(ServerMetrics.class), mock(HelixManager.class), mock(ExecutorService.class))
- .getOrCreatePartitionManager(0);
+ TableUpsertMetadataManager tableUpsertMetadataManager = TableUpsertMetadataManagerFactory.create(_tableConfig);
+ tableUpsertMetadataManager.init(_tableConfig, _schema, mock(TableDataManager.class), mock(ServerMetrics.class),
+ mock(HelixManager.class), mock(ExecutorService.class));
+ _partitionUpsertMetadataManager = tableUpsertMetadataManager.getOrCreatePartitionManager(0);
_mutableSegmentImpl =
MutableSegmentImplTestUtils.createMutableSegmentImpl(_schema, Collections.emptySet(), Collections.emptySet(),
Collections.emptySet(), false, true, upsertConfigWithHash, "secondsSinceEpoch",
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org