You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/03/23 19:27:34 UTC

[incubator-pinot] 04/09: basic split of logics

This is an automated email from the ASF dual-hosted git repository.

jamesshao pushed a commit to branch upsert-refactor
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 63e52fe517621accc74a99d5a24e6246c8fc8cb0
Author: james Shao <sj...@uber.com>
AuthorDate: Tue Mar 17 17:46:31 2020 -0700

    basic split of logics
---
 .../pinot/broker/broker/BrokerServerBuilder.java   |  12 +-
 .../broker/broker/helix/HelixBrokerStarter.java    |  35 +++-
 .../requesthandler/BaseBrokerRequestHandler.java   |  47 ++++-
 .../requesthandler/LowWaterMarkQueryWriter.java    | 151 ++++++++++++++
 .../SingleConnectionBrokerRequestHandler.java      |   8 +-
 .../broker/upsert/DefaultLowWaterMarkService.java  |  28 ++-
 .../broker/upsert/LowWaterMarkServiceProvider.java |  39 ++--
 .../LowWaterMarkQueryWriterTest.java               | 130 ++++++++++++
 .../apache/pinot/common/utils/CommonConstants.java |   2 +
 .../helix/core/PinotHelixResourceManager.java      |  12 ++
 ...rkManager.java => DefaultWaterMarkManager.java} |  15 +-
 ...ermarkManager.java => LowWaterMarkService.java} |  21 +-
 .../segment/updater}/SegmentDeletionHandler.java   |   3 +-
 .../segment/updater}/UpsertComponentContainer.java |  19 +-
 ...WatermarkManager.java => WaterMarkManager.java} |   3 +-
 pinot-grigio/pinot-grigio-provided/pom.xml         |  38 +++-
 .../upsert/PollingBasedLowWaterMarkService.java    | 224 +++++++++++++++++++++
 .../UpsertImmutableIndexSegmentCallback.java       |   8 +-
 .../upsert/UpsertMutableIndexSegmentCallback.java  |   6 +-
 .../pinot/core/segment/updater/SegmentUpdater.java |   4 +-
 .../updater/UpsertComponentContainerImpl.java      | 151 ++++++++++++++
 ...arkManager.java => UpsertWaterMarkManager.java} |  14 +-
 .../PollingBasedLowWaterMarkServiceTest.java       | 215 ++++++++++++++++++++
 .../UpsertImmutableIndexSegmentCallbackTest.java   |   6 +-
 .../tests/ClusterIntegrationTestUtils.java         |   2 +
 .../api/resources/LowWatermarksResource.java       |  62 ++++++
 .../server/api/resources/UpsertDebugResource.java  |  84 ++++++++
 .../org/apache/pinot/server/conf/ServerConf.java   |  15 +-
 .../pinot/server/starter/ServerInstance.java       |  39 +++-
 .../starter/helix/HelixInstanceDataManager.java    |   7 -
 .../server/starter/helix/HelixServerStarter.java   |  67 ++++--
 .../SegmentOnlineOfflineStateModelFactory.java     |  14 +-
 .../upsert/DefaultUpsertComponentContainer.java    |  36 +++-
 .../upsert/UpsertComponentContainerProvider.java   |  18 +-
 .../api/resources/LowWatermarksResourceTest.java   |  23 ++-
 .../realtime/provisioning/MemoryEstimator.java     |  17 +-
 36 files changed, 1424 insertions(+), 151 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerServerBuilder.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerServerBuilder.java
index 103d469..c30dd51 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerServerBuilder.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerServerBuilder.java
@@ -19,8 +19,8 @@
 package org.apache.pinot.broker.broker;
 
 import com.google.common.base.Preconditions;
+import org.apache.pinot.core.segment.updater.LowWaterMarkService;
 import com.yammer.metrics.core.MetricsRegistry;
-import java.util.concurrent.atomic.AtomicReference;
 import org.apache.commons.configuration.Configuration;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
@@ -36,6 +36,8 @@ import org.apache.pinot.common.utils.CommonConstants.Helix;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.atomic.AtomicReference;
+
 
 public class BrokerServerBuilder {
   private static final Logger LOGGER = LoggerFactory.getLogger(BrokerServerBuilder.class);
@@ -57,9 +59,11 @@ public class BrokerServerBuilder {
   private final BrokerMetrics _brokerMetrics;
   private final BrokerRequestHandler _brokerRequestHandler;
   private final BrokerAdminApiApplication _brokerAdminApplication;
+  private final LowWaterMarkService _lwmService;
 
   public BrokerServerBuilder(Configuration config, RoutingTable routingTable, TimeBoundaryService timeBoundaryService,
-      QueryQuotaManager queryQuotaManager, ZkHelixPropertyStore<ZNRecord> propertyStore) {
+      QueryQuotaManager queryQuotaManager, ZkHelixPropertyStore<ZNRecord> propertyStore,
+                             LowWaterMarkService lowWaterMarkService) {
     _config = config;
     _delayedShutdownTimeMs =
         config.getLong(Broker.CONFIG_OF_DELAY_SHUTDOWN_TIME_MS, Broker.DEFAULT_DELAY_SHUTDOWN_TIME_MS);
@@ -77,8 +81,10 @@ public class BrokerServerBuilder {
     _brokerMetrics.initializeGlobalMeters();
     _brokerRequestHandler =
         new SingleConnectionBrokerRequestHandler(_config, _routingTable, _timeBoundaryService, _accessControlFactory,
-            queryQuotaManager, _brokerMetrics, _propertyStore);
+            queryQuotaManager, _brokerMetrics, _propertyStore, lowWaterMarkService);
     _brokerAdminApplication = new BrokerAdminApiApplication(this);
+
+    _lwmService = lowWaterMarkService;
   }
 
   public void start() {
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
index 6986182..0e127ad 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
@@ -20,11 +20,6 @@ package org.apache.pinot.broker.broker.helix;
 
 import com.google.common.collect.ImmutableList;
 import com.yammer.metrics.core.MetricsRegistry;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import javax.annotation.Nullable;
 import org.apache.commons.configuration.BaseConfiguration;
 import org.apache.commons.configuration.Configuration;
 import org.apache.helix.ConfigAccessor;
@@ -49,6 +44,8 @@ import org.apache.pinot.broker.broker.BrokerServerBuilder;
 import org.apache.pinot.broker.queryquota.HelixExternalViewBasedQueryQuotaManager;
 import org.apache.pinot.broker.requesthandler.BrokerRequestHandler;
 import org.apache.pinot.broker.routing.HelixExternalViewBasedRouting;
+import org.apache.pinot.core.segment.updater.LowWaterMarkService;
+import org.apache.pinot.broker.upsert.LowWaterMarkServiceProvider;
 import org.apache.pinot.common.Utils;
 import org.apache.pinot.common.config.TagNameUtils;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
@@ -62,6 +59,12 @@ import org.apache.pinot.common.utils.ServiceStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 
 @SuppressWarnings("unused")
 public class HelixBrokerStarter {
@@ -93,6 +96,8 @@ public class HelixBrokerStarter {
   private HelixManager _participantHelixManager;
   private TimeboundaryRefreshMessageHandlerFactory _tbiMessageHandler;
 
+  private LowWaterMarkService _lwmService;
+
   public HelixBrokerStarter(Configuration brokerConf, String clusterName, String zkServer)
       throws Exception {
     this(brokerConf, clusterName, zkServer, null);
@@ -168,6 +173,11 @@ public class HelixBrokerStarter {
     _helixDataAccessor = _spectatorHelixManager.getHelixDataAccessor();
     ConfigAccessor configAccessor = _spectatorHelixManager.getConfigAccessor();
 
+    // start lwm service
+    LowWaterMarkServiceProvider provider = new LowWaterMarkServiceProvider(_brokerConf,
+        _spectatorHelixManager.getHelixDataAccessor(), _clusterName);
+    _lwmService = provider.getInstance();
+
     // Set up the broker server builder
     LOGGER.info("Setting up broker server builder");
     _helixExternalViewBasedRouting =
@@ -184,13 +194,17 @@ public class HelixBrokerStarter {
     String enableQueryLimitOverride = configAccessor.get(helixConfigScope, Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE);
     _brokerConf.setProperty(Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE, Boolean.valueOf(enableQueryLimitOverride));
     _brokerServerBuilder = new BrokerServerBuilder(_brokerConf, _helixExternalViewBasedRouting,
-        _helixExternalViewBasedRouting.getTimeBoundaryService(), _helixExternalViewBasedQueryQuotaManager, _propertyStore);
+        _helixExternalViewBasedRouting.getTimeBoundaryService(), _helixExternalViewBasedQueryQuotaManager,
+        _propertyStore, _lwmService);
     BrokerRequestHandler brokerRequestHandler = _brokerServerBuilder.getBrokerRequestHandler();
     BrokerMetrics brokerMetrics = _brokerServerBuilder.getBrokerMetrics();
     _helixExternalViewBasedRouting.setBrokerMetrics(brokerMetrics);
     _helixExternalViewBasedQueryQuotaManager.setBrokerMetrics(brokerMetrics);
     _brokerServerBuilder.start();
 
+    // start lwm service
+    _lwmService.start(brokerMetrics);
+
     // Initialize the cluster change mediator
     LOGGER.info("Initializing cluster change mediator");
     for (ClusterChangeHandler externalViewChangeHandler : _externalViewChangeHandlers) {
@@ -241,6 +255,7 @@ public class HelixBrokerStarter {
     _participantHelixManager
         .addPreConnectCallback(() -> brokerMetrics.addMeteredGlobalValue(BrokerMeter.HELIX_ZOOKEEPER_RECONNECTS, 1L));
 
+
     // Register the service status handler
     registerServiceStatusHandler();
 
@@ -315,6 +330,10 @@ public class HelixBrokerStarter {
       _spectatorHelixManager.disconnect();
     }
 
+    if (_lwmService != null) {
+      LOGGER.info("Shutting down low water mark service");
+      _lwmService.shutDown();
+    }
     LOGGER.info("Finish shutting down Pinot broker");
   }
 
@@ -347,6 +366,10 @@ public class HelixBrokerStarter {
     return new HelixBrokerStarter(brokerConf, "quickstart", "localhost:2122");
   }
 
+  public LowWaterMarkService getLwmService() {
+    return _lwmService;
+  }
+
   public static void main(String[] args)
       throws Exception {
     getDefault().start();
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 7667e61..1d8f97b 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
+import org.apache.pinot.core.segment.updater.LowWaterMarkService;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.helix.ZNRecord;
@@ -76,6 +77,10 @@ import org.apache.pinot.pql.parsers.pql2.ast.FunctionCallAstNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.pinot.common.utils.CommonConstants.Broker.CONFIG_OF_BROKER_LWM_REWRITE_ENABLE;
+import static org.apache.pinot.common.utils.CommonConstants.Broker.CONFIG_OF_BROKER_LWM_REWRITE_ENABLE_DEFAULT;
+import static org.apache.pinot.common.utils.CommonConstants.Broker.Request.DISABLE_REWRITE;
+
 
 @ThreadSafe
 public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
@@ -87,6 +92,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
   protected final AccessControlFactory _accessControlFactory;
   protected final QueryQuotaManager _queryQuotaManager;
   protected final BrokerMetrics _brokerMetrics;
+  protected final LowWaterMarkService _lwmService;
 
   protected final AtomicLong _requestIdGenerator = new AtomicLong();
   protected final BrokerRequestOptimizer _brokerRequestOptimizer = new BrokerRequestOptimizer();
@@ -96,6 +102,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
   protected final long _brokerTimeoutMs;
   protected final int _queryResponseLimit;
   protected final int _queryLogLength;
+  protected final boolean _enableQueryRewrite;
 
   private final RateLimiter _queryLogRateLimiter;
 
@@ -108,13 +115,15 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
 
   public BaseBrokerRequestHandler(Configuration config, RoutingTable routingTable,
       TimeBoundaryService timeBoundaryService, AccessControlFactory accessControlFactory,
-      QueryQuotaManager queryQuotaManager, BrokerMetrics brokerMetrics, ZkHelixPropertyStore<ZNRecord> propertyStore) {
+      QueryQuotaManager queryQuotaManager, BrokerMetrics brokerMetrics, ZkHelixPropertyStore<ZNRecord> propertyStore,
+      LowWaterMarkService lowWaterMarkService) {
     _config = config;
     _routingTable = routingTable;
     _timeBoundaryService = timeBoundaryService;
     _accessControlFactory = accessControlFactory;
     _queryQuotaManager = queryQuotaManager;
     _brokerMetrics = brokerMetrics;
+    _lwmService = lowWaterMarkService;
 
     _enableCaseInsensitivePql = _config.getBoolean(CommonConstants.Helix.ENABLE_CASE_INSENSITIVE_PQL_KEY, false);
     if (_enableCaseInsensitivePql) {
@@ -125,6 +134,10 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
 
     _enableQueryLimitOverride = _config.getBoolean(Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE, false);
 
+    // query rewrite for upsert feature
+    _enableQueryRewrite = config.getBoolean(CONFIG_OF_BROKER_LWM_REWRITE_ENABLE,
+        CONFIG_OF_BROKER_LWM_REWRITE_ENABLE_DEFAULT);
+
     _brokerId = config.getString(Broker.CONFIG_OF_BROKER_ID, getDefaultBrokerId());
     _brokerTimeoutMs = config.getLong(Broker.CONFIG_OF_BROKER_TIMEOUT_MS, Broker.DEFAULT_BROKER_TIMEOUT_MS);
     _queryResponseLimit =
@@ -285,6 +298,11 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
       requestStatistics.setFanoutType(RequestStatistics.FanoutType.REALTIME);
     }
 
+    if (shouldEnableLowWaterMarkRewrite(request)) {
+      // Augment the realtime request with LowWaterMark constraints.
+      addLowWaterMarkToQuery(realtimeBrokerRequest, rawTableName);
+    }
+
     // Calculate routing table for the query
     long routingStartTimeNs = System.nanoTime();
     Map<ServerInstance, List<String>> offlineRoutingTable = null;
@@ -368,6 +386,21 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
     return brokerResponse;
   }
 
+  private boolean shouldEnableLowWaterMarkRewrite(JsonNode request) {
+    if (_enableQueryRewrite) {
+      try {
+        if (request.has(DISABLE_REWRITE)) {
+          return !request.get(DISABLE_REWRITE).asBoolean();
+        } else {
+          return true;
+        }
+      } catch (Exception ex) {
+        LOGGER.warn("cannot parse the disable rewrite option: [{}] to boolean from request json", DISABLE_REWRITE, ex);
+      }
+    }
+    return false;
+  }
+
   /**
    * Reset limit for selection query if it exceeds maxQuerySelectionLimit.
    * @param brokerRequest
@@ -753,6 +786,18 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
     }
   }
 
+  private void addLowWaterMarkToQuery(BrokerRequest realtimeBrokerRequest, String rawTableName) {
+    final String realtimeTableName = rawTableName + "_REALTIME";
+    Map<Integer, Long> lowWaterMarks = _lwmService.getLowWaterMarks(realtimeTableName);
+    if (lowWaterMarks == null || lowWaterMarks.size() == 0) {
+      LOGGER.info("No low water marks info found for table {}", realtimeTableName);
+      return;
+    }
+    LOGGER.info("Found low water marks {} for table {}", String.valueOf(lowWaterMarks), realtimeTableName);
+    LowWaterMarkQueryWriter.addLowWaterMarkToQuery(realtimeBrokerRequest, lowWaterMarks);
+    LOGGER.info("Query augmented with LWMS info for table {} : {}", realtimeTableName, realtimeBrokerRequest);
+  }
+
   /**
    * Processes the optimized broker requests for both OFFLINE and REALTIME table.
    */
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/LowWaterMarkQueryWriter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/LowWaterMarkQueryWriter.java
new file mode 100644
index 0000000..c3285e5
--- /dev/null
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/LowWaterMarkQueryWriter.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.requesthandler;
+
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.request.FilterOperator;
+import org.apache.pinot.common.request.FilterQuery;
+import org.apache.pinot.common.request.FilterQueryMap;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// Add a lwm query to the filter query of a Pinot query for upsert enabled table.
+// Thread-Safe
+public class LowWaterMarkQueryWriter {
+  private static final Logger LOGGER = LoggerFactory.getLogger(LowWaterMarkQueryWriter.class);
+  private static final String VALID_FROM = "$validFrom";
+  private static final String VALID_UNTIL = "$validUntil";
+  // Normal Pinot query node uses positive IDs. So lwm query node ids are all negative.
+  private static final int QUERY_ID_BASE = -1000;
+
+  /**
+   * For upsert enabled tables, augment the realtime query with low water mark constraints in its filter query of the
+   * form
+   *   ($validFrom <= lwm and $validFrom > -1) AND (lwm < $validUtil OR $validUtil = -1)
+   *
+   * @param realtimeBrokerRequest
+   * @param lowWaterMarks
+   */
+  public static void addLowWaterMarkToQuery(BrokerRequest realtimeBrokerRequest, Map<Integer, Long> lowWaterMarks) {
+    if (lowWaterMarks == null || lowWaterMarks.size() == 0) {
+      LOGGER.warn("No low water mark info found for query: {}", realtimeBrokerRequest);
+      return;
+    }
+
+    // Choose the min lwm among all partitions.
+    long minLwm = Collections.min(lowWaterMarks.values());
+
+    // 1. Build the low water mark query of the form for a table assuming lwm is the min LWM and -1 is used as
+    // uninitialized marker.
+    // ($validFrom <= lwm and $validFrom > -1) AND (lwm < $validUtil OR $validUtil = -1)
+    // -1 is used instead of Long.MAXVALUE because Pinot does not handle long arithmetic correctly.
+    FilterQuery lwmQuery = addSinglePartitionLowWaterMark(QUERY_ID_BASE - 1, realtimeBrokerRequest, minLwm);
+
+    // 2. Attach low water mark filter to the current filters.
+    FilterQuery currentFilterQuery = realtimeBrokerRequest.getFilterQuery();
+    if (currentFilterQuery != null) {
+      // Make an AND query of lwmQuery and the existing query.
+      FilterQuery andFilterQuery = new FilterQuery();
+      // Make sure we do not reuse any query id in lwmQuerys.
+      andFilterQuery.setId(QUERY_ID_BASE);
+      andFilterQuery.setOperator(FilterOperator.AND);
+      List<Integer> nestedFilterQueryIds = new ArrayList<>(2);
+      nestedFilterQueryIds.add(currentFilterQuery.getId());
+      nestedFilterQueryIds.add(lwmQuery.getId());
+      andFilterQuery.setNestedFilterQueryIds(nestedFilterQueryIds);
+
+      realtimeBrokerRequest.setFilterQuery(andFilterQuery);
+      FilterQueryMap filterSubQueryMap = realtimeBrokerRequest.getFilterSubQueryMap();
+      filterSubQueryMap.putToFilterQueryMap(lwmQuery.getId(), lwmQuery);
+      filterSubQueryMap.putToFilterQueryMap(andFilterQuery.getId(), andFilterQuery);
+    } else {
+      realtimeBrokerRequest.getFilterSubQueryMap().putToFilterQueryMap(lwmQuery.getId(), lwmQuery);
+      realtimeBrokerRequest.setFilterQuery(lwmQuery);
+    }
+  }
+
+  /**
+   *
+   * @param queryIdBase The starting id that will be assigned to the first query created in ths method.
+   * @param realtimeBrokerRequest
+   * @param lwm low water mark.
+   * @return a filter query corresponding to the low water mark constraint of a single partition. The general form is:
+   *         ($ValidFrom <= lwm && $validFrom > -1)  AND (lwm < $validUtil OR $validUtil = -1)
+   */
+  private static FilterQuery addSinglePartitionLowWaterMark(int queryIdBase, BrokerRequest realtimeBrokerRequest,
+      Long lwm) {
+    // ValidFromQuery: ($ValidFrom <= lwm && $validFrom > -1)
+    FilterQuery validFromFilterQuery = new FilterQuery();
+    // Important: Always decrement queryIdBase value after use to avoid id conflict.
+    validFromFilterQuery.setId(queryIdBase--);
+    validFromFilterQuery.setOperator(FilterOperator.AND);
+    FilterQuery validFromP1 = getLeafFilterQuery(VALID_FROM, queryIdBase--, "(*\t\t" + lwm + "]", FilterOperator.RANGE, realtimeBrokerRequest);
+    FilterQuery validFromP2 = getLeafFilterQuery(VALID_FROM, queryIdBase--, "(-1\t\t*)", FilterOperator.RANGE, realtimeBrokerRequest);
+    List<Integer> nestedQueriesIdForValidFrom = new ArrayList<>();
+    nestedQueriesIdForValidFrom.add(validFromP1.getId());
+    nestedQueriesIdForValidFrom.add(validFromP2.getId());
+    validFromFilterQuery.setNestedFilterQueryIds(nestedQueriesIdForValidFrom);
+
+    // ValidUtilQuery: (lwm < $validUtil OR $validUtil = -1)
+    FilterQuery validUtilFilterQuery = new FilterQuery();
+    validUtilFilterQuery.setId(queryIdBase--);
+    validUtilFilterQuery.setOperator(FilterOperator.OR);
+
+    FilterQuery validUtilP1 = getLeafFilterQuery(VALID_UNTIL, queryIdBase--, "(" + lwm + "\t\t*)", FilterOperator.RANGE, realtimeBrokerRequest);
+    FilterQuery validUtilP2 = getLeafFilterQuery(VALID_UNTIL, queryIdBase--, "-1", FilterOperator.EQUALITY, realtimeBrokerRequest);
+    List<Integer> nestedQueriesIdForValidUtil = new ArrayList<>();
+    nestedQueriesIdForValidUtil.add(validUtilP1.getId());
+    nestedQueriesIdForValidUtil.add(validUtilP2.getId());
+    validUtilFilterQuery.setNestedFilterQueryIds(nestedQueriesIdForValidUtil);
+
+    // Top level query: ValidFromQuery AND ValidUtilQuery
+    FilterQuery lwmQuery = new FilterQuery();
+    lwmQuery.setId(queryIdBase--);
+    lwmQuery.setOperator(FilterOperator.AND);
+    List<Integer> nestQids = new ArrayList<>();
+    nestQids.add(validFromFilterQuery.getId());
+    nestQids.add(validUtilFilterQuery.getId());
+    lwmQuery.setNestedFilterQueryIds(nestQids);
+
+    // Add all the new created queries to the query map.
+    realtimeBrokerRequest.getFilterSubQueryMap().putToFilterQueryMap(lwmQuery.getId(), lwmQuery);
+    realtimeBrokerRequest.getFilterSubQueryMap().putToFilterQueryMap(validFromFilterQuery.getId(), validFromFilterQuery);
+    realtimeBrokerRequest.getFilterSubQueryMap().putToFilterQueryMap(validUtilFilterQuery.getId(), validUtilFilterQuery);
+    return lwmQuery;
+  }
+
+  private static FilterQuery getLeafFilterQuery(String column, int id, String value, FilterOperator operator,
+      BrokerRequest realtimeBrokerRequest) {
+    FilterQuery filterQuery = new FilterQuery();
+    filterQuery.setColumn(column);
+    filterQuery.setId(id);
+    filterQuery.setValue(Collections.singletonList(value));
+    filterQuery.setOperator(operator);
+    if (realtimeBrokerRequest.getFilterSubQueryMap() == null) {
+      realtimeBrokerRequest.setFilterSubQueryMap(new FilterQueryMap());
+    }
+    realtimeBrokerRequest.getFilterSubQueryMap().putToFilterQueryMap(id, filterQuery);
+    return filterQuery;
+  }
+}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
index 844a200..95f943f 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
@@ -23,6 +23,8 @@ import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
+
+import org.apache.pinot.core.segment.updater.LowWaterMarkService;
 import org.apache.commons.configuration.Configuration;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
@@ -57,8 +59,10 @@ public class SingleConnectionBrokerRequestHandler extends BaseBrokerRequestHandl
 
   public SingleConnectionBrokerRequestHandler(Configuration config, RoutingTable routingTable,
       TimeBoundaryService timeBoundaryService, AccessControlFactory accessControlFactory,
-      QueryQuotaManager queryQuotaManager, BrokerMetrics brokerMetrics, ZkHelixPropertyStore<ZNRecord> propertyStore) {
-    super(config, routingTable, timeBoundaryService, accessControlFactory, queryQuotaManager, brokerMetrics, propertyStore);
+      QueryQuotaManager queryQuotaManager, BrokerMetrics brokerMetrics, ZkHelixPropertyStore<ZNRecord> propertyStore,
+      LowWaterMarkService lowWaterMarkService) {
+    super(config, routingTable, timeBoundaryService, accessControlFactory, queryQuotaManager, brokerMetrics,
+            propertyStore, lowWaterMarkService);
     _queryRouter = new QueryRouter(_brokerId, brokerMetrics);
   }
 
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/upsert/DefaultUpsertComponentContainer.java b/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultLowWaterMarkService.java
similarity index 57%
copy from pinot-server/src/main/java/org/apache/pinot/server/upsert/DefaultUpsertComponentContainer.java
copy to pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultLowWaterMarkService.java
index 63424ac..94b3be4 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/upsert/DefaultUpsertComponentContainer.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultLowWaterMarkService.java
@@ -16,34 +16,32 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.server.upsert;
+package org.apache.pinot.broker.upsert;
 
-import com.codahale.metrics.MetricRegistry;
-import org.apache.commons.configuration.Configuration;
+import com.google.common.collect.ImmutableMap;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.core.segment.updater.LowWaterMarkService;
 
-public class DefaultUpsertComponentContainer implements UpsertComponentContainer {
+import java.util.Map;
 
-  private final SegmentDeletionHandler deletionHandler = new SegmentDeletionHandler();
+public class DefaultLowWaterMarkService implements LowWaterMarkService {
 
   @Override
-  public void registerMetrics(MetricRegistry registry) {
-
-  }
-
-  @Override
-  public void init(Configuration config) {
+  public void init(HelixDataAccessor helixDataAccessor, String helixClusterName, int serverPollingInterval,
+      int serverPort){
   }
 
   @Override
-  public SegmentDeletionHandler getSegmentDeletionHandler() {
-    return deletionHandler;
+  public Map<Integer, Long> getLowWaterMarks(String tableName) {
+    return ImmutableMap.of();
   }
 
   @Override
-  public void start() {
+  public void shutDown() {
   }
 
   @Override
-  public void stop() {
+  public void start(BrokerMetrics brokerMetrics) {
   }
 }
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainerProvider.java b/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/LowWaterMarkServiceProvider.java
similarity index 50%
copy from pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainerProvider.java
copy to pinot-broker/src/main/java/org/apache/pinot/broker/upsert/LowWaterMarkServiceProvider.java
index fe4af07..f5c06f3 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainerProvider.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/LowWaterMarkServiceProvider.java
@@ -16,43 +16,48 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.server.upsert;
+package org.apache.pinot.broker.upsert;
 
 import com.google.common.base.Preconditions;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.apache.pinot.core.segment.updater.WatermarkManager;
-import org.apache.pinot.grigio.common.metrics.GrigioMetrics;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.core.segment.updater.LowWaterMarkService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class UpsertComponentContainerProvider {
+import static org.apache.pinot.common.utils.CommonConstants.Broker.*;
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(UpsertComponentContainerProvider.class);
 
-  public static final String UPSERT_COMPONENT_CONFIG_KEY = "watermarkManager.class";
-  public static final String UPSERT_COMPONENT_CONFIG_DEFAULT = DefaultUpsertComponentContainer.class.getName();
+public class LowWaterMarkServiceProvider {
 
-  private final Configuration _conf;
-  private UpsertComponentContainer _instance;
+  private static final Logger LOGGER = LoggerFactory.getLogger(LowWaterMarkServiceProvider.class);
 
-  public UpsertComponentContainerProvider(Configuration conf, GrigioMetrics metrics) {
-    _conf = conf;
-    String className = _conf.getString(UPSERT_COMPONENT_CONFIG_KEY, UPSERT_COMPONENT_CONFIG_DEFAULT);
+  private LowWaterMarkService _instance;
+
+  public LowWaterMarkServiceProvider(Configuration brokerConfig, HelixDataAccessor dataAccessor, String clusterName) {
+    String className = brokerConfig.getString(CommonConstants.Broker.CONFIG_OF_BROKER_LWMS_CLASS_NAME,
+        DefaultLowWaterMarkService.class.getName());
     LOGGER.info("creating watermark manager with class {}", className);
     try {
-      Class<UpsertComponentContainer> comonentContainerClass = (Class<UpsertComponentContainer>) Class.forName(className);
-      Preconditions.checkState(comonentContainerClass.isAssignableFrom(WatermarkManager.class),
-          "configured class not assignable from Callback class");
+      Class<LowWaterMarkService> comonentContainerClass = (Class<LowWaterMarkService>) Class.forName(className);
+      Preconditions.checkState(comonentContainerClass.isAssignableFrom(LowWaterMarkService.class),
+          "configured class not assignable from LowWaterMarkService class");
       _instance = comonentContainerClass.newInstance();
-      _instance.init(_conf);
+      _instance.init(dataAccessor, clusterName,
+          brokerConfig.getInt(CONFIG_OF_BROKER_POLLING_SERVER_LWMS_INTERVAL_MS,
+              DEFAULT_OF_BROKER_POLLING_SERVER_LWMS_INTERVAL_MS),
+          brokerConfig.getInt(CONFIG_OF_BROKER_POLLING_SERVER_LWMS_SERVER_PORT,
+              CommonConstants.Server.DEFAULT_ADMIN_API_PORT));
     } catch (Exception e) {
       LOGGER.error("failed to load watermark manager class", className, e);
+      _instance = null;
       ExceptionUtils.rethrow(e);
     }
   }
 
-  public UpsertComponentContainer getInstance() {
+  public LowWaterMarkService getInstance() {
     return _instance;
   }
 }
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LowWaterMarkQueryWriterTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LowWaterMarkQueryWriterTest.java
new file mode 100644
index 0000000..1021477
--- /dev/null
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LowWaterMarkQueryWriterTest.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.requesthandler;
+
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.request.FilterOperator;
+import org.apache.pinot.common.request.FilterQuery;
+import org.apache.pinot.pql.parsers.Pql2Compiler;
+import org.apache.thrift.TException;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class LowWaterMarkQueryWriterTest {
+    @Test
+    public void testRewriteQueryWithoutExistingFilters() throws Exception{
+        Pql2Compiler pql2Compiler = new Pql2Compiler();
+        BrokerRequest req = pql2Compiler.compileToBrokerRequest("SELECT * FROM T");
+        Assert.assertFalse(req.isSetFilterQuery());
+        Map<Integer, Long> lwms = new HashMap<>();
+        lwms.put(0, 10L);
+        lwms.put(1, 20L);
+        LowWaterMarkQueryWriter.addLowWaterMarkToQuery(req, lwms);
+        Assert.assertTrue(req.isSetFilterQuery());
+        try {
+            req.validate();
+        } catch (TException e)   {
+            Assert.fail("Query after low water mark query is not valid: ", e);
+        }
+        // Verify there are in total 7 filter query nodes in the filter query tree.
+        Map<Integer,FilterQuery> filterSubQueryMap = req.getFilterSubQueryMap().getFilterQueryMap();
+        Assert.assertEquals(filterSubQueryMap.size(), 7);
+
+        Integer lwmQueryId = req.getFilterQuery().getId();
+        // 1. Verify the low water mark query.
+        FilterQuery lwmQuery = filterSubQueryMap.get(lwmQueryId);
+        verifyNoneTerminalFilterQuery(lwmQuery, FilterOperator.AND, 2);
+        FilterQuery validFrom1Query = filterSubQueryMap.get(lwmQuery.getNestedFilterQueryIds().get(0));
+        FilterQuery validTo1Query = filterSubQueryMap.get(lwmQuery.getNestedFilterQueryIds().get(1));
+
+        // Verify the subtree (i.e., an AND with two nodes) for the $validFrom column.
+        verifyNoneTerminalFilterQuery(validFrom1Query, FilterOperator.AND, 2);
+        verifyTerminalFilterQuery(filterSubQueryMap.get(validFrom1Query.getNestedFilterQueryIds().get(0)),
+            "$validFrom", "(*\t\t10]", FilterOperator.RANGE);
+        verifyTerminalFilterQuery(filterSubQueryMap.get(validFrom1Query.getNestedFilterQueryIds().get(1)),
+            "$validFrom", "(-1\t\t*)", FilterOperator.RANGE);
+
+        // Verify the subtree (i.e., an OR with two nodes) for the $validutil column.
+        verifyNoneTerminalFilterQuery(validTo1Query, FilterOperator.OR, 2);
+        verifyTerminalFilterQuery(filterSubQueryMap.get(validTo1Query.getNestedFilterQueryIds().get(0)),
+            "$validUntil", "(10\t\t*)", FilterOperator.RANGE);
+        verifyTerminalFilterQuery(filterSubQueryMap.get(validTo1Query.getNestedFilterQueryIds().get(1)),
+            "$validUntil", "-1", FilterOperator.EQUALITY);
+    }
+
+    @Test
+    public void testRewriteQueryWithExistingFilters() {
+        Pql2Compiler pql2Compiler = new Pql2Compiler();
+        BrokerRequest req = pql2Compiler.compileToBrokerRequest("SELECT * FROM T WHERE A < 4");
+        Assert.assertTrue(req.isSetFilterQuery());
+        Map<Integer, Long> lwms = new HashMap<>();
+        lwms.put(0, 10L);
+        lwms.put(1, 20L);
+        LowWaterMarkQueryWriter.addLowWaterMarkToQuery(req, lwms);
+        Assert.assertTrue(req.isSetFilterQuery());
+        try {
+            req.validate();
+        } catch (TException e) {
+            Assert.fail("Query after low water mark query is not valid: ", e);
+        }
+        // Verify there are in total 9 filter query nodes in the filter query tree.
+        Map<Integer,FilterQuery> filterSubQueryMap = req.getFilterSubQueryMap().getFilterQueryMap();
+        Assert.assertEquals(filterSubQueryMap.size(), 9);
+        // 0. Verify there are one top level filter of operator OR with two sub filter queries.
+        FilterQuery rootFilterQuery = req.getFilterQuery();
+        verifyNoneTerminalFilterQuery(rootFilterQuery, FilterOperator.AND, 2);
+        // 1. Verify the existing filter query A < 4 is not affected.
+        verifyTerminalFilterQuery(filterSubQueryMap.get(rootFilterQuery.getNestedFilterQueryIds().get(0)), "A", "(*\t\t4)", FilterOperator.RANGE);
+
+        FilterQuery lowWaterMarkQuery = filterSubQueryMap.get(rootFilterQuery.getNestedFilterQueryIds().get(1));
+        // Verify the lwm query
+        verifyNoneTerminalFilterQuery(lowWaterMarkQuery, FilterOperator.AND, 2);
+        FilterQuery validFrom1Query = filterSubQueryMap.get(lowWaterMarkQuery.getNestedFilterQueryIds().get(0));
+        FilterQuery validTo1Query = filterSubQueryMap.get(lowWaterMarkQuery.getNestedFilterQueryIds().get(1));
+
+        // Verify the subtree (i.e., an AND with two nodes) for the $validFrom column.
+        verifyNoneTerminalFilterQuery(validFrom1Query, FilterOperator.AND, 2);
+        verifyTerminalFilterQuery(filterSubQueryMap.get(validFrom1Query.getNestedFilterQueryIds().get(0)),
+            "$validFrom", "(*\t\t10]", FilterOperator.RANGE);
+        verifyTerminalFilterQuery(filterSubQueryMap.get(validFrom1Query.getNestedFilterQueryIds().get(1)),
+            "$validFrom", "(-1\t\t*)", FilterOperator.RANGE);
+
+        // Verify the subtree (i.e., an OR with two nodes) for the $validutil column.
+        verifyNoneTerminalFilterQuery(validTo1Query, FilterOperator.OR, 2);
+        verifyTerminalFilterQuery(filterSubQueryMap.get(validTo1Query.getNestedFilterQueryIds().get(0)),
+            "$validUntil", "(10\t\t*)", FilterOperator.RANGE);
+        verifyTerminalFilterQuery(filterSubQueryMap.get(validTo1Query.getNestedFilterQueryIds().get(1)),
+            "$validUntil", "-1", FilterOperator.EQUALITY);
+    }
+
+    private void verifyTerminalFilterQuery(FilterQuery filterQuery, String column, String value, FilterOperator op) {
+        Assert.assertEquals(filterQuery.getColumn(), column);
+        Assert.assertEquals(filterQuery.getValue(), Collections.singletonList(value));
+        Assert.assertEquals(filterQuery.getOperator(), op);
+    }
+
+    private void verifyNoneTerminalFilterQuery(FilterQuery filterQuery, FilterOperator op, int numOfChildQueries) {
+        Assert.assertEquals(filterQuery.getOperator(), op);
+        Assert.assertEquals(filterQuery.getNestedFilterQueryIdsSize(), numOfChildQueries);
+    }
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index a6f242c..0e65779 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -175,7 +175,9 @@ public class CommonConstants {
     public static final String CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE = "pinot.broker.enable.query.limit.override";
 
     public static final String CONFIG_OF_BROKER_POLLING_SERVER_LWMS_INTERVAL_MS = "pinot.broker.query.polling.server.lwms.interval.ms";
+    public static final int DEFAULT_OF_BROKER_POLLING_SERVER_LWMS_INTERVAL_MS = 5 * 1_000;
     public static final String CONFIG_OF_BROKER_POLLING_SERVER_LWMS_SERVER_PORT = "pinot.broker.query.polling.server.lwms.port";
+    public static final String CONFIG_OF_BROKER_LWMS_CLASS_NAME = "pinot.broker.lwms.classname";
     public static final String CONFIG_OF_BROKER_LWM_REWRITE_ENABLE = "pinot.broker.query.lwm.rewrite";
     public static final boolean CONFIG_OF_BROKER_LWM_REWRITE_ENABLE_DEFAULT = true;
     public static class Request {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index a494e3f..9cce40d 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -112,6 +112,18 @@ import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 public class PinotHelixResourceManager {
   private static final Logger LOGGER = LoggerFactory.getLogger(PinotHelixResourceManager.class);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WatermarkManager.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/DefaultWaterMarkManager.java
similarity index 71%
copy from pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WatermarkManager.java
copy to pinot-core/src/main/java/org/apache/pinot/core/segment/updater/DefaultWaterMarkManager.java
index 01579e7..b45060c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WatermarkManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/DefaultWaterMarkManager.java
@@ -18,16 +18,23 @@
  */
 package org.apache.pinot.core.segment.updater;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.commons.configuration.Configuration;
-import org.apache.pinot.grigio.common.metrics.GrigioMeter;
 import org.apache.pinot.grigio.common.metrics.GrigioMetrics;
 
 import java.util.Map;
 
-public interface WatermarkManager {
+public class DefaultWaterMarkManager implements WaterMarkManager {
 
-  void init(Configuration config, GrigioMetrics metrics);
+  private static final Map<String, Map<Integer, Long>> DEFAULT_MAP = ImmutableMap.of();
 
-  Map<String, Map<Integer, Long>> getHighWaterMarkTablePartitionMap();
+  @Override
+  public void init(Configuration config, GrigioMetrics metrics) {
 
+  }
+
+  @Override
+  public Map<String, Map<Integer, Long>> getHighWaterMarkTablePartitionMap() {
+    return DEFAULT_MAP;
+  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WatermarkManager.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/LowWaterMarkService.java
similarity index 56%
copy from pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WatermarkManager.java
copy to pinot-core/src/main/java/org/apache/pinot/core/segment/updater/LowWaterMarkService.java
index 01579e7..2fd7434 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WatermarkManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/LowWaterMarkService.java
@@ -18,16 +18,25 @@
  */
 package org.apache.pinot.core.segment.updater;
 
-import org.apache.commons.configuration.Configuration;
-import org.apache.pinot.grigio.common.metrics.GrigioMeter;
-import org.apache.pinot.grigio.common.metrics.GrigioMetrics;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.pinot.common.metrics.BrokerMetrics;
 
 import java.util.Map;
 
-public interface WatermarkManager {
+/**
+ * LowWaterMarkService keeps records of the low water mark (i.e., the stream ingestion progress) for each partition of
+ * an input table.
+ */
+public interface LowWaterMarkService {
+
+    void init(HelixDataAccessor helixDataAccessor, String helixClusterName, int serverPollingInterval, int serverPort);
 
-  void init(Configuration config, GrigioMetrics metrics);
+    // Return the low water mark mapping from partition id to the corresponding low water mark of a given table.
+    Map<Integer, Long> getLowWaterMarks(String tableName);
 
-  Map<String, Map<Integer, Long>> getHighWaterMarkTablePartitionMap();
+    // Shutdown the service.
+    void shutDown();
 
+    // start
+    void start(BrokerMetrics brokerMetrics);
 }
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/upsert/SegmentDeletionHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/SegmentDeletionHandler.java
similarity index 94%
rename from pinot-server/src/main/java/org/apache/pinot/server/upsert/SegmentDeletionHandler.java
rename to pinot-core/src/main/java/org/apache/pinot/core/segment/updater/SegmentDeletionHandler.java
index 1aba06b..1db6f84 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/upsert/SegmentDeletionHandler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/SegmentDeletionHandler.java
@@ -16,10 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.server.upsert;
+package org.apache.pinot.core.segment.updater;
 
 import com.google.common.collect.ImmutableList;
-import org.apache.pinot.core.segment.updater.SegmentDeletionListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertComponentContainer.java
similarity index 67%
rename from pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainer.java
rename to pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertComponentContainer.java
index 7f636fc..eb64285 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertComponentContainer.java
@@ -16,20 +16,27 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.server.upsert;
+package org.apache.pinot.core.segment.updater;
 
-import com.codahale.metrics.MetricRegistry;
+import com.yammer.metrics.core.MetricsRegistry;
 import org.apache.commons.configuration.Configuration;
+import org.apache.helix.HelixManager;
 
 public interface UpsertComponentContainer {
 
-  void registerMetrics(MetricRegistry registry);
+  void registerMetrics(String prefix, MetricsRegistry registry);
 
-  void init(Configuration config);
+  void init(Configuration config, HelixManager helixManager, String clusterName, String instanceName);
+
+  void startBackgroundThread();
+
+  void stopBackgroundThread();
+
+  void shutdown();
 
   SegmentDeletionHandler getSegmentDeletionHandler();
 
-  void start();
+  WaterMarkManager getWatermarkManager();
 
-  void stop();
+  boolean isUpsertEnabled();
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WatermarkManager.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WaterMarkManager.java
similarity index 96%
copy from pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WatermarkManager.java
copy to pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WaterMarkManager.java
index 01579e7..acc8479 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WatermarkManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WaterMarkManager.java
@@ -24,10 +24,11 @@ import org.apache.pinot.grigio.common.metrics.GrigioMetrics;
 
 import java.util.Map;
 
-public interface WatermarkManager {
+public interface WaterMarkManager {
 
   void init(Configuration config, GrigioMetrics metrics);
 
   Map<String, Map<Integer, Long>> getHighWaterMarkTablePartitionMap();
 
+
 }
diff --git a/pinot-grigio/pinot-grigio-provided/pom.xml b/pinot-grigio/pinot-grigio-provided/pom.xml
index 5643070..ec7b4ca 100644
--- a/pinot-grigio/pinot-grigio-provided/pom.xml
+++ b/pinot-grigio/pinot-grigio-provided/pom.xml
@@ -52,8 +52,40 @@
             <artifactId>mockito-core</artifactId>
             <scope>test</scope>
         </dependency>
-    </dependencies>
-
-
 
+        <dependency>
+            <groupId>com.github.tomakehurst</groupId>
+            <artifactId>wiremock-jre8</artifactId>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.ow2.asm</groupId>
+                    <artifactId>asm</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>junit</groupId>
+                    <artifactId>junit</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.pinot</groupId>
+            <artifactId>pinot-broker</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.pinot</groupId>
+            <artifactId>pinot-controller</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.pinot</groupId>
+            <artifactId>pinot-controller</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
 </project>
\ No newline at end of file
diff --git a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/PollingBasedLowWaterMarkService.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/PollingBasedLowWaterMarkService.java
new file mode 100644
index 0000000..f7e88c9
--- /dev/null
+++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/PollingBasedLowWaterMarkService.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.upsert;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyPathBuilder;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkCacheBaseDataAccessor;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.metrics.BrokerGauge;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.restlet.resources.TableLowWaterMarksInfo;
+import org.apache.pinot.core.segment.updater.LowWaterMarkService;
+import org.glassfish.jersey.client.ClientProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.WebTarget;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+// A low water mark service which polls various Pinot servers periodically to get the low water marks for partitions of
+// servers.
+public class PollingBasedLowWaterMarkService implements LowWaterMarkService {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PollingBasedLowWaterMarkService.class);
+
+  private static final String LWMS_PATH = "lwms";
+  private static final String HTTP = "http";
+
+  private static final int SERVER_CONNENCT_TIMEOUT_MS = 10000;
+  private static final int SERVER_READ_TIMEOUT = 10000;
+  private static final String SERVER_PREFIX = "Server_";
+
+  // A map from table_name to its partition->lwm mapping.
+  private Map<String, Map<Integer, Long>> _tableLowWaterMarks;
+  private ZkCacheBaseDataAccessor<ZNRecord> _cacheInstanceConfigsDataAccessor;
+  private Client _httpClient;
+  // We can tune this polling interval to make sure we get the fresh snapshot of server low water marks.
+  private int _serverPollingInterval;
+  private int _serverPort;
+  private boolean _shuttingDown;
+  private BrokerMetrics _brokerMetrics;
+
+  @Override
+  public void init(HelixDataAccessor helixDataAccessor, String helixClusterName, int serverPollingInterval, int serverPort) {
+    // Construct the zk path to get the server instances.
+    String instanceConfigs = PropertyPathBuilder.instanceConfig(helixClusterName);
+    // Build a zk data reader.
+    _cacheInstanceConfigsDataAccessor =
+        new ZkCacheBaseDataAccessor<>((ZkBaseDataAccessor<ZNRecord>) helixDataAccessor.getBaseDataAccessor(),
+            instanceConfigs, null, Collections.singletonList(instanceConfigs));
+    _tableLowWaterMarks = new ConcurrentHashMap<>();
+    _httpClient = ClientBuilder.newClient();
+    _httpClient.property(ClientProperties.CONNECT_TIMEOUT, SERVER_CONNENCT_TIMEOUT_MS);
+    _httpClient.property(ClientProperties.READ_TIMEOUT, SERVER_READ_TIMEOUT);
+    _serverPollingInterval = serverPollingInterval;
+    _serverPort = serverPort;
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+      @Override
+      public void run() {
+        try {
+          shutDown();
+        } catch (final Exception e) {
+          LOGGER.error("Caught exception while running shutdown hook", e);
+        }
+      }
+    });
+  }
+
+  @Override
+  public void start(BrokerMetrics brokerMetrics) {
+    _brokerMetrics = brokerMetrics;
+    Thread serverPollingThread = new Thread(new PinotServerPollingExecutor());
+    serverPollingThread.start();
+  }
+
+  @Override
+  public Map<Integer, Long> getLowWaterMarks(String tableName) {
+    return _tableLowWaterMarks.get(tableName);
+  }
+
+  @Override
+  public void shutDown() {
+    _shuttingDown = true;
+  }
+
+  // Poll all the servers periodically to find out the Low Water Mark info.
+  private class PinotServerPollingExecutor implements Runnable {
+    @Override
+    public void run() {
+      while (!_shuttingDown) {
+        try {
+          Map<String, Map<Integer, Long>> latestLowWaterMarks = new ConcurrentHashMap<>();
+          // 1. Find out all the alive servers.
+          List<String> serverInstances = _cacheInstanceConfigsDataAccessor.getChildNames("/", AccessOption.PERSISTENT);
+          List<ZNRecord> instances = _cacheInstanceConfigsDataAccessor.getChildren("/", null, AccessOption.PERSISTENT);
+          for (ZNRecord r : instances) {
+            LOGGER.info("Instance info for lwms: {}", r.toString());
+          }
+          // 2. Ask each server for its low water mark info.
+          for (String serverIntanceId : serverInstances) {
+            // Check the instance is in fact a server.
+            if (!serverIntanceId.startsWith(SERVER_PREFIX) && !serverIntanceId.startsWith("server_"))
+              continue;
+            InstanceConfig serverConfig = InstanceConfig.toInstanceConfig(serverIntanceId.substring(
+                SERVER_PREFIX.length()));
+            try {
+              // (TODO) Fixing this. Hardcode using the default server admin port for now.
+              WebTarget webTarget = _httpClient.target(getURI(serverConfig.getHostName(), _serverPort));
+              TableLowWaterMarksInfo lwms = webTarget.path(PollingBasedLowWaterMarkService.LWMS_PATH).request().
+                  get(TableLowWaterMarksInfo.class);
+              LOGGER.info("Found low water mark info for server {}: {}", serverIntanceId, lwms.getTableLowWaterMarks());
+              // 3. Update the low water marks.
+              LwmMerger.updateLowWaterMarks(latestLowWaterMarks, lwms.getTableLowWaterMarks());
+            } catch (Exception e) {
+              // TODO(tingchen) Handle server failures. We could keep the last known lwms of a server.
+              LOGGER.warn("Error during getting low water marks from server {}", serverIntanceId, e);
+            }
+          }
+          // 4. Replace the broker's low water marks table with the latest low water mark info.
+          if (validate(latestLowWaterMarks)) {
+            _tableLowWaterMarks = latestLowWaterMarks;
+          }
+          // 5. Sleep for some interval.
+          Thread.sleep(_serverPollingInterval);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          // It is OK for us to break out the loop early because the Low Water Mark refresh is best effort.
+          break;
+        }
+      }
+    }
+
+    // Validate the low water mark info polled from all the servers are right. For now, return true.
+    // (TODO tingchen) figure out the right checks.
+    private boolean validate(Map<String, Map<Integer, Long>> latestLowWaterMarks) {
+      if (latestLowWaterMarks == null) {
+        _brokerMetrics.addMeteredGlobalValue(BrokerMeter.LOW_WATER_MARK_QUERY_FAILURES, 1);
+        return false;
+      }
+      for(String tableName : latestLowWaterMarks.keySet()) {
+        Map<Integer, Long> partitionLWMs = latestLowWaterMarks.get(tableName);
+        _brokerMetrics.addValueToTableGauge(tableName, BrokerGauge.TABLE_MIN_LOW_WATER_MARK,
+            Collections.min(partitionLWMs.values()));
+      }
+      return true;
+    }
+
+    private URI getURI(String host, int port) throws URISyntaxException {
+      LOGGER.info("requesting host {} and port {}", host, port);
+      return new URI(PollingBasedLowWaterMarkService.HTTP, null, host, port, null
+          , null, null);
+    }
+  }
+
+  static class LwmMerger {
+    // Update an existing map currentLwmsMap of tableName->low_water_marks with a new map of the same type.
+    // If an entry in the new map does not exist in currentLwmsMap, insert it to currentLwmsMap.
+    // otherwise merge the entry with the existing entry in currentLwmsMap using mergeTablePartitionLwms().
+    static void updateLowWaterMarks(Map<String, Map<Integer, Long>> currentLwmsMap,
+                                    final Map<String, Map<Integer, Long>> serverLwmsMap) {
+      for (Map.Entry<String, Map<Integer, Long>> serverMap : serverLwmsMap.entrySet()) {
+        String tableName = serverMap.getKey();
+        Map<Integer, Long> tableLwms = serverMap.getValue();
+        if (currentLwmsMap.containsKey(tableName)) {
+          currentLwmsMap.put(tableName,
+              LwmMerger.mergeTablePartitionLwms(Collections.unmodifiableMap(currentLwmsMap.get(tableName)),
+                  tableLwms));
+        } else {
+          currentLwmsMap.put(tableName, tableLwms);
+        }
+      }
+    }
+
+    // Merge all the entries in the two input maps of partition_id->lwm.
+    // If an entry exists only in a map, put it in the combined map.
+    // If an entry exists in both maps, use the entry with the smaller low water marks.
+    static Map<Integer, Long> mergeTablePartitionLwms(final Map<Integer, Long> m1, final Map<Integer, Long> m2) {
+      if (m1 == null || m1.size() == 0) {
+        return m2;
+      }
+      if (m2 == null || m2.size() == 0) {
+        return m1;
+      }
+      Map<Integer, Long> result = new HashMap<>(m1);
+      for (Map.Entry<Integer, Long> entry : m2.entrySet()) {
+        Integer partitionNo = entry.getKey();
+        Long lwm = entry.getValue();
+        if (result.containsKey(partitionNo)) {
+          result.put(partitionNo, Math.min(lwm, result.get(partitionNo)));
+        } else {
+          result.put(partitionNo, lwm);
+        }
+      }
+      return result;
+    }
+  }
+}
diff --git a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertImmutableIndexSegmentCallback.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertImmutableIndexSegmentCallback.java
index 1ddc963..b7fcdb8 100644
--- a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertImmutableIndexSegmentCallback.java
+++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertImmutableIndexSegmentCallback.java
@@ -29,7 +29,7 @@ import org.apache.pinot.core.io.reader.BaseSingleColumnSingleValueReader;
 import org.apache.pinot.core.io.reader.DataFileReader;
 import org.apache.pinot.core.segment.index.column.ColumnIndexContainer;
 import org.apache.pinot.core.segment.index.readers.Dictionary;
-import org.apache.pinot.core.segment.updater.UpsertWatermarkManager;
+import org.apache.pinot.core.segment.updater.UpsertWaterMarkManager;
 import org.apache.pinot.core.segment.virtualcolumn.mutable.VirtualColumnLongValueReaderWriter;
 import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntry;
 import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntrySet;
@@ -50,7 +50,7 @@ public class UpsertImmutableIndexSegmentCallback implements IndexSegmentCallback
   private String _segmentName;
   private int _totalDoc;
   private long _minSourceOffset;
-  private UpsertWatermarkManager _upsertWatermarkManager;
+  private UpsertWaterMarkManager _upsertWatermarkManager;
   private UpdateLogStorageProvider _updateLogStorageProvider;
   // use array for mapping bewteen offset to docId, where actual offset = min_offset + array_index
   // use 4 bytes per record
@@ -67,7 +67,7 @@ public class UpsertImmutableIndexSegmentCallback implements IndexSegmentCallback
         CommonConstants.Helix.TableType.REALTIME);
     _segmentName = segmentMetadata.getName();
     _totalDoc = segmentMetadata.getTotalDocs();
-    _upsertWatermarkManager = UpsertWatermarkManager.getInstance();
+    _upsertWatermarkManager = UpsertWaterMarkManager.getInstance();
     _updateLogStorageProvider = UpdateLogStorageProvider.getInstance();
     _virtualColumnsReaderWriter = new ArrayList<>();
     for (DataFileReader reader: virtualColumnIndexReader.values()) {
@@ -80,7 +80,7 @@ public class UpsertImmutableIndexSegmentCallback implements IndexSegmentCallback
    */
   @VisibleForTesting
   protected void init(List<VirtualColumnLongValueReaderWriter> readerWriters,
-      int totalDoc, UpsertWatermarkManager manager,
+      int totalDoc, UpsertWaterMarkManager manager,
       UpdateLogStorageProvider updateLogStorageProvider,
       long minSourceOffset, int[] offsetToDocId) {
     _tableNameWithType = "testTable";
diff --git a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertMutableIndexSegmentCallback.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertMutableIndexSegmentCallback.java
index eeda5a6..6eae5fb 100644
--- a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertMutableIndexSegmentCallback.java
+++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertMutableIndexSegmentCallback.java
@@ -24,7 +24,7 @@ import org.apache.pinot.common.segment.SegmentMetadata;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.core.io.reader.DataFileReader;
 import org.apache.pinot.core.segment.index.column.ColumnIndexContainer;
-import org.apache.pinot.core.segment.updater.UpsertWatermarkManager;
+import org.apache.pinot.core.segment.updater.UpsertWaterMarkManager;
 import org.apache.pinot.core.segment.virtualcolumn.mutable.VirtualColumnLongValueReaderWriter;
 import org.apache.pinot.grigio.common.messages.LogEventType;
 import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntry;
@@ -48,7 +48,7 @@ public class UpsertMutableIndexSegmentCallback implements IndexSegmentCallback {
   private String _tableName;
   private String _segmentName;
   private Schema _schema;
-  private UpsertWatermarkManager _upsertWatermarkManager;
+  private UpsertWaterMarkManager _upsertWatermarkManager;
   private String _offsetColumnName;
   private final List<VirtualColumnLongValueReaderWriter> _mutableSegmentReaderWriters = new ArrayList<>();
   // use map for mapping between kafka offset and docId because we at-most have 1 mutable segment per consumer
@@ -75,7 +75,7 @@ public class UpsertMutableIndexSegmentCallback implements IndexSegmentCallback {
         _mutableSegmentReaderWriters.add((VirtualColumnLongValueReaderWriter) reader);
       }
     }
-    _upsertWatermarkManager = UpsertWatermarkManager.getInstance();
+    _upsertWatermarkManager = UpsertWaterMarkManager.getInstance();
     LOGGER.info("starting upsert segment with {} reader writer", _mutableSegmentReaderWriters.size());
   }
 
diff --git a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdater.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdater.java
index 530cd0d..5f0aa43 100644
--- a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdater.java
+++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdater.java
@@ -91,7 +91,7 @@ public class SegmentUpdater implements SegmentDeletionListener {
     _topicPrefix = conf.getString(SegmentUpdaterConfig.INPUT_TOPIC_PREFIX);
     _updateSleepMs = conf.getInt(SegmentUpdaterConfig.SEGMENT_UDPATE_SLEEP_MS,
         SegmentUpdaterConfig.SEGMENT_UDPATE_SLEEP_MS_DEFAULT);
-    UpsertWatermarkManager.init(metrics);
+    UpsertWaterMarkManager.init(metrics);
     _consumer = provider.getConsumer();
     _ingestionExecutorService = Executors.newFixedThreadPool(1);
     _updateLogStorageProvider = UpdateLogStorageProvider.getInstance();
@@ -179,7 +179,7 @@ public class SegmentUpdater implements SegmentDeletionListener {
           if (System.currentTimeMillis() - lastReportedTime > LOGGER_TIME_GAP_MS) {
             lastReportedTime = System.currentTimeMillis();
             LOGGER.info("processed {} messages in {} ms", eventCount, System.currentTimeMillis() - loopStartTime);
-            LOGGER.info("latest high water mark is {}", UpsertWatermarkManager.getInstance().toString());
+            LOGGER.info("latest high water mark is {}", UpsertWaterMarkManager.getInstance().toString());
           }
           _consumer.ackOffset();
           _metrics.addTimedValueMs(GrigioTimer.SEGMENT_UPDATER_LOOP_TIME, System.currentTimeMillis() - loopStartTime);
diff --git a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/UpsertComponentContainerImpl.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/UpsertComponentContainerImpl.java
new file mode 100644
index 0000000..20758f0
--- /dev/null
+++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/UpsertComponentContainerImpl.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.updater;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.yammer.metrics.core.MetricsRegistry;
+import org.apache.commons.configuration.Configuration;
+import org.apache.helix.HelixManager;
+import org.apache.pinot.grigio.common.metrics.GrigioMetrics;
+import org.apache.pinot.grigio.common.storageProvider.UpdateLogStorageProvider;
+import org.apache.pinot.grigio.common.storageProvider.retentionManager.UpdateLogRetentionManager;
+import org.apache.pinot.grigio.common.storageProvider.retentionManager.UpdateLogRetentionManagerImpl;
+import org.apache.pinot.grigio.common.utils.IdealStateHelper;
+import org.apache.pinot.grigio.servers.GrigioServerMetrics;
+import org.apache.pinot.grigio.servers.KeyCoordinatorProvider;
+import org.apache.pinot.grigio.servers.SegmentUpdaterProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.pinot.common.utils.CommonConstants.Grigio.PINOT_UPSERT_SERVER_COMPONENT_PREFIX;
+
+public class UpsertComponentContainerImpl implements UpsertComponentContainer {
+  private static final Logger LOGGER = LoggerFactory.getLogger(UpsertComponentContainerImpl.class);
+
+  // config keys
+  public static final String ENABLED_CONFIG_KEY = "enabled";
+  public static final String STORAGE_CONFIG_KEY = "storage";
+  public static final String HOST_NAME_CONFIG_KEY = "hostname";
+  public static final String KC_CONFIG_KEY = "kc";
+  public static final String UPDATER_CONFIG_KEY = "updater";
+
+  private volatile boolean _isUpsertEnabled = false;
+  private AtomicBoolean _inited = new AtomicBoolean(false);
+
+  // members of related upsert components
+  private Configuration _conf;
+  private String _hostName;
+  private GrigioMetrics _grigioMetrics;
+  private KeyCoordinatorProvider _keyCoordinatorProvider;
+  private SegmentUpdaterProvider _segmentUpdaterProvider;
+  private SegmentUpdater _segmentUpdater;
+  private UpdateLogRetentionManager _retentionManager;
+  private SegmentDeletionHandler _segmentDeletionHandler;
+  private WaterMarkManager _waterMarkManager;
+
+  @Override
+  public void registerMetrics(String prefix, MetricsRegistry registry) {
+    _grigioMetrics = new GrigioServerMetrics(prefix + PINOT_UPSERT_SERVER_COMPONENT_PREFIX, registry);
+    _grigioMetrics.initializeGlobalMeters();
+  }
+
+  @Override
+  public void init(Configuration config, HelixManager helixManager, String clusterName, String instanceName) {
+    Preconditions.checkState(!_inited.getAndSet(true), "cannot initialize upsert component twice");
+    _isUpsertEnabled = config.getBoolean(ENABLED_CONFIG_KEY, false);
+    if (_isUpsertEnabled) {
+      LOGGER.info("initializing upsert components");
+      _conf = config;
+      _hostName = _conf.getString(HOST_NAME_CONFIG_KEY);
+      initVirtualColumnStorageProvider(config);
+      _keyCoordinatorProvider = buildKeyCoordinatorProvider(config, _hostName);
+      _segmentUpdaterProvider = buildSegmentUpdaterProvider(config, _hostName);
+      _retentionManager = new UpdateLogRetentionManagerImpl(
+          new IdealStateHelper(helixManager.getClusterManagmentTool(), clusterName), instanceName
+      );
+      _segmentUpdater = buildSegmentUpdater(config, _segmentUpdaterProvider, _retentionManager);
+      UpsertWaterMarkManager.init(_grigioMetrics);
+      _waterMarkManager = UpsertWaterMarkManager.getInstance();
+      _segmentDeletionHandler = new SegmentDeletionHandler(ImmutableList.of(_segmentUpdater));
+    } else {
+      _waterMarkManager = new DefaultWaterMarkManager();
+      _segmentDeletionHandler = new SegmentDeletionHandler();
+    }
+    _inited.set(true);
+  }
+
+  @Override
+  public SegmentDeletionHandler getSegmentDeletionHandler() {
+    Preconditions.checkState(_inited.get(), "upsert container is not initialized yet");
+    return _segmentDeletionHandler;
+  }
+
+  @Override
+  public WaterMarkManager getWatermarkManager() {
+    return _waterMarkManager;
+  }
+
+  @Override
+  public synchronized void startBackgroundThread() {
+    if (_segmentUpdater != null) {
+      _segmentUpdater.start();
+    }
+  }
+
+  @Override
+  public synchronized void stopBackgroundThread() {
+    if (_segmentUpdater != null) {
+      LOGGER.info("closing segment updater");
+      _segmentUpdater.shutdown();
+    }
+  }
+
+  @Override
+  public boolean isUpsertEnabled() {
+    return _isUpsertEnabled;
+  }
+
+  public synchronized void shutdown() {
+    if (_keyCoordinatorProvider != null) {
+      LOGGER.info("shutting down key coordinator provider");
+      _keyCoordinatorProvider.close();
+    }
+  }
+
+  private void initVirtualColumnStorageProvider(Configuration conf) {
+    UpdateLogStorageProvider.init(conf.subset(STORAGE_CONFIG_KEY));
+  }
+
+  public KeyCoordinatorProvider buildKeyCoordinatorProvider(Configuration conf, String hostname) {
+    return new KeyCoordinatorProvider(conf.subset(KC_CONFIG_KEY), hostname, _grigioMetrics);
+  }
+
+  public SegmentUpdaterProvider buildSegmentUpdaterProvider(Configuration conf, String hostname) {
+    return new SegmentUpdaterProvider(conf.subset(UPDATER_CONFIG_KEY), hostname, _grigioMetrics);
+  }
+
+  public SegmentUpdater buildSegmentUpdater(Configuration conf, SegmentUpdaterProvider updateProvider,
+                                            UpdateLogRetentionManager updateLogRetentionManager) {
+    return new SegmentUpdater(conf.subset(UPDATER_CONFIG_KEY), updateProvider, updateLogRetentionManager,
+            _grigioMetrics);
+  }
+}
diff --git a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/UpsertWatermarkManager.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/UpsertWaterMarkManager.java
similarity index 90%
rename from pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/UpsertWatermarkManager.java
rename to pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/UpsertWaterMarkManager.java
index faad27b..266f25d2 100644
--- a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/UpsertWatermarkManager.java
+++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/UpsertWaterMarkManager.java
@@ -31,26 +31,26 @@ import org.slf4j.LoggerFactory;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
-public class UpsertWatermarkManager implements WatermarkManager {
+public class UpsertWaterMarkManager implements WaterMarkManager {
 
   private final Map<String, Map<Integer, Long>> _highWaterMarkTablePartitionMap = new ConcurrentHashMap<>();
   private final GrigioMetrics _metrics;
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(UpsertWatermarkManager.class);
-  private static volatile UpsertWatermarkManager _instance;
+  private static final Logger LOGGER = LoggerFactory.getLogger(UpsertWaterMarkManager.class);
+  private static volatile UpsertWaterMarkManager _instance;
 
-  private UpsertWatermarkManager(GrigioMetrics metrics) {
+  private UpsertWaterMarkManager(GrigioMetrics metrics) {
     _metrics = metrics;
   }
 
   public static void init(GrigioMetrics metrics) {
-    synchronized (UpsertWatermarkManager.class) {
+    synchronized (UpsertWaterMarkManager.class) {
       Preconditions.checkState(_instance == null, "upsert water mark manager is already init");
-      _instance = new UpsertWatermarkManager(metrics);
+      _instance = new UpsertWaterMarkManager(metrics);
     }
   }
 
-  public static UpsertWatermarkManager getInstance() {
+  public static UpsertWaterMarkManager getInstance() {
     Preconditions.checkState(_instance != null, "upsert water mark manager is not yet init");
     return _instance;
   }
diff --git a/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/broker/upsert/PollingBasedLowWaterMarkServiceTest.java b/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/broker/upsert/PollingBasedLowWaterMarkServiceTest.java
new file mode 100644
index 0000000..611f257
--- /dev/null
+++ b/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/broker/upsert/PollingBasedLowWaterMarkServiceTest.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.upsert;
+
+import com.github.tomakehurst.wiremock.WireMockServer;
+import com.github.tomakehurst.wiremock.client.WireMock;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
+import org.apache.helix.model.MasterSlaveSMD;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.pinot.broker.broker.helix.HelixBrokerStarter;
+import org.apache.pinot.common.config.TagNameUtils;
+import org.apache.pinot.common.restlet.resources.ResourceUtils;
+import org.apache.pinot.common.restlet.resources.TableLowWaterMarksInfo;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.ZkStarter;
+import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.commons.configuration.Configuration;
+import org.apache.helix.HelixAdmin;
+import org.testng.Assert;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.get;
+import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
+import static org.apache.pinot.common.utils.CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE;
+
+public class PollingBasedLowWaterMarkServiceTest extends ControllerTest {
+  private PinotHelixResourceManager _pinotResourceManager;
+  private static final String HELIX_CLUSTER_NAME = "TestLowWaterMarksPolling";
+  private final Configuration _pinotHelixBrokerProperties = new PropertiesConfiguration();
+
+  private HelixAdmin _helixAdmin;
+  private HelixBrokerStarter _helixBrokerStarter;
+
+//  @Test
+  public void testBrokerCallServersCorrectly()
+      throws Exception {
+    ZkStarter.startLocalZkServer();
+    final String instanceId = "localhost_helixController";
+    _pinotResourceManager =
+        new PinotHelixResourceManager(ZkStarter.DEFAULT_ZK_STR, HELIX_CLUSTER_NAME, null, 10000L,
+            true, /*isUpdateStateModel=*/false, true);
+    HelixManager helixManager = registerAndConnectAsHelixParticipant(HELIX_CLUSTER_NAME, instanceId, ZkStarter.DEFAULT_ZK_STR);
+    _pinotResourceManager.start(helixManager);
+    _helixAdmin = _pinotResourceManager.getHelixAdmin();
+
+    // Set up a cluster with one controller and 2 servers.
+    addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
+    addFakeServerInstancesToAutoJoinHelixCluster(2, true);
+
+
+    _pinotHelixBrokerProperties.addProperty(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, 8943);
+    _pinotHelixBrokerProperties
+        .addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL, 100L);
+
+
+    // Set the two servers' lwms info.
+    Map<Integer, Long> table1Map = new HashMap<>();
+    table1Map.put(0, 10L);
+    table1Map.put(1, 20L);
+    Map<Integer, Long> table2Map = new HashMap<>();
+    table2Map.put(0, 11L);
+    Map<String, Map<Integer, Long>> server1LwmsMap = new ConcurrentHashMap<>();
+    server1LwmsMap.put("Table1", table1Map);
+    server1LwmsMap.put("Table2", table2Map);
+
+    Map<Integer, Long> newTable1Map = new HashMap<>();
+    newTable1Map.put(0, 15L);
+    newTable1Map.put(1, 18L);
+    Map<Integer, Long> table3Map = new HashMap<>();
+    table3Map.put(0, 17L);
+    Map<String, Map<Integer, Long>> server2LwmsMap = new HashMap<>();
+    server2LwmsMap.put("Table1", newTable1Map);
+    server2LwmsMap.put("Table3", table3Map);
+
+
+    WireMockServer mockServer1 = new WireMockServer(1);
+    mockServer1.start();
+    mockServer1.stubFor(WireMock.get(WireMock.urlEqualTo("/lwms")).willReturn(WireMock.aResponse()
+        .withBody(ResourceUtils.convertToJsonString(new TableLowWaterMarksInfo(server1LwmsMap)))
+        .withHeader("Content-Type", "application/json")
+        .withStatus(200)));
+    WireMockServer mockServer2 = new WireMockServer(2);
+    mockServer2.start();
+    mockServer2.stubFor(WireMock.get(WireMock.urlEqualTo("/lwms")).willReturn(WireMock.aResponse()
+        .withBody(ResourceUtils.convertToJsonString(new TableLowWaterMarksInfo(server2LwmsMap)))
+        .withHeader("Content-Type", "application/json")
+        .withStatus(200)));
+
+    _helixBrokerStarter =
+        new HelixBrokerStarter(_pinotHelixBrokerProperties, HELIX_CLUSTER_NAME, ZkStarter.DEFAULT_ZK_STR);
+
+
+    while (_helixAdmin.getInstancesInClusterWithTag(HELIX_CLUSTER_NAME, "DefaultTenant_OFFLINE").size() == 0
+        || _helixAdmin.getInstancesInClusterWithTag(HELIX_CLUSTER_NAME, "DefaultTenant_BROKER").size() == 0) {
+      Thread.sleep(100);
+    }
+
+    Thread.sleep(1000);
+
+    // Verify the low water mark service behaviors.
+    mockServer1.verify(1, WireMock.getRequestedFor(WireMock.urlEqualTo("/lwms")));
+    mockServer2.verify(1, WireMock.getRequestedFor(WireMock.urlEqualTo("/lwms")));
+
+    Assert.assertNotNull(_helixBrokerStarter.getLwmService().getLowWaterMarks("Table1"));
+    Assert.assertNotNull(_helixBrokerStarter.getLwmService().getLowWaterMarks("Table2"));
+    Assert.assertNotNull(_helixBrokerStarter.getLwmService().getLowWaterMarks("Table3"));
+
+    // Table 1 verification.
+    Assert.assertEquals(_helixBrokerStarter.getLwmService().getLowWaterMarks("Table1").size(), 2);
+    Assert.assertTrue(_helixBrokerStarter.getLwmService().getLowWaterMarks("Table1").get(Integer.parseInt("0")) == 10L);
+    Assert.assertTrue(_helixBrokerStarter.getLwmService().getLowWaterMarks("Table1").get(Integer.parseInt("1")) == 18L);
+    // Table 1 verification.
+    Assert.assertEquals(_helixBrokerStarter.getLwmService().getLowWaterMarks("Table2").size(), 1);
+    Assert.assertTrue(_helixBrokerStarter.getLwmService().getLowWaterMarks("Table2").get(Integer.parseInt("0")) == 11L);
+    // Table 1 verification.
+    Assert.assertEquals(_helixBrokerStarter.getLwmService().getLowWaterMarks("Table3").size(), 1);
+    Assert.assertTrue(_helixBrokerStarter.getLwmService().getLowWaterMarks("Table3").get(Integer.parseInt("0")) == 17L);
+  }
+
+//  @Test
+  public void testLowWaterMarksMerge() {
+    Map<Integer, Long> table1Map = new HashMap<>();
+    table1Map.put(0, 10L);
+    table1Map.put(1, 20L);
+    Map<Integer, Long> table2Map = new HashMap<>();
+    table2Map.put(0, 11L);
+    Map<String, Map<Integer, Long>> currentLwmsMap = new ConcurrentHashMap<>();
+    currentLwmsMap.put("Table1", table1Map);
+    currentLwmsMap.put("Table2", table2Map);
+
+    Map<Integer, Long> newTable1Map = new HashMap<>();
+    newTable1Map.put(0, 15L);
+    newTable1Map.put(1, 18L);
+    Map<Integer, Long> table3Map = new HashMap<>();
+    table3Map.put(0, 17L);
+    Map<String, Map<Integer, Long>> serverLwms = new HashMap<>();
+    serverLwms.put("Table1", newTable1Map);
+    serverLwms.put("Table3", table3Map);
+
+    PollingBasedLowWaterMarkService.LwmMerger.updateLowWaterMarks(currentLwmsMap, serverLwms);
+
+    Assert.assertEquals(currentLwmsMap.size(), 3);
+
+    // Verify Table1 content.
+    Assert.assertTrue(currentLwmsMap.containsKey("Table1"));
+    Map<Integer, Long> lwmsMap1 = currentLwmsMap.get("Table1");
+    Assert.assertEquals(lwmsMap1.size(), 2);
+    // Verify that the lower LWM value is chosen in the combined results.
+    Assert.assertTrue(lwmsMap1.get(0) == 10L);
+    Assert.assertTrue(lwmsMap1.get(1) == 18L);
+
+    // Verify Table2 content.
+    Assert.assertTrue(currentLwmsMap.containsKey("Table2"));
+    Map<Integer, Long> lwmsMap2 = currentLwmsMap.get("Table2");
+    Assert.assertEquals(lwmsMap2.size(), 1);
+    // Verify that the lower LWM value is chosen in the combined results.
+    Assert.assertTrue(lwmsMap2.get(0) == 11L);
+
+    // Verify Table3 content.
+    Assert.assertTrue(currentLwmsMap.containsKey("Table3"));
+    Map<Integer, Long> lwmsMap3 = currentLwmsMap.get("Table3");
+    Assert.assertEquals(lwmsMap3.size(), 1);
+    // Verify that the lower LWM value is chosen in the combined results.
+    Assert.assertTrue(lwmsMap3.get(0) == 17L);
+  }
+
+  /**
+   * Register and connect to Helix cluster as PARTICIPANT role.
+   */
+  private HelixManager registerAndConnectAsHelixParticipant(String helixClusterName, String instanceId, String helixZkURL) {
+    HelixManager helixManager =
+            HelixManagerFactory.getZKHelixManager(helixClusterName, instanceId, InstanceType.PARTICIPANT, helixZkURL);
+
+    // Registers Master-Slave state model to state machine engine, which is for calculating participant assignment in lead controller resource.
+    helixManager.getStateMachineEngine()
+            .registerStateModelFactory(MasterSlaveSMD.name, new MasterSlaveStateModelFactory());
+
+    try {
+      helixManager.connect();
+      return helixManager;
+    } catch (Exception e) {
+      String errorMsg =
+              String.format("Exception when connecting the instance %s as Participant to Helix.", instanceId);
+      throw new RuntimeException(errorMsg);
+    }
+  }
+
+}
diff --git a/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/core/data/manager/upsert/UpsertImmutableIndexSegmentCallbackTest.java b/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/core/data/manager/upsert/UpsertImmutableIndexSegmentCallbackTest.java
index 979e5c1..e2e270d 100644
--- a/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/core/data/manager/upsert/UpsertImmutableIndexSegmentCallbackTest.java
+++ b/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/core/data/manager/upsert/UpsertImmutableIndexSegmentCallbackTest.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pinot.core.data.manager.upsert;
 
-import org.apache.pinot.core.segment.updater.UpsertWatermarkManager;
+import org.apache.pinot.core.segment.updater.UpsertWaterMarkManager;
 import org.apache.pinot.core.segment.virtualcolumn.mutable.VirtualColumnLongValueReaderWriter;
 import org.apache.pinot.grigio.common.messages.LogEventType;
 import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntry;
@@ -40,13 +40,13 @@ import static org.mockito.Mockito.when;
 
 public class UpsertImmutableIndexSegmentCallbackTest {
   UpdateLogStorageProvider _mockProvider;
-  UpsertWatermarkManager _mockUpsertWatermarkManager;
+  UpsertWaterMarkManager _mockUpsertWatermarkManager;
   List<VirtualColumnLongValueReaderWriter> _readerWriters = new ArrayList<>();
 
   @BeforeMethod
   public void init() {
     _mockProvider = mock(UpdateLogStorageProvider.class);
-    _mockUpsertWatermarkManager = mock(UpsertWatermarkManager.class);
+    _mockUpsertWatermarkManager = mock(UpsertWaterMarkManager.class);
   }
 
   @Test
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
index 6a626cc..8f30ca8 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
@@ -332,6 +332,8 @@ public class ClusterIntegrationTestUtils {
     properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
     properties.put("request.required.acks", "1");
     properties.put("partitioner.class", "kafka.producer.ByteArrayPartitioner");
+    properties.put("max.request.size", "300000000");
+    properties.put("buffer.memory", "300000000");
 
     StreamDataProducer producer =
         StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties);
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/LowWatermarksResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/LowWatermarksResource.java
new file mode 100644
index 0000000..b346066
--- /dev/null
+++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/LowWatermarksResource.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api.resources;
+
+import org.apache.pinot.common.restlet.resources.ResourceUtils;
+import org.apache.pinot.common.restlet.resources.TableLowWaterMarksInfo;
+import org.apache.pinot.core.segment.updater.WaterMarkManager;
+import org.apache.pinot.server.starter.ServerInstance;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+@Api(tags = "LowWaterMarks")
+@Path("/")
+public class LowWatermarksResource {
+
+  @Inject
+  ServerInstance serverInstance;
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/lwms")
+  @ApiOperation(value = "Show the lwms of tables ", notes = "Returns the lwms of all upsert enable tables in this server")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"),
+      @ApiResponse(code = 500, message = "Internal server error"),
+  })
+  public String getLowWaterMarks() {
+    WaterMarkManager watermarkManager = serverInstance.getWatermarkManager();
+
+    if (watermarkManager == null) {
+      throw new WebApplicationException("Invalid server initialization", Response.Status.INTERNAL_SERVER_ERROR);
+    }
+    return ResourceUtils.convertToJsonString(
+        new TableLowWaterMarksInfo(watermarkManager.getHighWaterMarkTablePartitionMap()));
+  }
+}
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/UpsertDebugResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/UpsertDebugResource.java
new file mode 100644
index 0000000..13a7612
--- /dev/null
+++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/UpsertDebugResource.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api.resources;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.SegmentDataManager;
+import org.apache.pinot.core.data.manager.TableDataManager;
+import org.apache.pinot.core.data.manager.UpsertSegmentDataManager;
+import org.apache.pinot.server.starter.ServerInstance;
+
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+
+@Api(tags = "UpsertDebug")
+@Path("/")
+public class UpsertDebugResource {
+
+  @Inject
+  ServerInstance serverInstance;
+
+  @GET
+  @Produces(MediaType.TEXT_PLAIN)
+  @Path("/upsert/{tableName}/{segmentName}/{offset}")
+  @ApiOperation(value = "$validFrom and $validUntil value", notes = "")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success", response = String.class),
+      @ApiResponse(code = 500, message = "Internal server error"),
+  })
+  public String getUpsertDataAtOffset(
+      @ApiParam(value = "Table name including type", required = true, example = "myTable_REALTIME") @PathParam("tableName") String tableName,
+      @ApiParam(value = "segment name", required = true, example = "eats_supply_update__0__0__20190923T0700Z") @PathParam("segmentName") String segmentName,
+      @ApiParam(value = "offset", required = true, example = "100") @PathParam("offset") String offsetStr
+  ) {
+    if (!serverInstance.isUpsertEnabled()) {
+      return "not an upsert server";
+    }
+    InstanceDataManager instanceDataManager = serverInstance.getInstanceDataManager();
+    TableDataManager tableDataManager = instanceDataManager.getTableDataManager(tableName);
+    if (tableDataManager == null) {
+      return "no table for " + tableName;
+    }
+    SegmentDataManager segmentDataManager = null;
+    try {
+      segmentDataManager = tableDataManager.acquireSegment(segmentName);
+      if (segmentDataManager == null) {
+        return "cannot find associate segment for segment " + segmentName;
+      }
+      if (!(segmentDataManager instanceof UpsertSegmentDataManager)) {
+        return "it is not an upsert table";
+      } else {
+        return ((UpsertSegmentDataManager) segmentDataManager).getVirtualColumnInfo(Long.parseLong(offsetStr));
+      }
+    } finally {
+      if (segmentDataManager != null) {
+        tableDataManager.releaseSegment(segmentDataManager);
+      }
+    }
+  }
+}
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java b/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java
index 5610a4a..d2adcb3 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java
@@ -20,8 +20,8 @@ package org.apache.pinot.server.conf;
 
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationException;
-import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.server.upsert.DefaultUpsertComponentContainer;
 
 
 /**
@@ -45,6 +45,11 @@ public class ServerConf {
 
   private static final String PINOT_QUERY_SCHEDULER_PREFIX = "pinot.query.scheduler";
 
+  public static final String UPSERT_CONFIG_PARENT = "pinot.server.upsert";
+  public static final String UPSERT_COMPONENT_CONFIG_KEY = "pinot.server.upsertComponent.class";
+  public static final String UPSERT_COMPONENT_CONFIG_DEFAULT = DefaultUpsertComponentContainer.class.getName();
+
+
   private Configuration _serverConf;
 
   public ServerConf(Configuration serverConfig) {
@@ -92,6 +97,14 @@ public class ServerConf {
     return _serverConf.subset(PINOT_QUERY_SCHEDULER_PREFIX);
   }
 
+  public String getUpsertComponentContainerClassName() {
+    return _serverConf.getString(UPSERT_COMPONENT_CONFIG_KEY, UPSERT_COMPONENT_CONFIG_DEFAULT);
+  }
+
+  public Configuration getUpsertConfig() {
+    return _serverConf.subset(UPSERT_CONFIG_PARENT);
+  }
+
   /**
    * Returns an array of transform function names as defined in the config
    * @return String array of transform functions
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
index 11e2326..845a90b 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
@@ -26,14 +26,18 @@ import java.util.concurrent.atomic.LongAccumulator;
 import org.apache.helix.HelixManager;
 import org.apache.pinot.common.metrics.MetricsHelper;
 import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.core.segment.updater.SegmentDeletionHandler;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.operator.transform.function.TransformFunction;
 import org.apache.pinot.core.operator.transform.function.TransformFunctionFactory;
 import org.apache.pinot.core.query.executor.QueryExecutor;
 import org.apache.pinot.core.query.scheduler.QueryScheduler;
 import org.apache.pinot.core.query.scheduler.QuerySchedulerFactory;
+import org.apache.pinot.core.segment.updater.WaterMarkManager;
 import org.apache.pinot.core.transport.QueryServer;
 import org.apache.pinot.server.conf.ServerConf;
+import org.apache.pinot.core.segment.updater.UpsertComponentContainer;
+import org.apache.pinot.server.upsert.UpsertComponentContainerProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,6 +49,7 @@ import org.slf4j.LoggerFactory;
 public class ServerInstance {
   private static final Logger LOGGER = LoggerFactory.getLogger(ServerInstance.class);
 
+  private final ServerConf _serverConf;
   private final ServerMetrics _serverMetrics;
   private final InstanceDataManager _instanceDataManager;
   private final QueryExecutor _queryExecutor;
@@ -52,12 +57,16 @@ public class ServerInstance {
   private final QueryScheduler _queryScheduler;
   private final QueryServer _queryServer;
 
+  // upsert related component, only initialize if necessary
+  private UpsertComponentContainer _upsertComponentContainer;
+
   private boolean _started = false;
 
-  public ServerInstance(ServerConf serverConf, HelixManager helixManager)
+  public ServerInstance(ServerConf serverConf, HelixManager helixManager, String clusterName, String instanceName)
       throws Exception {
     LOGGER.info("Initializing server instance");
 
+    _serverConf = serverConf;
     LOGGER.info("Initializing server metrics");
     MetricsHelper.initializeMetrics(serverConf.getMetricsConfig());
     MetricsRegistry metricsRegistry = new MetricsRegistry();
@@ -97,12 +106,19 @@ public class ServerInstance {
     }
     TransformFunctionFactory.init(transformFunctionClasses);
 
+
+    final UpsertComponentContainerProvider upsertComponentContainerProvider = new UpsertComponentContainerProvider(serverConf);
+    _upsertComponentContainer = upsertComponentContainerProvider.getInstance();
+    _upsertComponentContainer.registerMetrics(_serverConf.getMetricsPrefix(), metricsRegistry);
+    _upsertComponentContainer.init(_serverConf.getUpsertConfig(), helixManager, clusterName, instanceName);
+
     LOGGER.info("Finish initializing server instance");
   }
 
   public synchronized void start() {
     // This method is called when Helix starts a new ZK session, and can be called multiple times. We only need to start
     // the server instance once, and simply ignore the following invocations.
+    LOGGER.info("Starting server instance");
     if (_started) {
       LOGGER.info("Server instance is already running, skipping the start");
       return;
@@ -127,12 +143,16 @@ public class ServerInstance {
     Preconditions.checkState(_started, "Server instance is not running");
     LOGGER.info("Shutting down server instance");
 
+    _upsertComponentContainer.stopBackgroundThread();
     LOGGER.info("Shutting down query server");
     _queryServer.shutDown();
     LOGGER.info("Shutting down query scheduler");
     _queryScheduler.stop();
     LOGGER.info("Shutting down query executor");
     _queryExecutor.shutDown();
+    LOGGER.info("Shutting down upsert components if necessary");
+    _upsertComponentContainer.shutdown();
+
     LOGGER.info("Shutting down instance data manager");
     _instanceDataManager.shutDown();
 
@@ -148,6 +168,23 @@ public class ServerInstance {
     return _instanceDataManager;
   }
 
+  public SegmentDeletionHandler getSegmentDeletionHandler() {
+    return _upsertComponentContainer.getSegmentDeletionHandler();
+  }
+
+  public WaterMarkManager getWatermarkManager() {
+    return _upsertComponentContainer.getWatermarkManager();
+  }
+
+  public void maybeStartUpsertBackgroundThread() {
+    LOGGER.info("starting upsert component background thread");
+    _upsertComponentContainer.startBackgroundThread();
+  }
+
+  public boolean isUpsertEnabled() {
+    return _upsertComponentContainer.isUpsertEnabled();
+  }
+
   public long getLatestQueryTime() {
     return _latestQueryTime.get();
   }
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 920b422..710e22a 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -307,13 +307,6 @@ public class HelixInstanceDataManager implements InstanceDataManager {
     }
   }
 
-  /*
-  @Override
-  public Map<String, Map<Integer, Long>> getLowWaterMarks() {
-    return UpsertWaterMarkManager.getInstance().getHighWaterMarkTablePartitionMap();
-  }
-   */
-
   @Override
   public String getSegmentDataDirectory() {
     return _instanceDataManagerConfig.getInstanceDataDir();
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
index c1ea182..402bb6b 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
@@ -21,14 +21,6 @@ package org.apache.pinot.server.starter.helix;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
 import org.apache.commons.configuration.BaseConfiguration;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationUtils;
@@ -64,14 +56,57 @@ import org.apache.pinot.server.conf.ServerConf;
 import org.apache.pinot.server.realtime.ControllerLeaderLocator;
 import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
 import org.apache.pinot.server.starter.ServerInstance;
-import org.apache.pinot.server.upsert.UpsertComponentContainer;
-import org.apache.pinot.server.upsert.UpsertComponentContainerProvider;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.pinot.common.utils.CommonConstants.Helix.*;
-import static org.apache.pinot.common.utils.CommonConstants.Server.*;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.pinot.common.utils.CommonConstants.Helix.CONFIG_OF_SERVER_FLAPPING_TIME_WINDOW_MS;
+import static org.apache.pinot.common.utils.CommonConstants.Helix.DEFAULT_FLAPPING_TIME_WINDOW_MS;
+import static org.apache.pinot.common.utils.CommonConstants.Helix.DEFAULT_SERVER_NETTY_PORT;
+import static org.apache.pinot.common.utils.CommonConstants.Helix.INSTANCE_CONNECTED_METRIC_NAME;
+import static org.apache.pinot.common.utils.CommonConstants.Helix.IS_SHUTDOWN_IN_PROGRESS;
+import static org.apache.pinot.common.utils.CommonConstants.Helix.Instance;
+import static org.apache.pinot.common.utils.CommonConstants.Helix.KEY_OF_SERVER_NETTY_HOST;
+import static org.apache.pinot.common.utils.CommonConstants.Helix.KEY_OF_SERVER_NETTY_PORT;
+import static org.apache.pinot.common.utils.CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE;
+import static org.apache.pinot.common.utils.CommonConstants.Helix.StateModel;
+import static org.apache.pinot.common.utils.CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE;
+import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_ADMIN_API_PORT;
+import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_INSTANCE_DATA_DIR;
+import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_INSTANCE_ID;
+import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR;
+import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_QUERY_EXECUTOR_TIMEOUT;
+import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_SERVER_MIN_RESOURCE_PERCENT_FOR_START;
+import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_SHUTDOWN_ENABLE_QUERY_CHECK;
+import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_SHUTDOWN_ENABLE_RESOURCE_CHECK;
+import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_SHUTDOWN_NO_QUERY_THRESHOLD_MS;
+import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_SHUTDOWN_RESOURCE_CHECK_INTERVAL_MS;
+import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_SHUTDOWN_TIMEOUT_MS;
+import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_STARTUP_ENABLE_SERVICE_STATUS_CHECK;
+import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_STARTUP_REALTIME_CONSUMPTION_CATCHUP_WAIT_MS;
+import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_STARTUP_SERVICE_STATUS_CHECK_INTERVAL_MS;
+import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_STARTUP_TIMEOUT_MS;
+import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_ADMIN_API_PORT;
+import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS;
+import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_SERVER_MIN_RESOURCE_PERCENT_FOR_START;
+import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_SHUTDOWN_ENABLE_QUERY_CHECK;
+import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_SHUTDOWN_ENABLE_RESOURCE_CHECK;
+import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_SHUTDOWN_RESOURCE_CHECK_INTERVAL_MS;
+import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_SHUTDOWN_TIMEOUT_MS;
+import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_STARTUP_ENABLE_SERVICE_STATUS_CHECK;
+import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_STARTUP_REALTIME_CONSUMPTION_CATCHUP_WAIT_MS;
+import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_STARTUP_SERVICE_STATUS_CHECK_INTERVAL_MS;
+import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_STARTUP_TIMEOUT_MS;
+import static org.apache.pinot.common.utils.CommonConstants.Server.SegmentCompletionProtocol;
 
 
 /**
@@ -138,11 +173,11 @@ public class HelixServerStarter {
     ServerSegmentCompletionProtocolHandler
         .init(_serverConf.subset(SegmentCompletionProtocol.PREFIX_OF_CONFIG_OF_SEGMENT_UPLOADER));
     ServerConf serverInstanceConfig = DefaultHelixStarterServerConfig.getDefaultHelixServerConfig(_serverConf);
-    _serverInstance = new ServerInstance(serverInstanceConfig, _helixManager);
+    _serverInstance = new ServerInstance(serverInstanceConfig, _helixManager, _helixClusterName, _instanceId);
     InstanceDataManager instanceDataManager = _serverInstance.getInstanceDataManager();
     SegmentFetcherAndLoader fetcherAndLoader = new SegmentFetcherAndLoader(_serverConf, instanceDataManager);
-    StateModelFactory<?> stateModelFactory =
-        new SegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager, fetcherAndLoader);
+    StateModelFactory<?> stateModelFactory = new SegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager,
+            fetcherAndLoader, _serverInstance.getSegmentDeletionHandler());
     _helixManager.getStateMachineEngine()
         .registerStateModelFactory(SegmentOnlineOfflineStateModelFactory.getStateModelName(), stateModelFactory);
     // Start the server instance as a pre-connect callback so that it starts after connecting to the ZK in order to
@@ -181,6 +216,8 @@ public class HelixServerStarter {
       long endTimeMs = startTimeMs + _serverConf.getLong(CONFIG_OF_STARTUP_TIMEOUT_MS, DEFAULT_STARTUP_TIMEOUT_MS);
       startupServiceStatusCheck(endTimeMs);
     }
+    _serverInstance.maybeStartUpsertBackgroundThread();
+
     setShuttingDownStatus(false);
     LOGGER.info("Pinot server ready");
 
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
index 145e378..1eab7f5 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
@@ -19,8 +19,6 @@
 package org.apache.pinot.server.starter.helix;
 
 import com.google.common.base.Preconditions;
-import java.io.File;
-import java.util.concurrent.locks.Lock;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.model.Message;
@@ -39,10 +37,13 @@ import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.SegmentDataManager;
 import org.apache.pinot.core.data.manager.TableDataManager;
 import org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager;
-import org.apache.pinot.server.upsert.SegmentDeletionHandler;
+import org.apache.pinot.core.segment.updater.SegmentDeletionHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.util.concurrent.locks.Lock;
+
 
 /**
  * Data Server layer state model to take over how to operate on:
@@ -56,17 +57,18 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta
   private final SegmentFetcherAndLoader _fetcherAndLoader;
   private final SegmentDeletionHandler _segmentDeletionHandler;
 
+
   public SegmentOnlineOfflineStateModelFactory(String instanceId, InstanceDataManager instanceDataManager,
       SegmentFetcherAndLoader fetcherAndLoader) {
-    this(instanceId, instanceDataManager, fetcherAndLoader, new SegmentDeletionHandler());
+      this(instanceId, instanceDataManager, fetcherAndLoader, new SegmentDeletionHandler());
   }
 
   public SegmentOnlineOfflineStateModelFactory(String instanceId, InstanceDataManager instanceDataManager,
-      SegmentFetcherAndLoader fetcherAndLoader, SegmentDeletionHandler segmentDeletionHandler) {
+      SegmentFetcherAndLoader fetcherAndLoader, SegmentDeletionHandler deletionHandler) {
     _instanceId = instanceId;
     _instanceDataManager = instanceDataManager;
     _fetcherAndLoader = fetcherAndLoader;
-    _segmentDeletionHandler = segmentDeletionHandler;
+    _segmentDeletionHandler = deletionHandler;
   }
 
   public static String getStateModelName() {
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/upsert/DefaultUpsertComponentContainer.java b/pinot-server/src/main/java/org/apache/pinot/server/upsert/DefaultUpsertComponentContainer.java
index 63424ac..3841a2e 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/upsert/DefaultUpsertComponentContainer.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/upsert/DefaultUpsertComponentContainer.java
@@ -18,20 +18,25 @@
  */
 package org.apache.pinot.server.upsert;
 
-import com.codahale.metrics.MetricRegistry;
+import com.yammer.metrics.core.MetricsRegistry;
 import org.apache.commons.configuration.Configuration;
+import org.apache.helix.HelixManager;
+import org.apache.pinot.core.segment.updater.DefaultWaterMarkManager;
+import org.apache.pinot.core.segment.updater.SegmentDeletionHandler;
+import org.apache.pinot.core.segment.updater.UpsertComponentContainer;
+import org.apache.pinot.core.segment.updater.WaterMarkManager;
 
 public class DefaultUpsertComponentContainer implements UpsertComponentContainer {
 
   private final SegmentDeletionHandler deletionHandler = new SegmentDeletionHandler();
+  private final WaterMarkManager watermarkManager = new DefaultWaterMarkManager();
 
   @Override
-  public void registerMetrics(MetricRegistry registry) {
-
+  public void registerMetrics(String prefix, MetricsRegistry registry) {
   }
 
   @Override
-  public void init(Configuration config) {
+  public void init(Configuration config, HelixManager helixManager, String clusterName, String instanceName) {
   }
 
   @Override
@@ -40,10 +45,29 @@ public class DefaultUpsertComponentContainer implements UpsertComponentContainer
   }
 
   @Override
-  public void start() {
+  public WaterMarkManager getWatermarkManager() {
+    return watermarkManager;
+  }
+
+  @Override
+  public boolean isUpsertEnabled() {
+    return false;
   }
 
   @Override
-  public void stop() {
+  public void startBackgroundThread() {
+
   }
+
+  @Override
+  public void stopBackgroundThread() {
+
+  }
+
+  @Override
+  public void shutdown() {
+
+  }
+
+
 }
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainerProvider.java b/pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainerProvider.java
index fe4af07..7ae27a6 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainerProvider.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainerProvider.java
@@ -19,10 +19,10 @@
 package org.apache.pinot.server.upsert;
 
 import com.google.common.base.Preconditions;
-import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.apache.pinot.core.segment.updater.WatermarkManager;
-import org.apache.pinot.grigio.common.metrics.GrigioMetrics;
+import org.apache.pinot.core.segment.updater.UpsertComponentContainer;
+import org.apache.pinot.core.segment.updater.WaterMarkManager;
+import org.apache.pinot.server.conf.ServerConf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -30,22 +30,16 @@ public class UpsertComponentContainerProvider {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(UpsertComponentContainerProvider.class);
 
-  public static final String UPSERT_COMPONENT_CONFIG_KEY = "watermarkManager.class";
-  public static final String UPSERT_COMPONENT_CONFIG_DEFAULT = DefaultUpsertComponentContainer.class.getName();
-
-  private final Configuration _conf;
   private UpsertComponentContainer _instance;
 
-  public UpsertComponentContainerProvider(Configuration conf, GrigioMetrics metrics) {
-    _conf = conf;
-    String className = _conf.getString(UPSERT_COMPONENT_CONFIG_KEY, UPSERT_COMPONENT_CONFIG_DEFAULT);
+  public UpsertComponentContainerProvider(ServerConf serverConf) {
+    String className = serverConf.getUpsertComponentContainerClassName();
     LOGGER.info("creating watermark manager with class {}", className);
     try {
       Class<UpsertComponentContainer> comonentContainerClass = (Class<UpsertComponentContainer>) Class.forName(className);
-      Preconditions.checkState(comonentContainerClass.isAssignableFrom(WatermarkManager.class),
+      Preconditions.checkState(comonentContainerClass.isAssignableFrom(WaterMarkManager.class),
           "configured class not assignable from Callback class");
       _instance = comonentContainerClass.newInstance();
-      _instance.init(_conf);
     } catch (Exception e) {
       LOGGER.error("failed to load watermark manager class", className, e);
       ExceptionUtils.rethrow(e);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WatermarkManager.java b/pinot-server/src/test/java/org/apache/pinot/server/api/resources/LowWatermarksResourceTest.java
similarity index 60%
rename from pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WatermarkManager.java
rename to pinot-server/src/test/java/org/apache/pinot/server/api/resources/LowWatermarksResourceTest.java
index 01579e7..b007c90 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WatermarkManager.java
+++ b/pinot-server/src/test/java/org/apache/pinot/server/api/resources/LowWatermarksResourceTest.java
@@ -1,3 +1,4 @@
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -16,18 +17,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.segment.updater;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.pinot.grigio.common.metrics.GrigioMeter;
-import org.apache.pinot.grigio.common.metrics.GrigioMetrics;
-
-import java.util.Map;
-
-public interface WatermarkManager {
+package org.apache.pinot.server.api.resources;
 
-  void init(Configuration config, GrigioMetrics metrics);
+import org.apache.pinot.common.restlet.resources.TableLowWaterMarksInfo;
+import org.apache.pinot.server.api.BaseResourceTest;
+import org.testng.Assert;
+import org.testng.annotations.Test;
 
-  Map<String, Map<Integer, Long>> getHighWaterMarkTablePartitionMap();
 
+public class LowWatermarksResourceTest extends BaseResourceTest {
+    @Test
+    public void testHappyPath() {
+        TableLowWaterMarksInfo lwms = _webTarget.path("lwms").request().get(TableLowWaterMarksInfo.class);
+        Assert.assertEquals(lwms.getTableLowWaterMarks().size(), 0);
+    }
 }
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java b/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
index ac989bb..65cb9e7 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
@@ -18,11 +18,6 @@
  */
 package org.apache.pinot.tools.realtime.provisioning;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.stream.Collectors;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.config.TableConfig;
@@ -40,6 +35,12 @@ import org.apache.pinot.core.realtime.impl.RealtimeSegmentConfig;
 import org.apache.pinot.core.realtime.impl.RealtimeSegmentStatsHistory;
 import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
 
 /**
  * Given a sample segment, this class can estimate how much memory would be used per host, for various combinations of numHostsToProvision and numHoursToConsume
@@ -126,7 +127,8 @@ public class MemoryEstimator {
 
     // create a config
     RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
-        new RealtimeSegmentConfig.Builder().setSegmentName(_segmentMetadata.getName())
+        new RealtimeSegmentConfig.Builder().setTableName(_tableConfig.getTableName())
+            .setSegmentName(_segmentMetadata.getName())
             .setStreamName(_tableNameWithType).setSchema(_segmentMetadata.getSchema())
             .setCapacity(_segmentMetadata.getTotalDocs()).setAvgNumMultiValues(_avgMultiValues)
             .setNoDictionaryColumns(_noDictionaryColumns)
@@ -228,7 +230,8 @@ public class MemoryEstimator {
       RealtimeSegmentZKMetadata segmentZKMetadata = getRealtimeSegmentZKMetadata(_segmentMetadata, totalDocs);
 
       RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
-          new RealtimeSegmentConfig.Builder().setSegmentName(_segmentMetadata.getName())
+          new RealtimeSegmentConfig.Builder().setTableName(_tableConfig.getTableName())
+              .setSegmentName(_segmentMetadata.getName())
               .setStreamName(_tableNameWithType).setSchema(_segmentMetadata.getSchema())
               .setCapacity(totalDocs).setAvgNumMultiValues(_avgMultiValues)
               .setNoDictionaryColumns(_noDictionaryColumns)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org