You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/03/23 19:27:36 UTC
[incubator-pinot] 06/09: more refactor
This is an automated email from the ASF dual-hosted git repository.
jamesshao pushed a commit to branch upsert-refactor
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 18a2da4457a75ee34cbb3bb854371706e55f1b51
Author: james Shao <sj...@uber.com>
AuthorDate: Thu Mar 19 10:45:11 2020 -0700
more refactor
---
.../requesthandler/BaseBrokerRequestHandler.java | 14 +--
.../requesthandler/LowWaterMarkQueryWriter.java | 109 ---------------------
.../broker/upsert/DefaultLowWaterMarkService.java | 8 ++
...ervice.java => DefaultUpsertQueryRewriter.java} | 33 ++-----
.../apache/pinot/common/utils/CommonConstants.java | 2 +-
.../core/segment/updater/LowWaterMarkService.java | 2 +
...erMarkService.java => UpsertQueryRewriter.java} | 27 ++---
.../upsert/PollingBasedLowWaterMarkService.java | 8 ++
.../broker/upsert/UpsertQueryRewriterImpl.java | 42 ++++++--
.../broker/upsert/UpsertQueryRewriterImplTest.java | 21 +++-
10 files changed, 83 insertions(+), 183 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 1d8f97b..cb17740 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -300,7 +300,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
if (shouldEnableLowWaterMarkRewrite(request)) {
// Augment the realtime request with LowWaterMark constraints.
- addLowWaterMarkToQuery(realtimeBrokerRequest, rawTableName);
+ _lwmService.getQueryRewriter().rewriteQueryForUpsert(realtimeBrokerRequest, rawTableName);
}
// Calculate routing table for the query
@@ -786,18 +786,6 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
}
}
- private void addLowWaterMarkToQuery(BrokerRequest realtimeBrokerRequest, String rawTableName) {
- final String realtimeTableName = rawTableName + "_REALTIME";
- Map<Integer, Long> lowWaterMarks = _lwmService.getLowWaterMarks(realtimeTableName);
- if (lowWaterMarks == null || lowWaterMarks.size() == 0) {
- LOGGER.info("No low water marks info found for table {}", realtimeTableName);
- return;
- }
- LOGGER.info("Found low water marks {} for table {}", String.valueOf(lowWaterMarks), realtimeTableName);
- LowWaterMarkQueryWriter.addLowWaterMarkToQuery(realtimeBrokerRequest, lowWaterMarks);
- LOGGER.info("Query augmented with LWMS info for table {} : {}", realtimeTableName, realtimeBrokerRequest);
- }
-
/**
* Processes the optimized broker requests for both OFFLINE and REALTIME table.
*/
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/LowWaterMarkQueryWriter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/LowWaterMarkQueryWriter.java
index c3285e5..b373243 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/LowWaterMarkQueryWriter.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/LowWaterMarkQueryWriter.java
@@ -39,113 +39,4 @@ public class LowWaterMarkQueryWriter {
// Normal Pinot query node uses positive IDs. So lwm query node ids are all negative.
private static final int QUERY_ID_BASE = -1000;
- /**
- * For upsert enabled tables, augment the realtime query with low water mark constraints in its filter query of the
- * form
- * ($validFrom <= lwm and $validFrom > -1) AND (lwm < $validUtil OR $validUtil = -1)
- *
- * @param realtimeBrokerRequest
- * @param lowWaterMarks
- */
- public static void addLowWaterMarkToQuery(BrokerRequest realtimeBrokerRequest, Map<Integer, Long> lowWaterMarks) {
- if (lowWaterMarks == null || lowWaterMarks.size() == 0) {
- LOGGER.warn("No low water mark info found for query: {}", realtimeBrokerRequest);
- return;
- }
-
- // Choose the min lwm among all partitions.
- long minLwm = Collections.min(lowWaterMarks.values());
-
- // 1. Build the low water mark query of the form for a table assuming lwm is the min LWM and -1 is used as
- // uninitialized marker.
- // ($validFrom <= lwm and $validFrom > -1) AND (lwm < $validUtil OR $validUtil = -1)
- // -1 is used instead of Long.MAXVALUE because Pinot does not handle long arithmetic correctly.
- FilterQuery lwmQuery = addSinglePartitionLowWaterMark(QUERY_ID_BASE - 1, realtimeBrokerRequest, minLwm);
-
- // 2. Attach low water mark filter to the current filters.
- FilterQuery currentFilterQuery = realtimeBrokerRequest.getFilterQuery();
- if (currentFilterQuery != null) {
- // Make an AND query of lwmQuery and the existing query.
- FilterQuery andFilterQuery = new FilterQuery();
- // Make sure we do not reuse any query id in lwmQuerys.
- andFilterQuery.setId(QUERY_ID_BASE);
- andFilterQuery.setOperator(FilterOperator.AND);
- List<Integer> nestedFilterQueryIds = new ArrayList<>(2);
- nestedFilterQueryIds.add(currentFilterQuery.getId());
- nestedFilterQueryIds.add(lwmQuery.getId());
- andFilterQuery.setNestedFilterQueryIds(nestedFilterQueryIds);
-
- realtimeBrokerRequest.setFilterQuery(andFilterQuery);
- FilterQueryMap filterSubQueryMap = realtimeBrokerRequest.getFilterSubQueryMap();
- filterSubQueryMap.putToFilterQueryMap(lwmQuery.getId(), lwmQuery);
- filterSubQueryMap.putToFilterQueryMap(andFilterQuery.getId(), andFilterQuery);
- } else {
- realtimeBrokerRequest.getFilterSubQueryMap().putToFilterQueryMap(lwmQuery.getId(), lwmQuery);
- realtimeBrokerRequest.setFilterQuery(lwmQuery);
- }
- }
-
- /**
- *
- * @param queryIdBase The starting id that will be assigned to the first query created in ths method.
- * @param realtimeBrokerRequest
- * @param lwm low water mark.
- * @return a filter query corresponding to the low water mark constraint of a single partition. The general form is:
- * ($ValidFrom <= lwm && $validFrom > -1) AND (lwm < $validUtil OR $validUtil = -1)
- */
- private static FilterQuery addSinglePartitionLowWaterMark(int queryIdBase, BrokerRequest realtimeBrokerRequest,
- Long lwm) {
- // ValidFromQuery: ($ValidFrom <= lwm && $validFrom > -1)
- FilterQuery validFromFilterQuery = new FilterQuery();
- // Important: Always decrement queryIdBase value after use to avoid id conflict.
- validFromFilterQuery.setId(queryIdBase--);
- validFromFilterQuery.setOperator(FilterOperator.AND);
- FilterQuery validFromP1 = getLeafFilterQuery(VALID_FROM, queryIdBase--, "(*\t\t" + lwm + "]", FilterOperator.RANGE, realtimeBrokerRequest);
- FilterQuery validFromP2 = getLeafFilterQuery(VALID_FROM, queryIdBase--, "(-1\t\t*)", FilterOperator.RANGE, realtimeBrokerRequest);
- List<Integer> nestedQueriesIdForValidFrom = new ArrayList<>();
- nestedQueriesIdForValidFrom.add(validFromP1.getId());
- nestedQueriesIdForValidFrom.add(validFromP2.getId());
- validFromFilterQuery.setNestedFilterQueryIds(nestedQueriesIdForValidFrom);
-
- // ValidUtilQuery: (lwm < $validUtil OR $validUtil = -1)
- FilterQuery validUtilFilterQuery = new FilterQuery();
- validUtilFilterQuery.setId(queryIdBase--);
- validUtilFilterQuery.setOperator(FilterOperator.OR);
-
- FilterQuery validUtilP1 = getLeafFilterQuery(VALID_UNTIL, queryIdBase--, "(" + lwm + "\t\t*)", FilterOperator.RANGE, realtimeBrokerRequest);
- FilterQuery validUtilP2 = getLeafFilterQuery(VALID_UNTIL, queryIdBase--, "-1", FilterOperator.EQUALITY, realtimeBrokerRequest);
- List<Integer> nestedQueriesIdForValidUtil = new ArrayList<>();
- nestedQueriesIdForValidUtil.add(validUtilP1.getId());
- nestedQueriesIdForValidUtil.add(validUtilP2.getId());
- validUtilFilterQuery.setNestedFilterQueryIds(nestedQueriesIdForValidUtil);
-
- // Top level query: ValidFromQuery AND ValidUtilQuery
- FilterQuery lwmQuery = new FilterQuery();
- lwmQuery.setId(queryIdBase--);
- lwmQuery.setOperator(FilterOperator.AND);
- List<Integer> nestQids = new ArrayList<>();
- nestQids.add(validFromFilterQuery.getId());
- nestQids.add(validUtilFilterQuery.getId());
- lwmQuery.setNestedFilterQueryIds(nestQids);
-
- // Add all the new created queries to the query map.
- realtimeBrokerRequest.getFilterSubQueryMap().putToFilterQueryMap(lwmQuery.getId(), lwmQuery);
- realtimeBrokerRequest.getFilterSubQueryMap().putToFilterQueryMap(validFromFilterQuery.getId(), validFromFilterQuery);
- realtimeBrokerRequest.getFilterSubQueryMap().putToFilterQueryMap(validUtilFilterQuery.getId(), validUtilFilterQuery);
- return lwmQuery;
- }
-
- private static FilterQuery getLeafFilterQuery(String column, int id, String value, FilterOperator operator,
- BrokerRequest realtimeBrokerRequest) {
- FilterQuery filterQuery = new FilterQuery();
- filterQuery.setColumn(column);
- filterQuery.setId(id);
- filterQuery.setValue(Collections.singletonList(value));
- filterQuery.setOperator(operator);
- if (realtimeBrokerRequest.getFilterSubQueryMap() == null) {
- realtimeBrokerRequest.setFilterSubQueryMap(new FilterQueryMap());
- }
- realtimeBrokerRequest.getFilterSubQueryMap().putToFilterQueryMap(id, filterQuery);
- return filterQuery;
- }
}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultLowWaterMarkService.java b/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultLowWaterMarkService.java
index 94b3be4..42e1dcb 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultLowWaterMarkService.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultLowWaterMarkService.java
@@ -22,11 +22,14 @@ import com.google.common.collect.ImmutableMap;
import org.apache.helix.HelixDataAccessor;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.core.segment.updater.LowWaterMarkService;
+import org.apache.pinot.core.segment.updater.UpsertQueryRewriter;
import java.util.Map;
public class DefaultLowWaterMarkService implements LowWaterMarkService {
+ private UpsertQueryRewriter upsertQueryRewriter = new DefaultUpsertQueryRewriter();
+
@Override
public void init(HelixDataAccessor helixDataAccessor, String helixClusterName, int serverPollingInterval,
int serverPort){
@@ -44,4 +47,9 @@ public class DefaultLowWaterMarkService implements LowWaterMarkService {
@Override
public void start(BrokerMetrics brokerMetrics) {
}
+
+ @Override
+ public UpsertQueryRewriter getQueryRewriter() {
+ return upsertQueryRewriter;
+ }
}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultLowWaterMarkService.java b/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultUpsertQueryRewriter.java
similarity index 56%
copy from pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultLowWaterMarkService.java
copy to pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultUpsertQueryRewriter.java
index 94b3be4..d18a56f 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultLowWaterMarkService.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultUpsertQueryRewriter.java
@@ -1,3 +1,8 @@
+package org.apache.pinot.broker.upsert;
+
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.core.segment.updater.UpsertQueryRewriter;
+
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -16,32 +21,10 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.broker.upsert;
-
-import com.google.common.collect.ImmutableMap;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.pinot.common.metrics.BrokerMetrics;
-import org.apache.pinot.core.segment.updater.LowWaterMarkService;
-
-import java.util.Map;
-
-public class DefaultLowWaterMarkService implements LowWaterMarkService {
-
- @Override
- public void init(HelixDataAccessor helixDataAccessor, String helixClusterName, int serverPollingInterval,
- int serverPort){
- }
-
- @Override
- public Map<Integer, Long> getLowWaterMarks(String tableName) {
- return ImmutableMap.of();
- }
-
- @Override
- public void shutDown() {
- }
+public class DefaultUpsertQueryRewriter implements UpsertQueryRewriter {
@Override
- public void start(BrokerMetrics brokerMetrics) {
+ public void rewriteQueryForUpsert(BrokerRequest request, String rawTableName) {
+ // do nothing
}
}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index 0e65779..f5eeb15 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -179,7 +179,7 @@ public class CommonConstants {
public static final String CONFIG_OF_BROKER_POLLING_SERVER_LWMS_SERVER_PORT = "pinot.broker.query.polling.server.lwms.port";
public static final String CONFIG_OF_BROKER_LWMS_CLASS_NAME = "pinot.broker.lwms.classname";
public static final String CONFIG_OF_BROKER_LWM_REWRITE_ENABLE = "pinot.broker.query.lwm.rewrite";
- public static final boolean CONFIG_OF_BROKER_LWM_REWRITE_ENABLE_DEFAULT = true;
+ public static final boolean CONFIG_OF_BROKER_LWM_REWRITE_ENABLE_DEFAULT = false;
public static class Request {
public static final String PQL = "pql";
public static final String SQL = "sql";
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/LowWaterMarkService.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/LowWaterMarkService.java
index 2fd7434..0c6756c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/LowWaterMarkService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/LowWaterMarkService.java
@@ -39,4 +39,6 @@ public interface LowWaterMarkService {
// start
void start(BrokerMetrics brokerMetrics);
+
+ UpsertQueryRewriter getQueryRewriter();
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/LowWaterMarkService.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertQueryRewriter.java
similarity index 54%
copy from pinot-core/src/main/java/org/apache/pinot/core/segment/updater/LowWaterMarkService.java
copy to pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertQueryRewriter.java
index 2fd7434..64a64d0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/LowWaterMarkService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertQueryRewriter.java
@@ -1,3 +1,7 @@
+package org.apache.pinot.core.segment.updater;
+
+import org.apache.pinot.common.request.BrokerRequest;
+
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -16,27 +20,8 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.core.segment.updater;
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.pinot.common.metrics.BrokerMetrics;
-
-import java.util.Map;
-
-/**
- * LowWaterMarkService keeps records of the low water mark (i.e., the stream ingestion progress) for each partition of
- * an input table.
- */
-public interface LowWaterMarkService {
-
- void init(HelixDataAccessor helixDataAccessor, String helixClusterName, int serverPollingInterval, int serverPort);
-
- // Return the low water mark mapping from partition id to the corresponding low water mark of a given table.
- Map<Integer, Long> getLowWaterMarks(String tableName);
+public interface UpsertQueryRewriter {
- // Shutdown the service.
- void shutDown();
+ void rewriteQueryForUpsert(BrokerRequest request, String rawTableName);
- // start
- void start(BrokerMetrics brokerMetrics);
}
diff --git a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/PollingBasedLowWaterMarkService.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/PollingBasedLowWaterMarkService.java
index f7e88c9..74e4b41 100644
--- a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/PollingBasedLowWaterMarkService.java
+++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/PollingBasedLowWaterMarkService.java
@@ -30,6 +30,7 @@ import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.restlet.resources.TableLowWaterMarksInfo;
import org.apache.pinot.core.segment.updater.LowWaterMarkService;
+import org.apache.pinot.core.segment.updater.UpsertQueryRewriter;
import org.glassfish.jersey.client.ClientProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,6 +67,7 @@ public class PollingBasedLowWaterMarkService implements LowWaterMarkService {
private int _serverPort;
private boolean _shuttingDown;
private BrokerMetrics _brokerMetrics;
+ private UpsertQueryRewriter _queryRewriter;
@Override
public void init(HelixDataAccessor helixDataAccessor, String helixClusterName, int serverPollingInterval, int serverPort) {
@@ -75,6 +77,7 @@ public class PollingBasedLowWaterMarkService implements LowWaterMarkService {
_cacheInstanceConfigsDataAccessor =
new ZkCacheBaseDataAccessor<>((ZkBaseDataAccessor<ZNRecord>) helixDataAccessor.getBaseDataAccessor(),
instanceConfigs, null, Collections.singletonList(instanceConfigs));
+ _queryRewriter = new UpsertQueryRewriterImpl(this);
_tableLowWaterMarks = new ConcurrentHashMap<>();
_httpClient = ClientBuilder.newClient();
_httpClient.property(ClientProperties.CONNECT_TIMEOUT, SERVER_CONNENCT_TIMEOUT_MS);
@@ -101,6 +104,11 @@ public class PollingBasedLowWaterMarkService implements LowWaterMarkService {
}
@Override
+ public UpsertQueryRewriter getQueryRewriter() {
+ return _queryRewriter;
+ }
+
+ @Override
public Map<Integer, Long> getLowWaterMarks(String tableName) {
return _tableLowWaterMarks.get(tableName);
}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/LowWaterMarkQueryWriter.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/UpsertQueryRewriterImpl.java
similarity index 80%
copy from pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/LowWaterMarkQueryWriter.java
copy to pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/UpsertQueryRewriterImpl.java
index c3285e5..90fce7c 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/LowWaterMarkQueryWriter.java
+++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/UpsertQueryRewriterImpl.java
@@ -16,29 +16,51 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.broker.requesthandler;
+package org.apache.pinot.broker.upsert;
+import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.request.FilterOperator;
import org.apache.pinot.common.request.FilterQuery;
import org.apache.pinot.common.request.FilterQueryMap;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.core.segment.updater.LowWaterMarkService;
+import org.apache.pinot.core.segment.updater.UpsertQueryRewriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-// Add a lwm query to the filter query of a Pinot query for upsert enabled table.
-// Thread-Safe
-public class LowWaterMarkQueryWriter {
- private static final Logger LOGGER = LoggerFactory.getLogger(LowWaterMarkQueryWriter.class);
+public class UpsertQueryRewriterImpl implements UpsertQueryRewriter {
+ private static final Logger LOGGER = LoggerFactory.getLogger(UpsertQueryRewriterImpl.class);
+
+ protected final LowWaterMarkService _lwmService;
private static final String VALID_FROM = "$validFrom";
private static final String VALID_UNTIL = "$validUntil";
// Normal Pinot query node uses positive IDs. So lwm query node ids are all negative.
private static final int QUERY_ID_BASE = -1000;
+ public UpsertQueryRewriterImpl(LowWaterMarkService lwmService) {
+ _lwmService = lwmService;
+ }
+
+ @Override
+ public void rewriteQueryForUpsert(BrokerRequest request, String rawTableName) {
+ final String realtimeTableName = TableNameBuilder.ensureTableNameWithType(rawTableName,
+ CommonConstants.Helix.TableType.REALTIME);
+ Map<Integer, Long> lowWaterMarks = _lwmService.getLowWaterMarks(realtimeTableName);
+ if (lowWaterMarks == null || lowWaterMarks.size() == 0) {
+ LOGGER.info("No low water marks info found for table {}", realtimeTableName);
+ return;
+ }
+ LOGGER.info("Found low water marks {} for table {}", String.valueOf(lowWaterMarks), realtimeTableName);
+ addLowWaterMarkToQuery(request, lowWaterMarks);
+ LOGGER.info("Query augmented with LWMS info for table {} : {}", realtimeTableName, request);
+ }
+
/**
* For upsert enabled tables, augment the realtime query with low water mark constraints in its filter query of the
* form
@@ -47,7 +69,7 @@ public class LowWaterMarkQueryWriter {
* @param realtimeBrokerRequest
* @param lowWaterMarks
*/
- public static void addLowWaterMarkToQuery(BrokerRequest realtimeBrokerRequest, Map<Integer, Long> lowWaterMarks) {
+ public void addLowWaterMarkToQuery(BrokerRequest realtimeBrokerRequest, Map<Integer, Long> lowWaterMarks) {
if (lowWaterMarks == null || lowWaterMarks.size() == 0) {
LOGGER.warn("No low water mark info found for query: {}", realtimeBrokerRequest);
return;
@@ -93,7 +115,7 @@ public class LowWaterMarkQueryWriter {
* @return a filter query corresponding to the low water mark constraint of a single partition. The general form is:
* ($ValidFrom <= lwm && $validFrom > -1) AND (lwm < $validUtil OR $validUtil = -1)
*/
- private static FilterQuery addSinglePartitionLowWaterMark(int queryIdBase, BrokerRequest realtimeBrokerRequest,
+ private FilterQuery addSinglePartitionLowWaterMark(int queryIdBase, BrokerRequest realtimeBrokerRequest,
Long lwm) {
// ValidFromQuery: ($ValidFrom <= lwm && $validFrom > -1)
FilterQuery validFromFilterQuery = new FilterQuery();
@@ -135,7 +157,7 @@ public class LowWaterMarkQueryWriter {
return lwmQuery;
}
- private static FilterQuery getLeafFilterQuery(String column, int id, String value, FilterOperator operator,
+ private FilterQuery getLeafFilterQuery(String column, int id, String value, FilterOperator operator,
BrokerRequest realtimeBrokerRequest) {
FilterQuery filterQuery = new FilterQuery();
filterQuery.setColumn(column);
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LowWaterMarkQueryWriterTest.java b/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/broker/upsert/UpsertQueryRewriterImplTest.java
similarity index 91%
rename from pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LowWaterMarkQueryWriterTest.java
rename to pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/broker/upsert/UpsertQueryRewriterImplTest.java
index 1021477..ac6c38d 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LowWaterMarkQueryWriterTest.java
+++ b/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/broker/upsert/UpsertQueryRewriterImplTest.java
@@ -16,21 +16,34 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.broker.requesthandler;
+package org.apache.pinot.broker.upsert;
+import org.apache.pinot.broker.requesthandler.LowWaterMarkQueryWriter;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.request.FilterOperator;
import org.apache.pinot.common.request.FilterQuery;
+import org.apache.pinot.core.segment.updater.LowWaterMarkService;
import org.apache.pinot.pql.parsers.Pql2Compiler;
import org.apache.thrift.TException;
import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-public class LowWaterMarkQueryWriterTest {
+public class UpsertQueryRewriterImplTest {
+
+ private LowWaterMarkService _lwms;
+ private UpsertQueryRewriterImpl rewriter;
+
+ @BeforeClass
+ public void init() {
+ _lwms = new PollingBasedLowWaterMarkService();
+ rewriter = new UpsertQueryRewriterImpl(_lwms);
+ }
+
@Test
public void testRewriteQueryWithoutExistingFilters() throws Exception{
Pql2Compiler pql2Compiler = new Pql2Compiler();
@@ -39,7 +52,7 @@ public class LowWaterMarkQueryWriterTest {
Map<Integer, Long> lwms = new HashMap<>();
lwms.put(0, 10L);
lwms.put(1, 20L);
- LowWaterMarkQueryWriter.addLowWaterMarkToQuery(req, lwms);
+ rewriter.addLowWaterMarkToQuery(req, lwms);
Assert.assertTrue(req.isSetFilterQuery());
try {
req.validate();
@@ -80,7 +93,7 @@ public class LowWaterMarkQueryWriterTest {
Map<Integer, Long> lwms = new HashMap<>();
lwms.put(0, 10L);
lwms.put(1, 20L);
- LowWaterMarkQueryWriter.addLowWaterMarkToQuery(req, lwms);
+ rewriter.addLowWaterMarkToQuery(req, lwms);
Assert.assertTrue(req.isSetFilterQuery());
try {
req.validate();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org