You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/03/23 19:27:36 UTC

[incubator-pinot] 06/09: more refactor

This is an automated email from the ASF dual-hosted git repository.

jamesshao pushed a commit to branch upsert-refactor
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 18a2da4457a75ee34cbb3bb854371706e55f1b51
Author: james Shao <sj...@uber.com>
AuthorDate: Thu Mar 19 10:45:11 2020 -0700

    more refactor
---
 .../requesthandler/BaseBrokerRequestHandler.java   |  14 +--
 .../requesthandler/LowWaterMarkQueryWriter.java    | 109 ---------------------
 .../broker/upsert/DefaultLowWaterMarkService.java  |   8 ++
 ...ervice.java => DefaultUpsertQueryRewriter.java} |  33 ++-----
 .../apache/pinot/common/utils/CommonConstants.java |   2 +-
 .../core/segment/updater/LowWaterMarkService.java  |   2 +
 ...erMarkService.java => UpsertQueryRewriter.java} |  27 ++---
 .../upsert/PollingBasedLowWaterMarkService.java    |   8 ++
 .../broker/upsert/UpsertQueryRewriterImpl.java     |  42 ++++++--
 .../broker/upsert/UpsertQueryRewriterImplTest.java |  21 +++-
 10 files changed, 83 insertions(+), 183 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 1d8f97b..cb17740 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -300,7 +300,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
 
     if (shouldEnableLowWaterMarkRewrite(request)) {
       // Augment the realtime request with LowWaterMark constraints.
-      addLowWaterMarkToQuery(realtimeBrokerRequest, rawTableName);
+      _lwmService.getQueryRewriter().rewriteQueryForUpsert(realtimeBrokerRequest, rawTableName);
     }
 
     // Calculate routing table for the query
@@ -786,18 +786,6 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
     }
   }
 
-  private void addLowWaterMarkToQuery(BrokerRequest realtimeBrokerRequest, String rawTableName) {
-    final String realtimeTableName = rawTableName + "_REALTIME";
-    Map<Integer, Long> lowWaterMarks = _lwmService.getLowWaterMarks(realtimeTableName);
-    if (lowWaterMarks == null || lowWaterMarks.size() == 0) {
-      LOGGER.info("No low water marks info found for table {}", realtimeTableName);
-      return;
-    }
-    LOGGER.info("Found low water marks {} for table {}", String.valueOf(lowWaterMarks), realtimeTableName);
-    LowWaterMarkQueryWriter.addLowWaterMarkToQuery(realtimeBrokerRequest, lowWaterMarks);
-    LOGGER.info("Query augmented with LWMS info for table {} : {}", realtimeTableName, realtimeBrokerRequest);
-  }
-
   /**
    * Processes the optimized broker requests for both OFFLINE and REALTIME table.
    */
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/LowWaterMarkQueryWriter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/LowWaterMarkQueryWriter.java
index c3285e5..b373243 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/LowWaterMarkQueryWriter.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/LowWaterMarkQueryWriter.java
@@ -39,113 +39,4 @@ public class LowWaterMarkQueryWriter {
   // Normal Pinot query node uses positive IDs. So lwm query node ids are all negative.
   private static final int QUERY_ID_BASE = -1000;
 
-  /**
-   * For upsert enabled tables, augment the realtime query with low water mark constraints in its filter query of the
-   * form
-   *   ($validFrom <= lwm and $validFrom > -1) AND (lwm < $validUtil OR $validUtil = -1)
-   *
-   * @param realtimeBrokerRequest
-   * @param lowWaterMarks
-   */
-  public static void addLowWaterMarkToQuery(BrokerRequest realtimeBrokerRequest, Map<Integer, Long> lowWaterMarks) {
-    if (lowWaterMarks == null || lowWaterMarks.size() == 0) {
-      LOGGER.warn("No low water mark info found for query: {}", realtimeBrokerRequest);
-      return;
-    }
-
-    // Choose the min lwm among all partitions.
-    long minLwm = Collections.min(lowWaterMarks.values());
-
-    // 1. Build the low water mark query of the form for a table assuming lwm is the min LWM and -1 is used as
-    // uninitialized marker.
-    // ($validFrom <= lwm and $validFrom > -1) AND (lwm < $validUtil OR $validUtil = -1)
-    // -1 is used instead of Long.MAXVALUE because Pinot does not handle long arithmetic correctly.
-    FilterQuery lwmQuery = addSinglePartitionLowWaterMark(QUERY_ID_BASE - 1, realtimeBrokerRequest, minLwm);
-
-    // 2. Attach low water mark filter to the current filters.
-    FilterQuery currentFilterQuery = realtimeBrokerRequest.getFilterQuery();
-    if (currentFilterQuery != null) {
-      // Make an AND query of lwmQuery and the existing query.
-      FilterQuery andFilterQuery = new FilterQuery();
-      // Make sure we do not reuse any query id in lwmQuerys.
-      andFilterQuery.setId(QUERY_ID_BASE);
-      andFilterQuery.setOperator(FilterOperator.AND);
-      List<Integer> nestedFilterQueryIds = new ArrayList<>(2);
-      nestedFilterQueryIds.add(currentFilterQuery.getId());
-      nestedFilterQueryIds.add(lwmQuery.getId());
-      andFilterQuery.setNestedFilterQueryIds(nestedFilterQueryIds);
-
-      realtimeBrokerRequest.setFilterQuery(andFilterQuery);
-      FilterQueryMap filterSubQueryMap = realtimeBrokerRequest.getFilterSubQueryMap();
-      filterSubQueryMap.putToFilterQueryMap(lwmQuery.getId(), lwmQuery);
-      filterSubQueryMap.putToFilterQueryMap(andFilterQuery.getId(), andFilterQuery);
-    } else {
-      realtimeBrokerRequest.getFilterSubQueryMap().putToFilterQueryMap(lwmQuery.getId(), lwmQuery);
-      realtimeBrokerRequest.setFilterQuery(lwmQuery);
-    }
-  }
-
-  /**
-   *
-   * @param queryIdBase The starting id that will be assigned to the first query created in ths method.
-   * @param realtimeBrokerRequest
-   * @param lwm low water mark.
-   * @return a filter query corresponding to the low water mark constraint of a single partition. The general form is:
-   *         ($ValidFrom <= lwm && $validFrom > -1)  AND (lwm < $validUtil OR $validUtil = -1)
-   */
-  private static FilterQuery addSinglePartitionLowWaterMark(int queryIdBase, BrokerRequest realtimeBrokerRequest,
-      Long lwm) {
-    // ValidFromQuery: ($ValidFrom <= lwm && $validFrom > -1)
-    FilterQuery validFromFilterQuery = new FilterQuery();
-    // Important: Always decrement queryIdBase value after use to avoid id conflict.
-    validFromFilterQuery.setId(queryIdBase--);
-    validFromFilterQuery.setOperator(FilterOperator.AND);
-    FilterQuery validFromP1 = getLeafFilterQuery(VALID_FROM, queryIdBase--, "(*\t\t" + lwm + "]", FilterOperator.RANGE, realtimeBrokerRequest);
-    FilterQuery validFromP2 = getLeafFilterQuery(VALID_FROM, queryIdBase--, "(-1\t\t*)", FilterOperator.RANGE, realtimeBrokerRequest);
-    List<Integer> nestedQueriesIdForValidFrom = new ArrayList<>();
-    nestedQueriesIdForValidFrom.add(validFromP1.getId());
-    nestedQueriesIdForValidFrom.add(validFromP2.getId());
-    validFromFilterQuery.setNestedFilterQueryIds(nestedQueriesIdForValidFrom);
-
-    // ValidUtilQuery: (lwm < $validUtil OR $validUtil = -1)
-    FilterQuery validUtilFilterQuery = new FilterQuery();
-    validUtilFilterQuery.setId(queryIdBase--);
-    validUtilFilterQuery.setOperator(FilterOperator.OR);
-
-    FilterQuery validUtilP1 = getLeafFilterQuery(VALID_UNTIL, queryIdBase--, "(" + lwm + "\t\t*)", FilterOperator.RANGE, realtimeBrokerRequest);
-    FilterQuery validUtilP2 = getLeafFilterQuery(VALID_UNTIL, queryIdBase--, "-1", FilterOperator.EQUALITY, realtimeBrokerRequest);
-    List<Integer> nestedQueriesIdForValidUtil = new ArrayList<>();
-    nestedQueriesIdForValidUtil.add(validUtilP1.getId());
-    nestedQueriesIdForValidUtil.add(validUtilP2.getId());
-    validUtilFilterQuery.setNestedFilterQueryIds(nestedQueriesIdForValidUtil);
-
-    // Top level query: ValidFromQuery AND ValidUtilQuery
-    FilterQuery lwmQuery = new FilterQuery();
-    lwmQuery.setId(queryIdBase--);
-    lwmQuery.setOperator(FilterOperator.AND);
-    List<Integer> nestQids = new ArrayList<>();
-    nestQids.add(validFromFilterQuery.getId());
-    nestQids.add(validUtilFilterQuery.getId());
-    lwmQuery.setNestedFilterQueryIds(nestQids);
-
-    // Add all the new created queries to the query map.
-    realtimeBrokerRequest.getFilterSubQueryMap().putToFilterQueryMap(lwmQuery.getId(), lwmQuery);
-    realtimeBrokerRequest.getFilterSubQueryMap().putToFilterQueryMap(validFromFilterQuery.getId(), validFromFilterQuery);
-    realtimeBrokerRequest.getFilterSubQueryMap().putToFilterQueryMap(validUtilFilterQuery.getId(), validUtilFilterQuery);
-    return lwmQuery;
-  }
-
-  private static FilterQuery getLeafFilterQuery(String column, int id, String value, FilterOperator operator,
-      BrokerRequest realtimeBrokerRequest) {
-    FilterQuery filterQuery = new FilterQuery();
-    filterQuery.setColumn(column);
-    filterQuery.setId(id);
-    filterQuery.setValue(Collections.singletonList(value));
-    filterQuery.setOperator(operator);
-    if (realtimeBrokerRequest.getFilterSubQueryMap() == null) {
-      realtimeBrokerRequest.setFilterSubQueryMap(new FilterQueryMap());
-    }
-    realtimeBrokerRequest.getFilterSubQueryMap().putToFilterQueryMap(id, filterQuery);
-    return filterQuery;
-  }
 }
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultLowWaterMarkService.java b/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultLowWaterMarkService.java
index 94b3be4..42e1dcb 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultLowWaterMarkService.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultLowWaterMarkService.java
@@ -22,11 +22,14 @@ import com.google.common.collect.ImmutableMap;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.core.segment.updater.LowWaterMarkService;
+import org.apache.pinot.core.segment.updater.UpsertQueryRewriter;
 
 import java.util.Map;
 
 public class DefaultLowWaterMarkService implements LowWaterMarkService {
 
+  private UpsertQueryRewriter upsertQueryRewriter = new DefaultUpsertQueryRewriter();
+
   @Override
   public void init(HelixDataAccessor helixDataAccessor, String helixClusterName, int serverPollingInterval,
       int serverPort){
@@ -44,4 +47,9 @@ public class DefaultLowWaterMarkService implements LowWaterMarkService {
   @Override
   public void start(BrokerMetrics brokerMetrics) {
   }
+
+  @Override
+  public UpsertQueryRewriter getQueryRewriter() {
+    return upsertQueryRewriter;
+  }
 }
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultLowWaterMarkService.java b/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultUpsertQueryRewriter.java
similarity index 56%
copy from pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultLowWaterMarkService.java
copy to pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultUpsertQueryRewriter.java
index 94b3be4..d18a56f 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultLowWaterMarkService.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultUpsertQueryRewriter.java
@@ -1,3 +1,8 @@
+package org.apache.pinot.broker.upsert;
+
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.core.segment.updater.UpsertQueryRewriter;
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -16,32 +21,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.broker.upsert;
-
-import com.google.common.collect.ImmutableMap;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.pinot.common.metrics.BrokerMetrics;
-import org.apache.pinot.core.segment.updater.LowWaterMarkService;
-
-import java.util.Map;
-
-public class DefaultLowWaterMarkService implements LowWaterMarkService {
-
-  @Override
-  public void init(HelixDataAccessor helixDataAccessor, String helixClusterName, int serverPollingInterval,
-      int serverPort){
-  }
-
-  @Override
-  public Map<Integer, Long> getLowWaterMarks(String tableName) {
-    return ImmutableMap.of();
-  }
-
-  @Override
-  public void shutDown() {
-  }
+public class DefaultUpsertQueryRewriter implements UpsertQueryRewriter {
 
   @Override
-  public void start(BrokerMetrics brokerMetrics) {
+  public void rewriteQueryForUpsert(BrokerRequest request, String rawTableName) {
+    // do nothing
   }
 }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index 0e65779..f5eeb15 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -179,7 +179,7 @@ public class CommonConstants {
     public static final String CONFIG_OF_BROKER_POLLING_SERVER_LWMS_SERVER_PORT = "pinot.broker.query.polling.server.lwms.port";
     public static final String CONFIG_OF_BROKER_LWMS_CLASS_NAME = "pinot.broker.lwms.classname";
     public static final String CONFIG_OF_BROKER_LWM_REWRITE_ENABLE = "pinot.broker.query.lwm.rewrite";
-    public static final boolean CONFIG_OF_BROKER_LWM_REWRITE_ENABLE_DEFAULT = true;
+    public static final boolean CONFIG_OF_BROKER_LWM_REWRITE_ENABLE_DEFAULT = false;
     public static class Request {
       public static final String PQL = "pql";
       public static final String SQL = "sql";
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/LowWaterMarkService.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/LowWaterMarkService.java
index 2fd7434..0c6756c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/LowWaterMarkService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/LowWaterMarkService.java
@@ -39,4 +39,6 @@ public interface LowWaterMarkService {
 
     // start
     void start(BrokerMetrics brokerMetrics);
+
+    UpsertQueryRewriter getQueryRewriter();
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/LowWaterMarkService.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertQueryRewriter.java
similarity index 54%
copy from pinot-core/src/main/java/org/apache/pinot/core/segment/updater/LowWaterMarkService.java
copy to pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertQueryRewriter.java
index 2fd7434..64a64d0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/LowWaterMarkService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertQueryRewriter.java
@@ -1,3 +1,7 @@
+package org.apache.pinot.core.segment.updater;
+
+import org.apache.pinot.common.request.BrokerRequest;
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -16,27 +20,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.segment.updater;
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.pinot.common.metrics.BrokerMetrics;
-
-import java.util.Map;
-
-/**
- * LowWaterMarkService keeps records of the low water mark (i.e., the stream ingestion progress) for each partition of
- * an input table.
- */
-public interface LowWaterMarkService {
-
-    void init(HelixDataAccessor helixDataAccessor, String helixClusterName, int serverPollingInterval, int serverPort);
-
-    // Return the low water mark mapping from partition id to the corresponding low water mark of a given table.
-    Map<Integer, Long> getLowWaterMarks(String tableName);
+public interface UpsertQueryRewriter {
 
-    // Shutdown the service.
-    void shutDown();
+  void rewriteQueryForUpsert(BrokerRequest request, String rawTableName);
 
-    // start
-    void start(BrokerMetrics brokerMetrics);
 }
diff --git a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/PollingBasedLowWaterMarkService.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/PollingBasedLowWaterMarkService.java
index f7e88c9..74e4b41 100644
--- a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/PollingBasedLowWaterMarkService.java
+++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/PollingBasedLowWaterMarkService.java
@@ -30,6 +30,7 @@ import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.restlet.resources.TableLowWaterMarksInfo;
 import org.apache.pinot.core.segment.updater.LowWaterMarkService;
+import org.apache.pinot.core.segment.updater.UpsertQueryRewriter;
 import org.glassfish.jersey.client.ClientProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,6 +67,7 @@ public class PollingBasedLowWaterMarkService implements LowWaterMarkService {
   private int _serverPort;
   private boolean _shuttingDown;
   private BrokerMetrics _brokerMetrics;
+  private UpsertQueryRewriter _queryRewriter;
 
   @Override
   public void init(HelixDataAccessor helixDataAccessor, String helixClusterName, int serverPollingInterval, int serverPort) {
@@ -75,6 +77,7 @@ public class PollingBasedLowWaterMarkService implements LowWaterMarkService {
     _cacheInstanceConfigsDataAccessor =
         new ZkCacheBaseDataAccessor<>((ZkBaseDataAccessor<ZNRecord>) helixDataAccessor.getBaseDataAccessor(),
             instanceConfigs, null, Collections.singletonList(instanceConfigs));
+    _queryRewriter = new UpsertQueryRewriterImpl(this);
     _tableLowWaterMarks = new ConcurrentHashMap<>();
     _httpClient = ClientBuilder.newClient();
     _httpClient.property(ClientProperties.CONNECT_TIMEOUT, SERVER_CONNENCT_TIMEOUT_MS);
@@ -101,6 +104,11 @@ public class PollingBasedLowWaterMarkService implements LowWaterMarkService {
   }
 
   @Override
+  public UpsertQueryRewriter getQueryRewriter() {
+    return _queryRewriter;
+  }
+
+  @Override
   public Map<Integer, Long> getLowWaterMarks(String tableName) {
     return _tableLowWaterMarks.get(tableName);
   }
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/LowWaterMarkQueryWriter.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/UpsertQueryRewriterImpl.java
similarity index 80%
copy from pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/LowWaterMarkQueryWriter.java
copy to pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/UpsertQueryRewriterImpl.java
index c3285e5..90fce7c 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/LowWaterMarkQueryWriter.java
+++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/UpsertQueryRewriterImpl.java
@@ -16,29 +16,51 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.broker.requesthandler;
+package org.apache.pinot.broker.upsert;
 
+import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.request.FilterOperator;
 import org.apache.pinot.common.request.FilterQuery;
 import org.apache.pinot.common.request.FilterQueryMap;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.core.segment.updater.LowWaterMarkService;
+import org.apache.pinot.core.segment.updater.UpsertQueryRewriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-// Add a lwm query to the filter query of a Pinot query for upsert enabled table.
-// Thread-Safe
-public class LowWaterMarkQueryWriter {
-  private static final Logger LOGGER = LoggerFactory.getLogger(LowWaterMarkQueryWriter.class);
+public class UpsertQueryRewriterImpl implements UpsertQueryRewriter {
+  private static final Logger LOGGER = LoggerFactory.getLogger(UpsertQueryRewriterImpl.class);
+
+  protected final LowWaterMarkService _lwmService;
   private static final String VALID_FROM = "$validFrom";
   private static final String VALID_UNTIL = "$validUntil";
   // Normal Pinot query node uses positive IDs. So lwm query node ids are all negative.
   private static final int QUERY_ID_BASE = -1000;
 
+  public UpsertQueryRewriterImpl(LowWaterMarkService lwmService) {
+    _lwmService = lwmService;
+  }
+
+  @Override
+  public void rewriteQueryForUpsert(BrokerRequest request, String rawTableName) {
+    final String realtimeTableName = TableNameBuilder.ensureTableNameWithType(rawTableName,
+        CommonConstants.Helix.TableType.REALTIME);
+    Map<Integer, Long> lowWaterMarks = _lwmService.getLowWaterMarks(realtimeTableName);
+    if (lowWaterMarks == null || lowWaterMarks.size() == 0) {
+      LOGGER.info("No low water marks info found for table {}", realtimeTableName);
+      return;
+    }
+    LOGGER.info("Found low water marks {} for table {}", String.valueOf(lowWaterMarks), realtimeTableName);
+    addLowWaterMarkToQuery(request, lowWaterMarks);
+    LOGGER.info("Query augmented with LWMS info for table {} : {}", realtimeTableName, request);
+  }
+
   /**
    * For upsert enabled tables, augment the realtime query with low water mark constraints in its filter query of the
    * form
@@ -47,7 +69,7 @@ public class LowWaterMarkQueryWriter {
    * @param realtimeBrokerRequest
    * @param lowWaterMarks
    */
-  public static void addLowWaterMarkToQuery(BrokerRequest realtimeBrokerRequest, Map<Integer, Long> lowWaterMarks) {
+  public void addLowWaterMarkToQuery(BrokerRequest realtimeBrokerRequest, Map<Integer, Long> lowWaterMarks) {
     if (lowWaterMarks == null || lowWaterMarks.size() == 0) {
       LOGGER.warn("No low water mark info found for query: {}", realtimeBrokerRequest);
       return;
@@ -93,7 +115,7 @@ public class LowWaterMarkQueryWriter {
    * @return a filter query corresponding to the low water mark constraint of a single partition. The general form is:
    *         ($ValidFrom <= lwm && $validFrom > -1)  AND (lwm < $validUtil OR $validUtil = -1)
    */
-  private static FilterQuery addSinglePartitionLowWaterMark(int queryIdBase, BrokerRequest realtimeBrokerRequest,
+  private FilterQuery addSinglePartitionLowWaterMark(int queryIdBase, BrokerRequest realtimeBrokerRequest,
       Long lwm) {
     // ValidFromQuery: ($ValidFrom <= lwm && $validFrom > -1)
     FilterQuery validFromFilterQuery = new FilterQuery();
@@ -135,7 +157,7 @@ public class LowWaterMarkQueryWriter {
     return lwmQuery;
   }
 
-  private static FilterQuery getLeafFilterQuery(String column, int id, String value, FilterOperator operator,
+  private FilterQuery getLeafFilterQuery(String column, int id, String value, FilterOperator operator,
       BrokerRequest realtimeBrokerRequest) {
     FilterQuery filterQuery = new FilterQuery();
     filterQuery.setColumn(column);
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LowWaterMarkQueryWriterTest.java b/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/broker/upsert/UpsertQueryRewriterImplTest.java
similarity index 91%
rename from pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LowWaterMarkQueryWriterTest.java
rename to pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/broker/upsert/UpsertQueryRewriterImplTest.java
index 1021477..ac6c38d 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LowWaterMarkQueryWriterTest.java
+++ b/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/broker/upsert/UpsertQueryRewriterImplTest.java
@@ -16,21 +16,34 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.broker.requesthandler;
+package org.apache.pinot.broker.upsert;
 
+import org.apache.pinot.broker.requesthandler.LowWaterMarkQueryWriter;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.request.FilterOperator;
 import org.apache.pinot.common.request.FilterQuery;
+import org.apache.pinot.core.segment.updater.LowWaterMarkService;
 import org.apache.pinot.pql.parsers.Pql2Compiler;
 import org.apache.thrift.TException;
 import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
-public class LowWaterMarkQueryWriterTest {
+public class UpsertQueryRewriterImplTest {
+
+    private LowWaterMarkService _lwms;
+    private UpsertQueryRewriterImpl rewriter;
+
+    @BeforeClass
+    public void init() {
+        _lwms = new PollingBasedLowWaterMarkService();
+        rewriter = new UpsertQueryRewriterImpl(_lwms);
+    }
+
     @Test
     public void testRewriteQueryWithoutExistingFilters() throws Exception{
         Pql2Compiler pql2Compiler = new Pql2Compiler();
@@ -39,7 +52,7 @@ public class LowWaterMarkQueryWriterTest {
         Map<Integer, Long> lwms = new HashMap<>();
         lwms.put(0, 10L);
         lwms.put(1, 20L);
-        LowWaterMarkQueryWriter.addLowWaterMarkToQuery(req, lwms);
+        rewriter.addLowWaterMarkToQuery(req, lwms);
         Assert.assertTrue(req.isSetFilterQuery());
         try {
             req.validate();
@@ -80,7 +93,7 @@ public class LowWaterMarkQueryWriterTest {
         Map<Integer, Long> lwms = new HashMap<>();
         lwms.put(0, 10L);
         lwms.put(1, 20L);
-        LowWaterMarkQueryWriter.addLowWaterMarkToQuery(req, lwms);
+        rewriter.addLowWaterMarkToQuery(req, lwms);
         Assert.assertTrue(req.isSetFilterQuery());
         try {
             req.validate();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org