You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/03/23 19:27:39 UTC

[incubator-pinot] 09/09: update to fix unit tests

This is an automated email from the ASF dual-hosted git repository.

jamesshao pushed a commit to branch upsert-refactor
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 39f5ba8d4770ae0a1b1c23ef02965ff430c7b78b
Author: james Shao <sj...@uber.com>
AuthorDate: Thu Mar 19 14:34:46 2020 -0700

    update to fix unit tests
---
 .../requesthandler/BaseBrokerRequestHandler.java   |  2 +-
 .../broker/upsert/DefaultLowWaterMarkService.java  | 11 +++---
 .../broker/upsert/DefaultUpsertQueryRewriter.java  |  6 ++--
 .../broker/upsert/LowWaterMarkServiceProvider.java | 18 +++++++---
 .../core/data/manager/BaseTableDataManager.java    |  1 +
 .../core/data/manager/InstanceDataManager.java     |  5 ---
 .../realtime/LLRealtimeSegmentDataManager.java     |  5 ++-
 .../manager/realtime/RealtimeTableDataManager.java | 13 ++++----
 .../data/manager/upsert/DataManagerCallback.java   | 37 +++++++++++++++++++-
 .../upsert/DefaultIndexSegmentCallback.java        |  8 ++---
 .../DefaultTableDataManagerCallbackImpl.java       | 14 ++++++--
 .../data/manager/upsert/IndexSegmentCallback.java  | 31 +++++++++++++++++
 .../manager/upsert/TableDataManagerCallback.java   | 39 ++++++++++++++++++++--
 .../upsert/TableDataManagerCallbackProvider.java   | 24 +++++++++----
 .../segment/updater/DefaultWaterMarkManager.java   |  4 ++-
 .../core/segment/updater/LowWaterMarkService.java  | 39 +++++++++++++++-------
 ...UpsertQueryRewriter.java => QueryRewriter.java} | 20 +++++++----
 .../segment/updater/UpsertComponentContainer.java  | 34 +++++++++++++++++++
 .../core/segment/updater/WaterMarkManager.java     | 27 +++++++++++++--
 .../SegmentGenerationWithNullValueVectorTest.java  |  2 ++
 .../pinot/query/executor/QueryExecutorTest.java    |  2 ++
 .../upsert/PollingBasedLowWaterMarkService.java    |  6 ++--
 .../broker/upsert/UpsertQueryRewriterImpl.java     |  6 ++--
 .../upsert/UpsertTableDataManagerCallbackImpl.java | 18 ++++++++--
 .../segment/updater/UpsertWaterMarkManager.java    | 26 ++++++++++++++-
 ...terImplTest.java => QueryRewriterImplTest.java} |  2 +-
 .../starter/helix/HelixInstanceDataManager.java    |  2 +-
 .../upsert/UpsertComponentContainerProvider.java   |  4 ---
 .../apache/pinot/server/api/BaseResourceTest.java  |  4 +++
 29 files changed, 332 insertions(+), 78 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index cb17740..4b02cd4 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -300,7 +300,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
 
     if (shouldEnableLowWaterMarkRewrite(request)) {
       // Augment the realtime request with LowWaterMark constraints.
-      _lwmService.getQueryRewriter().rewriteQueryForUpsert(realtimeBrokerRequest, rawTableName);
+      _lwmService.getQueryRewriter().maybeRewriteQueryForUpsert(realtimeBrokerRequest, rawTableName);
     }
 
     // Calculate routing table for the query
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultLowWaterMarkService.java b/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultLowWaterMarkService.java
index 42e1dcb..eb71890 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultLowWaterMarkService.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultLowWaterMarkService.java
@@ -22,13 +22,16 @@ import com.google.common.collect.ImmutableMap;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.core.segment.updater.LowWaterMarkService;
-import org.apache.pinot.core.segment.updater.UpsertQueryRewriter;
+import org.apache.pinot.core.segment.updater.QueryRewriter;
 
 import java.util.Map;
 
+/**
+ * default class to handle any low watermark operation on pinot broker, mostly no-op
+ */
 public class DefaultLowWaterMarkService implements LowWaterMarkService {
 
-  private UpsertQueryRewriter upsertQueryRewriter = new DefaultUpsertQueryRewriter();
+  private QueryRewriter queryRewriter = new DefaultUpsertQueryRewriter();
 
   @Override
   public void init(HelixDataAccessor helixDataAccessor, String helixClusterName, int serverPollingInterval,
@@ -49,7 +52,7 @@ public class DefaultLowWaterMarkService implements LowWaterMarkService {
   }
 
   @Override
-  public UpsertQueryRewriter getQueryRewriter() {
-    return upsertQueryRewriter;
+  public QueryRewriter getQueryRewriter() {
+    return queryRewriter;
   }
 }
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultUpsertQueryRewriter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultUpsertQueryRewriter.java
index d18a56f..97195a3 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultUpsertQueryRewriter.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultUpsertQueryRewriter.java
@@ -1,7 +1,7 @@
 package org.apache.pinot.broker.upsert;
 
 import org.apache.pinot.common.request.BrokerRequest;
-import org.apache.pinot.core.segment.updater.UpsertQueryRewriter;
+import org.apache.pinot.core.segment.updater.QueryRewriter;
 
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -21,10 +21,10 @@ import org.apache.pinot.core.segment.updater.UpsertQueryRewriter;
  * specific language governing permissions and limitations
  * under the License.
  */
-public class DefaultUpsertQueryRewriter implements UpsertQueryRewriter {
+public class DefaultUpsertQueryRewriter implements QueryRewriter {
 
   @Override
-  public void rewriteQueryForUpsert(BrokerRequest request, String rawTableName) {
+  public void maybeRewriteQueryForUpsert(BrokerRequest request, String rawTableName) {
     // do nothing
   }
 }
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/LowWaterMarkServiceProvider.java b/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/LowWaterMarkServiceProvider.java
index f5c06f3..56b3bff 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/LowWaterMarkServiceProvider.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/LowWaterMarkServiceProvider.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.broker.upsert;
 
-import com.google.common.base.Preconditions;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.helix.HelixDataAccessor;
@@ -29,21 +28,28 @@ import org.slf4j.LoggerFactory;
 
 import static org.apache.pinot.common.utils.CommonConstants.Broker.*;
 
-
+/**
+ * provider to initialize LowWaterMarkServer for pinot broker
+ */
 public class LowWaterMarkServiceProvider {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(LowWaterMarkServiceProvider.class);
 
   private LowWaterMarkService _instance;
 
+  /**
+   * create a new provider instance
+   * @param brokerConfig config for this provider to create the actual class reference,
+   *                     refer to {@value CommonConstants.Broker#CONFIG_OF_BROKER_LWMS_CLASS_NAME}
+   * @param dataAccessor helix data access to help low watermark service to find proper server cluster
+   * @param clusterName cluster name for the current pinot cluster
+   */
   public LowWaterMarkServiceProvider(Configuration brokerConfig, HelixDataAccessor dataAccessor, String clusterName) {
     String className = brokerConfig.getString(CommonConstants.Broker.CONFIG_OF_BROKER_LWMS_CLASS_NAME,
         DefaultLowWaterMarkService.class.getName());
     LOGGER.info("creating watermark manager with class {}", className);
     try {
       Class<LowWaterMarkService> comonentContainerClass = (Class<LowWaterMarkService>) Class.forName(className);
-      Preconditions.checkState(comonentContainerClass.isAssignableFrom(LowWaterMarkService.class),
-          "configured class not assignable from LowWaterMarkService class");
       _instance = comonentContainerClass.newInstance();
       _instance.init(dataAccessor, clusterName,
           brokerConfig.getInt(CONFIG_OF_BROKER_POLLING_SERVER_LWMS_INTERVAL_MS,
@@ -57,6 +63,10 @@ public class LowWaterMarkServiceProvider {
     }
   }
 
+  /**
+   * fetch the current instance of low watermark service this provider created
+   * @return
+   */
   public LowWaterMarkService getInstance() {
     return _instance;
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index cb278b4..6c35b80 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -107,6 +107,7 @@ public abstract class BaseTableDataManager implements TableDataManager {
    * <p>The new segment is added with reference count of 1, so that is never removed until a drop command comes through.
    *
    * @param immutableSegment Immutable segment to add
+   * @param dataManagerCallback callback for performing any other necessary operation for other ingestion models
    */
   @Override
   public void addSegment(ImmutableSegment immutableSegment, DataManagerCallback dataManagerCallback) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
index eba0689..8f51bd9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
@@ -20,7 +20,6 @@ package org.apache.pinot.core.data.manager;
 
 import java.io.File;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
@@ -132,8 +131,4 @@ public interface InstanceDataManager {
    */
   ZkHelixPropertyStore<ZNRecord> getPropertyStore();
 
-//  /**
-//   * Return the mappings from partition -> low water marks of all the tables hosted in this server.
-//   */
-//  Map<String, Map<Integer, Long>> getLowWaterMarks();
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 4f3ae50..dc2f748 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -1056,7 +1056,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   public LLRealtimeSegmentDataManager(RealtimeSegmentZKMetadata segmentZKMetadata, TableConfig tableConfig,
       RealtimeTableDataManager realtimeTableDataManager, String resourceDataDir, IndexLoadingConfig indexLoadingConfig,
       Schema schema, LLCSegmentName llcSegmentName, Semaphore partitionConsumerSemaphore, ServerMetrics serverMetrics,
-      DataManagerCallback dataManagerCallback) {
+      DataManagerCallback dataManagerCallback) throws IOException {
     _segBuildSemaphore = realtimeTableDataManager.getSegmentBuildSemaphore();
     _segmentZKMetadata = (LLCRealtimeSegmentZKMetadata) segmentZKMetadata;
     _tableConfig = tableConfig;
@@ -1219,6 +1219,9 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
 
     _segmentCommitterFactory = new SegmentCommitterFactory(segmentLogger, _indexLoadingConfig, _protocolHandler);
 
+    // init virtual columns
+    _dataManagerCallback.initVirtualColumns();
+
     segmentLogger
         .info("Starting consumption on realtime consuming segment {} maxRowCount {} maxEndTime {}", _llcSegmentName,
             _segmentMaxRowCount, new DateTime(_consumeEndTime, DateTimeZone.UTC).toString());
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index bc5316d..fd42627 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -39,7 +39,6 @@ import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
 import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
 import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
-import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.utils.CommonConstants.Segment.Realtime.Status;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.common.utils.NamedThreadFactory;
@@ -229,11 +228,13 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
     // of the index directory and loading segment from it
     LoaderUtils.reloadFailureRecovery(indexDir);
 
+    // tell callback to add segment
+    _tableDataManagerCallback.addSegment(_tableNameWithType, segmentName, tableConfig);
+
     if (indexDir.exists() && (realtimeSegmentZKMetadata.getStatus() == Status.DONE)) {
       // Segment already exists on disk, and metadata has been committed. Treat it like an offline segment
-
-      DataManagerCallback callback = _tableDataManagerCallback
-          .getDataManagerCallback(_tableNameWithType, segmentName, schema, _serverMetrics, false);
+      final DataManagerCallback callback = _tableDataManagerCallback
+          .getImmutableDataManagerCallback(_tableNameWithType, segmentName, schema, _serverMetrics);
       addSegment(loadImmutableSegment(indexDir, indexLoadingConfig, schema, callback), callback);
     } else {
       // Either we don't have the segment on disk or we have not committed in ZK. We should be starting the consumer
@@ -267,7 +268,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
         manager = new LLRealtimeSegmentDataManager(realtimeSegmentZKMetadata, tableConfig,
             this, _indexDir.getAbsolutePath(), indexLoadingConfig, schema, llcSegmentName,
             _partitionIdToSemaphoreMap.get(streamPartitionId), _serverMetrics,
-            _tableDataManagerCallback.getDataManagerCallback(_tableNameWithType, segmentName, schema, _serverMetrics, true));
+            _tableDataManagerCallback.getMutableDataManagerCallback(_tableNameWithType, segmentName, schema, _serverMetrics));
       }
       _logger.info("Initialize RealtimeSegmentDataManager - " + segmentName);
       _segmentDataManagerMap.put(segmentName, manager);
@@ -321,7 +322,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
     try {
       File indexDir = new File(_indexDir, segmentName);
       DataManagerCallback dataManagerCallback = _tableDataManagerCallback
-          .getDataManagerCallback(_tableNameWithType, segmentName, schema, _serverMetrics, false);
+          .getImmutableDataManagerCallback(_tableNameWithType, segmentName, schema, _serverMetrics);
       addSegment(loadImmutableSegment(indexDir, indexLoadingConfig, schema, dataManagerCallback), dataManagerCallback);
     } catch (Exception e) {
       throw new RuntimeException(e);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DataManagerCallback.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DataManagerCallback.java
index 466553e..91b08d2 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DataManagerCallback.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DataManagerCallback.java
@@ -18,26 +18,61 @@
  */
 package org.apache.pinot.core.data.manager.upsert;
 
+import org.apache.pinot.core.data.manager.SegmentDataManager;
 import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntry;
-import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 
 import java.io.IOException;
 import java.util.List;
 
+/**
+ * component inject to {@link org.apache.pinot.core.data.manager.SegmentDataManager} for handling extra logics for
+ * other workflows other than regular append-mode ingestion. We are expected to provide appropriate link to class
+ * during run time
+ */
 public interface DataManagerCallback {
 
+  /**
+   * create a callback component for {@link org.apache.pinot.core.indexsegment.IndexSegment} when
+   * {@link org.apache.pinot.core.data.manager.SegmentDataManager} create one.
+   * @return callback associated with the internal index segment this data manager holds
+   */
   IndexSegmentCallback getIndexSegmentCallback();
 
+  /**
+   * process the row after transformation in the ingestion process
+   * @param row the row of newly ingested and transformed data from upstream
+   * @param offset the offset of this particular row
+   */
   void processTransformedRow(GenericRow row, long offset);
 
+  /**
+   * process the row after we have finished the index the current row
+   * @param row the row we just index to the current segment
+   * @param offset the offset associated with the index
+   */
   void postIndexProcessing(GenericRow row, long offset);
 
+  /**
+   * callback for when a realtime segment data manager done with the current consumption loop for all data associated
+   * with it
+   */
   void postConsumeLoop();
 
+  /**
+   * initialize all virtual columns for the current data manager associated with upsert component (if necessary)
+   * @throws IOException
+   */
   void initVirtualColumns() throws IOException;
 
+  /**
+   * update the data in the virtual columns from segment updater loop if necessary
+   * @param messages list of update log entries for the current datamanager
+   */
   void updateVirtualColumns(List<UpdateLogEntry> messages);
 
+  /**
+   * callback when the associated data manager is destroyed by pinot server in call {@link SegmentDataManager#destroy()}
+   */
   void destroy();
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DefaultIndexSegmentCallback.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DefaultIndexSegmentCallback.java
index 0299cb7..a0d9acf 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DefaultIndexSegmentCallback.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DefaultIndexSegmentCallback.java
@@ -27,6 +27,9 @@ import org.apache.pinot.spi.data.readers.GenericRow;
 
 import java.util.Map;
 
+/**
+ * no-op callback for non-upsert table/pinot server instance
+ */
 public class DefaultIndexSegmentCallback implements IndexSegmentCallback {
 
   public static final DefaultIndexSegmentCallback INSTANCE = new DefaultIndexSegmentCallback();
@@ -35,27 +38,22 @@ public class DefaultIndexSegmentCallback implements IndexSegmentCallback {
 
   @Override
   public void init(SegmentMetadata segmentMetadata, Map<String, DataFileReader> virtualColumnIndexReader) {
-    // do nothing
   }
 
   @Override
   public void initOffsetColumn(ColumnIndexContainer offsetColumnContainer) {
-    // do noting
   }
 
   @Override
   public void postProcessRecords(GenericRow row, int docId) {
-    // do nothing
   }
 
   @Override
   public void initVirtualColumn() {
-    // do nothing
   }
 
   @Override
   public void updateVirtualColumn(Iterable<UpdateLogEntry> logEntries) {
-    // do nothing
   }
 
   @Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DefaultTableDataManagerCallbackImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DefaultTableDataManagerCallbackImpl.java
index 2e56455..a0f4398 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DefaultTableDataManagerCallbackImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DefaultTableDataManagerCallbackImpl.java
@@ -22,6 +22,10 @@ import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.spi.data.Schema;
 
+/**
+ * Class for no-op callback for pinot cluster/table that don't support upsert
+ * We will also use this for pinot tables that don't configured to use upsert semantic
+ */
 public class DefaultTableDataManagerCallbackImpl implements TableDataManagerCallback {
 
   private static final DefaultDataManagerCallbackImpl DEFAULT_DM_CALLBACK = DefaultDataManagerCallbackImpl.INSTANCE;
@@ -35,8 +39,14 @@ public class DefaultTableDataManagerCallbackImpl implements TableDataManagerCall
   }
 
   @Override
-  public DataManagerCallback getDataManagerCallback(String tableName, String segmentName,
-      Schema schema, ServerMetrics serverMetrics, boolean isMutable) {
+  public DataManagerCallback getMutableDataManagerCallback(String tableName, String segmentName,
+      Schema schema, ServerMetrics serverMetrics) {
+    return DEFAULT_DM_CALLBACK;
+  }
+
+  @Override
+  public DataManagerCallback getImmutableDataManagerCallback(String tableName, String segmentName,
+      Schema schema, ServerMetrics serverMetrics) {
     return DEFAULT_DM_CALLBACK;
   }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/IndexSegmentCallback.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/IndexSegmentCallback.java
index dd198f9..8b3a14d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/IndexSegmentCallback.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/IndexSegmentCallback.java
@@ -27,17 +27,48 @@ import org.apache.pinot.spi.data.readers.GenericRow;
 import java.io.IOException;
 import java.util.Map;
 
+/**
+ * callback for handling any upsert-related operations in subclass of
+ * {@link org.apache.pinot.core.indexsegment.IndexSegment} if necessary
+ */
 public interface IndexSegmentCallback {
 
+  /**
+   * initialize the callback from {@link org.apache.pinot.core.indexsegment.IndexSegment}
+   * @param segmentMetadata the metadata associated with the curreng segment
+   * @param virtualColumnIndexReader
+   */
   void init(SegmentMetadata segmentMetadata, Map<String, DataFileReader> virtualColumnIndexReader);
 
+  /**
+   * initialize offset column for in-memory access
+   * @param offsetColumnContainer the column that stores the offset data
+   */
   void initOffsetColumn(ColumnIndexContainer offsetColumnContainer);
 
+  /**
+   * perform any operation from the callback for the given row after it has been processed and index
+   * @param row the current pinot row we just indexed into the current IndexSegment
+   * @param docId the docId of this record
+   */
   void postProcessRecords(GenericRow row, int docId);
 
+  /**
+   * initialize set of upsert-related virtual columns if necessary
+   * @throws IOException
+   */
   void initVirtualColumn() throws IOException;
 
+  /**
+   * update upsert-related virtual column from segment updater if necessary
+   * @param logEntries
+   */
   void updateVirtualColumn(Iterable<UpdateLogEntry> logEntries);
 
+  /**
+   * retrieve a information related to an upsert-enable segment virtual column for debug purpose
+   * @param offset the offset of the record we are trying to get the virtual columnn data for
+   * @return string representation of the virtual column data information
+   */
   String getVirtualColumnInfo(long offset);
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/TableDataManagerCallback.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/TableDataManagerCallback.java
index ec6a15f..7501961 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/TableDataManagerCallback.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/TableDataManagerCallback.java
@@ -20,16 +20,49 @@ package org.apache.pinot.core.data.manager.upsert;
 
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.spi.data.Schema;
 
+import java.io.File;
+
+/**
+ * component inject to {@link org.apache.pinot.core.data.manager.TableDataManager} for handling extra logics for
+ * other workflows other than regular append-mode ingestion. We are expected to provide appropriate link to class
+ * during run time
+ */
 public interface TableDataManagerCallback {
 
+  /**
+   * initialize the callback object during {@link org.apache.pinot.core.data.manager.TableDataManager#init}
+   * ensure any internal component for this callback is properly created during the start time
+   */
   void init();
 
-  void addSegment(String tableName, String segmentName, TableConfig tableConfig);
+  /**
+   * callback to ensure other components related to the callback are added when
+   * {@link org.apache.pinot.core.data.manager.TableDataManager#addSegment(File, IndexLoadingConfig)}
+   * is executed
+   */
+  void addSegment(String tableNameWithType, String segmentName, TableConfig tableConfig);
+
+  /**
+   * return a callback object for an Immutable segment data manager callback component when a table create a new
+   * immutable {@link org.apache.pinot.core.data.manager.SegmentDataManager}
+   */
+  DataManagerCallback getMutableDataManagerCallback(String tableNameWithType, String segmentName, Schema schema,
+      ServerMetrics serverMetrics);
 
-  DataManagerCallback getDataManagerCallback(String tableName, String segmentName, Schema schema,
-      ServerMetrics serverMetrics, boolean isMutable);
+  /**
+   * return a callback object for a mutable segment data manager callback component when a table create a new
+   * immutable {@link org.apache.pinot.core.data.manager.SegmentDataManager}
+   */
+  DataManagerCallback getImmutableDataManagerCallback(String tableNameWithType, String segmentName, Schema schema,
+      ServerMetrics serverMetrics);
 
+  /**
+   * create a no-op default callback for segmentDataManager that don't support upsert
+   * (eg, offline table, HLL consumers etc)
+   * @return a no-op default callback for data manager
+   */
   DataManagerCallback getDefaultDataManagerCallback();
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/TableDataManagerCallbackProvider.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/TableDataManagerCallbackProvider.java
index 2e868a2..1f08a90 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/TableDataManagerCallbackProvider.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/TableDataManagerCallbackProvider.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.core.data.manager.upsert;
 
-import com.google.common.base.Preconditions;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
@@ -27,6 +26,10 @@ import org.apache.pinot.core.data.manager.config.TableDataManagerConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * class for creating appropriate {@link TableDataManagerCallback} depends on the config
+ * allow upsert-enabled pinot server to inject proper logics while keeping append-only pinot server keep the same
+ */
 public class TableDataManagerCallbackProvider {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(TableDataManagerCallbackProvider.class);
@@ -38,6 +41,12 @@ public class TableDataManagerCallbackProvider {
   public static final String DEFAULT_CALLBACK_CLASS_CONFIG_KEY = "append.tableDataManager.callback";
   public static final String CALLBACK_CLASS_CONFIG_DEFAULT = DefaultTableDataManagerCallbackImpl.class.getName();
 
+  /**
+   * initialize table data manager callback provider
+   * the most information config will be {@value UPSERT_CALLBACK_CLASS_CONFIG_KEY} for creating the proper
+   * callback injection for upsert pinot server
+   * @param configuration
+   */
   public TableDataManagerCallbackProvider(Configuration configuration) {
     String appendClassName = configuration.getString(DEFAULT_CALLBACK_CLASS_CONFIG_KEY, CALLBACK_CLASS_CONFIG_DEFAULT);
     String upsertClassName = configuration.getString(UPSERT_CALLBACK_CLASS_CONFIG_KEY);
@@ -47,8 +56,6 @@ public class TableDataManagerCallbackProvider {
       LOGGER.error("failed to load table data manager class {}", appendClassName, e);
       ExceptionUtils.rethrow(e);
     }
-    Preconditions.checkState(defaultTableDataManagerCallBackClass.isAssignableFrom(TableDataManagerCallback.class),
-        "configured class not assignable from Callback class", defaultTableDataManagerCallBackClass);
     if (StringUtils.isNotEmpty(upsertClassName)) {
       try {
         upsertTableDataManagerCallBackClass = (Class<TableDataManagerCallback>) Class.forName(upsertClassName);
@@ -56,11 +63,13 @@ public class TableDataManagerCallbackProvider {
         LOGGER.error("failed to load table data manager class {}", upsertClassName);
         ExceptionUtils.rethrow(e);
       }
-      Preconditions.checkState(upsertTableDataManagerCallBackClass.isAssignableFrom(TableDataManagerCallback.class),
-          "configured class not assignable from Callback class");
     }
   }
 
+  /**
+   * create a proper callback for the table, depends on whether the table is configured for upsert or not
+   * @param tableDataManagerConfig the config for the table
+   */
   public TableDataManagerCallback getTableDataManagerCallback(TableDataManagerConfig tableDataManagerConfig) {
     if (tableDataManagerConfig.getUpdateSemantic() == CommonConstants.UpdateSemantic.UPSERT) {
       return getUpsertTableDataManagerCallback();
@@ -69,7 +78,7 @@ public class TableDataManagerCallbackProvider {
     }
   }
 
-  public TableDataManagerCallback getUpsertTableDataManagerCallback() {
+  private TableDataManagerCallback getUpsertTableDataManagerCallback() {
     try {
       return upsertTableDataManagerCallBackClass.newInstance();
     } catch (Exception ex) {
@@ -79,6 +88,9 @@ public class TableDataManagerCallbackProvider {
     return null;
   }
 
+  /**
+   * create a tabledatamanager for a non-upsert enabled tables, ensure to use the original pinot workflow
+   */
   public TableDataManagerCallback getDefaultTableDataManagerCallback() {
     try {
       return defaultTableDataManagerCallBackClass.newInstance();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/DefaultWaterMarkManager.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/DefaultWaterMarkManager.java
index b45060c..7c53467 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/DefaultWaterMarkManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/DefaultWaterMarkManager.java
@@ -24,13 +24,15 @@ import org.apache.pinot.grigio.common.metrics.GrigioMetrics;
 
 import java.util.Map;
 
+/**
+ * default no-op watermark manager for pinot
+ */
 public class DefaultWaterMarkManager implements WaterMarkManager {
 
   private static final Map<String, Map<Integer, Long>> DEFAULT_MAP = ImmutableMap.of();
 
   @Override
   public void init(Configuration config, GrigioMetrics metrics) {
-
   }
 
   @Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/LowWaterMarkService.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/LowWaterMarkService.java
index 0c6756c..e9cc5f9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/LowWaterMarkService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/LowWaterMarkService.java
@@ -26,19 +26,34 @@ import java.util.Map;
 /**
  * LowWaterMarkService keeps records of the low water mark (i.e., the stream ingestion progress) for each partition of
  * an input table.
+ * It runs on pinot broker to fetch lwm information from pinot server periodically
+ * and use that to rewrite pinot query periodically
  */
 public interface LowWaterMarkService {
 
-    void init(HelixDataAccessor helixDataAccessor, String helixClusterName, int serverPollingInterval, int serverPort);
-
-    // Return the low water mark mapping from partition id to the corresponding low water mark of a given table.
-    Map<Integer, Long> getLowWaterMarks(String tableName);
-
-    // Shutdown the service.
-    void shutDown();
-
-    // start
-    void start(BrokerMetrics brokerMetrics);
-
-    UpsertQueryRewriter getQueryRewriter();
+  void init(HelixDataAccessor helixDataAccessor, String helixClusterName, int serverPollingInterval, int serverPort);
+
+  /**
+   * the low water mark mapping from partition id to the corresponding low water mark of a given table.
+   * @param tableNameWithType
+   * @return map of partition to lowWatermark
+   */
+  Map<Integer, Long> getLowWaterMarks(String tableNameWithType);
+
+  /**
+   * shutdown low water mark service and its background threads (if any)
+   */
+  void shutDown();
+
+  /**
+   * start the current low watermark service
+   * @param brokerMetrics pinot broker metrics for lwm service to report its status to
+   */
+  void start(BrokerMetrics brokerMetrics);
+
+  /**
+   * get a queryrewriter to ensure that we can rewrite a query if the target table is upsert-enabled table
+   * @return
+   */
+  QueryRewriter getQueryRewriter();
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertQueryRewriter.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/QueryRewriter.java
similarity index 71%
rename from pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertQueryRewriter.java
rename to pinot-core/src/main/java/org/apache/pinot/core/segment/updater/QueryRewriter.java
index 64a64d0..40abd9a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertQueryRewriter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/QueryRewriter.java
@@ -1,7 +1,3 @@
-package org.apache.pinot.core.segment.updater;
-
-import org.apache.pinot.common.request.BrokerRequest;
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -20,8 +16,20 @@ import org.apache.pinot.common.request.BrokerRequest;
  * specific language governing permissions and limitations
  * under the License.
  */
-public interface UpsertQueryRewriter {
+package org.apache.pinot.core.segment.updater;
+
+import org.apache.pinot.common.request.BrokerRequest;
+
+/**
+ * class that rewrite pinot broker sql for upsert or other purpose
+ */
+public interface QueryRewriter {
 
-  void rewriteQueryForUpsert(BrokerRequest request, String rawTableName);
+  /**
+   * rewrite the query for pinot upsert table if necessary
+   * @param request the pinot sql request that pinot broker requests
+   * @param rawTableName the raw
+   */
+  void maybeRewriteQueryForUpsert(BrokerRequest request, String rawTableName);
 
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertComponentContainer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertComponentContainer.java
index eb64285..f1312e7 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertComponentContainer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertComponentContainer.java
@@ -22,21 +22,55 @@ import com.yammer.metrics.core.MetricsRegistry;
 import org.apache.commons.configuration.Configuration;
 import org.apache.helix.HelixManager;
 
+/**
+ * contains all components related to upsert in pinot server
+ */
 public interface UpsertComponentContainer {
 
+  /**
+   * register pinot upsert component metrics to the given registry
+   * @param prefix the prefix of all metrics
+   * @param registry the registry we are going to register the metrics to
+   */
   void registerMetrics(String prefix, MetricsRegistry registry);
 
+  /**
+   * initialize the upsert comonent container with necessary config and information
+   * @param config the configuration for this upsert component
+   * @param helixManager helix manager for the current pinot server helix state
+   * @param clusterName helix cluster name for the current pinot cluster
+   * @param instanceName the name of current pinot instance in this cluster
+   */
   void init(Configuration config, HelixManager helixManager, String clusterName, String instanceName);
 
+  /**
+   * start any necessary background processing for this upsert component
+   */
   void startBackgroundThread();
 
+  /**
+   * stop any necessary background processing for this upsert component
+   */
   void stopBackgroundThread();
 
+  /**
+   * shutdown and clean up any state for this upsert component
+   */
   void shutdown();
 
+  /**
+   * return a segment deletion callback component that should be invoked when pinot server removed a segment
+   * from its internal storage (to DROPPED state)
+   */
   SegmentDeletionHandler getSegmentDeletionHandler();
 
+  /**
+   * return the current watermark manager for this server
+   */
   WaterMarkManager getWatermarkManager();
 
+  /**
+   * check if upsert is enable for the current pinot server
+   */
   boolean isUpsertEnabled();
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WaterMarkManager.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WaterMarkManager.java
index acc8479..63ba5c9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WaterMarkManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WaterMarkManager.java
@@ -19,16 +19,37 @@
 package org.apache.pinot.core.segment.updater;
 
 import org.apache.commons.configuration.Configuration;
-import org.apache.pinot.grigio.common.metrics.GrigioMeter;
 import org.apache.pinot.grigio.common.metrics.GrigioMetrics;
 
 import java.util.Map;
 
+/**
+ * class run on pinot server to keep track of the low-water-mark of each upsert table
+ * organized by partition
+ */
 public interface WaterMarkManager {
 
+  /**
+   * initialize watermark manager
+   * @param config the configuration subset for waterMarkManager
+   * @param metrics the metrics for watermark manager
+   */
   void init(Configuration config, GrigioMetrics metrics);
 
+  /**
+   * the highest epoch for each partition of each table in this pinot server
+   * @return mapping of {pinot_table_name: {partition_id: high_water_mark}}
+   * example as {
+   *     "table1_REALTIME" : {
+   *       "0" : 1400982,
+   *       "1" : 1400982,
+   *       "2" : 1400982,
+   *       "3" : 1400982
+   *     },
+   *     "table2_REALTIME" : {
+   *       "0" : 1401008,
+   *       "1" : 1401008
+   *     }
+   */
   Map<String, Map<Integer, Long>> getHighWaterMarkTablePartitionMap();
-
-
 }
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithNullValueVectorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithNullValueVectorTest.java
index 4d1d469..a56f897 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithNullValueVectorTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithNullValueVectorTest.java
@@ -40,6 +40,7 @@ import org.apache.pinot.common.request.InstanceRequest;
 import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.TableDataManager;
+import org.apache.pinot.core.data.manager.config.InstanceDataManagerConfig;
 import org.apache.pinot.core.data.manager.config.TableDataManagerConfig;
 import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
 import org.apache.pinot.core.data.manager.upsert.DefaultDataManagerCallbackImpl;
@@ -137,6 +138,7 @@ public class SegmentGenerationWithNullValueVectorTest {
     when(tableDataManagerConfig.getTableDataManagerType()).thenReturn("OFFLINE");
     when(tableDataManagerConfig.getTableName()).thenReturn(TABLE_NAME);
     when(tableDataManagerConfig.getDataDir()).thenReturn(FileUtils.getTempDirectoryPath());
+    TableDataManagerProvider.init(mock(InstanceDataManagerConfig.class));
     @SuppressWarnings("unchecked")
     TableDataManager tableDataManager = TableDataManagerProvider
         .getTableDataManager(tableDataManagerConfig, "testInstance", mock(ZkHelixPropertyStore.class),
diff --git a/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java b/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java
index 203d2f6..2eda11f 100644
--- a/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java
@@ -34,6 +34,7 @@ import org.apache.pinot.common.segment.ReadMode;
 import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.TableDataManager;
+import org.apache.pinot.core.data.manager.config.InstanceDataManagerConfig;
 import org.apache.pinot.core.data.manager.config.TableDataManagerConfig;
 import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
 import org.apache.pinot.core.data.manager.upsert.DefaultDataManagerCallbackImpl;
@@ -98,6 +99,7 @@ public class QueryExecutorTest {
     when(tableDataManagerConfig.getTableDataManagerType()).thenReturn("OFFLINE");
     when(tableDataManagerConfig.getTableName()).thenReturn(TABLE_NAME);
     when(tableDataManagerConfig.getDataDir()).thenReturn(FileUtils.getTempDirectoryPath());
+    TableDataManagerProvider.init(mock(InstanceDataManagerConfig.class));
     @SuppressWarnings("unchecked")
     TableDataManager tableDataManager = TableDataManagerProvider
         .getTableDataManager(tableDataManagerConfig, "testInstance", mock(ZkHelixPropertyStore.class),
diff --git a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/PollingBasedLowWaterMarkService.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/PollingBasedLowWaterMarkService.java
index 74e4b41..c92d53e 100644
--- a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/PollingBasedLowWaterMarkService.java
+++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/PollingBasedLowWaterMarkService.java
@@ -30,7 +30,7 @@ import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.restlet.resources.TableLowWaterMarksInfo;
 import org.apache.pinot.core.segment.updater.LowWaterMarkService;
-import org.apache.pinot.core.segment.updater.UpsertQueryRewriter;
+import org.apache.pinot.core.segment.updater.QueryRewriter;
 import org.glassfish.jersey.client.ClientProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -67,7 +67,7 @@ public class PollingBasedLowWaterMarkService implements LowWaterMarkService {
   private int _serverPort;
   private boolean _shuttingDown;
   private BrokerMetrics _brokerMetrics;
-  private UpsertQueryRewriter _queryRewriter;
+  private QueryRewriter _queryRewriter;
 
   @Override
   public void init(HelixDataAccessor helixDataAccessor, String helixClusterName, int serverPollingInterval, int serverPort) {
@@ -104,7 +104,7 @@ public class PollingBasedLowWaterMarkService implements LowWaterMarkService {
   }
 
   @Override
-  public UpsertQueryRewriter getQueryRewriter() {
+  public QueryRewriter getQueryRewriter() {
     return _queryRewriter;
   }
 
diff --git a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/UpsertQueryRewriterImpl.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/UpsertQueryRewriterImpl.java
index 90fce7c..a9cd1f0 100644
--- a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/UpsertQueryRewriterImpl.java
+++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/UpsertQueryRewriterImpl.java
@@ -25,7 +25,7 @@ import org.apache.pinot.common.request.FilterQuery;
 import org.apache.pinot.common.request.FilterQueryMap;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.core.segment.updater.LowWaterMarkService;
-import org.apache.pinot.core.segment.updater.UpsertQueryRewriter;
+import org.apache.pinot.core.segment.updater.QueryRewriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,7 +34,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-public class UpsertQueryRewriterImpl implements UpsertQueryRewriter {
+public class UpsertQueryRewriterImpl implements QueryRewriter {
   private static final Logger LOGGER = LoggerFactory.getLogger(UpsertQueryRewriterImpl.class);
 
   protected final LowWaterMarkService _lwmService;
@@ -48,7 +48,7 @@ public class UpsertQueryRewriterImpl implements UpsertQueryRewriter {
   }
 
   @Override
-  public void rewriteQueryForUpsert(BrokerRequest request, String rawTableName) {
+  public void maybeRewriteQueryForUpsert(BrokerRequest request, String rawTableName) {
     final String realtimeTableName = TableNameBuilder.ensureTableNameWithType(rawTableName,
         CommonConstants.Helix.TableType.REALTIME);
     Map<Integer, Long> lowWaterMarks = _lwmService.getLowWaterMarks(realtimeTableName);
diff --git a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertTableDataManagerCallbackImpl.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertTableDataManagerCallbackImpl.java
index dc329e4..efbbf00 100644
--- a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertTableDataManagerCallbackImpl.java
+++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertTableDataManagerCallbackImpl.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.core.data.manager.upsert;
 
-
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.pinot.common.config.TableConfig;
@@ -30,6 +29,10 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
+/**
+ * class for handle all upsert related operation for interacting with segments for a given table at
+ * {@link org.apache.pinot.core.data.manager.TableDataManager}
+ */
 public class UpsertTableDataManagerCallbackImpl implements TableDataManagerCallback {
   private static final Logger LOGGER = LoggerFactory.getLogger(UpsertTableDataManagerCallbackImpl.class);
 
@@ -51,7 +54,18 @@ public class UpsertTableDataManagerCallbackImpl implements TableDataManagerCallb
   }
 
   @Override
-  public DataManagerCallback getDataManagerCallback(String tableName, String segmentName, Schema schema,
+  public DataManagerCallback getMutableDataManagerCallback(String tableNameWithType, String segmentName, Schema schema,
+      ServerMetrics serverMetrics) {
+    return getDataManagerCallback(tableNameWithType, segmentName, schema, serverMetrics, true);
+  }
+
+  @Override
+  public DataManagerCallback getImmutableDataManagerCallback(String tableNameWithType, String segmentName,
+      Schema schema, ServerMetrics serverMetrics) {
+    return getDataManagerCallback(tableNameWithType, segmentName, schema, serverMetrics, false);
+  }
+
+  private DataManagerCallback getDataManagerCallback(String tableName, String segmentName, Schema schema,
       ServerMetrics serverMetrics, boolean isMutable) {
     return new UpsertDataManagerCallbackImpl(tableName, segmentName, schema, serverMetrics, isMutable);
   }
diff --git a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/UpsertWaterMarkManager.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/UpsertWaterMarkManager.java
index 266f25d2..4b3d861 100644
--- a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/UpsertWaterMarkManager.java
+++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/UpsertWaterMarkManager.java
@@ -31,6 +31,14 @@ import org.slf4j.LoggerFactory;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+/**
+ * watermark manager for upsert component to collect the low-water-mark information of each tables in the current
+ * pinot server
+ * watermark is defined as largest version of each partition (segment update event topic partition) for each table
+ * so it stores the data in map of {table_name: {partition_id: highest_water_mark}}
+ * then {@link LowWaterMarkService} will ingest those information from pinot server and calculate the lowest of these
+ * watermark and use it in query to send to server
+ */
 public class UpsertWaterMarkManager implements WaterMarkManager {
 
   private final Map<String, Map<Integer, Long>> _highWaterMarkTablePartitionMap = new ConcurrentHashMap<>();
@@ -55,7 +63,12 @@ public class UpsertWaterMarkManager implements WaterMarkManager {
     return _instance;
   }
 
-  // TODO(tingchen) Look into the case where Segment Update Messages might arrive before the corresponding physical data.
+  /**
+   * process a event message and update the current watermark information for this manager
+   * @param table
+   * @param segment
+   * @param logEntry the message containing the new watermark information
+   */
   public void processMessage(String table, String segment, UpdateLogEntry logEntry) {
     if (logEntry == null) {
       return;
@@ -65,6 +78,12 @@ public class UpsertWaterMarkManager implements WaterMarkManager {
     processVersionUpdate(table, partition, version);
   }
 
+  /**
+   * update the high watermark information associated with the given table/partition
+   * @param table
+   * @param partition
+   * @param version
+   */
   public void processVersionUpdate(String table, int partition, long version) {
     Preconditions.checkState(partition >= 0, "logEntry has invalid version {} for table {}",
         version, table);
@@ -76,6 +95,11 @@ public class UpsertWaterMarkManager implements WaterMarkManager {
     }
   }
 
+  /**
+   * return the highest watermark for each partition of the given table
+   * @param tableName
+   * @return
+   */
   public Map<Integer, Long> getHighWaterMarkForTable(String tableName) {
     return ImmutableMap.copyOf(_highWaterMarkTablePartitionMap.getOrDefault(tableName, ImmutableMap.of()));
   }
diff --git a/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/broker/upsert/UpsertQueryRewriterImplTest.java b/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/broker/upsert/QueryRewriterImplTest.java
similarity index 99%
rename from pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/broker/upsert/UpsertQueryRewriterImplTest.java
rename to pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/broker/upsert/QueryRewriterImplTest.java
index ac6c38d..5e6b703 100644
--- a/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/broker/upsert/UpsertQueryRewriterImplTest.java
+++ b/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/broker/upsert/QueryRewriterImplTest.java
@@ -33,7 +33,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
-public class UpsertQueryRewriterImplTest {
+public class QueryRewriterImplTest {
 
     private LowWaterMarkService _lwms;
     private UpsertQueryRewriterImpl rewriter;
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 710e22a..b06000f 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -221,7 +221,7 @@ public class HelixInstanceDataManager implements InstanceDataManager {
 
       final TableDataManager tableDataManager = _tableDataManagerMap.get(tableNameWithType);
       final DataManagerCallback dataManagerCallback = tableDataManager.getTableDataManagerCallback()
-          .getDataManagerCallback(tableNameWithType, segmentName, schema, _serverMetrics, false);
+          .getImmutableDataManagerCallback(tableNameWithType, segmentName, schema, _serverMetrics);
 
       // Load from index directory
       ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(indexDir,
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainerProvider.java b/pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainerProvider.java
index 7ae27a6..ab76e36 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainerProvider.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainerProvider.java
@@ -18,10 +18,8 @@
  */
 package org.apache.pinot.server.upsert;
 
-import com.google.common.base.Preconditions;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.pinot.core.segment.updater.UpsertComponentContainer;
-import org.apache.pinot.core.segment.updater.WaterMarkManager;
 import org.apache.pinot.server.conf.ServerConf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,8 +35,6 @@ public class UpsertComponentContainerProvider {
     LOGGER.info("creating watermark manager with class {}", className);
     try {
       Class<UpsertComponentContainer> comonentContainerClass = (Class<UpsertComponentContainer>) Class.forName(className);
-      Preconditions.checkState(comonentContainerClass.isAssignableFrom(WaterMarkManager.class),
-          "configured class not assignable from Callback class");
       _instance = comonentContainerClass.newInstance();
     } catch (Exception e) {
       LOGGER.error("failed to load watermark manager class", className, e);
diff --git a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
index 60702c4..aec1510 100644
--- a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
+++ b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
@@ -33,6 +33,7 @@ import org.apache.pinot.common.segment.ReadMode;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.TableDataManager;
+import org.apache.pinot.core.data.manager.config.InstanceDataManagerConfig;
 import org.apache.pinot.core.data.manager.config.TableDataManagerConfig;
 import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
 import org.apache.pinot.core.data.manager.upsert.DefaultDataManagerCallbackImpl;
@@ -41,6 +42,7 @@ import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
 import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
 import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.core.segment.updater.DefaultWaterMarkManager;
 import org.apache.pinot.segments.v1.creator.SegmentTestUtils;
 import org.apache.pinot.server.starter.ServerInstance;
 import org.apache.pinot.server.starter.helix.AdminApiApplication;
@@ -83,6 +85,7 @@ public abstract class BaseResourceTest {
 
     // Mock the server instance
     ServerInstance serverInstance = mock(ServerInstance.class);
+    when(serverInstance.getWatermarkManager()).thenReturn(new DefaultWaterMarkManager());
     when(serverInstance.getInstanceDataManager()).thenReturn(instanceDataManager);
 
     // Add the default table and segment
@@ -133,6 +136,7 @@ public abstract class BaseResourceTest {
     when(tableDataManagerConfig.getTableDataManagerType()).thenReturn("OFFLINE");
     when(tableDataManagerConfig.getTableName()).thenReturn(tableName);
     when(tableDataManagerConfig.getDataDir()).thenReturn(FileUtils.getTempDirectoryPath());
+    TableDataManagerProvider.init(mock(InstanceDataManagerConfig.class));
     TableDataManager tableDataManager = TableDataManagerProvider
         .getTableDataManager(tableDataManagerConfig, "testInstance", mock(ZkHelixPropertyStore.class),
             mock(ServerMetrics.class));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org