You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/03/18 18:04:18 UTC

[incubator-pinot] 03/05: temp update for adding bunch of stuff

This is an automated email from the ASF dual-hosted git repository.

jamesshao pushed a commit to branch upsert-refactor
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 92bba12ca1bf14207e8eec2b46ee1c7cc50946a4
Author: james Shao <sj...@uber.com>
AuthorDate: Mon Mar 16 11:58:46 2020 -0700

    temp update for adding bunch of stuff
---
 .../core/data/manager/BaseTableDataManager.java    |  10 +-
 .../core/data/manager/InstanceDataManager.java     |   8 +-
 .../pinot/core/data/manager/TableDataManager.java  |  10 +-
 .../offline/ImmutableSegmentDataManager.java       |  17 +++
 .../manager/offline/OfflineTableDataManager.java   |  18 ++-
 .../manager/offline/TableDataManagerProvider.java  |  26 +++--
 .../offline/UpsertImmutableSegmentDataManager.java |  67 -----------
 .../AppendLLRealtimeSegmentDataManager.java        |  58 ---------
 .../realtime/AppendRealtimeTableDataManager.java   |  55 ---------
 .../realtime/HLRealtimeSegmentDataManager.java     |   5 +-
 .../realtime/LLRealtimeSegmentDataManager.java     |  26 +++--
 .../manager/realtime/RealtimeTableDataManager.java |  48 +++++---
 .../realtime/UpsertRealtimeTableDataManager.java   |  82 -------------
 .../manager/upsert/DataManagerCallback.java}       |  32 ++---
 .../upsert/DefaultDataManagerCallbackImpl.java     |  71 +++++++++++
 .../upsert/DefaultIndexSegmentCallback.java        |  65 +++++++++++
 .../DefaultTableDataManagerCallbackImpl.java}      |  36 +++---
 .../manager/upsert/IndexSegmentCallback.java}      |  32 ++---
 .../manager/upsert/TableDataManagerCallback.java}  |  20 +++-
 .../upsert/TableDataManagerCallbackProvider.java   |  91 +++++++++++++++
 .../immutable/ImmutableSegmentImpl.java            |  32 +++--
 .../immutable/ImmutableSegmentLoader.java          |  26 ++---
 .../indexsegment/mutable/MutableSegmentImpl.java   |  24 ++--
 ...entUpdaterConfig.java => WatermarkManager.java} |  16 ++-
 .../data/manager/BaseTableDataManagerTest.java     |  20 ++--
 .../realtime/LLRealtimeSegmentDataManagerTest.java |   5 +-
 .../mutable/MutableSegmentImplTestUtils.java       |   3 +-
 .../SegmentGenerationWithNullValueVectorTest.java  |   3 +-
 .../core/segment/index/loader/LoaderTest.java      |  10 +-
 .../pinot/queries/SerializedBytesQueriesTest.java  |   2 +
 .../pinot/query/executor/QueryExecutorTest.java    |   3 +-
 pinot-grigio/{ => pinot-grigio-provided}/pom.xml   |  49 +++++---
 .../upsert/UpsertDataManagerCallbackImpl.java      | 106 +++++++++--------
 .../UpsertImmutableIndexSegmentCallback.java       | 130 ++++++++++-----------
 .../upsert/UpsertMutableIndexSegmentCallback.java  |  55 +++++----
 .../upsert/UpsertTableDataManagerCallbackImpl.java |  63 ++++++++++
 .../pinot/core/segment/updater/SegmentUpdater.java |  28 ++---
 .../core/segment/updater/SegmentUpdaterConfig.java |   0
 .../segment/updater/UpsertWatermarkManager.java    |  20 ++--
 .../UpsertImmutableIndexSegmentCallbackTest.java   |  17 ++-
 pinot-grigio/pom.xml                               |   1 +
 .../starter/helix/HelixInstanceDataManager.java    |  16 ++-
 .../server/starter/helix/HelixServerStarter.java   |   2 +
 .../SegmentOnlineOfflineStateModelFactory.java     |   9 ++
 .../upsert/DefaultUpsertComponentContainer.java    |  34 ++++--
 .../server/upsert/SegmentDeletionHandler.java      |  55 +++++++++
 .../server/upsert/UpsertComponentContainer.java    |  20 +++-
 .../upsert/UpsertComponentContainerProvider.java   |  58 +++++++++
 .../apache/pinot/server/api/BaseResourceTest.java  |   3 +-
 .../realtime/provisioning/MemoryEstimator.java     |   9 +-
 50 files changed, 961 insertions(+), 635 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index a39bdfc..cb278b4 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -33,6 +33,7 @@ import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.core.data.manager.config.TableDataManagerConfig;
 import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
+import org.apache.pinot.core.data.manager.upsert.DataManagerCallback;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
 import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
 import org.slf4j.Logger;
@@ -108,14 +109,14 @@ public abstract class BaseTableDataManager implements TableDataManager {
    * @param immutableSegment Immutable segment to add
    */
   @Override
-  public void addSegment(ImmutableSegment immutableSegment) {
+  public void addSegment(ImmutableSegment immutableSegment, DataManagerCallback dataManagerCallback) {
     String segmentName = immutableSegment.getSegmentName();
     _logger.info("Adding immutable segment: {} to table: {}", segmentName, _tableNameWithType);
     _serverMetrics.addValueToTableGauge(_tableNameWithType, ServerGauge.DOCUMENT_COUNT,
         immutableSegment.getSegmentMetadata().getTotalRawDocs());
     _serverMetrics.addValueToTableGauge(_tableNameWithType, ServerGauge.SEGMENT_COUNT, 1L);
 
-    ImmutableSegmentDataManager newSegmentManager = getImmutableSegmentDataManager(immutableSegment);
+    ImmutableSegmentDataManager newSegmentManager = getImmutableSegmentDataManager(immutableSegment, dataManagerCallback);
     SegmentDataManager oldSegmentManager = _segmentDataManagerMap.put(segmentName, newSegmentManager);
     if (oldSegmentManager == null) {
       _logger.info("Added new immutable segment: {} to table: {}", segmentName, _tableNameWithType);
@@ -212,7 +213,8 @@ public abstract class BaseTableDataManager implements TableDataManager {
     return _tableNameWithType;
   }
 
-  protected ImmutableSegmentDataManager getImmutableSegmentDataManager(ImmutableSegment immutableSegment) {
-    return new ImmutableSegmentDataManager(immutableSegment);
+  protected ImmutableSegmentDataManager getImmutableSegmentDataManager(ImmutableSegment immutableSegment,
+      DataManagerCallback callback) {
+    return new ImmutableSegmentDataManager(immutableSegment, callback);
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
index 175f5a7..eba0689 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
@@ -132,8 +132,8 @@ public interface InstanceDataManager {
    */
   ZkHelixPropertyStore<ZNRecord> getPropertyStore();
 
-  /**
-   * Return the mappings from partition -> low water marks of all the tables hosted in this server.
-   */
-  Map<String, Map<Integer, Long>> getLowWaterMarks();
+//  /**
+//   * Return the mappings from partition -> low water marks of all the tables hosted in this server.
+//   */
+//  Map<String, Map<Integer, Long>> getLowWaterMarks();
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/TableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/TableDataManager.java
index 7b1c79c..46aa745 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/TableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/TableDataManager.java
@@ -27,6 +27,8 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.core.data.manager.config.TableDataManagerConfig;
+import org.apache.pinot.core.data.manager.upsert.DataManagerCallback;
+import org.apache.pinot.core.data.manager.upsert.TableDataManagerCallback;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
 import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
 
@@ -58,7 +60,7 @@ public interface TableDataManager {
   /**
    * Adds a loaded immutable segment into the table.
    */
-  void addSegment(ImmutableSegment immutableSegment);
+  void addSegment(ImmutableSegment immutableSegment, DataManagerCallback dataManagerCallback);
 
   /**
    * Adds a segment from local disk into the OFFLINE table.
@@ -116,4 +118,10 @@ public interface TableDataManager {
    * Returns the table name managed by this instance.
    */
   String getTableName();
+
+  /**
+   * get the callback object for this table, either no-op object for the regular table or upsert callback for
+   * upsert-enabled table
+   */
+  TableDataManagerCallback getTableDataManagerCallback();
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/ImmutableSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/ImmutableSegmentDataManager.java
index 0401f61..949affe 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/ImmutableSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/ImmutableSegmentDataManager.java
@@ -18,9 +18,14 @@
  */
 package org.apache.pinot.core.data.manager.offline;
 
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.pinot.core.data.manager.SegmentDataManager;
+import org.apache.pinot.core.data.manager.upsert.DataManagerCallback;
+import org.apache.pinot.core.data.manager.upsert.DefaultDataManagerCallbackImpl;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
 
+import java.io.IOException;
+
 
 /**
  * Segment data manager for immutable segment.
@@ -28,9 +33,20 @@ import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
 public class ImmutableSegmentDataManager extends SegmentDataManager {
 
   protected final ImmutableSegment _immutableSegment;
+  protected final DataManagerCallback _dataManagerCallback;
 
   public ImmutableSegmentDataManager(ImmutableSegment immutableSegment) {
+    this(immutableSegment, DefaultDataManagerCallbackImpl.INSTANCE);
+  }
+
+  public ImmutableSegmentDataManager(ImmutableSegment immutableSegment, DataManagerCallback dataManagerCallback) {
     _immutableSegment = immutableSegment;
+    _dataManagerCallback = dataManagerCallback;
+    try {
+      _dataManagerCallback.initVirtualColumns();
+    } catch (IOException ex) {
+      ExceptionUtils.rethrow(ex);
+    }
   }
 
   @Override
@@ -45,6 +61,7 @@ public class ImmutableSegmentDataManager extends SegmentDataManager {
 
   @Override
   public void destroy() {
+    _dataManagerCallback.destroy();
     _immutableSegment.destroy();
   }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/OfflineTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/OfflineTableDataManager.java
index c2c9351..61230a4 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/OfflineTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/OfflineTableDataManager.java
@@ -20,6 +20,9 @@ package org.apache.pinot.core.data.manager.offline;
 
 import java.io.File;
 import javax.annotation.concurrent.ThreadSafe;
+
+import org.apache.pinot.core.data.manager.upsert.DataManagerCallback;
+import org.apache.pinot.core.data.manager.upsert.TableDataManagerCallback;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.core.data.manager.BaseTableDataManager;
@@ -33,6 +36,12 @@ import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
 @ThreadSafe
 public class OfflineTableDataManager extends BaseTableDataManager {
 
+  private TableDataManagerCallback _tableDataManagerCallback;
+
+  public OfflineTableDataManager(TableDataManagerCallback tableDataManagerCallback) {
+    _tableDataManagerCallback = tableDataManagerCallback;
+  }
+
   @Override
   protected void doInit() {
   }
@@ -49,6 +58,13 @@ public class OfflineTableDataManager extends BaseTableDataManager {
   public void addSegment(File indexDir, IndexLoadingConfig indexLoadingConfig)
       throws Exception {
     Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType);
-    addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, schema));
+    DataManagerCallback callback = _tableDataManagerCallback.getDefaultDataManagerCallback();
+    addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig,
+        callback, schema), callback);
+  }
+
+  @Override
+  public TableDataManagerCallback getTableDataManagerCallback() {
+    return _tableDataManagerCallback;
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
index 3ec1bbd..f5b009f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pinot.core.data.manager.offline;
 
+import com.google.common.base.Preconditions;
+import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.metrics.ServerMetrics;
@@ -25,8 +27,8 @@ import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.core.data.manager.TableDataManager;
 import org.apache.pinot.core.data.manager.config.InstanceDataManagerConfig;
 import org.apache.pinot.core.data.manager.config.TableDataManagerConfig;
-import org.apache.pinot.core.data.manager.realtime.AppendRealtimeTableDataManager;
-import org.apache.pinot.core.data.manager.realtime.UpsertRealtimeTableDataManager;
+import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
+import org.apache.pinot.core.data.manager.upsert.TableDataManagerCallbackProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,11 +43,18 @@ public class TableDataManagerProvider {
   private static final Logger LOGGER = LoggerFactory.getLogger(TableDataManagerProvider.class);
 
   private static Semaphore _segmentBuildSemaphore;
+  private static TableDataManagerCallbackProvider _tableDataManagerCallbackProvider;
 
   private TableDataManagerProvider() {
   }
 
   public static void init(InstanceDataManagerConfig instanceDataManagerConfig) {
+    init(instanceDataManagerConfig, new TableDataManagerCallbackProvider(new PropertiesConfiguration()));
+  }
+
+  public static void init(InstanceDataManagerConfig instanceDataManagerConfig,
+                          TableDataManagerCallbackProvider callbackProvider) {
+    _tableDataManagerCallbackProvider = callbackProvider;
     int maxParallelBuilds = instanceDataManagerConfig.getMaxParallelSegmentBuilds();
     if (maxParallelBuilds > 0) {
       _segmentBuildSemaphore = new Semaphore(maxParallelBuilds, true);
@@ -55,17 +64,16 @@ public class TableDataManagerProvider {
   public static TableDataManager getTableDataManager(@Nonnull TableDataManagerConfig tableDataManagerConfig,
       @Nonnull String instanceId, @Nonnull ZkHelixPropertyStore<ZNRecord> propertyStore,
       @Nonnull ServerMetrics serverMetrics) {
-    TableDataManager tableDataManager;
+    Preconditions.checkNotNull(_tableDataManagerCallbackProvider, "callback provider is not init");
+    final TableDataManager tableDataManager;
     switch (CommonConstants.Helix.TableType.valueOf(tableDataManagerConfig.getTableDataManagerType())) {
       case OFFLINE:
-        tableDataManager = new OfflineTableDataManager();
+        tableDataManager = new OfflineTableDataManager(
+            _tableDataManagerCallbackProvider.getDefaultTableDataManagerCallback());
         break;
       case REALTIME:
-        if (tableDataManagerConfig.getUpdateSemantic() == CommonConstants.UpdateSemantic.UPSERT) {
-          tableDataManager = new UpsertRealtimeTableDataManager(_segmentBuildSemaphore);
-        } else {
-          tableDataManager = new AppendRealtimeTableDataManager(_segmentBuildSemaphore);
-        }
+        tableDataManager = new RealtimeTableDataManager(_segmentBuildSemaphore,
+            _tableDataManagerCallbackProvider.getTableDataManagerCallback(tableDataManagerConfig));
         break;
       default:
         throw new IllegalStateException();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/UpsertImmutableSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/UpsertImmutableSegmentDataManager.java
deleted file mode 100644
index 790e912..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/UpsertImmutableSegmentDataManager.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.data.manager.offline;
-
-import org.apache.pinot.common.config.TableNameBuilder;
-import org.apache.pinot.common.utils.CommonConstants;
-import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.core.data.manager.UpsertSegmentDataManager;
-import org.apache.pinot.core.indexsegment.UpsertSegment;
-import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
-import org.apache.pinot.core.segment.updater.SegmentUpdater;
-import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntry;
-
-import java.io.IOException;
-import java.util.List;
-
-public class UpsertImmutableSegmentDataManager extends ImmutableSegmentDataManager implements UpsertSegmentDataManager {
-
-  private String _tableNameWithType;
-
-  public UpsertImmutableSegmentDataManager(ImmutableSegment immutableSegment) throws IOException {
-    super(immutableSegment);
-    _tableNameWithType = TableNameBuilder.ensureTableNameWithType(immutableSegment.getSegmentMetadata().getTableName(),
-        CommonConstants.Helix.TableType.REALTIME);
-    initVirtualColumns();
-  }
-
-  @Override
-  public void updateVirtualColumns(List<UpdateLogEntry> messages) {
-    ((UpsertSegment) _immutableSegment).updateVirtualColumn(messages);
-  }
-
-  @Override
-  public String getVirtualColumnInfo(long offset) {
-    return ((UpsertSegment) _immutableSegment).getVirtualColumnInfo(offset);
-  }
-
-  @Override
-  public void destroy() {
-    SegmentUpdater.getInstance().removeSegmentDataManager(_tableNameWithType, getSegmentName(), this);
-    super.destroy();
-  }
-
-  private void initVirtualColumns() throws IOException {
-    // 1. add listener for update events
-    // 2. load all existing messages
-    // ensure the above orders so we can ensure all events are received by this data manager
-    SegmentUpdater.getInstance().addSegmentDataManager(_tableNameWithType, new LLCSegmentName(getSegmentName()), this);
-    ((UpsertSegment) _immutableSegment).initVirtualColumn();
-  }
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/AppendLLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/AppendLLRealtimeSegmentDataManager.java
deleted file mode 100644
index f725cc8..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/AppendLLRealtimeSegmentDataManager.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.data.manager.realtime;
-
-import org.apache.pinot.common.config.TableConfig;
-import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
-import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.core.indexsegment.mutable.MutableAppendSegmentImpl;
-import org.apache.pinot.core.indexsegment.mutable.MutableSegmentImpl;
-import org.apache.pinot.core.realtime.impl.RealtimeSegmentConfig;
-import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.readers.GenericRow;
-
-import java.util.concurrent.Semaphore;
-
-public class AppendLLRealtimeSegmentDataManager extends LLRealtimeSegmentDataManager {
-
-  public AppendLLRealtimeSegmentDataManager(RealtimeSegmentZKMetadata segmentZKMetadata, TableConfig tableConfig,
-      RealtimeTableDataManager realtimeTableDataManager, String resourceDataDir, IndexLoadingConfig indexLoadingConfig,
-      Schema schema, LLCSegmentName llcSegmentName, Semaphore partitionConsumerSemaphore, ServerMetrics serverMetrics) {
-    super(segmentZKMetadata, tableConfig, realtimeTableDataManager, resourceDataDir, indexLoadingConfig, schema,
-            llcSegmentName, partitionConsumerSemaphore, serverMetrics);
-  }
-
-  @Override
-  protected MutableSegmentImpl createMutableSegment(RealtimeSegmentConfig config) {
-    return new MutableAppendSegmentImpl(config);
-  }
-
-  @Override
-  protected void processTransformedRow(GenericRow row, long offset) {
-    // do nothing
-  }
-
-  @Override
-  protected void postIndexProcessing(GenericRow row, long offset) {
-    // do nothing
-  }
-
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/AppendRealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/AppendRealtimeTableDataManager.java
deleted file mode 100644
index 741fb08..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/AppendRealtimeTableDataManager.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.data.manager.realtime;
-
-import org.apache.pinot.common.config.TableConfig;
-import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
-import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
-import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
-import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
-import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
-import org.apache.pinot.spi.data.Schema;
-
-import java.io.File;
-import java.util.concurrent.Semaphore;
-
-public class AppendRealtimeTableDataManager extends RealtimeTableDataManager {
-
-  public AppendRealtimeTableDataManager(Semaphore segmentBuildSemaphore) {
-    super(segmentBuildSemaphore);
-  }
-
-  @Override
-  protected LLRealtimeSegmentDataManager getLLRealtimeSegmentDataManager(
-          RealtimeSegmentZKMetadata realtimeSegmentZKMetadata, TableConfig tableConfig,
-          RealtimeTableDataManager realtimeTableDataManager, String indexDirPath, IndexLoadingConfig indexLoadingConfig,
-          Schema schema, LLCSegmentName llcSegmentName, Semaphore partitionConsumerSemaphore, ServerMetrics serverMetrics)
-      throws Exception {
-    return new AppendLLRealtimeSegmentDataManager(realtimeSegmentZKMetadata, tableConfig, realtimeTableDataManager,
-            indexDirPath, indexLoadingConfig, schema, llcSegmentName, partitionConsumerSemaphore, serverMetrics);
-  }
-
-  @Override
-  protected ImmutableSegment loadImmutableSegment(File indexDir, IndexLoadingConfig indexLoadingConfig, Schema schema)
-      throws Exception {
-    return ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, schema);
-  }
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
index 086812e..d767e98 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
@@ -30,7 +30,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.config.IndexingConfig;
 import org.apache.pinot.common.config.TableConfig;
-import org.apache.pinot.core.indexsegment.mutable.MutableAppendSegmentImpl;
+import org.apache.pinot.core.data.manager.upsert.DefaultDataManagerCallbackImpl;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
 import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
@@ -195,7 +195,8 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
                 indexLoadingConfig.isDirectRealtimeOffheapAllocation(), serverMetrics))
             .setStatsHistory(realtimeTableDataManager.getStatsHistory())
             .setNullHandlingEnabled(indexingConfig.isNullHandlingEnabled()).build();
-    realtimeSegment = new MutableAppendSegmentImpl(realtimeSegmentConfig);
+    realtimeSegment = new MutableSegmentImpl(realtimeSegmentConfig,
+        DefaultDataManagerCallbackImpl.INSTANCE.getIndexSegmentCallback());
 
 
     notifier = realtimeTableDataManager;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 7765e9e..4f3ae50 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -50,6 +50,7 @@ import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
 import org.apache.pinot.common.utils.CommonConstants.Segment.Realtime.CompletionMode;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.core.data.manager.upsert.DataManagerCallback;
 import org.apache.pinot.core.data.recordtransformer.CompositeTransformer;
 import org.apache.pinot.core.data.recordtransformer.RecordTransformer;
 import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
@@ -83,7 +84,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Segment data manager for low level consumer realtime segments, which manages consumption and segment completion.
  */
-public abstract class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
+public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   protected enum State {
     // The state machine starts off with this state. While in this state we consume stream events
     // and index them in memory. We continue to be in this state until the end criteria is satisfied
@@ -272,6 +273,9 @@ public abstract class LLRealtimeSegmentDataManager extends RealtimeSegmentDataMa
   private final boolean _nullHandlingEnabled;
   private final SegmentCommitterFactory _segmentCommitterFactory;
 
+  // upsert/append related components
+  private final DataManagerCallback _dataManagerCallback;
+
   // TODO each time this method is called, we print reason for stop. Good to print only once.
   private boolean endCriteriaReached() {
     Preconditions.checkState(_state.shouldConsume(), "Incorrect state %s", _state);
@@ -416,6 +420,7 @@ public abstract class LLRealtimeSegmentDataManager extends RealtimeSegmentDataMa
       _serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.ROWS_WITH_ERRORS, _numRowsErrored);
       _serverMetrics.addMeteredTableValue(_tableStreamName, ServerMeter.ROWS_WITH_ERRORS, _numRowsErrored);
     }
+    _dataManagerCallback.postConsumeLoop();
     return true;
   }
 
@@ -467,14 +472,14 @@ public abstract class LLRealtimeSegmentDataManager extends RealtimeSegmentDataMa
           GenericRow transformedRow = _recordTransformer.transform(decodedRow);
 
           if (transformedRow != null) {
-            processTransformedRow(transformedRow, _currentOffset);
+            _dataManagerCallback.processTransformedRow(transformedRow, _currentOffset);
             realtimeRowsConsumedMeter =
                 _serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.REALTIME_ROWS_CONSUMED, 1,
                     realtimeRowsConsumedMeter);
             indexedMessageCount++;
 
             canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata);
-            postIndexProcessing(transformedRow, _currentOffset);
+            _dataManagerCallback.postIndexProcessing(transformedRow, _currentOffset);
           } else {
             realtimeRowsDroppedMeter = _serverMetrics
                 .addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
@@ -505,11 +510,6 @@ public abstract class LLRealtimeSegmentDataManager extends RealtimeSegmentDataMa
     }
   }
 
-  protected abstract void processTransformedRow(GenericRow row, long offset);
-
-
-  protected abstract void postIndexProcessing(GenericRow row, long offset);
-
   public class PartitionConsumer implements Runnable {
     public void run() {
       long initialConsumptionEnd = 0L;
@@ -1017,6 +1017,7 @@ public abstract class LLRealtimeSegmentDataManager extends RealtimeSegmentDataMa
   }
 
   public void destroy() {
+    _dataManagerCallback.destroy();
     try {
       stop();
     } catch (InterruptedException e) {
@@ -1054,7 +1055,8 @@ public abstract class LLRealtimeSegmentDataManager extends RealtimeSegmentDataMa
   // If the transition is OFFLINE to ONLINE, the caller should have downloaded the segment and we don't reach here.
   public LLRealtimeSegmentDataManager(RealtimeSegmentZKMetadata segmentZKMetadata, TableConfig tableConfig,
       RealtimeTableDataManager realtimeTableDataManager, String resourceDataDir, IndexLoadingConfig indexLoadingConfig,
-      Schema schema, LLCSegmentName llcSegmentName, Semaphore partitionConsumerSemaphore, ServerMetrics serverMetrics) {
+      Schema schema, LLCSegmentName llcSegmentName, Semaphore partitionConsumerSemaphore, ServerMetrics serverMetrics,
+      DataManagerCallback dataManagerCallback) {
     _segBuildSemaphore = realtimeTableDataManager.getSegmentBuildSemaphore();
     _segmentZKMetadata = (LLCRealtimeSegmentZKMetadata) segmentZKMetadata;
     _tableConfig = tableConfig;
@@ -1064,6 +1066,7 @@ public abstract class LLRealtimeSegmentDataManager extends RealtimeSegmentDataMa
     _indexLoadingConfig = indexLoadingConfig;
     _schema = schema;
     _serverMetrics = serverMetrics;
+    _dataManagerCallback = dataManagerCallback;
     _segmentVersion = indexLoadingConfig.getSegmentVersion();
     _instanceId = _realtimeTableDataManager.getServerInstance();
     _leaseExtender = SegmentBuildTimeLeaseExtender.getLeaseExtender(_instanceId);
@@ -1185,7 +1188,8 @@ public abstract class LLRealtimeSegmentDataManager extends RealtimeSegmentDataMa
       }
     }
 
-    _realtimeSegment = createMutableSegment(realtimeSegmentConfigBuilder.build());
+    _realtimeSegment = new MutableSegmentImpl(realtimeSegmentConfigBuilder.build(),
+        _dataManagerCallback.getIndexSegmentCallback());
     _startOffset = _segmentZKMetadata.getStartOffset();
     _currentOffset = _startOffset;
     _resourceTmpDir = new File(resourceDataDir, "_tmp");
@@ -1221,8 +1225,6 @@ public abstract class LLRealtimeSegmentDataManager extends RealtimeSegmentDataMa
     start();
   }
 
-  protected abstract MutableSegmentImpl createMutableSegment(RealtimeSegmentConfig config);
-
   /**
    * Creates a new stream consumer
    * @param reason
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 2be3ca1..bc5316d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -48,6 +48,8 @@ import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
 import org.apache.pinot.core.data.manager.BaseTableDataManager;
 import org.apache.pinot.core.data.manager.SegmentDataManager;
+import org.apache.pinot.core.data.manager.upsert.DataManagerCallback;
+import org.apache.pinot.core.data.manager.upsert.TableDataManagerCallback;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
 import org.apache.pinot.core.realtime.impl.RealtimeSegmentStatsHistory;
 import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
@@ -59,12 +61,13 @@ import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
 
 
 @ThreadSafe
-public abstract class RealtimeTableDataManager extends BaseTableDataManager {
+public class RealtimeTableDataManager extends BaseTableDataManager {
   private final ExecutorService _segmentAsyncExecutorService =
       Executors.newSingleThreadExecutor(new NamedThreadFactory("SegmentAsyncExecutorService"));
   private SegmentBuildTimeLeaseExtender _leaseExtender;
   private RealtimeSegmentStatsHistory _statsHistory;
   private final Semaphore _segmentBuildSemaphore;
+  private final TableDataManagerCallback _tableDataManagerCallback;
   // Maintains a map of partitionIds to semaphores.
   // The semaphore ensures that exactly one PartitionConsumer instance consumes from any stream partition.
   // In some streams, it's possible that having multiple consumers (with the same consumer name on the same host) consuming from the same stream partition can lead to bugs.
@@ -90,12 +93,14 @@ public abstract class RealtimeTableDataManager extends BaseTableDataManager {
   // likely that we get fresh data each time instead of multiple copies of roughly same data.
   private static final int MIN_INTERVAL_BETWEEN_STATS_UPDATES_MINUTES = 30;
 
-  public RealtimeTableDataManager(Semaphore segmentBuildSemaphore) {
+  public RealtimeTableDataManager(Semaphore segmentBuildSemaphore, TableDataManagerCallback tableDataManagerCallback) {
     _segmentBuildSemaphore = segmentBuildSemaphore;
+    _tableDataManagerCallback = tableDataManagerCallback;
   }
 
   @Override
   protected void doInit() {
+    _tableDataManagerCallback.init();
     _leaseExtender = SegmentBuildTimeLeaseExtender.create(_instanceId, _serverMetrics, _tableNameWithType);
 
     File statsFile = new File(_tableDataDir, STATS_FILE_NAME);
@@ -227,7 +232,9 @@ public abstract class RealtimeTableDataManager extends BaseTableDataManager {
     if (indexDir.exists() && (realtimeSegmentZKMetadata.getStatus() == Status.DONE)) {
       // Segment already exists on disk, and metadata has been committed. Treat it like an offline segment
 
-      addSegment(loadImmutableSegment(indexDir, indexLoadingConfig, schema));
+      DataManagerCallback callback = _tableDataManagerCallback
+          .getDataManagerCallback(_tableNameWithType, segmentName, schema, _serverMetrics, false);
+      addSegment(loadImmutableSegment(indexDir, indexLoadingConfig, schema, callback), callback);
     } else {
       // Either we don't have the segment on disk or we have not committed in ZK. We should be starting the consumer
       // for realtime segment here. If we wrote it on disk but could not get to commit to zk yet, we should replace the
@@ -257,15 +264,21 @@ public abstract class RealtimeTableDataManager extends BaseTableDataManager {
         LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
         int streamPartitionId = llcSegmentName.getPartitionId();
         _partitionIdToSemaphoreMap.putIfAbsent(streamPartitionId, new Semaphore(1));
-        manager = getLLRealtimeSegmentDataManager(realtimeSegmentZKMetadata, tableConfig, this,
-                _indexDir.getAbsolutePath(), indexLoadingConfig, schema, llcSegmentName,
-                _partitionIdToSemaphoreMap.get(streamPartitionId), _serverMetrics);
+        manager = new LLRealtimeSegmentDataManager(realtimeSegmentZKMetadata, tableConfig,
+            this, _indexDir.getAbsolutePath(), indexLoadingConfig, schema, llcSegmentName,
+            _partitionIdToSemaphoreMap.get(streamPartitionId), _serverMetrics,
+            _tableDataManagerCallback.getDataManagerCallback(_tableNameWithType, segmentName, schema, _serverMetrics, true));
       }
       _logger.info("Initialize RealtimeSegmentDataManager - " + segmentName);
       _segmentDataManagerMap.put(segmentName, manager);
     }
   }
 
+  @Override
+  public TableDataManagerCallback getTableDataManagerCallback() {
+    return _tableDataManagerCallback;
+  }
+
   public void downloadAndReplaceSegment(String segmentName, LLCRealtimeSegmentZKMetadata llcSegmentMetadata,
       IndexLoadingConfig indexLoadingConfig, Schema schema) {
     final String uri = llcSegmentMetadata.getDownloadUrl();
@@ -297,7 +310,8 @@ public abstract class RealtimeTableDataManager extends BaseTableDataManager {
     ZKMetadataProvider.setRealtimeSegmentZKMetadata(_propertyStore, _tableNameWithType, segmentZKMetadata);
     File indexDir = new File(_indexDir, segmentZKMetadata.getSegmentName());
     Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType);
-    addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, schema));
+    addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig),
+        _tableDataManagerCallback.getDefaultDataManagerCallback());
   }
 
   /**
@@ -306,7 +320,9 @@ public abstract class RealtimeTableDataManager extends BaseTableDataManager {
   public void replaceLLSegment(String segmentName, IndexLoadingConfig indexLoadingConfig, Schema schema) {
     try {
       File indexDir = new File(_indexDir, segmentName);
-      addSegment(loadImmutableSegment(indexDir, indexLoadingConfig, schema));
+      DataManagerCallback dataManagerCallback = _tableDataManagerCallback
+          .getDataManagerCallback(_tableNameWithType, segmentName, schema, _serverMetrics, false);
+      addSegment(loadImmutableSegment(indexDir, indexLoadingConfig, schema, dataManagerCallback), dataManagerCallback);
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
@@ -316,17 +332,11 @@ public abstract class RealtimeTableDataManager extends BaseTableDataManager {
     return _instanceId;
   }
 
-  /**
-   *  allow {@link UpsertRealtimeTableDataManager} to override this method
-   */
-  protected abstract LLRealtimeSegmentDataManager getLLRealtimeSegmentDataManager(
-          RealtimeSegmentZKMetadata realtimeSegmentZKMetadata, TableConfig tableConfig,
-          RealtimeTableDataManager realtimeTableDataManager, String indexDirPath, IndexLoadingConfig indexLoadingConfig,
-          Schema schema, LLCSegmentName llcSegmentName, Semaphore partitionConsumerSemaphore, ServerMetrics serverMetrics)
-      throws Exception;
-
-  protected abstract ImmutableSegment loadImmutableSegment(File indexDir, IndexLoadingConfig indexLoadingConfig, Schema schema)
-      throws Exception;
+  protected ImmutableSegment loadImmutableSegment(File indexDir, IndexLoadingConfig indexLoadingConfig, Schema schema,
+                                                  DataManagerCallback dataManagerCallback)
+      throws Exception {
+    return ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, dataManagerCallback, schema);
+  }
 
   /**
    * Validate a schema against the table config for real-time record consumption.
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/UpsertRealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/UpsertRealtimeTableDataManager.java
deleted file mode 100644
index 12569db..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/UpsertRealtimeTableDataManager.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.data.manager.realtime;
-
-import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.core.data.manager.offline.UpsertImmutableSegmentDataManager;
-import org.apache.pinot.common.config.TableConfig;
-import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
-import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
-import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
-import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
-import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
-import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
-import org.apache.pinot.grigio.common.storageProvider.UpdateLogStorageProvider;
-import org.apache.pinot.spi.data.Schema;
-
-import javax.annotation.Nonnull;
-import java.io.File;
-import java.io.IOException;
-import java.util.concurrent.Semaphore;
-
-public class UpsertRealtimeTableDataManager extends RealtimeTableDataManager {
-  private UpdateLogStorageProvider _updateLogStorageProvider;
-
-  public UpsertRealtimeTableDataManager(Semaphore segmentBuildSemaphore) {
-    super(segmentBuildSemaphore);
-    _updateLogStorageProvider = UpdateLogStorageProvider.getInstance();
-  }
-
-  @Override
-  public void addSegment(@Nonnull String segmentName, @Nonnull TableConfig tableConfig,
-                         @Nonnull IndexLoadingConfig indexLoadingConfig) throws Exception {
-    _updateLogStorageProvider.addSegment(_tableNameWithType, segmentName);
-    super.addSegment(segmentName, tableConfig, indexLoadingConfig);
-  }
-
-  @Override
-  protected LLRealtimeSegmentDataManager getLLRealtimeSegmentDataManager(
-          RealtimeSegmentZKMetadata realtimeSegmentZKMetadata, TableConfig tableConfig,
-          RealtimeTableDataManager realtimeTableDataManager, String indexDirPath, IndexLoadingConfig indexLoadingConfig,
-          Schema schema, LLCSegmentName llcSegmentName, Semaphore partitionConsumerSemaphore, ServerMetrics serverMetrics)
-      throws Exception {
-    return new UpsertLLRealtimeSegmentDataManager(realtimeSegmentZKMetadata, tableConfig, realtimeTableDataManager,
-            indexDirPath, indexLoadingConfig, schema, llcSegmentName, partitionConsumerSemaphore, serverMetrics);
-  }
-
-  @Override
-  protected ImmutableSegmentDataManager getImmutableSegmentDataManager(ImmutableSegment immutableSegment) {
-    try {
-      return new UpsertImmutableSegmentDataManager(immutableSegment);
-    } catch (IOException e) {
-      throw new RuntimeException("failed to init the upsert immutable segment", e);
-    }
-  }
-
-  @Override
-  protected ImmutableSegment loadImmutableSegment(File indexDir, IndexLoadingConfig indexLoadingConfig, Schema schema) {
-    try {
-      return ImmutableSegmentLoader.loadUpsertSegment(indexDir, indexLoadingConfig, schema);
-    } catch (Exception e) {
-      throw new RuntimeException("failed to load immutable segment", e);
-    }
-  }
-
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableAppendSegmentImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DataManagerCallback.java
similarity index 60%
copy from pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableAppendSegmentImpl.java
copy to pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DataManagerCallback.java
index 1c9effc..466553e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableAppendSegmentImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DataManagerCallback.java
@@ -16,22 +16,28 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.indexsegment.mutable;
+package org.apache.pinot.core.data.manager.upsert;
 
-import org.apache.pinot.core.realtime.impl.RealtimeSegmentConfig;
+import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntry;
+import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-public class MutableAppendSegmentImpl extends MutableSegmentImpl {
-  private static final Logger LOGGER = LoggerFactory.getLogger(MutableUpsertSegmentImpl.class);
+import java.io.IOException;
+import java.util.List;
 
-  public MutableAppendSegmentImpl(RealtimeSegmentConfig config) {
-    super(config);
-  }
+public interface DataManagerCallback {
 
-  @Override
-  protected void postProcessRecords(GenericRow row, int docId) {
-    // nothing
-  }
+  IndexSegmentCallback getIndexSegmentCallback();
+
+  void processTransformedRow(GenericRow row, long offset);
+
+  void postIndexProcessing(GenericRow row, long offset);
+
+  void postConsumeLoop();
+
+  void initVirtualColumns() throws IOException;
+
+  void updateVirtualColumns(List<UpdateLogEntry> messages);
+
+  void destroy();
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DefaultDataManagerCallbackImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DefaultDataManagerCallbackImpl.java
new file mode 100644
index 0000000..347c541
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DefaultDataManagerCallbackImpl.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.upsert;
+
+import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntry;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+import java.io.IOException;
+import java.util.List;
+
+public class DefaultDataManagerCallbackImpl implements DataManagerCallback {
+
+  // expect to use the cached instance  for append mode to reduce memory usage
+  public static final DefaultDataManagerCallbackImpl INSTANCE = new DefaultDataManagerCallbackImpl();
+
+  private final IndexSegmentCallback _indexSegmentCallback;
+
+  private DefaultDataManagerCallbackImpl() {
+    _indexSegmentCallback = DefaultIndexSegmentCallback.INSTANCE;
+  }
+
+  public IndexSegmentCallback getIndexSegmentCallback() {
+    return _indexSegmentCallback;
+  }
+
+  @Override
+  public void processTransformedRow(GenericRow row, long offset) {
+    // do nothing
+  }
+
+  @Override
+  public void postIndexProcessing(GenericRow row, long offset) {
+    // do nothing
+  }
+
+  @Override
+  public void postConsumeLoop() {
+    // do nothing
+  }
+
+  @Override
+  public void initVirtualColumns() throws IOException {
+    // do nothing
+  }
+
+  @Override
+  public void updateVirtualColumns(List<UpdateLogEntry> messages) {
+    // do nothing
+  }
+
+  @Override
+  public void destroy() {
+    // do nothing
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DefaultIndexSegmentCallback.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DefaultIndexSegmentCallback.java
new file mode 100644
index 0000000..0299cb7
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DefaultIndexSegmentCallback.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.upsert;
+
+import joptsimple.internal.Strings;
+import org.apache.pinot.common.segment.SegmentMetadata;
+import org.apache.pinot.core.io.reader.DataFileReader;
+import org.apache.pinot.core.segment.index.column.ColumnIndexContainer;
+import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntry;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+import java.util.Map;
+
+public class DefaultIndexSegmentCallback implements IndexSegmentCallback {
+
+  public static final DefaultIndexSegmentCallback INSTANCE = new DefaultIndexSegmentCallback();
+
+  private DefaultIndexSegmentCallback(){}
+
+  @Override
+  public void init(SegmentMetadata segmentMetadata, Map<String, DataFileReader> virtualColumnIndexReader) {
+    // do nothing
+  }
+
+  @Override
+  public void initOffsetColumn(ColumnIndexContainer offsetColumnContainer) {
+    // do noting
+  }
+
+  @Override
+  public void postProcessRecords(GenericRow row, int docId) {
+    // do nothing
+  }
+
+  @Override
+  public void initVirtualColumn() {
+    // do nothing
+  }
+
+  @Override
+  public void updateVirtualColumn(Iterable<UpdateLogEntry> logEntries) {
+    // do nothing
+  }
+
+  @Override
+  public String getVirtualColumnInfo(long offset) {
+    return Strings.EMPTY;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/ImmutableSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DefaultTableDataManagerCallbackImpl.java
similarity index 52%
copy from pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/ImmutableSegmentDataManager.java
copy to pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DefaultTableDataManagerCallbackImpl.java
index 0401f61..2e56455 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/ImmutableSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/DefaultTableDataManagerCallbackImpl.java
@@ -16,40 +16,34 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.data.manager.offline;
+package org.apache.pinot.core.data.manager.upsert;
 
-import org.apache.pinot.core.data.manager.SegmentDataManager;
-import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
+import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.spi.data.Schema;
 
+public class DefaultTableDataManagerCallbackImpl implements TableDataManagerCallback {
 
-/**
- * Segment data manager for immutable segment.
- */
-public class ImmutableSegmentDataManager extends SegmentDataManager {
-
-  protected final ImmutableSegment _immutableSegment;
-
-  public ImmutableSegmentDataManager(ImmutableSegment immutableSegment) {
-    _immutableSegment = immutableSegment;
-  }
+  private static final DefaultDataManagerCallbackImpl DEFAULT_DM_CALLBACK = DefaultDataManagerCallbackImpl.INSTANCE;
 
   @Override
-  public String getSegmentName() {
-    return _immutableSegment.getSegmentName();
+  public void init() {
   }
 
   @Override
-  public ImmutableSegment getSegment() {
-    return _immutableSegment;
+  public void addSegment(String tableName, String segmentName, TableConfig tableConfig) {
   }
 
   @Override
-  public void destroy() {
-    _immutableSegment.destroy();
+  public DataManagerCallback getDataManagerCallback(String tableName, String segmentName,
+      Schema schema, ServerMetrics serverMetrics, boolean isMutable) {
+    return DEFAULT_DM_CALLBACK;
   }
 
   @Override
-  public String toString() {
-    return "ImmutableSegmentDataManager(" + _immutableSegment.getSegmentName() + ")";
+  public DataManagerCallback getDefaultDataManagerCallback() {
+    return DEFAULT_DM_CALLBACK;
   }
+
+
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableAppendSegmentImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/IndexSegmentCallback.java
similarity index 52%
copy from pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableAppendSegmentImpl.java
copy to pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/IndexSegmentCallback.java
index 1c9effc..dd198f9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableAppendSegmentImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/IndexSegmentCallback.java
@@ -16,22 +16,28 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.indexsegment.mutable;
+package org.apache.pinot.core.data.manager.upsert;
 
-import org.apache.pinot.core.realtime.impl.RealtimeSegmentConfig;
+import org.apache.pinot.common.segment.SegmentMetadata;
+import org.apache.pinot.core.io.reader.DataFileReader;
+import org.apache.pinot.core.segment.index.column.ColumnIndexContainer;
+import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntry;
 import org.apache.pinot.spi.data.readers.GenericRow;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-public class MutableAppendSegmentImpl extends MutableSegmentImpl {
-  private static final Logger LOGGER = LoggerFactory.getLogger(MutableUpsertSegmentImpl.class);
+import java.io.IOException;
+import java.util.Map;
 
-  public MutableAppendSegmentImpl(RealtimeSegmentConfig config) {
-    super(config);
-  }
+public interface IndexSegmentCallback {
 
-  @Override
-  protected void postProcessRecords(GenericRow row, int docId) {
-    // nothing
-  }
+  void init(SegmentMetadata segmentMetadata, Map<String, DataFileReader> virtualColumnIndexReader);
+
+  void initOffsetColumn(ColumnIndexContainer offsetColumnContainer);
+
+  void postProcessRecords(GenericRow row, int docId);
+
+  void initVirtualColumn() throws IOException;
+
+  void updateVirtualColumn(Iterable<UpdateLogEntry> logEntries);
+
+  String getVirtualColumnInfo(long offset);
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdaterConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/TableDataManagerCallback.java
similarity index 59%
copy from pinot-core/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdaterConfig.java
copy to pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/TableDataManagerCallback.java
index 97d48f3..ec6a15f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdaterConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/TableDataManagerCallback.java
@@ -16,10 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.segment.updater;
+package org.apache.pinot.core.data.manager.upsert;
 
-public class SegmentUpdaterConfig {
-  public static final String SEGMENT_UDPATE_SLEEP_MS = "sleep.ms";
-  public static final String INPUT_TOPIC_PREFIX  = "input.topic.prefix";
-  public static final int SEGMENT_UDPATE_SLEEP_MS_DEFAULT = 100;
+import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.spi.data.Schema;
+
+public interface TableDataManagerCallback {
+
+  void init();
+
+  void addSegment(String tableName, String segmentName, TableConfig tableConfig);
+
+  DataManagerCallback getDataManagerCallback(String tableName, String segmentName, Schema schema,
+      ServerMetrics serverMetrics, boolean isMutable);
+
+  DataManagerCallback getDefaultDataManagerCallback();
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/TableDataManagerCallbackProvider.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/TableDataManagerCallbackProvider.java
new file mode 100644
index 0000000..2e868a2
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/upsert/TableDataManagerCallbackProvider.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.upsert;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.core.data.manager.config.TableDataManagerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TableDataManagerCallbackProvider {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(TableDataManagerCallbackProvider.class);
+
+  private Class<TableDataManagerCallback> defaultTableDataManagerCallBackClass;
+  private Class<TableDataManagerCallback> upsertTableDataManagerCallBackClass;
+
+  public static final String UPSERT_CALLBACK_CLASS_CONFIG_KEY = "upsert.tableDataManager.callback";
+  public static final String DEFAULT_CALLBACK_CLASS_CONFIG_KEY = "append.tableDataManager.callback";
+  public static final String CALLBACK_CLASS_CONFIG_DEFAULT = DefaultTableDataManagerCallbackImpl.class.getName();
+
+  public TableDataManagerCallbackProvider(Configuration configuration) {
+    String appendClassName = configuration.getString(DEFAULT_CALLBACK_CLASS_CONFIG_KEY, CALLBACK_CLASS_CONFIG_DEFAULT);
+    String upsertClassName = configuration.getString(UPSERT_CALLBACK_CLASS_CONFIG_KEY);
+    try {
+      defaultTableDataManagerCallBackClass = (Class<TableDataManagerCallback>) Class.forName(appendClassName);
+    } catch (ClassNotFoundException e) {
+      LOGGER.error("failed to load table data manager class {}", appendClassName, e);
+      ExceptionUtils.rethrow(e);
+    }
+    Preconditions.checkState(defaultTableDataManagerCallBackClass.isAssignableFrom(TableDataManagerCallback.class),
+        "configured class not assignable from Callback class", defaultTableDataManagerCallBackClass);
+    if (StringUtils.isNotEmpty(upsertClassName)) {
+      try {
+        upsertTableDataManagerCallBackClass = (Class<TableDataManagerCallback>) Class.forName(upsertClassName);
+      } catch (ClassNotFoundException e) {
+        LOGGER.error("failed to load table data manager class {}", upsertClassName);
+        ExceptionUtils.rethrow(e);
+      }
+      Preconditions.checkState(upsertTableDataManagerCallBackClass.isAssignableFrom(TableDataManagerCallback.class),
+          "configured class not assignable from Callback class");
+    }
+  }
+
+  public TableDataManagerCallback getTableDataManagerCallback(TableDataManagerConfig tableDataManagerConfig) {
+    if (tableDataManagerConfig.getUpdateSemantic() == CommonConstants.UpdateSemantic.UPSERT) {
+      return getUpsertTableDataManagerCallback();
+    } else {
+      return getDefaultTableDataManagerCallback();
+    }
+  }
+
+  public TableDataManagerCallback getUpsertTableDataManagerCallback() {
+    try {
+      return upsertTableDataManagerCallBackClass.newInstance();
+    } catch (Exception ex) {
+      LOGGER.error("failed to initialize new table data manager callback {}", upsertTableDataManagerCallBackClass.getName());
+      ExceptionUtils.rethrow(ex);
+    }
+    return null;
+  }
+
+  public TableDataManagerCallback getDefaultTableDataManagerCallback() {
+    try {
+      return defaultTableDataManagerCallBackClass.newInstance();
+    } catch (Exception ex) {
+      LOGGER.error("failed to initialize new table data manager callback {}", upsertTableDataManagerCallBackClass.getName());
+      ExceptionUtils.rethrow(ex);
+    }
+    return null;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentImpl.java
index 21f056c..5e070ba 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentImpl.java
@@ -19,10 +19,11 @@
 package org.apache.pinot.core.indexsegment.immutable;
 
 import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 import javax.annotation.Nullable;
+
+import org.apache.pinot.core.data.manager.upsert.IndexSegmentCallback;
+import org.apache.pinot.core.segment.virtualcolumn.mutable.VirtualColumnLongValueReaderWriter;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
@@ -43,18 +44,33 @@ import org.slf4j.LoggerFactory;
 public class ImmutableSegmentImpl implements ImmutableSegment {
   private static final Logger LOGGER = LoggerFactory.getLogger(ImmutableSegmentImpl.class);
 
-  protected final SegmentDirectory _segmentDirectory;
-  protected final SegmentMetadataImpl _segmentMetadata;
-  protected final Map<String, ColumnIndexContainer> _indexContainerMap;
-  protected final StarTreeIndexContainer _starTreeIndexContainer;
+  private final SegmentDirectory _segmentDirectory;
+  private final SegmentMetadataImpl _segmentMetadata;
+  private final Map<String, ColumnIndexContainer> _indexContainerMap;
+  private final StarTreeIndexContainer _starTreeIndexContainer;
+  private final IndexSegmentCallback _segmentCallback;
+
 
   public ImmutableSegmentImpl(SegmentDirectory segmentDirectory, SegmentMetadataImpl segmentMetadata,
       Map<String, ColumnIndexContainer> columnIndexContainerMap,
-      @Nullable StarTreeIndexContainer starTreeIndexContainer) {
+      @Nullable StarTreeIndexContainer starTreeIndexContainer, IndexSegmentCallback segmentCallback) {
     _segmentDirectory = segmentDirectory;
     _segmentMetadata = segmentMetadata;
     _indexContainerMap = columnIndexContainerMap;
     _starTreeIndexContainer = starTreeIndexContainer;
+    _segmentCallback = segmentCallback;
+
+    // create virtualColumnReaderWriter
+    Map<String, DataFileReader> virtualColumnsReaderWriter = new HashMap<>();
+    for (Map.Entry<String, ColumnIndexContainer> entry: columnIndexContainerMap.entrySet()) {
+      String columnName = entry.getKey();
+      ColumnIndexContainer container = entry.getValue();
+      if (segmentMetadata.getSchema().isVirtualColumn(columnName) && (container.getForwardIndex() instanceof VirtualColumnLongValueReaderWriter)) {
+        virtualColumnsReaderWriter.put(columnName, container.getForwardIndex());
+      }
+    }
+    _segmentCallback.init(segmentMetadata, virtualColumnsReaderWriter);
+    _segmentCallback.initOffsetColumn(columnIndexContainerMap.get(segmentMetadata.getSchema().getOffsetKey()));
   }
 
   @Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentLoader.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentLoader.java
index 49d7729..198a164 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentLoader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentLoader.java
@@ -24,6 +24,8 @@ import java.util.HashMap;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.segment.ReadMode;
+import org.apache.pinot.core.data.manager.upsert.DefaultDataManagerCallbackImpl;
+import org.apache.pinot.core.data.manager.upsert.DataManagerCallback;
 import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
 import org.apache.pinot.core.segment.index.ColumnMetadata;
 import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
@@ -44,8 +46,6 @@ import org.apache.pinot.spi.data.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nonnull;
-
 
 public class ImmutableSegmentLoader {
   private ImmutableSegmentLoader() {
@@ -60,7 +60,7 @@ public class ImmutableSegmentLoader {
       throws Exception {
     IndexLoadingConfig defaultIndexLoadingConfig = new IndexLoadingConfig();
     defaultIndexLoadingConfig.setReadMode(readMode);
-    return load(indexDir, defaultIndexLoadingConfig, null);
+    return load(indexDir, defaultIndexLoadingConfig, DefaultDataManagerCallbackImpl.INSTANCE, null);
   }
 
   /**
@@ -68,25 +68,18 @@ public class ImmutableSegmentLoader {
    */
   public static ImmutableSegment load(File indexDir, IndexLoadingConfig indexLoadingConfig)
       throws Exception {
-    return load(indexDir, indexLoadingConfig, null);
+    return load(indexDir, indexLoadingConfig, DefaultDataManagerCallbackImpl.INSTANCE, null);
   }
 
-  public static ImmutableSegment load(File indexDir, IndexLoadingConfig indexLoadingConfig, @Nullable Schema schema)
+  public static ImmutableSegment load(File indexDir, IndexLoadingConfig indexLoadingConfig,
+                                      DataManagerCallback dataManagerCallback, @Nullable Schema schema)
           throws Exception {
-    return loadHelper(indexDir, indexLoadingConfig, schema);
+    return loadHelper(indexDir, indexLoadingConfig, dataManagerCallback, schema);
   }
 
-  /**
-   * to load upsert related segment
-   */
-  public static ImmutableUpsertSegmentImpl loadUpsertSegment(File indexDir, IndexLoadingConfig indexLoadingConfig,
-                                                             Schema schema) throws Exception {
-    ImmutableSegmentImpl segment = loadHelper(indexDir, indexLoadingConfig, schema);
-    return ImmutableUpsertSegmentImpl.copyOf(segment);
-  }
 
   private static ImmutableSegmentImpl loadHelper(File indexDir, IndexLoadingConfig indexLoadingConfig,
-                                                 @Nullable Schema schema) throws Exception {
+                                                 DataManagerCallback dataManagerCallback, @Nullable Schema schema) throws Exception {
     Preconditions.checkArgument(indexDir.isDirectory(), "Index directory: {} does not exist or is not a directory",
         indexDir);
 
@@ -155,7 +148,8 @@ public class ImmutableSegmentLoader {
               indexContainerMap, readMode);
     }
 
-    return new ImmutableSegmentImpl(segmentDirectory, segmentMetadata, indexContainerMap, starTreeIndexContainer);
+    return new ImmutableSegmentImpl(segmentDirectory, segmentMetadata, indexContainerMap, starTreeIndexContainer,
+        dataManagerCallback.getIndexSegmentCallback());
   }
 
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
index b80ac64..12fcd94 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
@@ -34,6 +34,7 @@ import javax.annotation.Nullable;
 
 import org.apache.pinot.common.config.SegmentPartitionConfig;
 import org.apache.pinot.common.segment.SegmentMetadata;
+import org.apache.pinot.core.data.manager.upsert.IndexSegmentCallback;
 import org.apache.pinot.core.indexsegment.IndexSegmentUtils;
 import org.apache.pinot.core.io.reader.DataFileReader;
 import org.apache.pinot.core.io.readerwriter.BaseSingleColumnSingleValueReaderWriter;
@@ -76,7 +77,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-public abstract class MutableSegmentImpl implements MutableSegment {
+public class MutableSegmentImpl implements MutableSegment {
   // For multi-valued column, forward-index.
   // Maximum number of multi-values per row. We assert on this.
   private static final int MAX_MULTI_VALUES_PER_ROW = 1000;
@@ -124,9 +125,9 @@ public abstract class MutableSegmentImpl implements MutableSegment {
   private final Collection<MetricFieldSpec> _physicalMetricFieldSpecs;
 
   // Cache some virtual columns
-  protected final Map<String, VirtualColumnProvider> _virtualColumnProviderMap = new HashMap<>();
-  protected final Map<String, Dictionary> _virtualColumnDictionary = new HashMap<>();
-  protected final Map<String, DataFileReader> _virtualColumnIndexReader = new HashMap<>();
+  private final Map<String, VirtualColumnProvider> _virtualColumnProviderMap = new HashMap<>();
+  private final Map<String, Dictionary> _virtualColumnDictionary = new HashMap<>();
+  private final Map<String, DataFileReader> _virtualColumnIndexReader = new HashMap<>();
 
   // default message metadata
   private volatile long _lastIndexedTimeMs = Long.MIN_VALUE;
@@ -134,7 +135,10 @@ public abstract class MutableSegmentImpl implements MutableSegment {
 
   private RealtimeLuceneReaders _realtimeLuceneReaders;
 
-  public MutableSegmentImpl(RealtimeSegmentConfig config) {
+  // upsert/append related components
+  private final IndexSegmentCallback _indexSegmentCallback;
+
+  public MutableSegmentImpl(RealtimeSegmentConfig config, IndexSegmentCallback indexSegmentCallback) {
     _tableName = config.getTableName();
     _segmentName = config.getSegmentName();
     _schema = config.getSchema();
@@ -167,6 +171,7 @@ public abstract class MutableSegmentImpl implements MutableSegment {
     _statsHistory = config.getStatsHistory();
     _segmentPartitionConfig = config.getSegmentPartitionConfig();
     _nullHandlingEnabled = config.isNullHandlingEnabled();
+    _indexSegmentCallback = indexSegmentCallback;
 
     Collection<FieldSpec> allFieldSpecs = _schema.getAllFieldSpecs();
     List<FieldSpec> physicalFieldSpecs = new ArrayList<>(allFieldSpecs.size());
@@ -302,7 +307,7 @@ public abstract class MutableSegmentImpl implements MutableSegment {
       }
     }
 
-    for (FieldSpec fieldSpec : _physicalFieldSpecs) {
+    for (FieldSpec fieldSpec : virtualFieldSpecs) {
       String column = fieldSpec.getName();
       VirtualColumnContext virtualColumnContext = new VirtualColumnContext(fieldSpec, _capacity, true);
       final VirtualColumnProvider provider =
@@ -317,6 +322,8 @@ public abstract class MutableSegmentImpl implements MutableSegment {
       _virtualColumnDictionary.put(column, dictionary);
     }
 
+    _indexSegmentCallback.init(_segmentMetadata, _virtualColumnIndexReader);
+
     if (_realtimeLuceneReaders != null) {
       // add the realtime lucene index readers to the global queue for refresh task to pick up
       RealtimeLuceneIndexRefreshState realtimeLuceneIndexRefreshState = RealtimeLuceneIndexRefreshState.getInstance();
@@ -377,7 +384,7 @@ public abstract class MutableSegmentImpl implements MutableSegment {
 
       // Update number of document indexed at last to make the latest record queryable
       canTakeMore = _numDocsIndexed++ < _capacity;
-      postProcessRecords(row, docId);
+      _indexSegmentCallback.postProcessRecords(row, docId);
     } else {
       Preconditions
           .checkState(_aggregateMetrics, "Invalid document-id during indexing: " + docId + " expected: " + numDocs);
@@ -522,8 +529,6 @@ public abstract class MutableSegmentImpl implements MutableSegment {
     }
   }
 
-  protected abstract void postProcessRecords(GenericRow row, int docId);
-
   private boolean aggregateMetrics(GenericRow row, int docId) {
     for (MetricFieldSpec metricFieldSpec : _physicalMetricFieldSpecs) {
       String column = metricFieldSpec.getName();
@@ -589,7 +594,6 @@ public abstract class MutableSegmentImpl implements MutableSegment {
   public ColumnDataSource getDataSource(String columnName) {
     FieldSpec fieldSpec = _schema.getFieldSpecFor(columnName);
     if (fieldSpec.isVirtualColumn()) {
-      // FIXME
       VirtualColumnContext virtualColumnContext = new VirtualColumnContext(fieldSpec, _numDocsIndexed, true);
       VirtualColumnProvider virtualColumnProvider = _virtualColumnProviderMap.get(columnName);
       return new ColumnDataSource(virtualColumnProvider.buildColumnIndexContainer(virtualColumnContext),
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdaterConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WatermarkManager.java
similarity index 70%
copy from pinot-core/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdaterConfig.java
copy to pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WatermarkManager.java
index 97d48f3..01579e7 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdaterConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WatermarkManager.java
@@ -18,8 +18,16 @@
  */
 package org.apache.pinot.core.segment.updater;
 
-public class SegmentUpdaterConfig {
-  public static final String SEGMENT_UDPATE_SLEEP_MS = "sleep.ms";
-  public static final String INPUT_TOPIC_PREFIX  = "input.topic.prefix";
-  public static final int SEGMENT_UDPATE_SLEEP_MS_DEFAULT = 100;
+import org.apache.commons.configuration.Configuration;
+import org.apache.pinot.grigio.common.metrics.GrigioMeter;
+import org.apache.pinot.grigio.common.metrics.GrigioMetrics;
+
+import java.util.Map;
+
+public interface WatermarkManager {
+
+  void init(Configuration config, GrigioMetrics metrics);
+
+  Map<String, Map<Integer, Long>> getHighWaterMarkTablePartitionMap();
+
 }
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
index 9b28f5a..9626be2 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
@@ -36,6 +36,9 @@ import org.apache.pinot.common.segment.SegmentMetadata;
 import org.apache.pinot.core.data.manager.config.TableDataManagerConfig;
 import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
 import org.apache.pinot.core.data.manager.offline.OfflineTableDataManager;
+import org.apache.pinot.core.data.manager.upsert.DefaultDataManagerCallbackImpl;
+import org.apache.pinot.core.data.manager.upsert.DefaultIndexSegmentCallback;
+import org.apache.pinot.core.data.manager.upsert.DefaultTableDataManagerCallbackImpl;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
 import org.testng.Assert;
 import org.testng.annotations.AfterSuite;
@@ -100,7 +103,7 @@ public class BaseTableDataManagerTest {
 
   private TableDataManager makeTestableManager()
       throws Exception {
-    TableDataManager tableDataManager = new OfflineTableDataManager();
+    TableDataManager tableDataManager = new OfflineTableDataManager(new DefaultTableDataManagerCallbackImpl());
     TableDataManagerConfig config;
     {
       config = mock(TableDataManagerConfig.class);
@@ -139,7 +142,7 @@ public class BaseTableDataManagerTest {
     // Add the segment, get it for use, remove the segment, and then return it.
     // Make sure that the segment is not destroyed before return.
     ImmutableSegment immutableSegment = makeImmutableSegment(segmentName, totalDocs);
-    tableDataManager.addSegment(immutableSegment);
+    tableDataManager.addSegment(immutableSegment, DefaultDataManagerCallbackImpl.INSTANCE);
     SegmentDataManager segmentDataManager = tableDataManager.acquireSegment(segmentName);
     Assert.assertEquals(segmentDataManager.getReferenceCount(), 2);
     tableDataManager.removeSegment(segmentName);
@@ -161,7 +164,7 @@ public class BaseTableDataManagerTest {
     // Add a new segment and remove it in order this time.
     final String anotherSeg = "AnotherSegment";
     ImmutableSegment ix1 = makeImmutableSegment(anotherSeg, totalDocs);
-    tableDataManager.addSegment(ix1);
+    tableDataManager.addSegment(ix1, DefaultDataManagerCallbackImpl.INSTANCE);
     SegmentDataManager sdm1 = tableDataManager.acquireSegment(anotherSeg);
     Assert.assertNotNull(sdm1);
     Assert.assertEquals(sdm1.getReferenceCount(), 2);
@@ -178,7 +181,7 @@ public class BaseTableDataManagerTest {
     Assert.assertEquals(sdm1.getReferenceCount(), 1);
     // Now replace the segment with another one.
     ImmutableSegment ix2 = makeImmutableSegment(anotherSeg, totalDocs + 1);
-    tableDataManager.addSegment(ix2);
+    tableDataManager.addSegment(ix2, DefaultDataManagerCallbackImpl.INSTANCE);
     // Now the previous one should have been destroyed, and
     Assert.assertEquals(sdm1.getReferenceCount(), 0);
     verify(ix1, times(1)).destroy();
@@ -222,7 +225,8 @@ public class BaseTableDataManagerTest {
 
     for (int i = _lo; i <= _hi; i++) {
       final String segName = SEGMENT_PREFIX + i;
-      tableDataManager.addSegment(makeImmutableSegment(segName, random.nextInt()));
+      tableDataManager.addSegment(makeImmutableSegment(segName, random.nextInt()),
+          DefaultDataManagerCallbackImpl.INSTANCE);
       _allSegManagers.add(_internalSegMap.get(segName));
     }
 
@@ -409,7 +413,8 @@ public class BaseTableDataManagerTest {
     private void addSegment() {
       final int segmentToAdd = _hi + 1;
       final String segName = SEGMENT_PREFIX + segmentToAdd;
-      _tableDataManager.addSegment(makeImmutableSegment(segName, _random.nextInt()));
+      _tableDataManager.addSegment(makeImmutableSegment(segName, _random.nextInt()),
+          DefaultDataManagerCallbackImpl.INSTANCE);
       _allSegManagers.add(_internalSegMap.get(segName));
       _hi = segmentToAdd;
     }
@@ -418,7 +423,8 @@ public class BaseTableDataManagerTest {
     private void replaceSegment() {
       int segToReplace = _random.nextInt(_hi - _lo + 1) + _lo;
       final String segName = SEGMENT_PREFIX + segToReplace;
-      _tableDataManager.addSegment(makeImmutableSegment(segName, _random.nextInt()));
+      _tableDataManager.addSegment(makeImmutableSegment(segName, _random.nextInt()),
+          DefaultDataManagerCallbackImpl.INSTANCE);
       _allSegManagers.add(_internalSegMap.get(segName));
     }
 
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
index c4b557b..2f8bc3f 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
@@ -31,6 +31,7 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.core.data.manager.upsert.DefaultDataManagerCallbackImpl;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
 import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
@@ -696,7 +697,7 @@ public class LLRealtimeSegmentDataManagerTest {
     Assert.assertEquals(secondSegmentDataManager.get().getPartitionConsumerSemaphore().availablePermits(), 1);
   }
 
-  public static class FakeLLRealtimeSegmentDataManager extends AppendLLRealtimeSegmentDataManager {
+  public static class FakeLLRealtimeSegmentDataManager extends LLRealtimeSegmentDataManager {
 
     public Field _state;
     public Field _shouldStop;
@@ -729,7 +730,7 @@ public class LLRealtimeSegmentDataManagerTest {
         throws Exception {
       super(segmentZKMetadata, tableConfig, realtimeTableDataManager, resourceDataDir,
           new IndexLoadingConfig(makeInstanceDataManagerConfig(), tableConfig), schema, llcSegmentName,
-          semaphoreMap.get(llcSegmentName.getPartitionId()), serverMetrics);
+          semaphoreMap.get(llcSegmentName.getPartitionId()), serverMetrics, DefaultDataManagerCallbackImpl.INSTANCE);
       _state = LLRealtimeSegmentDataManager.class.getDeclaredField("_state");
       _state.setAccessible(true);
       _shouldStop = LLRealtimeSegmentDataManager.class.getDeclaredField("_shouldStop");
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTestUtils.java b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTestUtils.java
index 26e7eff..4ccf197 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTestUtils.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTestUtils.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.indexsegment.mutable;
 import java.util.Set;
 import javax.annotation.Nonnull;
 import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.core.data.manager.upsert.DefaultIndexSegmentCallback;
 import org.apache.pinot.core.io.writer.impl.DirectMemoryManager;
 import org.apache.pinot.core.realtime.impl.RealtimeSegmentConfig;
 import org.apache.pinot.core.realtime.impl.RealtimeSegmentStatsHistory;
@@ -59,6 +60,6 @@ public class MutableSegmentImplTestUtils {
             .setRealtimeSegmentZKMetadata(new RealtimeSegmentZKMetadata())
             .setMemoryManager(new DirectMemoryManager(SEGMENT_NAME)).setStatsHistory(statsHistory)
             .setAggregateMetrics(aggregateMetrics).setNullHandlingEnabled(nullHandlingEnabled).build();
-    return new MutableAppendSegmentImpl(realtimeSegmentConfig);
+    return new MutableSegmentImpl(realtimeSegmentConfig, DefaultIndexSegmentCallback.INSTANCE);
   }
 }
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithNullValueVectorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithNullValueVectorTest.java
index a4ee984..4d1d469 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithNullValueVectorTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithNullValueVectorTest.java
@@ -42,6 +42,7 @@ import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.TableDataManager;
 import org.apache.pinot.core.data.manager.config.TableDataManagerConfig;
 import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
+import org.apache.pinot.core.data.manager.upsert.DefaultDataManagerCallbackImpl;
 import org.apache.pinot.core.query.executor.QueryExecutor;
 import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
@@ -141,7 +142,7 @@ public class SegmentGenerationWithNullValueVectorTest {
         .getTableDataManager(tableDataManagerConfig, "testInstance", mock(ZkHelixPropertyStore.class),
             mock(ServerMetrics.class));
     tableDataManager.start();
-    tableDataManager.addSegment(_segment);
+    tableDataManager.addSegment(_segment, DefaultDataManagerCallbackImpl.INSTANCE);
     _instanceDataManager = mock(InstanceDataManager.class);
     when(_instanceDataManager.getTableDataManager(TABLE_NAME)).thenReturn(tableDataManager);
 
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/loader/LoaderTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/loader/LoaderTest.java
index 59342a3..d71896c 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/loader/LoaderTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/loader/LoaderTest.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.List;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.filefilter.FileFileFilter;
+import org.apache.pinot.core.data.manager.upsert.DefaultDataManagerCallbackImpl;
 import org.apache.pinot.core.segment.creator.impl.inv.text.LuceneTextIndexCreator;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -225,12 +226,14 @@ public class LoaderTest {
     schema.addField(new DimensionFieldSpec("SVString", FieldSpec.DataType.STRING, true, ""));
     schema.addField(new DimensionFieldSpec("MVString", FieldSpec.DataType.STRING, false, ""));
 
-    IndexSegment indexSegment = ImmutableSegmentLoader.load(_indexDir, _v1IndexLoadingConfig, schema);
+    IndexSegment indexSegment = ImmutableSegmentLoader
+        .load(_indexDir, _v1IndexLoadingConfig, DefaultDataManagerCallbackImpl.INSTANCE, schema);
     Assert.assertEquals(indexSegment.getDataSource("SVString").getDictionary().get(0), "");
     Assert.assertEquals(indexSegment.getDataSource("MVString").getDictionary().get(0), "");
     indexSegment.destroy();
 
-    indexSegment = ImmutableSegmentLoader.load(_indexDir, _v3IndexLoadingConfig, schema);
+    indexSegment = ImmutableSegmentLoader
+        .load(_indexDir, _v3IndexLoadingConfig, DefaultDataManagerCallbackImpl.INSTANCE, schema);
     Assert.assertEquals(indexSegment.getDataSource("SVString").getDictionary().get(0), "");
     Assert.assertEquals(indexSegment.getDataSource("MVString").getDictionary().get(0), "");
     indexSegment.destroy();
@@ -245,7 +248,8 @@ public class LoaderTest {
 
     FieldSpec byteMetric = new MetricFieldSpec(newColumnName, FieldSpec.DataType.BYTES, defaultValue);
     schema.addField(byteMetric);
-    IndexSegment indexSegment = ImmutableSegmentLoader.load(_indexDir, _v3IndexLoadingConfig, schema);
+    IndexSegment indexSegment = ImmutableSegmentLoader
+        .load(_indexDir, _v3IndexLoadingConfig, DefaultDataManagerCallbackImpl.INSTANCE, schema);
     Assert
         .assertEquals(BytesUtils.toHexString((byte[]) indexSegment.getDataSource(newColumnName).getDictionary().get(0)),
             defaultValue);
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/SerializedBytesQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/SerializedBytesQueriesTest.java
index c8e688b..b0f5a55 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/SerializedBytesQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/SerializedBytesQueriesTest.java
@@ -29,6 +29,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Random;
 import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.data.manager.upsert.DefaultDataManagerCallbackImpl;
+import org.apache.pinot.core.data.manager.upsert.DefaultIndexSegmentCallback;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.common.response.broker.AggregationResult;
diff --git a/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java b/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java
index b6ce1e2..203d2f6 100644
--- a/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java
@@ -36,6 +36,7 @@ import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.TableDataManager;
 import org.apache.pinot.core.data.manager.config.TableDataManagerConfig;
 import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
+import org.apache.pinot.core.data.manager.upsert.DefaultDataManagerCallbackImpl;
 import org.apache.pinot.core.indexsegment.IndexSegment;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
@@ -103,7 +104,7 @@ public class QueryExecutorTest {
             mock(ServerMetrics.class));
     tableDataManager.start();
     for (ImmutableSegment indexSegment : _indexSegments) {
-      tableDataManager.addSegment(indexSegment);
+      tableDataManager.addSegment(indexSegment, DefaultDataManagerCallbackImpl.INSTANCE);
     }
     InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
     when(instanceDataManager.getTableDataManager(TABLE_NAME)).thenReturn(tableDataManager);
diff --git a/pinot-grigio/pom.xml b/pinot-grigio/pinot-grigio-provided/pom.xml
similarity index 51%
copy from pinot-grigio/pom.xml
copy to pinot-grigio/pinot-grigio-provided/pom.xml
index e53fa72..5643070 100644
--- a/pinot-grigio/pom.xml
+++ b/pinot-grigio/pinot-grigio-provided/pom.xml
@@ -22,21 +22,38 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <parent>
-    <artifactId>pinot</artifactId>
-    <groupId>org.apache.pinot</groupId>
-    <version>0.3.0-SNAPSHOT</version>
-  </parent>
-  <modelVersion>4.0.0</modelVersion>
-  <artifactId>pinot-grigio</artifactId>
-    <packaging>pom</packaging>
-    <name>Pinot Grigio</name>
-  <url>http://maven.apache.org</url>
-    <modules>
-      <module>pinot-grigio-common</module>
-      <module>pinot-grigio-coordinator</module>
-    </modules>
+    <parent>
+        <artifactId>pinot-grigio</artifactId>
+        <groupId>org.apache.pinot</groupId>
+        <version>0.3.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>pinot-grigio-provided</artifactId>
+    <name>Pinot Grigio Provided components</name>
     <properties>
-    <pinot.root>${basedir}/..</pinot.root>
-  </properties>
+        <pinot.root>${basedir}/../..</pinot.root>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.pinot</groupId>
+            <artifactId>pinot-common</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.pinot</groupId>
+            <artifactId>pinot-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.testng</groupId>
+            <artifactId>testng</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+
+
 </project>
\ No newline at end of file
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/UpsertLLRealtimeSegmentDataManager.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertDataManagerCallbackImpl.java
similarity index 57%
rename from pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/UpsertLLRealtimeSegmentDataManager.java
rename to pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertDataManagerCallbackImpl.java
index f08037a..24731f2 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/UpsertLLRealtimeSegmentDataManager.java
+++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertDataManagerCallbackImpl.java
@@ -16,20 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.data.manager.realtime;
+package org.apache.pinot.core.data.manager.upsert;
 
 import com.google.common.base.Preconditions;
-import org.apache.pinot.common.config.TableConfig;
-import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.core.data.manager.UpsertSegmentDataManager;
-import org.apache.pinot.core.indexsegment.UpsertSegment;
-import org.apache.pinot.core.indexsegment.mutable.MutableSegmentImpl;
-import org.apache.pinot.core.indexsegment.mutable.MutableUpsertSegmentImpl;
-import org.apache.pinot.core.realtime.impl.RealtimeSegmentConfig;
-import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.core.segment.updater.SegmentUpdater;
 import org.apache.pinot.grigio.common.messages.KeyCoordinatorQueueMsg;
 import org.apache.pinot.grigio.common.rpcQueue.ProduceTask;
@@ -40,71 +34,74 @@ import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.TimeFieldSpec;
 import org.apache.pinot.spi.data.readers.GenericRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.concurrent.Semaphore;
 
-/**
- * class design is pretty bad right now, need to rework inheritance to abstract the base class or use composition instead
- */
-public class UpsertLLRealtimeSegmentDataManager extends LLRealtimeSegmentDataManager implements UpsertSegmentDataManager {
+public class UpsertDataManagerCallbackImpl implements DataManagerCallback {
 
-  private final QueueProducer _keyCoordinatorQueueProducer;
+  private static final Logger LOGGER = LoggerFactory.getLogger(UpsertDataManagerCallbackImpl.class);
 
-  public UpsertLLRealtimeSegmentDataManager(RealtimeSegmentZKMetadata segmentZKMetadata, TableConfig tableConfig,
-      RealtimeTableDataManager realtimeTableDataManager, String resourceDataDir, IndexLoadingConfig indexLoadingConfig,
-      Schema schema, LLCSegmentName llcSegmentName, Semaphore partitionConsumerSemaphore, ServerMetrics serverMetrics) throws Exception {
-    super(segmentZKMetadata, tableConfig, realtimeTableDataManager, resourceDataDir, indexLoadingConfig, schema,
-            llcSegmentName, partitionConsumerSemaphore, serverMetrics);
-    Preconditions.checkState(_schema.getPrimaryKeyFieldSpec() != null, "primary key not found");
-    Preconditions.checkState(_schema.getOffsetKeyFieldSpec() != null, "offset key not found");
+  private final String _tableNameWithType;
+  private final QueueProducer _keyCoordinatorQueueProducer;
+  private final String _segmentName;
+  private final Schema _schema;
+  private final ServerMetrics _serverMetrics;
+  private IndexSegmentCallback _indexSegmentCallback;
+
+  public UpsertDataManagerCallbackImpl(String tableNameWithType, String segmentName, Schema schema,
+      ServerMetrics serverMetrics, boolean isMutable) {
+    Preconditions.checkState(schema.getPrimaryKeyFieldSpec() != null, "primary key not found");
+    Preconditions.checkState(schema.getOffsetKeyFieldSpec() != null, "offset key not found");
+    _tableNameWithType = TableNameBuilder.ensureTableNameWithType(tableNameWithType,
+        CommonConstants.Helix.TableType.REALTIME);
+    _segmentName = segmentName;
+    _schema = schema;
+    _serverMetrics = serverMetrics;
     _keyCoordinatorQueueProducer = KeyCoordinatorProvider.getInstance().getCachedProducer(_tableNameWithType);
-    initVirtualColumns();
-  }
-
-  public String getSegmentName() {
-    return _segmentNameStr;
+    if (isMutable) {
+      _indexSegmentCallback = new UpsertMutableIndexSegmentCallback();
+    } else {
+      _indexSegmentCallback = new UpsertImmutableIndexSegmentCallback();
+    }
   }
 
   @Override
-  public void updateVirtualColumns(List<UpdateLogEntry> messages) {
-    ((UpsertSegment) _realtimeSegment).updateVirtualColumn(messages);
+  public synchronized IndexSegmentCallback getIndexSegmentCallback() {
+    return _indexSegmentCallback;
   }
 
   @Override
-  public String getVirtualColumnInfo(long offset) {
-    return ((UpsertSegment) _realtimeSegment).getVirtualColumnInfo(offset);
+  public void processTransformedRow(GenericRow row, long offset) {
+    row.putValue(_schema.getOffsetKey(), offset);
   }
 
   @Override
-  public void destroy() {
-    SegmentUpdater.getInstance().removeSegmentDataManager(_tableNameWithType, _llcSegmentName.getSegmentName(), this);
-    super.destroy();
+  public void postIndexProcessing(GenericRow row, long offset) {
+    emitEventToKeyCoordinator(row, offset);
   }
 
   @Override
-  protected MutableSegmentImpl createMutableSegment(RealtimeSegmentConfig config) {
-    return new MutableUpsertSegmentImpl(config);
+  public void postConsumeLoop() {
+    _keyCoordinatorQueueProducer.flush();
   }
 
-  @Override
-  protected void processTransformedRow(GenericRow row, long offset) {
-    row.putField(_schema.getOffsetKey(), offset);
+  public void updateVirtualColumns(List<UpdateLogEntry> messages) {
+    _indexSegmentCallback.updateVirtualColumn(messages);
   }
 
-  @Override
-  protected void postIndexProcessing(GenericRow row, long offset) {
-    emitEventToKeyCoordinator(row, offset);
+  public String getVirtualColumnInfo(long offset) {
+    return _indexSegmentCallback.getVirtualColumnInfo(offset);
   }
 
-  @Override
-  protected boolean consumeLoop() throws Exception {
-    boolean result = super.consumeLoop();
-    segmentLogger.info("flushing kafka producer");
-    _keyCoordinatorQueueProducer.flush();
-    segmentLogger.info("done flushing kafka producer");
-    return result;
+  public void destroy() {
+    try {
+      SegmentUpdater.getInstance().removeSegmentDataManager(_tableNameWithType, _segmentName, this);
+    } catch (Exception ex) {
+      LOGGER.error("failed to destroy data manager callback");
+    }
   }
 
   /**
@@ -116,7 +113,7 @@ public class UpsertLLRealtimeSegmentDataManager extends LLRealtimeSegmentDataMan
     final byte[] primaryKeyBytes = getPrimaryKeyBytesFromRow(row);
     final long timestampMillis = getTimestampFromRow(row);
     ProduceTask<byte[], KeyCoordinatorQueueMsg> task = new ProduceTask<>(primaryKeyBytes,
-        new KeyCoordinatorQueueMsg(primaryKeyBytes, _segmentNameStr, timestampMillis, offset));
+        new KeyCoordinatorQueueMsg(primaryKeyBytes, _segmentName, timestampMillis, offset));
     task.setCallback(new ProduceTask.Callback() {
       @Override
       public void onSuccess() {
@@ -143,11 +140,12 @@ public class UpsertLLRealtimeSegmentDataManager extends LLRealtimeSegmentDataMan
     return spec.getIncomingGranularitySpec().toMillis(row.getValue(spec.getIncomingTimeColumnName()));
   }
 
-
-  private void initVirtualColumns() throws IOException {
+  @Override
+  public void initVirtualColumns() throws IOException {
     // 1. ensure the data manager can capture all update events
     // 2. load all existing messages
-    SegmentUpdater.getInstance().addSegmentDataManager(_tableNameWithType, _llcSegmentName, this);
-    ((UpsertSegment) _realtimeSegment).initVirtualColumn();
+    SegmentUpdater.getInstance().addSegmentDataManager(_tableNameWithType, new LLCSegmentName(_segmentName), this);
+    _indexSegmentCallback.initVirtualColumn();
   }
+
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableUpsertSegmentImpl.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertImmutableIndexSegmentCallback.java
similarity index 74%
rename from pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableUpsertSegmentImpl.java
rename to pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertImmutableIndexSegmentCallback.java
index 5ab42f5..1ddc963 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableUpsertSegmentImpl.java
+++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertImmutableIndexSegmentCallback.java
@@ -16,97 +16,86 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.indexsegment.immutable;
+package org.apache.pinot.core.data.manager.upsert;
+
 
 import com.clearspring.analytics.util.Preconditions;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.pinot.common.Utils;
 import org.apache.pinot.common.config.TableNameBuilder;
+import org.apache.pinot.common.segment.SegmentMetadata;
 import org.apache.pinot.common.utils.CommonConstants;
-import org.apache.pinot.core.indexsegment.UpsertSegment;
 import org.apache.pinot.core.io.reader.BaseSingleColumnSingleValueReader;
 import org.apache.pinot.core.io.reader.DataFileReader;
-import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
 import org.apache.pinot.core.segment.index.column.ColumnIndexContainer;
 import org.apache.pinot.core.segment.index.readers.Dictionary;
-import org.apache.pinot.core.segment.store.SegmentDirectory;
-import org.apache.pinot.core.segment.updater.UpsertWaterMarkManager;
+import org.apache.pinot.core.segment.updater.UpsertWatermarkManager;
 import org.apache.pinot.core.segment.virtualcolumn.mutable.VirtualColumnLongValueReaderWriter;
-import org.apache.pinot.core.startree.v2.store.StarTreeIndexContainer;
 import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntry;
 import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntrySet;
 import org.apache.pinot.grigio.common.storageProvider.UpdateLogStorageProvider;
+import org.apache.pinot.spi.data.readers.GenericRow;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
-public class ImmutableUpsertSegmentImpl extends ImmutableSegmentImpl implements UpsertSegment {
+public class UpsertImmutableIndexSegmentCallback implements IndexSegmentCallback {
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(ImmutableUpsertSegmentImpl.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(UpsertImmutableIndexSegmentCallback.class);
 
-  private final List<VirtualColumnLongValueReaderWriter> _virtualColumnsReaderWriter;
-  private final String _tableNameWithType;
-  private final String _segmentName;
-  private final int _totalDoc;
+  private List<VirtualColumnLongValueReaderWriter> _virtualColumnsReaderWriter;
+  private String _tableNameWithType;
+  private String _segmentName;
+  private int _totalDoc;
   private long _minSourceOffset;
-  private final UpsertWaterMarkManager _upsertWaterMarkManager;
-  private final UpdateLogStorageProvider _updateLogStorageProvider;
+  private UpsertWatermarkManager _upsertWatermarkManager;
+  private UpdateLogStorageProvider _updateLogStorageProvider;
   // use array for mapping bewteen offset to docId, where actual offset = min_offset + array_index
   // use 4 bytes per record
   private int[] _sourceOffsetToDocIdArray;
 
   private static final int DEFAULT_DOC_ID_FOR_MISSING_ENTRY = -1;
 
-  public ImmutableUpsertSegmentImpl(SegmentDirectory segmentDirectory,
-                                    SegmentMetadataImpl segmentMetadata,
-                                    Map<String, ColumnIndexContainer> columnIndexContainerMap,
-                                    @Nullable StarTreeIndexContainer starTreeIndexContainer) {
-    super(segmentDirectory, segmentMetadata, columnIndexContainerMap, starTreeIndexContainer);
+  public UpsertImmutableIndexSegmentCallback() {}
+
+  @Override
+  public void init(SegmentMetadata segmentMetadata, Map<String, DataFileReader> virtualColumnIndexReader) {
     Preconditions.checkState(segmentMetadata.getSchema().isTableForUpsert(), "table should be upsert but it is not");
     _tableNameWithType = TableNameBuilder.ensureTableNameWithType(segmentMetadata.getTableName(),
         CommonConstants.Helix.TableType.REALTIME);
     _segmentName = segmentMetadata.getName();
     _totalDoc = segmentMetadata.getTotalDocs();
-    _upsertWaterMarkManager = UpsertWaterMarkManager.getInstance();
+    _upsertWatermarkManager = UpsertWatermarkManager.getInstance();
     _updateLogStorageProvider = UpdateLogStorageProvider.getInstance();
     _virtualColumnsReaderWriter = new ArrayList<>();
-    for (Map.Entry<String, ColumnIndexContainer> entry: columnIndexContainerMap.entrySet()) {
-      String columnName = entry.getKey();
-      ColumnIndexContainer container = entry.getValue();
-      if (segmentMetadata.getSchema().isVirtualColumn(columnName) && (container.getForwardIndex() instanceof VirtualColumnLongValueReaderWriter)) {
-        _virtualColumnsReaderWriter.add((VirtualColumnLongValueReaderWriter) container.getForwardIndex());
-      }
+    for (DataFileReader reader: virtualColumnIndexReader.values()) {
+      _virtualColumnsReaderWriter.add((VirtualColumnLongValueReaderWriter) reader);
     }
-     buildOffsetToDocIdMap(columnIndexContainerMap.get(segmentMetadata.getSchema().getOffsetKey()));
   }
 
-  /** constructor used for creating instance in test cases
-   * should not be used for creating regular segment
+  /**
+   * method for testing purpose only
    */
   @VisibleForTesting
-  protected ImmutableUpsertSegmentImpl(List<VirtualColumnLongValueReaderWriter> readerWriters,
-                                       int totalDoc, UpsertWaterMarkManager manager,
-                                       UpdateLogStorageProvider updateLogStorageProvider,
-                                       long minSourceOffset, int[] offsetToDocId) {
-    super(null, null, null, null);
+  protected void init(List<VirtualColumnLongValueReaderWriter> readerWriters,
+      int totalDoc, UpsertWatermarkManager manager,
+      UpdateLogStorageProvider updateLogStorageProvider,
+      long minSourceOffset, int[] offsetToDocId) {
     _tableNameWithType = "testTable";
     _segmentName = "testSegment";
     _virtualColumnsReaderWriter = readerWriters;
     _totalDoc = totalDoc;
-    _upsertWaterMarkManager = manager;
+    _upsertWatermarkManager = manager;
     _updateLogStorageProvider = updateLogStorageProvider;
     _minSourceOffset = minSourceOffset;
     _sourceOffsetToDocIdArray = offsetToDocId;
-
   }
 
-  private void buildOffsetToDocIdMap(ColumnIndexContainer offsetColumnIndexContainer) {
+
+  @Override
+  public void initOffsetColumn(ColumnIndexContainer offsetColumnIndexContainer) {
     long start = System.currentTimeMillis();
     final DataFileReader reader = offsetColumnIndexContainer.getForwardIndex();
     final Dictionary dictionary = offsetColumnIndexContainer.getDictionary();
@@ -143,34 +132,9 @@ public class ImmutableUpsertSegmentImpl extends ImmutableSegmentImpl implements
     LOGGER.info("built offset to DocId map for segment {} with {} documents in {} ms", _segmentName, _totalDoc, System.currentTimeMillis() - start);
   }
 
-  public static ImmutableUpsertSegmentImpl copyOf(ImmutableSegmentImpl immutableSegment) {
-
-    return new ImmutableUpsertSegmentImpl(immutableSegment._segmentDirectory, immutableSegment._segmentMetadata,
-        immutableSegment._indexContainerMap, immutableSegment._starTreeIndexContainer);
-  }
-
-  @Override
-  public void updateVirtualColumn(Iterable<UpdateLogEntry> logEntries) {
-    for (UpdateLogEntry logEntry: logEntries) {
-      boolean updated = false;
-      int docId = getDocIdFromSourceOffset(logEntry.getOffset());
-      for (VirtualColumnLongValueReaderWriter readerWriter : _virtualColumnsReaderWriter) {
-        updated = readerWriter.update(docId, logEntry.getValue(), logEntry.getType()) || updated;
-      }
-      if (updated) {
-        _upsertWaterMarkManager.processMessage(_tableNameWithType, _segmentName, logEntry);
-      }
-    }
-  }
-
   @Override
-  public String getVirtualColumnInfo(long offset) {
-    int docId = getDocIdFromSourceOffset(offset);
-    StringBuilder result = new StringBuilder("matched: ");
-    for (VirtualColumnLongValueReaderWriter readerWriter : _virtualColumnsReaderWriter) {
-      result.append(readerWriter.getInt(docId)).append("; ");
-    }
-    return result.toString();
+  public void postProcessRecords(GenericRow row, int docId) {
+    // do nothing
   }
 
   /**
@@ -216,7 +180,7 @@ public class ImmutableUpsertSegmentImpl extends ImmutableSegmentImpl implements
             unmatchedLogEntryCount);
       }
       partitionToHighestWatermark.forEach((partition, value) ->
-          _upsertWaterMarkManager.processVersionUpdate(_tableNameWithType, partition, value));
+          _upsertWatermarkManager.processVersionUpdate(_tableNameWithType, partition, value));
 
     } catch (Exception e) {
       LOGGER.error("failed to load the offset with thread pool");
@@ -226,6 +190,30 @@ public class ImmutableUpsertSegmentImpl extends ImmutableSegmentImpl implements
         _segmentName, System.currentTimeMillis() - start);
   }
 
+  @Override
+  public void updateVirtualColumn(Iterable<UpdateLogEntry> logEntries) {
+    for (UpdateLogEntry logEntry: logEntries) {
+      boolean updated = false;
+      int docId = getDocIdFromSourceOffset(logEntry.getOffset());
+      for (VirtualColumnLongValueReaderWriter readerWriter : _virtualColumnsReaderWriter) {
+        updated = readerWriter.update(docId, logEntry.getValue(), logEntry.getType()) || updated;
+      }
+      if (updated) {
+        _upsertWatermarkManager.processMessage(_tableNameWithType, _segmentName, logEntry);
+      }
+    }
+  }
+
+  @Override
+  public String getVirtualColumnInfo(long offset) {
+    int docId = getDocIdFromSourceOffset(offset);
+    StringBuilder result = new StringBuilder("matched: ");
+    for (VirtualColumnLongValueReaderWriter readerWriter : _virtualColumnsReaderWriter) {
+      result.append(readerWriter.getInt(docId)).append("; ");
+    }
+    return result.toString();
+  }
+
   /**
    * given a offset from source kafka topic, return the docId associated with it
    * throw exception if there is no docId for the associated kafka offset (because kafka offset might not be continuous)
@@ -246,6 +234,6 @@ public class ImmutableUpsertSegmentImpl extends ImmutableSegmentImpl implements
         return _sourceOffsetToDocIdArray[position];
       }
     }
-
   }
+
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableUpsertSegmentImpl.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertMutableIndexSegmentCallback.java
similarity index 76%
rename from pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableUpsertSegmentImpl.java
rename to pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertMutableIndexSegmentCallback.java
index a6e7728..eeda5a6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableUpsertSegmentImpl.java
+++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertMutableIndexSegmentCallback.java
@@ -16,18 +16,21 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.indexsegment.mutable;
+package org.apache.pinot.core.data.manager.upsert;
 
 import com.google.common.base.Preconditions;
-import org.apache.pinot.core.indexsegment.UpsertSegment;
+import org.apache.pinot.common.config.TableNameBuilder;
+import org.apache.pinot.common.segment.SegmentMetadata;
+import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.core.io.reader.DataFileReader;
-import org.apache.pinot.core.realtime.impl.RealtimeSegmentConfig;
-import org.apache.pinot.core.segment.updater.UpsertWaterMarkManager;
+import org.apache.pinot.core.segment.index.column.ColumnIndexContainer;
+import org.apache.pinot.core.segment.updater.UpsertWatermarkManager;
 import org.apache.pinot.core.segment.virtualcolumn.mutable.VirtualColumnLongValueReaderWriter;
 import org.apache.pinot.grigio.common.messages.LogEventType;
 import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntry;
 import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntrySet;
 import org.apache.pinot.grigio.common.storageProvider.UpdateLogStorageProvider;
+import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,13 +41,15 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
-public class MutableUpsertSegmentImpl extends MutableSegmentImpl implements UpsertSegment {
+public class UpsertMutableIndexSegmentCallback implements IndexSegmentCallback {
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(MutableUpsertSegmentImpl.class);
-  private final UpsertWaterMarkManager upsertWaterMarkManager;
-
-  private final String _kafkaOffsetColumnName;
+  private static final Logger LOGGER = LoggerFactory.getLogger(UpsertMutableIndexSegmentCallback.class);
 
+  private String _tableName;
+  private String _segmentName;
+  private Schema _schema;
+  private UpsertWatermarkManager _upsertWatermarkManager;
+  private String _offsetColumnName;
   private final List<VirtualColumnLongValueReaderWriter> _mutableSegmentReaderWriters = new ArrayList<>();
   // use map for mapping between kafka offset and docId because we at-most have 1 mutable segment per consumer
   // will use more memory, but we can update this later
@@ -54,23 +59,31 @@ public class MutableUpsertSegmentImpl extends MutableSegmentImpl implements Upse
   private final Map<Long, UpdateLogEntry> _unmatchedInsertRecords = new ConcurrentHashMap<>();
   private final Map<Long, UpdateLogEntry> _unmatchedDeleteRecords = new ConcurrentHashMap<>();
 
-  public MutableUpsertSegmentImpl(RealtimeSegmentConfig config) {
-    super(config);
-    _kafkaOffsetColumnName = _schema.getOffsetKey();
+  @Override
+  public void init(SegmentMetadata segmentMetadata, Map<String, DataFileReader> virtualColumnIndexReader) {
+    _tableName = TableNameBuilder.ensureTableNameWithType(segmentMetadata.getTableName(),
+        CommonConstants.Helix.TableType.REALTIME);
+    _segmentName = segmentMetadata.getName();
+    _schema = segmentMetadata.getSchema();
+    _offsetColumnName = _schema.getOffsetKey();
     Preconditions.checkState(_schema.isTableForUpsert(), "table should be upsert");
-    for (Map.Entry<String, DataFileReader> entry: _virtualColumnIndexReader.entrySet()) {
+    for (Map.Entry<String, DataFileReader> entry: virtualColumnIndexReader.entrySet()) {
       String column = entry.getKey();
       DataFileReader reader = entry.getValue();
       if (reader instanceof VirtualColumnLongValueReaderWriter) {
-        _logger.info("adding virtual column {} to updatable reader writer list", column);
+        LOGGER.info("adding virtual column {} to updatable reader writer list", column);
         _mutableSegmentReaderWriters.add((VirtualColumnLongValueReaderWriter) reader);
       }
     }
-    upsertWaterMarkManager = UpsertWaterMarkManager.getInstance();
+    _upsertWatermarkManager = UpsertWatermarkManager.getInstance();
     LOGGER.info("starting upsert segment with {} reader writer", _mutableSegmentReaderWriters.size());
   }
 
   @Override
+  public void initOffsetColumn(ColumnIndexContainer offsetColumnContainer) {
+    // do nothing
+  }
+
   public synchronized void updateVirtualColumn(Iterable<UpdateLogEntry> logEntries) {
     for (UpdateLogEntry logEntry: logEntries) {
       boolean updated = false;
@@ -83,7 +96,7 @@ public class MutableUpsertSegmentImpl extends MutableSegmentImpl implements Upse
         }
         if (updated) {
           // only update high water mark if it indeed updated something
-          upsertWaterMarkManager.processMessage(_tableName, _segmentName, logEntry);
+          _upsertWatermarkManager.processMessage(_tableName, _segmentName, logEntry);
         }
       }
       if (!offsetFound) {
@@ -92,7 +105,6 @@ public class MutableUpsertSegmentImpl extends MutableSegmentImpl implements Upse
     }
   }
 
-  @Override
   public String getVirtualColumnInfo(long offset) {
     Integer docId = _sourceOffsetToDocId.get(offset);
     StringBuilder result = new StringBuilder("matched: ");
@@ -113,8 +125,8 @@ public class MutableUpsertSegmentImpl extends MutableSegmentImpl implements Upse
   }
 
   @Override
-  protected synchronized void postProcessRecords(GenericRow row, int docId) {
-    final Long offset = (Long) row.getValue(_kafkaOffsetColumnName);
+  public synchronized void postProcessRecords(GenericRow row, int docId) {
+    final Long offset = (Long) row.getValue(_offsetColumnName);
     for (VirtualColumnLongValueReaderWriter readerWriter: _mutableSegmentReaderWriters) {
       readerWriter.addNewRecord(docId);
     }
@@ -123,9 +135,7 @@ public class MutableUpsertSegmentImpl extends MutableSegmentImpl implements Upse
     checkForOutstandingRecords(_unmatchedInsertRecords, offset, docId);
   }
 
-  @Override
   public void initVirtualColumn() throws IOException {
-    Preconditions.checkState(_numDocsIndexed == 0, "should init virtual column before ingestion");
     UpdateLogEntrySet updateLogEntries = UpdateLogStorageProvider.getInstance().getAllMessages(_tableName, _segmentName);
     LOGGER.info("got {} update log entries for current segment {}", updateLogEntries.size(), _segmentName);
     // some physical data might have been ingested when we init virtual column, we will go through the normal update
@@ -133,6 +143,7 @@ public class MutableUpsertSegmentImpl extends MutableSegmentImpl implements Upse
     updateVirtualColumn(updateLogEntries);
   }
 
+
   private void checkForOutstandingRecords(Map<Long, UpdateLogEntry> unmatchRecordsMap, Long offset, int docId) {
     UpdateLogEntry unmatchedEntry = unmatchRecordsMap.remove(offset);
     if (unmatchedEntry != null) {
@@ -141,7 +152,7 @@ public class MutableUpsertSegmentImpl extends MutableSegmentImpl implements Upse
         updated = readerWriter.update(docId, unmatchedEntry.getValue(), unmatchedEntry.getType()) || updated;
       }
       if (updated) {
-        upsertWaterMarkManager.processMessage(_tableName, _segmentName, unmatchedEntry);
+        _upsertWatermarkManager.processMessage(_tableName, _segmentName, unmatchedEntry);
       }
     }
   }
diff --git a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertTableDataManagerCallbackImpl.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertTableDataManagerCallbackImpl.java
new file mode 100644
index 0000000..dc329e4
--- /dev/null
+++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertTableDataManagerCallbackImpl.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.upsert;
+
+
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.grigio.common.storageProvider.UpdateLogStorageProvider;
+import org.apache.pinot.spi.data.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class UpsertTableDataManagerCallbackImpl implements TableDataManagerCallback {
+  private static final Logger LOGGER = LoggerFactory.getLogger(UpsertTableDataManagerCallbackImpl.class);
+
+  private UpdateLogStorageProvider _updateLogStorageProvider;
+
+  @Override
+  public void init() {
+    _updateLogStorageProvider = UpdateLogStorageProvider.getInstance();
+  }
+
+  @Override
+  public void addSegment(String tableName, String segmentName, TableConfig tableConfig) {
+    try {
+      _updateLogStorageProvider.addSegment(tableName, segmentName);
+    } catch (IOException e) {
+      LOGGER.error("failed to add update log for segment {} {}", tableName, segmentName);
+      ExceptionUtils.rethrow(e);
+    }
+  }
+
+  @Override
+  public DataManagerCallback getDataManagerCallback(String tableName, String segmentName, Schema schema,
+      ServerMetrics serverMetrics, boolean isMutable) {
+    return new UpsertDataManagerCallbackImpl(tableName, segmentName, schema, serverMetrics, isMutable);
+  }
+
+  @Override
+  public DataManagerCallback getDefaultDataManagerCallback() {
+    throw new NotImplementedException("cannot create DefaultDataManagerCallback from UpsertTableDataManagerCallback");
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdater.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdater.java
similarity index 94%
rename from pinot-core/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdater.java
rename to pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdater.java
index d0d8c0e..530cd0d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdater.java
+++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdater.java
@@ -26,7 +26,7 @@ import org.apache.commons.configuration.Configuration;
 import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.core.data.manager.UpsertSegmentDataManager;
+import org.apache.pinot.core.data.manager.upsert.DataManagerCallback;
 import org.apache.pinot.grigio.common.messages.LogCoordinatorMessage;
 import org.apache.pinot.grigio.common.metrics.GrigioMeter;
 import org.apache.pinot.grigio.common.metrics.GrigioMetrics;
@@ -75,7 +75,7 @@ public class SegmentUpdater implements SegmentDeletionListener {
   private final String _topicPrefix;
   private final ExecutorService _ingestionExecutorService;
   private final QueueConsumer _consumer;
-  private final Map<String, Map<String, Set<UpsertSegmentDataManager>>> _tableSegmentMap = new ConcurrentHashMap<>();
+  private final Map<String, Map<String, Set<DataManagerCallback>>> _tableSegmentMap = new ConcurrentHashMap<>();
   private final Map<String, Map<Integer, Long>> _tablePartitionCreationTime = new ConcurrentHashMap<>();
   private final UpdateLogStorageProvider _updateLogStorageProvider;
   private final UpdateLogRetentionManager _retentionManager;
@@ -91,7 +91,7 @@ public class SegmentUpdater implements SegmentDeletionListener {
     _topicPrefix = conf.getString(SegmentUpdaterConfig.INPUT_TOPIC_PREFIX);
     _updateSleepMs = conf.getInt(SegmentUpdaterConfig.SEGMENT_UDPATE_SLEEP_MS,
         SegmentUpdaterConfig.SEGMENT_UDPATE_SLEEP_MS_DEFAULT);
-    UpsertWaterMarkManager.init(metrics);
+    UpsertWatermarkManager.init(metrics);
     _consumer = provider.getConsumer();
     _ingestionExecutorService = Executors.newFixedThreadPool(1);
     _updateLogStorageProvider = UpdateLogStorageProvider.getInstance();
@@ -163,7 +163,7 @@ public class SegmentUpdater implements SegmentDeletionListener {
           String tableName = TableNameBuilder.ensureTableNameWithType(entry.getKey(), CommonConstants.Helix.TableType.REALTIME);
           int tableMessageCount = 0;
           if (_tableSegmentMap.containsKey(tableName)) {
-            final Map<String, Set<UpsertSegmentDataManager>> segmentManagersMap = _tableSegmentMap.get(tableName);
+            final Map<String, Set<DataManagerCallback>> segmentManagersMap = _tableSegmentMap.get(tableName);
             final TableUpdateLogs segment2UpdateLogsMap = entry.getValue();
             updateSegmentVirtualColumns(tableName, segmentManagersMap, segment2UpdateLogsMap, timeToStoreUpdateLogs);
           } else {
@@ -179,7 +179,7 @@ public class SegmentUpdater implements SegmentDeletionListener {
           if (System.currentTimeMillis() - lastReportedTime > LOGGER_TIME_GAP_MS) {
             lastReportedTime = System.currentTimeMillis();
             LOGGER.info("processed {} messages in {} ms", eventCount, System.currentTimeMillis() - loopStartTime);
-            LOGGER.info("latest high water mark is {}", UpsertWaterMarkManager.getInstance().toString());
+            LOGGER.info("latest high water mark is {}", UpsertWatermarkManager.getInstance().toString());
           }
           _consumer.ackOffset();
           _metrics.addTimedValueMs(GrigioTimer.SEGMENT_UPDATER_LOOP_TIME, System.currentTimeMillis() - loopStartTime);
@@ -202,7 +202,7 @@ public class SegmentUpdater implements SegmentDeletionListener {
   /**
    * Update the virtual columns of affected segments of a table.
    */
-  private void updateSegmentVirtualColumns(String tableName, Map<String, Set<UpsertSegmentDataManager>> segmentManagersMap,
+  private void updateSegmentVirtualColumns(String tableName, Map<String, Set<DataManagerCallback>> segmentManagersMap,
                                            TableUpdateLogs segment2UpdateLogsMap, AtomicLong timeToStoreUpdateLogs) throws IOException{
     for (Map.Entry<String, List<UpdateLogEntry>> segmentEntry : segment2UpdateLogsMap.getSegments2UpdateLog().entrySet()) {
       final String segmentNameStr = segmentEntry.getKey();
@@ -217,7 +217,7 @@ public class SegmentUpdater implements SegmentDeletionListener {
    * from consuming to online (mutable segment to immutable segment). In most of cases we expect only one segment manager
    * in this set of UpsertSegmentDataManager
    */
-  private void updateVirtualColumn(String table, String segment, Set<UpsertSegmentDataManager> segmentDataManagers,
+  private void updateVirtualColumn(String table, String segment, Set<DataManagerCallback> segmentDataManagers,
                                    List<UpdateLogEntry> messages, AtomicLong timeToStoreUpdateLogs) throws IOException {
     LOGGER.debug("updating segment {} with {} results for {} data managers", segment, messages.size(),
         segmentDataManagers.size());
@@ -225,7 +225,7 @@ public class SegmentUpdater implements SegmentDeletionListener {
       storeUpdateLogs(table, segment, messages, timeToStoreUpdateLogs);
     }
     try {
-      for (UpsertSegmentDataManager dataManager: segmentDataManagers) {
+      for (DataManagerCallback dataManager: segmentDataManagers) {
         dataManager.updateVirtualColumns(messages);
       }
     } catch (Exception ex) {
@@ -239,7 +239,8 @@ public class SegmentUpdater implements SegmentDeletionListener {
    * @param segmentName
    * @param dataManager the data manager for the current given table/segment combination
    */
-  public synchronized void addSegmentDataManager(String tableNameWithType, LLCSegmentName segmentName, UpsertSegmentDataManager dataManager) {
+  public synchronized void addSegmentDataManager(String tableNameWithType, LLCSegmentName segmentName,
+      DataManagerCallback dataManager) {
     // TODO get partition assignment from
     LOGGER.info("segment updater adding table {} segment {}", tableNameWithType, segmentName.getSegmentName());
     if (!_tableSegmentMap.containsKey(tableNameWithType)) {
@@ -258,11 +259,12 @@ public class SegmentUpdater implements SegmentDeletionListener {
     }
   }
 
-  public synchronized void removeSegmentDataManager(String tableNameWithType, String segmentName, UpsertSegmentDataManager toDeleteManager) {
+  public synchronized void removeSegmentDataManager(String tableNameWithType, String segmentName,
+      DataManagerCallback toDeleteManager) {
     LOGGER.info("segment updater removing table {} segment {}", tableNameWithType, segmentName);
-    Map<String, Set<UpsertSegmentDataManager>> segmentMap = _tableSegmentMap.get(tableNameWithType);
+    Map<String, Set<DataManagerCallback>> segmentMap = _tableSegmentMap.get(tableNameWithType);
     if (segmentMap != null) {
-      Set<UpsertSegmentDataManager> segmentDataManagers = segmentMap.get(segmentName);
+      Set<DataManagerCallback> segmentDataManagers = segmentMap.get(segmentName);
       if (segmentDataManagers != null) {
         segmentDataManagers.remove(toDeleteManager);
         if (segmentDataManagers.size() == 0) {
@@ -298,7 +300,7 @@ public class SegmentUpdater implements SegmentDeletionListener {
   @Override
   public synchronized void onSegmentDeletion(String tableNameWithType, String segmentName) {
     LOGGER.info("deleting segment virtual column from local storage for table {} segment {}", tableNameWithType, segmentName);
-    Map<String, Set<UpsertSegmentDataManager>> segmentManagerMap = _tableSegmentMap.get(tableNameWithType);
+    Map<String, Set<DataManagerCallback>> segmentManagerMap = _tableSegmentMap.get(tableNameWithType);
     if (segmentManagerMap != null) {
       if (segmentManagerMap.containsKey(segmentName) && segmentManagerMap.get(segmentName).size() > 0) {
         LOGGER.error("trying to remove segment storage with {} segment data manager", segmentManagerMap.get(segmentName).size());
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdaterConfig.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdaterConfig.java
similarity index 100%
copy from pinot-core/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdaterConfig.java
copy to pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdaterConfig.java
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertWaterMarkManager.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/UpsertWatermarkManager.java
similarity index 86%
rename from pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertWaterMarkManager.java
rename to pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/UpsertWatermarkManager.java
index bbea037..faad27b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertWaterMarkManager.java
+++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/UpsertWatermarkManager.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.segment.updater;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
+import org.apache.commons.configuration.Configuration;
 import org.apache.pinot.grigio.common.metrics.GrigioGauge;
 import org.apache.pinot.grigio.common.metrics.GrigioMetrics;
 import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntry;
@@ -30,26 +31,26 @@ import org.slf4j.LoggerFactory;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
-public class UpsertWaterMarkManager {
+public class UpsertWatermarkManager implements WatermarkManager {
 
   private final Map<String, Map<Integer, Long>> _highWaterMarkTablePartitionMap = new ConcurrentHashMap<>();
   private final GrigioMetrics _metrics;
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(UpsertWaterMarkManager.class);
-  private static volatile UpsertWaterMarkManager _instance;
+  private static final Logger LOGGER = LoggerFactory.getLogger(UpsertWatermarkManager.class);
+  private static volatile UpsertWatermarkManager _instance;
 
-  private UpsertWaterMarkManager(GrigioMetrics metrics) {
+  private UpsertWatermarkManager(GrigioMetrics metrics) {
     _metrics = metrics;
   }
 
   public static void init(GrigioMetrics metrics) {
-    synchronized (UpsertWaterMarkManager.class) {
+    synchronized (UpsertWatermarkManager.class) {
       Preconditions.checkState(_instance == null, "upsert water mark manager is already init");
-      _instance = new UpsertWaterMarkManager(metrics);
+      _instance = new UpsertWatermarkManager(metrics);
     }
   }
 
-  public static UpsertWaterMarkManager getInstance() {
+  public static UpsertWatermarkManager getInstance() {
     Preconditions.checkState(_instance != null, "upsert water mark manager is not yet init");
     return _instance;
   }
@@ -79,6 +80,11 @@ public class UpsertWaterMarkManager {
     return ImmutableMap.copyOf(_highWaterMarkTablePartitionMap.getOrDefault(tableName, ImmutableMap.of()));
   }
 
+  @Override
+  public void init(Configuration config, GrigioMetrics metrics) {
+
+  }
+
   public Map<String, Map<Integer, Long>> getHighWaterMarkTablePartitionMap() {
     return ImmutableMap.copyOf(_highWaterMarkTablePartitionMap);
   }
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/immutable/ImmutableUpsertSegmentImplTest.java b/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/core/data/manager/upsert/UpsertImmutableIndexSegmentCallbackTest.java
similarity index 89%
rename from pinot-core/src/test/java/org/apache/pinot/core/indexsegment/immutable/ImmutableUpsertSegmentImplTest.java
rename to pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/core/data/manager/upsert/UpsertImmutableIndexSegmentCallbackTest.java
index 817131c..979e5c1 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/immutable/ImmutableUpsertSegmentImplTest.java
+++ b/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/core/data/manager/upsert/UpsertImmutableIndexSegmentCallbackTest.java
@@ -16,9 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.indexsegment.immutable;
+package org.apache.pinot.core.data.manager.upsert;
 
-import org.apache.pinot.core.segment.updater.UpsertWaterMarkManager;
+import org.apache.pinot.core.segment.updater.UpsertWatermarkManager;
 import org.apache.pinot.core.segment.virtualcolumn.mutable.VirtualColumnLongValueReaderWriter;
 import org.apache.pinot.grigio.common.messages.LogEventType;
 import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntry;
@@ -38,16 +38,15 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 
-public class ImmutableUpsertSegmentImplTest {
-
+public class UpsertImmutableIndexSegmentCallbackTest {
   UpdateLogStorageProvider _mockProvider;
-  UpsertWaterMarkManager _mockUpsertWaterMarkManager;
+  UpsertWatermarkManager _mockUpsertWatermarkManager;
   List<VirtualColumnLongValueReaderWriter> _readerWriters = new ArrayList<>();
 
   @BeforeMethod
   public void init() {
     _mockProvider = mock(UpdateLogStorageProvider.class);
-    _mockUpsertWaterMarkManager = mock(UpsertWaterMarkManager.class);
+    _mockUpsertWatermarkManager = mock(UpsertWatermarkManager.class);
   }
 
   @Test
@@ -105,10 +104,10 @@ public class ImmutableUpsertSegmentImplTest {
 
     start = System.currentTimeMillis();
 
-    ImmutableUpsertSegmentImpl immutableUpsertSegment = new ImmutableUpsertSegmentImpl(_readerWriters, totalDocs,
-        _mockUpsertWaterMarkManager, _mockProvider, minOffset, offsetToDocId);
+    UpsertImmutableIndexSegmentCallback callback = new UpsertImmutableIndexSegmentCallback();
+    callback.init(_readerWriters, totalDocs, _mockUpsertWatermarkManager, _mockProvider, minOffset, offsetToDocId);
+    callback.initVirtualColumn();
 
-    immutableUpsertSegment.initVirtualColumn();
     long runtime = System.currentTimeMillis() - start;
     System.out.println("run time is " + runtime);
     // on regular developer laptop this should take less 1 second, but on integration server this might be longer
diff --git a/pinot-grigio/pom.xml b/pinot-grigio/pom.xml
index e53fa72..47ad035 100644
--- a/pinot-grigio/pom.xml
+++ b/pinot-grigio/pom.xml
@@ -35,6 +35,7 @@
     <modules>
       <module>pinot-grigio-common</module>
       <module>pinot-grigio-coordinator</module>
+      <module>pinot-grigio-provided</module>
     </modules>
     <properties>
     <pinot.root>${basedir}/..</pinot.root>
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 1af7e29..920b422 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -41,11 +41,11 @@ import org.apache.pinot.core.data.manager.SegmentDataManager;
 import org.apache.pinot.core.data.manager.TableDataManager;
 import org.apache.pinot.core.data.manager.config.TableDataManagerConfig;
 import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
+import org.apache.pinot.core.data.manager.upsert.DataManagerCallback;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
 import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.core.segment.index.loader.LoaderUtils;
-import org.apache.pinot.core.segment.updater.UpsertWaterMarkManager;
 import org.apache.pinot.spi.data.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -219,12 +219,18 @@ public class HelixInstanceDataManager implements InstanceDataManager {
       // Copy from segment backup directory back to index directory
       FileUtils.copyDirectory(segmentBackupDir, indexDir);
 
+      final TableDataManager tableDataManager = _tableDataManagerMap.get(tableNameWithType);
+      final DataManagerCallback dataManagerCallback = tableDataManager.getTableDataManagerCallback()
+          .getDataManagerCallback(tableNameWithType, segmentName, schema, _serverMetrics, false);
+
       // Load from index directory
-      ImmutableSegment immutableSegment = ImmutableSegmentLoader
-          .load(indexDir, new IndexLoadingConfig(_instanceDataManagerConfig, tableConfig), schema);
+      ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(indexDir,
+          new IndexLoadingConfig(_instanceDataManagerConfig, tableConfig),
+          dataManagerCallback,
+          schema);
 
       // Replace the old segment in memory
-      _tableDataManagerMap.get(tableNameWithType).addSegment(immutableSegment);
+      _tableDataManagerMap.get(tableNameWithType).addSegment(immutableSegment, dataManagerCallback);
 
       // Rename segment backup directory to segment temporary directory (atomic)
       // The reason to first rename then delete is that, renaming is an atomic operation, but deleting is not. When we
@@ -301,10 +307,12 @@ public class HelixInstanceDataManager implements InstanceDataManager {
     }
   }
 
+  /*
   @Override
   public Map<String, Map<Integer, Long>> getLowWaterMarks() {
     return UpsertWaterMarkManager.getInstance().getHighWaterMarkTablePartitionMap();
   }
+   */
 
   @Override
   public String getSegmentDataDirectory() {
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
index f46023d..c1ea182 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
@@ -64,6 +64,8 @@ import org.apache.pinot.server.conf.ServerConf;
 import org.apache.pinot.server.realtime.ControllerLeaderLocator;
 import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
 import org.apache.pinot.server.starter.ServerInstance;
+import org.apache.pinot.server.upsert.UpsertComponentContainer;
+import org.apache.pinot.server.upsert.UpsertComponentContainerProvider;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
index 910a6b9..145e378 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
@@ -39,6 +39,7 @@ import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.SegmentDataManager;
 import org.apache.pinot.core.data.manager.TableDataManager;
 import org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager;
+import org.apache.pinot.server.upsert.SegmentDeletionHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,12 +54,19 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta
   private final String _instanceId;
   private final InstanceDataManager _instanceDataManager;
   private final SegmentFetcherAndLoader _fetcherAndLoader;
+  private final SegmentDeletionHandler _segmentDeletionHandler;
 
   public SegmentOnlineOfflineStateModelFactory(String instanceId, InstanceDataManager instanceDataManager,
       SegmentFetcherAndLoader fetcherAndLoader) {
+    this(instanceId, instanceDataManager, fetcherAndLoader, new SegmentDeletionHandler());
+  }
+
+  public SegmentOnlineOfflineStateModelFactory(String instanceId, InstanceDataManager instanceDataManager,
+      SegmentFetcherAndLoader fetcherAndLoader, SegmentDeletionHandler segmentDeletionHandler) {
     _instanceId = instanceId;
     _instanceDataManager = instanceDataManager;
     _fetcherAndLoader = fetcherAndLoader;
+    _segmentDeletionHandler = segmentDeletionHandler;
   }
 
   public static String getStateModelName() {
@@ -203,6 +211,7 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta
           FileUtils.deleteQuietly(segmentDir);
           _logger.info("Deleted segment directory {}", segmentDir);
         }
+        _segmentDeletionHandler.deleteSegmentFromLocalStorage(tableNameWithType, segmentName);
       } catch (final Exception e) {
         _logger.error("Cannot delete the segment : " + segmentName + " from local directory!\n" + e.getMessage(), e);
         Utils.rethrowException(e);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableAppendSegmentImpl.java b/pinot-server/src/main/java/org/apache/pinot/server/upsert/DefaultUpsertComponentContainer.java
similarity index 57%
rename from pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableAppendSegmentImpl.java
rename to pinot-server/src/main/java/org/apache/pinot/server/upsert/DefaultUpsertComponentContainer.java
index 1c9effc..63424ac 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableAppendSegmentImpl.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/upsert/DefaultUpsertComponentContainer.java
@@ -16,22 +16,34 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.indexsegment.mutable;
+package org.apache.pinot.server.upsert;
 
-import org.apache.pinot.core.realtime.impl.RealtimeSegmentConfig;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.commons.configuration.Configuration;
 
-public class MutableAppendSegmentImpl extends MutableSegmentImpl {
-  private static final Logger LOGGER = LoggerFactory.getLogger(MutableUpsertSegmentImpl.class);
+public class DefaultUpsertComponentContainer implements UpsertComponentContainer {
 
-  public MutableAppendSegmentImpl(RealtimeSegmentConfig config) {
-    super(config);
+  private final SegmentDeletionHandler deletionHandler = new SegmentDeletionHandler();
+
+  @Override
+  public void registerMetrics(MetricRegistry registry) {
+
+  }
+
+  @Override
+  public void init(Configuration config) {
+  }
+
+  @Override
+  public SegmentDeletionHandler getSegmentDeletionHandler() {
+    return deletionHandler;
+  }
+
+  @Override
+  public void start() {
   }
 
   @Override
-  protected void postProcessRecords(GenericRow row, int docId) {
-    // nothing
+  public void stop() {
   }
 }
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/upsert/SegmentDeletionHandler.java b/pinot-server/src/main/java/org/apache/pinot/server/upsert/SegmentDeletionHandler.java
new file mode 100644
index 0000000..1aba06b
--- /dev/null
+++ b/pinot-server/src/main/java/org/apache/pinot/server/upsert/SegmentDeletionHandler.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.upsert;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.pinot.core.segment.updater.SegmentDeletionListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class SegmentDeletionHandler {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentDeletionHandler.class);
+
+  private List<SegmentDeletionListener> _deleteListeners;
+
+
+  public SegmentDeletionHandler() {
+    this(ImmutableList.of());
+  }
+
+  public SegmentDeletionHandler(List<SegmentDeletionListener> listeners) {
+    _deleteListeners = listeners;
+  }
+
+  public void deleteSegmentFromLocalStorage(String tableNameWithType, String segmentName) {
+    //TODO move the deletion from file system logic to here
+    LOGGER.info("trying to perform deletion for {}:{} on {} deletion handler", tableNameWithType, segmentName,
+        _deleteListeners.size());
+    for (SegmentDeletionListener deletionListener : _deleteListeners) {
+      try {
+        deletionListener.onSegmentDeletion(tableNameWithType, segmentName);
+      } catch (Exception ex) {
+        LOGGER.error("failed to delete table {} segment {} with exception", tableNameWithType, segmentName, ex);
+      }
+    }
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdaterConfig.java b/pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainer.java
similarity index 69%
rename from pinot-core/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdaterConfig.java
rename to pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainer.java
index 97d48f3..7f636fc 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdaterConfig.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainer.java
@@ -16,10 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.segment.updater;
+package org.apache.pinot.server.upsert;
 
-public class SegmentUpdaterConfig {
-  public static final String SEGMENT_UDPATE_SLEEP_MS = "sleep.ms";
-  public static final String INPUT_TOPIC_PREFIX  = "input.topic.prefix";
-  public static final int SEGMENT_UDPATE_SLEEP_MS_DEFAULT = 100;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.commons.configuration.Configuration;
+
+public interface UpsertComponentContainer {
+
+  void registerMetrics(MetricRegistry registry);
+
+  void init(Configuration config);
+
+  SegmentDeletionHandler getSegmentDeletionHandler();
+
+  void start();
+
+  void stop();
 }
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainerProvider.java b/pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainerProvider.java
new file mode 100644
index 0000000..fe4af07
--- /dev/null
+++ b/pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainerProvider.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.upsert;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.pinot.core.segment.updater.WatermarkManager;
+import org.apache.pinot.grigio.common.metrics.GrigioMetrics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UpsertComponentContainerProvider {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(UpsertComponentContainerProvider.class);
+
+  public static final String UPSERT_COMPONENT_CONFIG_KEY = "watermarkManager.class";
+  public static final String UPSERT_COMPONENT_CONFIG_DEFAULT = DefaultUpsertComponentContainer.class.getName();
+
+  private final Configuration _conf;
+  private UpsertComponentContainer _instance;
+
+  public UpsertComponentContainerProvider(Configuration conf, GrigioMetrics metrics) {
+    _conf = conf;
+    String className = _conf.getString(UPSERT_COMPONENT_CONFIG_KEY, UPSERT_COMPONENT_CONFIG_DEFAULT);
+    LOGGER.info("creating watermark manager with class {}", className);
+    try {
+      Class<UpsertComponentContainer> comonentContainerClass = (Class<UpsertComponentContainer>) Class.forName(className);
+      Preconditions.checkState(comonentContainerClass.isAssignableFrom(WatermarkManager.class),
+          "configured class not assignable from Callback class");
+      _instance = comonentContainerClass.newInstance();
+      _instance.init(_conf);
+    } catch (Exception e) {
+      LOGGER.error("failed to load watermark manager class", className, e);
+      ExceptionUtils.rethrow(e);
+    }
+  }
+
+  public UpsertComponentContainer getInstance() {
+    return _instance;
+  }
+}
diff --git a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
index 08ab8c3..60702c4 100644
--- a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
+++ b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
@@ -35,6 +35,7 @@ import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.TableDataManager;
 import org.apache.pinot.core.data.manager.config.TableDataManagerConfig;
 import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
+import org.apache.pinot.core.data.manager.upsert.DefaultDataManagerCallbackImpl;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
@@ -122,7 +123,7 @@ public abstract class BaseResourceTest {
     ImmutableSegment immutableSegment =
         ImmutableSegmentLoader.load(new File(INDEX_DIR, driver.getSegmentName()), ReadMode.mmap);
     _indexSegments.add(immutableSegment);
-    _tableDataManagerMap.get(TABLE_NAME).addSegment(immutableSegment);
+    _tableDataManagerMap.get(TABLE_NAME).addSegment(immutableSegment, DefaultDataManagerCallbackImpl.INSTANCE);
     return immutableSegment;
   }
 
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java b/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
index 187b36f..ac989bb 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
@@ -26,7 +26,8 @@ import java.util.stream.Collectors;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.config.TableConfig;
-import org.apache.pinot.core.indexsegment.mutable.MutableAppendSegmentImpl;
+import org.apache.pinot.core.data.manager.upsert.DefaultDataManagerCallbackImpl;
+import org.apache.pinot.core.data.manager.upsert.DefaultIndexSegmentCallback;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
 import org.apache.pinot.spi.utils.DataSize;
@@ -136,7 +137,8 @@ public class MemoryEstimator {
             .setStatsHistory(sampleStatsHistory);
 
     // create mutable segment impl
-    MutableSegmentImpl mutableSegmentImpl = new MutableAppendSegmentImpl(realtimeSegmentConfigBuilder.build());
+    MutableSegmentImpl mutableSegmentImpl = new MutableSegmentImpl(realtimeSegmentConfigBuilder.build(),
+        DefaultIndexSegmentCallback.INSTANCE);
 
     // read all rows and index them
     try (PinotSegmentRecordReader segmentRecordReader = new PinotSegmentRecordReader(_sampleCompletedSegment);) {
@@ -236,7 +238,8 @@ public class MemoryEstimator {
               .setOffHeap(true).setMemoryManager(memoryManager).setStatsHistory(statsHistory);
 
       // create mutable segment impl
-      MutableSegmentImpl mutableSegmentImpl = new MutableAppendSegmentImpl(realtimeSegmentConfigBuilder.build());
+      MutableSegmentImpl mutableSegmentImpl = new MutableSegmentImpl(realtimeSegmentConfigBuilder.build(),
+          DefaultIndexSegmentCallback.INSTANCE);
       long memoryForConsumingSegmentPerPartition = memoryManager.getTotalAllocatedBytes();
       mutableSegmentImpl.destroy();
       FileUtils.deleteQuietly(statsFileCopy);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org