You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/03/23 19:27:39 UTC
[incubator-pinot] 09/09: update to fix unit tests
This is an automated email from the ASF dual-hosted git repository.
jamesshao pushed a commit to branch upsert-refactor
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 39f5ba8d4770ae0a1b1c23ef02965ff430c7b78b
Author: james Shao <sj...@uber.com>
AuthorDate: Thu Mar 19 14:34:46 2020 -0700
update to fix unit tests
---
.../requesthandler/BaseBrokerRequestHandler.java | 2 +-
.../broker/upsert/DefaultLowWaterMarkService.java | 11 +++---
.../broker/upsert/DefaultUpsertQueryRewriter.java | 6 ++--
.../broker/upsert/LowWaterMarkServiceProvider.java | 18 +++++++---
.../core/data/manager/BaseTableDataManager.java | 1 +
.../core/data/manager/InstanceDataManager.java | 5 ---
.../realtime/LLRealtimeSegmentDataManager.java | 5 ++-
.../manager/realtime/RealtimeTableDataManager.java | 13 ++++----
.../data/manager/upsert/DataManagerCallback.java | 37 +++++++++++++++++++-
.../upsert/DefaultIndexSegmentCallback.java | 8 ++---
.../DefaultTableDataManagerCallbackImpl.java | 14 ++++++--
.../data/manager/upsert/IndexSegmentCallback.java | 31 +++++++++++++++++
.../manager/upsert/TableDataManagerCallback.java | 39 ++++++++++++++++++++--
.../upsert/TableDataManagerCallbackProvider.java | 24 +++++++++----
.../segment/updater/DefaultWaterMarkManager.java | 4 ++-
.../core/segment/updater/LowWaterMarkService.java | 39 +++++++++++++++-------
...UpsertQueryRewriter.java => QueryRewriter.java} | 20 +++++++----
.../segment/updater/UpsertComponentContainer.java | 34 +++++++++++++++++++
.../core/segment/updater/WaterMarkManager.java | 27 +++++++++++++--
.../SegmentGenerationWithNullValueVectorTest.java | 2 ++
.../pinot/query/executor/QueryExecutorTest.java | 2 ++
.../upsert/PollingBasedLowWaterMarkService.java | 6 ++--
.../broker/upsert/UpsertQueryRewriterImpl.java | 6 ++--
.../upsert/UpsertTableDataManagerCallbackImpl.java | 18 ++++++++--
.../segment/updater/UpsertWaterMarkManager.java | 26 ++++++++++++++-
...terImplTest.java => QueryRewriterImplTest.java} | 2 +-
.../starter/helix/HelixInstanceDataManager.java | 2 +-
.../upsert/UpsertComponentContainerProvider.java | 4 ---
.../apache/pinot/server/api/BaseResourceTest.java | 4 +++
29 files changed, 332 insertions(+), 78 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index cb17740..4b02cd4 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -300,7 +300,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
if (shouldEnableLowWaterMarkRewrite(request)) {
// Augment the realtime request with LowWaterMark constraints.
- _lwmService.getQueryRewriter().rewriteQueryForUpsert(realtimeBrokerRequest, rawTableName);
+ _lwmService.getQueryRewriter().maybeRewriteQueryForUpsert(realtimeBrokerRequest, rawTableName);
}
// Calculate routing table for the query
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultLowWaterMarkService.java b/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultLowWaterMarkService.java
index 42e1dcb..eb71890 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultLowWaterMarkService.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultLowWaterMarkService.java
@@ -22,13 +22,16 @@ import com.google.common.collect.ImmutableMap;
import org.apache.helix.HelixDataAccessor;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.core.segment.updater.LowWaterMarkService;
-import org.apache.pinot.core.segment.updater.UpsertQueryRewriter;
+import org.apache.pinot.core.segment.updater.QueryRewriter;
import java.util.Map;
+/**
+ * default class to handle any low watermark operation on pinot broker, mostly no-op
+ */
public class DefaultLowWaterMarkService implements LowWaterMarkService {
- private UpsertQueryRewriter upsertQueryRewriter = new DefaultUpsertQueryRewriter();
+ private QueryRewriter queryRewriter = new DefaultUpsertQueryRewriter();
@Override
public void init(HelixDataAccessor helixDataAccessor, String helixClusterName, int serverPollingInterval,
@@ -49,7 +52,7 @@ public class DefaultLowWaterMarkService implements LowWaterMarkService {
}
@Override
- public UpsertQueryRewriter getQueryRewriter() {
- return upsertQueryRewriter;
+ public QueryRewriter getQueryRewriter() {
+ return queryRewriter;
}
}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultUpsertQueryRewriter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultUpsertQueryRewriter.java
index d18a56f..97195a3 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultUpsertQueryRewriter.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultUpsertQueryRewriter.java
@@ -1,7 +1,7 @@
package org.apache.pinot.broker.upsert;
import org.apache.pinot.common.request.BrokerRequest;
-import org.apache.pinot.core.segment.updater.UpsertQueryRewriter;
+import org.apache.pinot.core.segment.updater.QueryRewriter;
/**
* Licensed to the Apache Software Foundation (ASF) under one
@@ -21,10 +21,10 @@ import org.apache.pinot.core.segment.updater.UpsertQueryRewriter;
* specific language governing permissions and limitations
* under the License.
*/
-public class DefaultUpsertQueryRewriter implements UpsertQueryRewriter {
+public class DefaultUpsertQueryRewriter implements QueryRewriter {
@Override
- public void rewriteQueryForUpsert(BrokerRequest request, String rawTableName) {
+ public void maybeRewriteQueryForUpsert(BrokerRequest request, String rawTableName) {
// do nothing
}
}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/LowWaterMarkServiceProvider.java b/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/LowWaterMarkServiceProvider.java
index f5c06f3..56b3bff 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/LowWaterMarkServiceProvider.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/LowWaterMarkServiceProvider.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.broker.upsert;
-import com.google.common.base.Preconditions;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.helix.HelixDataAccessor;
@@ -29,21 +28,28 @@ import org.slf4j.LoggerFactory;
import static org.apache.pinot.common.utils.CommonConstants.Broker.*;
-
+/**
+ * provider to initialize LowWaterMarkServer for pinot broker
+ */
public class LowWaterMarkServiceProvider {
private static final Logger LOGGER = LoggerFactory.getLogger(LowWaterMarkServiceProvider.class);
private LowWaterMarkService _instance;
+ /**
+ * create a new provider instance
+ * @param brokerConfig config for this provider to create the actual class reference,
+ * refer to {@value CommonConstants.Broker#CONFIG_OF_BROKER_LWMS_CLASS_NAME}
+ * @param dataAccessor helix data access to help low watermark service to find proper server cluster
+ * @param clusterName cluster name for the current pinot cluster
+ */
public LowWaterMarkServiceProvider(Configuration brokerConfig, HelixDataAccessor dataAccessor, String clusterName) {
String className = brokerConfig.getString(CommonConstants.Broker.CONFIG_OF_BROKER_LWMS_CLASS_NAME,
DefaultLowWaterMarkService.class.getName());
LOGGER.info("creating watermark manager with class {}", className);
try {
Class<LowWaterMarkService> comonentContainerClass = (Class<LowWaterMarkService>) Class.forName(className);
- Preconditions.checkState(comonentContainerClass.isAssignableFrom(LowWaterMarkService.class),
- "configured class not assignable from LowWaterMarkService class");
_instance = comonentContainerClass.newInstance();
_instance.init(dataAccessor, clusterName,
brokerConfig.getInt(CONFIG_OF_BROKER_POLLING_SERVER_LWMS_INTERVAL_MS,
@@ -57,6 +63,10 @@ public class LowWaterMarkServiceProvider {
}
}
+ /**
+ * fetch the current instance of low watermark service this provider created
+ * @return
+ */
public LowWaterMarkService getInstance() {
return _instance;
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index cb278b4..6c35b80 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -107,6 +107,7 @@ public abstract class BaseTableDataManager implements TableDataManager {
* <p>The new segment is added with reference count of 1, so that is never removed until a drop command comes through.
*
* @param immutableSegment Immutable segment to add
+ * @param dataManagerCallback callback for performing any other necessary operation for other ingestion models
*/
@Override
public void addSegment(ImmutableSegment immutableSegment, DataManagerCallback dataManagerCallback) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
index eba0689..8f51bd9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
@@ -20,7 +20,6 @@ package org.apache.pinot.core.data.manager;
import java.io.File;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
@@ -132,8 +131,4 @@ public interface InstanceDataManager {
*/
ZkHelixPropertyStore<ZNRecord> getPropertyStore();
-// /**
-// * Return the mappings from partition -> low water marks of all the tables hosted in this server.
-// */
-// Map<String, Map<Integer, Long>> getLowWaterMarks();
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 4f3ae50..dc2f748 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -1056,7 +1056,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
public LLRealtimeSegmentDataManager(RealtimeSegmentZKMetadata segmentZKMetadata, TableConfig tableConfig,
RealtimeTableDataManager realtimeTableDataManager, String resourceDataDir, IndexLoadingConfig indexLoadingConfig,
Schema schema, LLCSegmentName llcSegmentName, Semaphore partitionConsumerSemaphore, ServerMetrics serverMetrics,
- DataManagerCallback dataManagerCallback) {
+ DataManagerCallback dataManagerCallback) throws IOException {
_segBuildSemaphore = realtimeTableDataManager.getSegmentBuildSemaphore();
_segmentZKMetadata = (LLCRealtimeSegmentZKMetadata) segmentZKMetadata;
_tableConfig = tableConfig;
@@ -1219,6 +1219,9 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
_segmentCommitterFactory = new SegmentCommitterFactory(segmentLogger, _indexLoadingConfig, _protocolHandler);
+ // init virtual columns
+ _dataManagerCallback.initVirtualColumns();
+
segmentLogger
.info("Starting consumption on realtime consuming segment {} maxRowCount {} maxEndTime {}", _llcSegmentName,
_segmentMaxRowCount, new DateTime(_consumeEndTime, DateTimeZone.UTC).toString());
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index bc5316d..fd42627 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -39,7 +39,6 @@ import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
-import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.CommonConstants.Segment.Realtime.Status;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.NamedThreadFactory;
@@ -229,11 +228,13 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
// of the index directory and loading segment from it
LoaderUtils.reloadFailureRecovery(indexDir);
+ // tell callback to add segment
+ _tableDataManagerCallback.addSegment(_tableNameWithType, segmentName, tableConfig);
+
if (indexDir.exists() && (realtimeSegmentZKMetadata.getStatus() == Status.DONE)) {
// Segment already exists on disk, and metadata has been committed. Treat it like an offline segment
-
- DataManagerCallback callback = _tableDataManagerCallback
- .getDataManagerCallback(_tableNameWithType, segmentName, schema, _serverMetrics, false);
+ final DataManagerCallback callback = _tableDataManagerCallback
+ .getImmutableDataManagerCallback(_tableNameWithType, segmentName, schema, _serverMetrics);
addSegment(loadImmutableSegment(indexDir, indexLoadingConfig, schema, callback), callback);
} else {
// Either we don't have the segment on disk or we have not committed in ZK. We should be starting the consumer
@@ -267,7 +268,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
manager = new LLRealtimeSegmentDataManager(realtimeSegmentZKMetadata, tableConfig,
this, _indexDir.getAbsolutePath(), indexLoadingConfig, schema, llcSegmentName,
_partitionIdToSemaphoreMap.get(streamPartitionId), _serverMetrics,
- _tableDataManagerCallback.getDataManagerCallback(_tableNameWithType, segmentName, schema, _serverMetrics, true));
+ _tableDataManagerCallback.getMutableDataManagerCallback(_tableNameWithType, segmentName, schema, _serverMetrics));
}
_logger.info("Initialize RealtimeSegmentDataManager - " + segmentName);
_segmentDataManagerMap.put(segmentName, manager);
@@ -321,7 +322,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
try {
File indexDir = new File(_indexDir, segmentName);
DataManagerCallback dataManagerCallback = _tableDataManagerCallback
- .getDataManagerCallback(_tableNameWithType, segmentName, schema, _serverMetrics, false);
+ .getImmutableDataManagerCallback(_tableNameWithType, segmentName, schema, _serverMetrics);
addSegment(loadImmutableSegment(indexDir, indexLoadingConfig, schema, dataManagerCallback), dataManagerCallback);
} catch (Exception e) {
throw new RuntimeException(e);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DataManagerCallback.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DataManagerCallback.java
index 466553e..91b08d2 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DataManagerCallback.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DataManagerCallback.java
@@ -18,26 +18,61 @@
*/
package org.apache.pinot.core.data.manager.upsert;
+import org.apache.pinot.core.data.manager.SegmentDataManager;
import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntry;
-import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import java.io.IOException;
import java.util.List;
+/**
+ * component inject to {@link org.apache.pinot.core.data.manager.SegmentDataManager} for handling extra logics for
+ * other workflows other than regular append-mode ingestion. We are expected to provide appropriate link to class
+ * during run time
+ */
public interface DataManagerCallback {
+ /**
+ * create a callback component for {@link org.apache.pinot.core.indexsegment.IndexSegment} when
+ * {@link org.apache.pinot.core.data.manager.SegmentDataManager} create one.
+ * @return callback associated with the internal index segment this data manager holds
+ */
IndexSegmentCallback getIndexSegmentCallback();
+ /**
+ * process the row after transformation in the ingestion process
+ * @param row the row of newly ingested and transformed data from upstream
+ * @param offset the offset of this particular row
+ */
void processTransformedRow(GenericRow row, long offset);
+ /**
+ * process the row after we have finished the index the current row
+ * @param row the row we just index to the current segment
+ * @param offset the offset associated with the index
+ */
void postIndexProcessing(GenericRow row, long offset);
+ /**
+ * callback for when a realtime segment data manager done with the current consumption loop for all data associated
+ * with it
+ */
void postConsumeLoop();
+ /**
+ * initialize all virtual columns for the current data manager associated with upsert component (if necessary)
+ * @throws IOException
+ */
void initVirtualColumns() throws IOException;
+ /**
+ * update the data in the virtual columns from segment updater loop if necessary
+ * @param messages list of update log entries for the current datamanager
+ */
void updateVirtualColumns(List<UpdateLogEntry> messages);
+ /**
+ * callback when the associated data manager is destroyed by pinot server in call {@link SegmentDataManager#destroy()}
+ */
void destroy();
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DefaultIndexSegmentCallback.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DefaultIndexSegmentCallback.java
index 0299cb7..a0d9acf 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DefaultIndexSegmentCallback.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DefaultIndexSegmentCallback.java
@@ -27,6 +27,9 @@ import org.apache.pinot.spi.data.readers.GenericRow;
import java.util.Map;
+/**
+ * no-op callback for non-upsert table/pinot server instance
+ */
public class DefaultIndexSegmentCallback implements IndexSegmentCallback {
public static final DefaultIndexSegmentCallback INSTANCE = new DefaultIndexSegmentCallback();
@@ -35,27 +38,22 @@ public class DefaultIndexSegmentCallback implements IndexSegmentCallback {
@Override
public void init(SegmentMetadata segmentMetadata, Map<String, DataFileReader> virtualColumnIndexReader) {
- // do nothing
}
@Override
public void initOffsetColumn(ColumnIndexContainer offsetColumnContainer) {
- // do noting
}
@Override
public void postProcessRecords(GenericRow row, int docId) {
- // do nothing
}
@Override
public void initVirtualColumn() {
- // do nothing
}
@Override
public void updateVirtualColumn(Iterable<UpdateLogEntry> logEntries) {
- // do nothing
}
@Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DefaultTableDataManagerCallbackImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DefaultTableDataManagerCallbackImpl.java
index 2e56455..a0f4398 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DefaultTableDataManagerCallbackImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DefaultTableDataManagerCallbackImpl.java
@@ -22,6 +22,10 @@ import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.spi.data.Schema;
+/**
+ * Class for no-op callback for pinot cluster/table that don't support upsert
+ * We will also use this for pinot tables that don't configured to use upsert semantic
+ */
public class DefaultTableDataManagerCallbackImpl implements TableDataManagerCallback {
private static final DefaultDataManagerCallbackImpl DEFAULT_DM_CALLBACK = DefaultDataManagerCallbackImpl.INSTANCE;
@@ -35,8 +39,14 @@ public class DefaultTableDataManagerCallbackImpl implements TableDataManagerCall
}
@Override
- public DataManagerCallback getDataManagerCallback(String tableName, String segmentName,
- Schema schema, ServerMetrics serverMetrics, boolean isMutable) {
+ public DataManagerCallback getMutableDataManagerCallback(String tableName, String segmentName,
+ Schema schema, ServerMetrics serverMetrics) {
+ return DEFAULT_DM_CALLBACK;
+ }
+
+ @Override
+ public DataManagerCallback getImmutableDataManagerCallback(String tableName, String segmentName,
+ Schema schema, ServerMetrics serverMetrics) {
return DEFAULT_DM_CALLBACK;
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/IndexSegmentCallback.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/IndexSegmentCallback.java
index dd198f9..8b3a14d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/IndexSegmentCallback.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/IndexSegmentCallback.java
@@ -27,17 +27,48 @@ import org.apache.pinot.spi.data.readers.GenericRow;
import java.io.IOException;
import java.util.Map;
+/**
+ * callback for handling any upsert-related operations in subclass of
+ * {@link org.apache.pinot.core.indexsegment.IndexSegment} if necessary
+ */
public interface IndexSegmentCallback {
+ /**
+ * initialize the callback from {@link org.apache.pinot.core.indexsegment.IndexSegment}
+ * @param segmentMetadata the metadata associated with the curreng segment
+ * @param virtualColumnIndexReader
+ */
void init(SegmentMetadata segmentMetadata, Map<String, DataFileReader> virtualColumnIndexReader);
+ /**
+ * initialize offset column for in-memory access
+ * @param offsetColumnContainer the column that stores the offset data
+ */
void initOffsetColumn(ColumnIndexContainer offsetColumnContainer);
+ /**
+ * perform any operation from the callback for the given row after it has been processed and index
+ * @param row the current pinot row we just indexed into the current IndexSegment
+ * @param docId the docId of this record
+ */
void postProcessRecords(GenericRow row, int docId);
+ /**
+ * initialize set of upsert-related virtual columns if necessary
+ * @throws IOException
+ */
void initVirtualColumn() throws IOException;
+ /**
+ * update upsert-related virtual column from segment updater if necessary
+ * @param logEntries
+ */
void updateVirtualColumn(Iterable<UpdateLogEntry> logEntries);
+ /**
+ * retrieve a information related to an upsert-enable segment virtual column for debug purpose
+ * @param offset the offset of the record we are trying to get the virtual columnn data for
+ * @return string representation of the virtual column data information
+ */
String getVirtualColumnInfo(long offset);
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/TableDataManagerCallback.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/TableDataManagerCallback.java
index ec6a15f..7501961 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/TableDataManagerCallback.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/TableDataManagerCallback.java
@@ -20,16 +20,49 @@ package org.apache.pinot.core.data.manager.upsert;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.spi.data.Schema;
+import java.io.File;
+
+/**
+ * component inject to {@link org.apache.pinot.core.data.manager.TableDataManager} for handling extra logics for
+ * other workflows other than regular append-mode ingestion. We are expected to provide appropriate link to class
+ * during run time
+ */
public interface TableDataManagerCallback {
+ /**
+ * initialize the callback object during {@link org.apache.pinot.core.data.manager.TableDataManager#init}
+ * ensure any internal component for this callback is properly created during the start time
+ */
void init();
- void addSegment(String tableName, String segmentName, TableConfig tableConfig);
+ /**
+ * callback to ensure other components related to the callback are added when
+ * {@link org.apache.pinot.core.data.manager.TableDataManager#addSegment(File, IndexLoadingConfig)}
+ * is executed
+ */
+ void addSegment(String tableNameWithType, String segmentName, TableConfig tableConfig);
+
+ /**
+ * return a callback object for an Immutable segment data manager callback component when a table create a new
+ * immutable {@link org.apache.pinot.core.data.manager.SegmentDataManager}
+ */
+ DataManagerCallback getMutableDataManagerCallback(String tableNameWithType, String segmentName, Schema schema,
+ ServerMetrics serverMetrics);
- DataManagerCallback getDataManagerCallback(String tableName, String segmentName, Schema schema,
- ServerMetrics serverMetrics, boolean isMutable);
+ /**
+ * return a callback object for a mutable segment data manager callback component when a table create a new
+ * immutable {@link org.apache.pinot.core.data.manager.SegmentDataManager}
+ */
+ DataManagerCallback getImmutableDataManagerCallback(String tableNameWithType, String segmentName, Schema schema,
+ ServerMetrics serverMetrics);
+ /**
+ * create a no-op default callback for segmentDataManager that don't support upsert
+ * (eg, offline table, HLL consumers etc)
+ * @return a no-op default callback for data manager
+ */
DataManagerCallback getDefaultDataManagerCallback();
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/TableDataManagerCallbackProvider.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/TableDataManagerCallbackProvider.java
index 2e868a2..1f08a90 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/TableDataManagerCallbackProvider.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/TableDataManagerCallbackProvider.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.core.data.manager.upsert;
-import com.google.common.base.Preconditions;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
@@ -27,6 +26,10 @@ import org.apache.pinot.core.data.manager.config.TableDataManagerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * class for creating appropriate {@link TableDataManagerCallback} depends on the config
+ * allow upsert-enabled pinot server to inject proper logics while keeping append-only pinot server keep the same
+ */
public class TableDataManagerCallbackProvider {
private static final Logger LOGGER = LoggerFactory.getLogger(TableDataManagerCallbackProvider.class);
@@ -38,6 +41,12 @@ public class TableDataManagerCallbackProvider {
public static final String DEFAULT_CALLBACK_CLASS_CONFIG_KEY = "append.tableDataManager.callback";
public static final String CALLBACK_CLASS_CONFIG_DEFAULT = DefaultTableDataManagerCallbackImpl.class.getName();
+ /**
+ * initialize table data manager callback provider
+ * the most information config will be {@value UPSERT_CALLBACK_CLASS_CONFIG_KEY} for creating the proper
+ * callback injection for upsert pinot server
+ * @param configuration
+ */
public TableDataManagerCallbackProvider(Configuration configuration) {
String appendClassName = configuration.getString(DEFAULT_CALLBACK_CLASS_CONFIG_KEY, CALLBACK_CLASS_CONFIG_DEFAULT);
String upsertClassName = configuration.getString(UPSERT_CALLBACK_CLASS_CONFIG_KEY);
@@ -47,8 +56,6 @@ public class TableDataManagerCallbackProvider {
LOGGER.error("failed to load table data manager class {}", appendClassName, e);
ExceptionUtils.rethrow(e);
}
- Preconditions.checkState(defaultTableDataManagerCallBackClass.isAssignableFrom(TableDataManagerCallback.class),
- "configured class not assignable from Callback class", defaultTableDataManagerCallBackClass);
if (StringUtils.isNotEmpty(upsertClassName)) {
try {
upsertTableDataManagerCallBackClass = (Class<TableDataManagerCallback>) Class.forName(upsertClassName);
@@ -56,11 +63,13 @@ public class TableDataManagerCallbackProvider {
LOGGER.error("failed to load table data manager class {}", upsertClassName);
ExceptionUtils.rethrow(e);
}
- Preconditions.checkState(upsertTableDataManagerCallBackClass.isAssignableFrom(TableDataManagerCallback.class),
- "configured class not assignable from Callback class");
}
}
+ /**
+ * create a proper callback for the table, depends on whether the table is configured for upsert or not
+ * @param tableDataManagerConfig the config for the table
+ */
public TableDataManagerCallback getTableDataManagerCallback(TableDataManagerConfig tableDataManagerConfig) {
if (tableDataManagerConfig.getUpdateSemantic() == CommonConstants.UpdateSemantic.UPSERT) {
return getUpsertTableDataManagerCallback();
@@ -69,7 +78,7 @@ public class TableDataManagerCallbackProvider {
}
}
- public TableDataManagerCallback getUpsertTableDataManagerCallback() {
+ private TableDataManagerCallback getUpsertTableDataManagerCallback() {
try {
return upsertTableDataManagerCallBackClass.newInstance();
} catch (Exception ex) {
@@ -79,6 +88,9 @@ public class TableDataManagerCallbackProvider {
return null;
}
+ /**
+ * create a tabledatamanager for a non-upsert enabled tables, ensure to use the original pinot workflow
+ */
public TableDataManagerCallback getDefaultTableDataManagerCallback() {
try {
return defaultTableDataManagerCallBackClass.newInstance();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/DefaultWaterMarkManager.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/DefaultWaterMarkManager.java
index b45060c..7c53467 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/DefaultWaterMarkManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/DefaultWaterMarkManager.java
@@ -24,13 +24,15 @@ import org.apache.pinot.grigio.common.metrics.GrigioMetrics;
import java.util.Map;
+/**
+ * default no-op watermark manager for pinot
+ */
public class DefaultWaterMarkManager implements WaterMarkManager {
private static final Map<String, Map<Integer, Long>> DEFAULT_MAP = ImmutableMap.of();
@Override
public void init(Configuration config, GrigioMetrics metrics) {
-
}
@Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/LowWaterMarkService.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/LowWaterMarkService.java
index 0c6756c..e9cc5f9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/LowWaterMarkService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/LowWaterMarkService.java
@@ -26,19 +26,34 @@ import java.util.Map;
/**
* LowWaterMarkService keeps records of the low water mark (i.e., the stream ingestion progress) for each partition of
* an input table.
+ * It runs on pinot broker to fetch lwm information from pinot server periodically
+ * and use that to rewrite pinot query periodically
*/
public interface LowWaterMarkService {
- void init(HelixDataAccessor helixDataAccessor, String helixClusterName, int serverPollingInterval, int serverPort);
-
- // Return the low water mark mapping from partition id to the corresponding low water mark of a given table.
- Map<Integer, Long> getLowWaterMarks(String tableName);
-
- // Shutdown the service.
- void shutDown();
-
- // start
- void start(BrokerMetrics brokerMetrics);
-
- UpsertQueryRewriter getQueryRewriter();
+ void init(HelixDataAccessor helixDataAccessor, String helixClusterName, int serverPollingInterval, int serverPort);
+
+ /**
+ * the low water mark mapping from partition id to the corresponding low water mark of a given table.
+ * @param tableNameWithType
+ * @return map of partition to lowWatermark
+ */
+ Map<Integer, Long> getLowWaterMarks(String tableNameWithType);
+
+ /**
+ * shutdown low water mark service and its background threads (if any)
+ */
+ void shutDown();
+
+ /**
+ * start the current low watermark service
+ * @param brokerMetrics pinot broker metrics for lwm service to report its status to
+ */
+ void start(BrokerMetrics brokerMetrics);
+
+ /**
+ * get a queryrewriter to ensure that we can rewrite a query if the target table is upsert-enabled table
+ * @return
+ */
+ QueryRewriter getQueryRewriter();
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertQueryRewriter.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/QueryRewriter.java
similarity index 71%
rename from pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertQueryRewriter.java
rename to pinot-core/src/main/java/org/apache/pinot/core/segment/updater/QueryRewriter.java
index 64a64d0..40abd9a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertQueryRewriter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/QueryRewriter.java
@@ -1,7 +1,3 @@
-package org.apache.pinot.core.segment.updater;
-
-import org.apache.pinot.common.request.BrokerRequest;
-
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -20,8 +16,20 @@ import org.apache.pinot.common.request.BrokerRequest;
* specific language governing permissions and limitations
* under the License.
*/
-public interface UpsertQueryRewriter {
+package org.apache.pinot.core.segment.updater;
+
+import org.apache.pinot.common.request.BrokerRequest;
+
+/**
+ * class that rewrite pinot broker sql for upsert or other purpose
+ */
+public interface QueryRewriter {
- void rewriteQueryForUpsert(BrokerRequest request, String rawTableName);
+ /**
+ * rewrite the query for pinot upsert table if necessary
+ * @param request the pinot sql request that pinot broker requests
+ * @param rawTableName the raw
+ */
+ void maybeRewriteQueryForUpsert(BrokerRequest request, String rawTableName);
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertComponentContainer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertComponentContainer.java
index eb64285..f1312e7 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertComponentContainer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertComponentContainer.java
@@ -22,21 +22,55 @@ import com.yammer.metrics.core.MetricsRegistry;
import org.apache.commons.configuration.Configuration;
import org.apache.helix.HelixManager;
+/**
+ * contains all components related to upsert in pinot server
+ */
public interface UpsertComponentContainer {
+ /**
+ * register pinot upsert component metrics to the given registry
+ * @param prefix the prefix of all metrics
+ * @param registry the registry we are going to register the metrics to
+ */
void registerMetrics(String prefix, MetricsRegistry registry);
+ /**
+ * initialize the upsert comonent container with necessary config and information
+ * @param config the configuration for this upsert component
+ * @param helixManager helix manager for the current pinot server helix state
+ * @param clusterName helix cluster name for the current pinot cluster
+ * @param instanceName the name of current pinot instance in this cluster
+ */
void init(Configuration config, HelixManager helixManager, String clusterName, String instanceName);
+ /**
+ * start any necessary background processing for this upsert component
+ */
void startBackgroundThread();
+ /**
+ * stop any necessary background processing for this upsert component
+ */
void stopBackgroundThread();
+ /**
+ * shutdown and clean up any state for this upsert component
+ */
void shutdown();
+ /**
+ * return a segment deletion callback component that should be invoked when pinot server removed a segment
+ * from its internal storage (to DROPPED state)
+ */
SegmentDeletionHandler getSegmentDeletionHandler();
+ /**
+ * return the current watermark manager for this server
+ */
WaterMarkManager getWatermarkManager();
+ /**
+ * check if upsert is enable for the current pinot server
+ */
boolean isUpsertEnabled();
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WaterMarkManager.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WaterMarkManager.java
index acc8479..63ba5c9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WaterMarkManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WaterMarkManager.java
@@ -19,16 +19,37 @@
package org.apache.pinot.core.segment.updater;
import org.apache.commons.configuration.Configuration;
-import org.apache.pinot.grigio.common.metrics.GrigioMeter;
import org.apache.pinot.grigio.common.metrics.GrigioMetrics;
import java.util.Map;
+/**
+ * class run on pinot server to keep track of the low-water-mark of each upsert table
+ * organized by partition
+ */
public interface WaterMarkManager {
+ /**
+ * initialize watermark manager
+ * @param config the configuration subset for waterMarkManager
+ * @param metrics the metrics for watermark manager
+ */
void init(Configuration config, GrigioMetrics metrics);
+ /**
+ * the highest epoch for each partition of each table in this pinot server
+ * @return mapping of {pinot_table_name: {partition_id: high_water_mark}}
+ * example as {
+ * "table1_REALTIME" : {
+ * "0" : 1400982,
+ * "1" : 1400982,
+ * "2" : 1400982,
+ * "3" : 1400982
+ * },
+ * "table2_REALTIME" : {
+ * "0" : 1401008,
+ * "1" : 1401008
+ * }
+ */
Map<String, Map<Integer, Long>> getHighWaterMarkTablePartitionMap();
-
-
}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithNullValueVectorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithNullValueVectorTest.java
index 4d1d469..a56f897 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithNullValueVectorTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithNullValueVectorTest.java
@@ -40,6 +40,7 @@ import org.apache.pinot.common.request.InstanceRequest;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.TableDataManager;
+import org.apache.pinot.core.data.manager.config.InstanceDataManagerConfig;
import org.apache.pinot.core.data.manager.config.TableDataManagerConfig;
import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
import org.apache.pinot.core.data.manager.upsert.DefaultDataManagerCallbackImpl;
@@ -137,6 +138,7 @@ public class SegmentGenerationWithNullValueVectorTest {
when(tableDataManagerConfig.getTableDataManagerType()).thenReturn("OFFLINE");
when(tableDataManagerConfig.getTableName()).thenReturn(TABLE_NAME);
when(tableDataManagerConfig.getDataDir()).thenReturn(FileUtils.getTempDirectoryPath());
+ TableDataManagerProvider.init(mock(InstanceDataManagerConfig.class));
@SuppressWarnings("unchecked")
TableDataManager tableDataManager = TableDataManagerProvider
.getTableDataManager(tableDataManagerConfig, "testInstance", mock(ZkHelixPropertyStore.class),
diff --git a/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java b/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java
index 203d2f6..2eda11f 100644
--- a/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java
@@ -34,6 +34,7 @@ import org.apache.pinot.common.segment.ReadMode;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.TableDataManager;
+import org.apache.pinot.core.data.manager.config.InstanceDataManagerConfig;
import org.apache.pinot.core.data.manager.config.TableDataManagerConfig;
import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
import org.apache.pinot.core.data.manager.upsert.DefaultDataManagerCallbackImpl;
@@ -98,6 +99,7 @@ public class QueryExecutorTest {
when(tableDataManagerConfig.getTableDataManagerType()).thenReturn("OFFLINE");
when(tableDataManagerConfig.getTableName()).thenReturn(TABLE_NAME);
when(tableDataManagerConfig.getDataDir()).thenReturn(FileUtils.getTempDirectoryPath());
+ TableDataManagerProvider.init(mock(InstanceDataManagerConfig.class));
@SuppressWarnings("unchecked")
TableDataManager tableDataManager = TableDataManagerProvider
.getTableDataManager(tableDataManagerConfig, "testInstance", mock(ZkHelixPropertyStore.class),
diff --git a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/PollingBasedLowWaterMarkService.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/PollingBasedLowWaterMarkService.java
index 74e4b41..c92d53e 100644
--- a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/PollingBasedLowWaterMarkService.java
+++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/PollingBasedLowWaterMarkService.java
@@ -30,7 +30,7 @@ import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.restlet.resources.TableLowWaterMarksInfo;
import org.apache.pinot.core.segment.updater.LowWaterMarkService;
-import org.apache.pinot.core.segment.updater.UpsertQueryRewriter;
+import org.apache.pinot.core.segment.updater.QueryRewriter;
import org.glassfish.jersey.client.ClientProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,7 +67,7 @@ public class PollingBasedLowWaterMarkService implements LowWaterMarkService {
private int _serverPort;
private boolean _shuttingDown;
private BrokerMetrics _brokerMetrics;
- private UpsertQueryRewriter _queryRewriter;
+ private QueryRewriter _queryRewriter;
@Override
public void init(HelixDataAccessor helixDataAccessor, String helixClusterName, int serverPollingInterval, int serverPort) {
@@ -104,7 +104,7 @@ public class PollingBasedLowWaterMarkService implements LowWaterMarkService {
}
@Override
- public UpsertQueryRewriter getQueryRewriter() {
+ public QueryRewriter getQueryRewriter() {
return _queryRewriter;
}
diff --git a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/UpsertQueryRewriterImpl.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/UpsertQueryRewriterImpl.java
index 90fce7c..a9cd1f0 100644
--- a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/UpsertQueryRewriterImpl.java
+++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/UpsertQueryRewriterImpl.java
@@ -25,7 +25,7 @@ import org.apache.pinot.common.request.FilterQuery;
import org.apache.pinot.common.request.FilterQueryMap;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.core.segment.updater.LowWaterMarkService;
-import org.apache.pinot.core.segment.updater.UpsertQueryRewriter;
+import org.apache.pinot.core.segment.updater.QueryRewriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,7 +34,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
-public class UpsertQueryRewriterImpl implements UpsertQueryRewriter {
+public class UpsertQueryRewriterImpl implements QueryRewriter {
private static final Logger LOGGER = LoggerFactory.getLogger(UpsertQueryRewriterImpl.class);
protected final LowWaterMarkService _lwmService;
@@ -48,7 +48,7 @@ public class UpsertQueryRewriterImpl implements UpsertQueryRewriter {
}
@Override
- public void rewriteQueryForUpsert(BrokerRequest request, String rawTableName) {
+ public void maybeRewriteQueryForUpsert(BrokerRequest request, String rawTableName) {
final String realtimeTableName = TableNameBuilder.ensureTableNameWithType(rawTableName,
CommonConstants.Helix.TableType.REALTIME);
Map<Integer, Long> lowWaterMarks = _lwmService.getLowWaterMarks(realtimeTableName);
diff --git a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertTableDataManagerCallbackImpl.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertTableDataManagerCallbackImpl.java
index dc329e4..efbbf00 100644
--- a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertTableDataManagerCallbackImpl.java
+++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertTableDataManagerCallbackImpl.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.core.data.manager.upsert;
-
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.pinot.common.config.TableConfig;
@@ -30,6 +29,10 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
+/**
+ * class for handle all upsert related operation for interacting with segments for a given table at
+ * {@link org.apache.pinot.core.data.manager.TableDataManager}
+ */
public class UpsertTableDataManagerCallbackImpl implements TableDataManagerCallback {
private static final Logger LOGGER = LoggerFactory.getLogger(UpsertTableDataManagerCallbackImpl.class);
@@ -51,7 +54,18 @@ public class UpsertTableDataManagerCallbackImpl implements TableDataManagerCallb
}
@Override
- public DataManagerCallback getDataManagerCallback(String tableName, String segmentName, Schema schema,
+ public DataManagerCallback getMutableDataManagerCallback(String tableNameWithType, String segmentName, Schema schema,
+ ServerMetrics serverMetrics) {
+ return getDataManagerCallback(tableNameWithType, segmentName, schema, serverMetrics, true);
+ }
+
+ @Override
+ public DataManagerCallback getImmutableDataManagerCallback(String tableNameWithType, String segmentName,
+ Schema schema, ServerMetrics serverMetrics) {
+ return getDataManagerCallback(tableNameWithType, segmentName, schema, serverMetrics, false);
+ }
+
+ private DataManagerCallback getDataManagerCallback(String tableName, String segmentName, Schema schema,
ServerMetrics serverMetrics, boolean isMutable) {
return new UpsertDataManagerCallbackImpl(tableName, segmentName, schema, serverMetrics, isMutable);
}
diff --git a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/UpsertWaterMarkManager.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/UpsertWaterMarkManager.java
index 266f25d2..4b3d861 100644
--- a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/UpsertWaterMarkManager.java
+++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/UpsertWaterMarkManager.java
@@ -31,6 +31,14 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+/**
+ * watermark manager for upsert component to collect the low-water-mark information of each tables in the current
+ * pinot server
+ * watermark is defined as largest version of each partition (segment update event topic partition) for each table
+ * so it stores the data in map of {table_name: {partition_id: highest_water_mark}}
+ * then {@link LowWaterMarkService} will ingest those information from pinot server and calculate the lowest of these
+ * watermark and use it in query to send to server
+ */
public class UpsertWaterMarkManager implements WaterMarkManager {
private final Map<String, Map<Integer, Long>> _highWaterMarkTablePartitionMap = new ConcurrentHashMap<>();
@@ -55,7 +63,12 @@ public class UpsertWaterMarkManager implements WaterMarkManager {
return _instance;
}
- // TODO(tingchen) Look into the case where Segment Update Messages might arrive before the corresponding physical data.
+ /**
+ * process a event message and update the current watermark information for this manager
+ * @param table
+ * @param segment
+ * @param logEntry the message containing the new watermark information
+ */
public void processMessage(String table, String segment, UpdateLogEntry logEntry) {
if (logEntry == null) {
return;
@@ -65,6 +78,12 @@ public class UpsertWaterMarkManager implements WaterMarkManager {
processVersionUpdate(table, partition, version);
}
+ /**
+ * update the high watermark information associated with the given table/partition
+ * @param table
+ * @param partition
+ * @param version
+ */
public void processVersionUpdate(String table, int partition, long version) {
Preconditions.checkState(partition >= 0, "logEntry has invalid version {} for table {}",
version, table);
@@ -76,6 +95,11 @@ public class UpsertWaterMarkManager implements WaterMarkManager {
}
}
+ /**
+ * return the highest watermark for each partition of the given table
+ * @param tableName
+ * @return
+ */
public Map<Integer, Long> getHighWaterMarkForTable(String tableName) {
return ImmutableMap.copyOf(_highWaterMarkTablePartitionMap.getOrDefault(tableName, ImmutableMap.of()));
}
diff --git a/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/broker/upsert/UpsertQueryRewriterImplTest.java b/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/broker/upsert/QueryRewriterImplTest.java
similarity index 99%
rename from pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/broker/upsert/UpsertQueryRewriterImplTest.java
rename to pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/broker/upsert/QueryRewriterImplTest.java
index ac6c38d..5e6b703 100644
--- a/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/broker/upsert/UpsertQueryRewriterImplTest.java
+++ b/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/broker/upsert/QueryRewriterImplTest.java
@@ -33,7 +33,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-public class UpsertQueryRewriterImplTest {
+public class QueryRewriterImplTest {
private LowWaterMarkService _lwms;
private UpsertQueryRewriterImpl rewriter;
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 710e22a..b06000f 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -221,7 +221,7 @@ public class HelixInstanceDataManager implements InstanceDataManager {
final TableDataManager tableDataManager = _tableDataManagerMap.get(tableNameWithType);
final DataManagerCallback dataManagerCallback = tableDataManager.getTableDataManagerCallback()
- .getDataManagerCallback(tableNameWithType, segmentName, schema, _serverMetrics, false);
+ .getImmutableDataManagerCallback(tableNameWithType, segmentName, schema, _serverMetrics);
// Load from index directory
ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(indexDir,
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainerProvider.java b/pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainerProvider.java
index 7ae27a6..ab76e36 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainerProvider.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainerProvider.java
@@ -18,10 +18,8 @@
*/
package org.apache.pinot.server.upsert;
-import com.google.common.base.Preconditions;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.pinot.core.segment.updater.UpsertComponentContainer;
-import org.apache.pinot.core.segment.updater.WaterMarkManager;
import org.apache.pinot.server.conf.ServerConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,8 +35,6 @@ public class UpsertComponentContainerProvider {
LOGGER.info("creating watermark manager with class {}", className);
try {
Class<UpsertComponentContainer> comonentContainerClass = (Class<UpsertComponentContainer>) Class.forName(className);
- Preconditions.checkState(comonentContainerClass.isAssignableFrom(WaterMarkManager.class),
- "configured class not assignable from Callback class");
_instance = comonentContainerClass.newInstance();
} catch (Exception e) {
LOGGER.error("failed to load watermark manager class", className, e);
diff --git a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
index 60702c4..aec1510 100644
--- a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
+++ b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
@@ -33,6 +33,7 @@ import org.apache.pinot.common.segment.ReadMode;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.TableDataManager;
+import org.apache.pinot.core.data.manager.config.InstanceDataManagerConfig;
import org.apache.pinot.core.data.manager.config.TableDataManagerConfig;
import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
import org.apache.pinot.core.data.manager.upsert.DefaultDataManagerCallbackImpl;
@@ -41,6 +42,7 @@ import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.core.segment.updater.DefaultWaterMarkManager;
import org.apache.pinot.segments.v1.creator.SegmentTestUtils;
import org.apache.pinot.server.starter.ServerInstance;
import org.apache.pinot.server.starter.helix.AdminApiApplication;
@@ -83,6 +85,7 @@ public abstract class BaseResourceTest {
// Mock the server instance
ServerInstance serverInstance = mock(ServerInstance.class);
+ when(serverInstance.getWatermarkManager()).thenReturn(new DefaultWaterMarkManager());
when(serverInstance.getInstanceDataManager()).thenReturn(instanceDataManager);
// Add the default table and segment
@@ -133,6 +136,7 @@ public abstract class BaseResourceTest {
when(tableDataManagerConfig.getTableDataManagerType()).thenReturn("OFFLINE");
when(tableDataManagerConfig.getTableName()).thenReturn(tableName);
when(tableDataManagerConfig.getDataDir()).thenReturn(FileUtils.getTempDirectoryPath());
+ TableDataManagerProvider.init(mock(InstanceDataManagerConfig.class));
TableDataManager tableDataManager = TableDataManagerProvider
.getTableDataManager(tableDataManagerConfig, "testInstance", mock(ZkHelixPropertyStore.class),
mock(ServerMetrics.class));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org