You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/03/18 18:04:19 UTC
[incubator-pinot] 04/05: basic split of logics
This is an automated email from the ASF dual-hosted git repository.
jamesshao pushed a commit to branch upsert-refactor
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 63e52fe517621accc74a99d5a24e6246c8fc8cb0
Author: james Shao <sj...@uber.com>
AuthorDate: Tue Mar 17 17:46:31 2020 -0700
basic split of logics
---
.../pinot/broker/broker/BrokerServerBuilder.java | 12 +-
.../broker/broker/helix/HelixBrokerStarter.java | 35 +++-
.../requesthandler/BaseBrokerRequestHandler.java | 47 ++++-
.../requesthandler/LowWaterMarkQueryWriter.java | 151 ++++++++++++++
.../SingleConnectionBrokerRequestHandler.java | 8 +-
.../broker/upsert/DefaultLowWaterMarkService.java | 28 ++-
.../broker/upsert/LowWaterMarkServiceProvider.java | 39 ++--
.../LowWaterMarkQueryWriterTest.java | 130 ++++++++++++
.../apache/pinot/common/utils/CommonConstants.java | 2 +
.../helix/core/PinotHelixResourceManager.java | 12 ++
...rkManager.java => DefaultWaterMarkManager.java} | 15 +-
...ermarkManager.java => LowWaterMarkService.java} | 21 +-
.../segment/updater}/SegmentDeletionHandler.java | 3 +-
.../segment/updater}/UpsertComponentContainer.java | 19 +-
...WatermarkManager.java => WaterMarkManager.java} | 3 +-
pinot-grigio/pinot-grigio-provided/pom.xml | 38 +++-
.../upsert/PollingBasedLowWaterMarkService.java | 224 +++++++++++++++++++++
.../UpsertImmutableIndexSegmentCallback.java | 8 +-
.../upsert/UpsertMutableIndexSegmentCallback.java | 6 +-
.../pinot/core/segment/updater/SegmentUpdater.java | 4 +-
.../updater/UpsertComponentContainerImpl.java | 151 ++++++++++++++
...arkManager.java => UpsertWaterMarkManager.java} | 14 +-
.../PollingBasedLowWaterMarkServiceTest.java | 215 ++++++++++++++++++++
.../UpsertImmutableIndexSegmentCallbackTest.java | 6 +-
.../tests/ClusterIntegrationTestUtils.java | 2 +
.../api/resources/LowWatermarksResource.java | 62 ++++++
.../server/api/resources/UpsertDebugResource.java | 84 ++++++++
.../org/apache/pinot/server/conf/ServerConf.java | 15 +-
.../pinot/server/starter/ServerInstance.java | 39 +++-
.../starter/helix/HelixInstanceDataManager.java | 7 -
.../server/starter/helix/HelixServerStarter.java | 67 ++++--
.../SegmentOnlineOfflineStateModelFactory.java | 14 +-
.../upsert/DefaultUpsertComponentContainer.java | 36 +++-
.../upsert/UpsertComponentContainerProvider.java | 18 +-
.../api/resources/LowWatermarksResourceTest.java | 23 ++-
.../realtime/provisioning/MemoryEstimator.java | 17 +-
36 files changed, 1424 insertions(+), 151 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerServerBuilder.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerServerBuilder.java
index 103d469..c30dd51 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerServerBuilder.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerServerBuilder.java
@@ -19,8 +19,8 @@
package org.apache.pinot.broker.broker;
import com.google.common.base.Preconditions;
+import org.apache.pinot.core.segment.updater.LowWaterMarkService;
import com.yammer.metrics.core.MetricsRegistry;
-import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.configuration.Configuration;
import org.apache.helix.ZNRecord;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
@@ -36,6 +36,8 @@ import org.apache.pinot.common.utils.CommonConstants.Helix;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.atomic.AtomicReference;
+
public class BrokerServerBuilder {
private static final Logger LOGGER = LoggerFactory.getLogger(BrokerServerBuilder.class);
@@ -57,9 +59,11 @@ public class BrokerServerBuilder {
private final BrokerMetrics _brokerMetrics;
private final BrokerRequestHandler _brokerRequestHandler;
private final BrokerAdminApiApplication _brokerAdminApplication;
+ private final LowWaterMarkService _lwmService;
public BrokerServerBuilder(Configuration config, RoutingTable routingTable, TimeBoundaryService timeBoundaryService,
- QueryQuotaManager queryQuotaManager, ZkHelixPropertyStore<ZNRecord> propertyStore) {
+ QueryQuotaManager queryQuotaManager, ZkHelixPropertyStore<ZNRecord> propertyStore,
+ LowWaterMarkService lowWaterMarkService) {
_config = config;
_delayedShutdownTimeMs =
config.getLong(Broker.CONFIG_OF_DELAY_SHUTDOWN_TIME_MS, Broker.DEFAULT_DELAY_SHUTDOWN_TIME_MS);
@@ -77,8 +81,10 @@ public class BrokerServerBuilder {
_brokerMetrics.initializeGlobalMeters();
_brokerRequestHandler =
new SingleConnectionBrokerRequestHandler(_config, _routingTable, _timeBoundaryService, _accessControlFactory,
- queryQuotaManager, _brokerMetrics, _propertyStore);
+ queryQuotaManager, _brokerMetrics, _propertyStore, lowWaterMarkService);
_brokerAdminApplication = new BrokerAdminApiApplication(this);
+
+ _lwmService = lowWaterMarkService;
}
public void start() {
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
index 6986182..0e127ad 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
@@ -20,11 +20,6 @@ package org.apache.pinot.broker.broker.helix;
import com.google.common.collect.ImmutableList;
import com.yammer.metrics.core.MetricsRegistry;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import javax.annotation.Nullable;
import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.helix.ConfigAccessor;
@@ -49,6 +44,8 @@ import org.apache.pinot.broker.broker.BrokerServerBuilder;
import org.apache.pinot.broker.queryquota.HelixExternalViewBasedQueryQuotaManager;
import org.apache.pinot.broker.requesthandler.BrokerRequestHandler;
import org.apache.pinot.broker.routing.HelixExternalViewBasedRouting;
+import org.apache.pinot.core.segment.updater.LowWaterMarkService;
+import org.apache.pinot.broker.upsert.LowWaterMarkServiceProvider;
import org.apache.pinot.common.Utils;
import org.apache.pinot.common.config.TagNameUtils;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
@@ -62,6 +59,12 @@ import org.apache.pinot.common.utils.ServiceStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
@SuppressWarnings("unused")
public class HelixBrokerStarter {
@@ -93,6 +96,8 @@ public class HelixBrokerStarter {
private HelixManager _participantHelixManager;
private TimeboundaryRefreshMessageHandlerFactory _tbiMessageHandler;
+ private LowWaterMarkService _lwmService;
+
public HelixBrokerStarter(Configuration brokerConf, String clusterName, String zkServer)
throws Exception {
this(brokerConf, clusterName, zkServer, null);
@@ -168,6 +173,11 @@ public class HelixBrokerStarter {
_helixDataAccessor = _spectatorHelixManager.getHelixDataAccessor();
ConfigAccessor configAccessor = _spectatorHelixManager.getConfigAccessor();
+ // start lwm service
+ LowWaterMarkServiceProvider provider = new LowWaterMarkServiceProvider(_brokerConf,
+ _spectatorHelixManager.getHelixDataAccessor(), _clusterName);
+ _lwmService = provider.getInstance();
+
// Set up the broker server builder
LOGGER.info("Setting up broker server builder");
_helixExternalViewBasedRouting =
@@ -184,13 +194,17 @@ public class HelixBrokerStarter {
String enableQueryLimitOverride = configAccessor.get(helixConfigScope, Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE);
_brokerConf.setProperty(Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE, Boolean.valueOf(enableQueryLimitOverride));
_brokerServerBuilder = new BrokerServerBuilder(_brokerConf, _helixExternalViewBasedRouting,
- _helixExternalViewBasedRouting.getTimeBoundaryService(), _helixExternalViewBasedQueryQuotaManager, _propertyStore);
+ _helixExternalViewBasedRouting.getTimeBoundaryService(), _helixExternalViewBasedQueryQuotaManager,
+ _propertyStore, _lwmService);
BrokerRequestHandler brokerRequestHandler = _brokerServerBuilder.getBrokerRequestHandler();
BrokerMetrics brokerMetrics = _brokerServerBuilder.getBrokerMetrics();
_helixExternalViewBasedRouting.setBrokerMetrics(brokerMetrics);
_helixExternalViewBasedQueryQuotaManager.setBrokerMetrics(brokerMetrics);
_brokerServerBuilder.start();
+ // start lwm service
+ _lwmService.start(brokerMetrics);
+
// Initialize the cluster change mediator
LOGGER.info("Initializing cluster change mediator");
for (ClusterChangeHandler externalViewChangeHandler : _externalViewChangeHandlers) {
@@ -241,6 +255,7 @@ public class HelixBrokerStarter {
_participantHelixManager
.addPreConnectCallback(() -> brokerMetrics.addMeteredGlobalValue(BrokerMeter.HELIX_ZOOKEEPER_RECONNECTS, 1L));
+
// Register the service status handler
registerServiceStatusHandler();
@@ -315,6 +330,10 @@ public class HelixBrokerStarter {
_spectatorHelixManager.disconnect();
}
+ if (_lwmService != null) {
+ LOGGER.info("Shutting down low water mark service");
+ _lwmService.shutDown();
+ }
LOGGER.info("Finish shutting down Pinot broker");
}
@@ -347,6 +366,10 @@ public class HelixBrokerStarter {
return new HelixBrokerStarter(brokerConf, "quickstart", "localhost:2122");
}
+ public LowWaterMarkService getLwmService() {
+ return _lwmService;
+ }
+
public static void main(String[] args)
throws Exception {
getDefault().start();
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 7667e61..1d8f97b 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
+import org.apache.pinot.core.segment.updater.LowWaterMarkService;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.ZNRecord;
@@ -76,6 +77,10 @@ import org.apache.pinot.pql.parsers.pql2.ast.FunctionCallAstNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.pinot.common.utils.CommonConstants.Broker.CONFIG_OF_BROKER_LWM_REWRITE_ENABLE;
+import static org.apache.pinot.common.utils.CommonConstants.Broker.CONFIG_OF_BROKER_LWM_REWRITE_ENABLE_DEFAULT;
+import static org.apache.pinot.common.utils.CommonConstants.Broker.Request.DISABLE_REWRITE;
+
@ThreadSafe
public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
@@ -87,6 +92,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
protected final AccessControlFactory _accessControlFactory;
protected final QueryQuotaManager _queryQuotaManager;
protected final BrokerMetrics _brokerMetrics;
+ protected final LowWaterMarkService _lwmService;
protected final AtomicLong _requestIdGenerator = new AtomicLong();
protected final BrokerRequestOptimizer _brokerRequestOptimizer = new BrokerRequestOptimizer();
@@ -96,6 +102,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
protected final long _brokerTimeoutMs;
protected final int _queryResponseLimit;
protected final int _queryLogLength;
+ protected final boolean _enableQueryRewrite;
private final RateLimiter _queryLogRateLimiter;
@@ -108,13 +115,15 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
public BaseBrokerRequestHandler(Configuration config, RoutingTable routingTable,
TimeBoundaryService timeBoundaryService, AccessControlFactory accessControlFactory,
- QueryQuotaManager queryQuotaManager, BrokerMetrics brokerMetrics, ZkHelixPropertyStore<ZNRecord> propertyStore) {
+ QueryQuotaManager queryQuotaManager, BrokerMetrics brokerMetrics, ZkHelixPropertyStore<ZNRecord> propertyStore,
+ LowWaterMarkService lowWaterMarkService) {
_config = config;
_routingTable = routingTable;
_timeBoundaryService = timeBoundaryService;
_accessControlFactory = accessControlFactory;
_queryQuotaManager = queryQuotaManager;
_brokerMetrics = brokerMetrics;
+ _lwmService = lowWaterMarkService;
_enableCaseInsensitivePql = _config.getBoolean(CommonConstants.Helix.ENABLE_CASE_INSENSITIVE_PQL_KEY, false);
if (_enableCaseInsensitivePql) {
@@ -125,6 +134,10 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
_enableQueryLimitOverride = _config.getBoolean(Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE, false);
+ // query rewrite for upsert feature
+ _enableQueryRewrite = config.getBoolean(CONFIG_OF_BROKER_LWM_REWRITE_ENABLE,
+ CONFIG_OF_BROKER_LWM_REWRITE_ENABLE_DEFAULT);
+
_brokerId = config.getString(Broker.CONFIG_OF_BROKER_ID, getDefaultBrokerId());
_brokerTimeoutMs = config.getLong(Broker.CONFIG_OF_BROKER_TIMEOUT_MS, Broker.DEFAULT_BROKER_TIMEOUT_MS);
_queryResponseLimit =
@@ -285,6 +298,11 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
requestStatistics.setFanoutType(RequestStatistics.FanoutType.REALTIME);
}
+ if (shouldEnableLowWaterMarkRewrite(request)) {
+ // Augment the realtime request with LowWaterMark constraints.
+ addLowWaterMarkToQuery(realtimeBrokerRequest, rawTableName);
+ }
+
// Calculate routing table for the query
long routingStartTimeNs = System.nanoTime();
Map<ServerInstance, List<String>> offlineRoutingTable = null;
@@ -368,6 +386,21 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
return brokerResponse;
}
+ private boolean shouldEnableLowWaterMarkRewrite(JsonNode request) {
+ if (_enableQueryRewrite) {
+ try {
+ if (request.has(DISABLE_REWRITE)) {
+ return !request.get(DISABLE_REWRITE).asBoolean();
+ } else {
+ return true;
+ }
+ } catch (Exception ex) {
+ LOGGER.warn("cannot parse the disable rewrite option: [{}] to boolean from request json", DISABLE_REWRITE, ex);
+ }
+ }
+ return false;
+ }
+
/**
* Reset limit for selection query if it exceeds maxQuerySelectionLimit.
* @param brokerRequest
@@ -753,6 +786,18 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
}
}
+ private void addLowWaterMarkToQuery(BrokerRequest realtimeBrokerRequest, String rawTableName) {
+ final String realtimeTableName = rawTableName + "_REALTIME";
+ Map<Integer, Long> lowWaterMarks = _lwmService.getLowWaterMarks(realtimeTableName);
+ if (lowWaterMarks == null || lowWaterMarks.size() == 0) {
+ LOGGER.info("No low water marks info found for table {}", realtimeTableName);
+ return;
+ }
+ LOGGER.info("Found low water marks {} for table {}", String.valueOf(lowWaterMarks), realtimeTableName);
+ LowWaterMarkQueryWriter.addLowWaterMarkToQuery(realtimeBrokerRequest, lowWaterMarks);
+ LOGGER.info("Query augmented with LWMS info for table {} : {}", realtimeTableName, realtimeBrokerRequest);
+ }
+
/**
* Processes the optimized broker requests for both OFFLINE and REALTIME table.
*/
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/LowWaterMarkQueryWriter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/LowWaterMarkQueryWriter.java
new file mode 100644
index 0000000..c3285e5
--- /dev/null
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/LowWaterMarkQueryWriter.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.requesthandler;
+
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.request.FilterOperator;
+import org.apache.pinot.common.request.FilterQuery;
+import org.apache.pinot.common.request.FilterQueryMap;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// Add a lwm query to the filter query of a Pinot query for upsert enabled table.
+// Thread-Safe
+public class LowWaterMarkQueryWriter {
+ private static final Logger LOGGER = LoggerFactory.getLogger(LowWaterMarkQueryWriter.class);
+ private static final String VALID_FROM = "$validFrom";
+ private static final String VALID_UNTIL = "$validUntil";
+ // Normal Pinot query node uses positive IDs. So lwm query node ids are all negative.
+ private static final int QUERY_ID_BASE = -1000;
+
+ /**
+ * For upsert enabled tables, augment the realtime query with low water mark constraints in its filter query of the
+ * form
+ * ($validFrom <= lwm and $validFrom > -1) AND (lwm < $validUtil OR $validUtil = -1)
+ *
+ * @param realtimeBrokerRequest
+ * @param lowWaterMarks
+ */
+ public static void addLowWaterMarkToQuery(BrokerRequest realtimeBrokerRequest, Map<Integer, Long> lowWaterMarks) {
+ if (lowWaterMarks == null || lowWaterMarks.size() == 0) {
+ LOGGER.warn("No low water mark info found for query: {}", realtimeBrokerRequest);
+ return;
+ }
+
+ // Choose the min lwm among all partitions.
+ long minLwm = Collections.min(lowWaterMarks.values());
+
+ // 1. Build the low water mark query of the form for a table assuming lwm is the min LWM and -1 is used as
+ // uninitialized marker.
+ // ($validFrom <= lwm and $validFrom > -1) AND (lwm < $validUtil OR $validUtil = -1)
+ // -1 is used instead of Long.MAXVALUE because Pinot does not handle long arithmetic correctly.
+ FilterQuery lwmQuery = addSinglePartitionLowWaterMark(QUERY_ID_BASE - 1, realtimeBrokerRequest, minLwm);
+
+ // 2. Attach low water mark filter to the current filters.
+ FilterQuery currentFilterQuery = realtimeBrokerRequest.getFilterQuery();
+ if (currentFilterQuery != null) {
+ // Make an AND query of lwmQuery and the existing query.
+ FilterQuery andFilterQuery = new FilterQuery();
+ // Make sure we do not reuse any query id in lwmQuerys.
+ andFilterQuery.setId(QUERY_ID_BASE);
+ andFilterQuery.setOperator(FilterOperator.AND);
+ List<Integer> nestedFilterQueryIds = new ArrayList<>(2);
+ nestedFilterQueryIds.add(currentFilterQuery.getId());
+ nestedFilterQueryIds.add(lwmQuery.getId());
+ andFilterQuery.setNestedFilterQueryIds(nestedFilterQueryIds);
+
+ realtimeBrokerRequest.setFilterQuery(andFilterQuery);
+ FilterQueryMap filterSubQueryMap = realtimeBrokerRequest.getFilterSubQueryMap();
+ filterSubQueryMap.putToFilterQueryMap(lwmQuery.getId(), lwmQuery);
+ filterSubQueryMap.putToFilterQueryMap(andFilterQuery.getId(), andFilterQuery);
+ } else {
+ realtimeBrokerRequest.getFilterSubQueryMap().putToFilterQueryMap(lwmQuery.getId(), lwmQuery);
+ realtimeBrokerRequest.setFilterQuery(lwmQuery);
+ }
+ }
+
+ /**
+ *
+ * @param queryIdBase The starting id that will be assigned to the first query created in ths method.
+ * @param realtimeBrokerRequest
+ * @param lwm low water mark.
+ * @return a filter query corresponding to the low water mark constraint of a single partition. The general form is:
+ * ($ValidFrom <= lwm && $validFrom > -1) AND (lwm < $validUtil OR $validUtil = -1)
+ */
+ private static FilterQuery addSinglePartitionLowWaterMark(int queryIdBase, BrokerRequest realtimeBrokerRequest,
+ Long lwm) {
+ // ValidFromQuery: ($ValidFrom <= lwm && $validFrom > -1)
+ FilterQuery validFromFilterQuery = new FilterQuery();
+ // Important: Always decrement queryIdBase value after use to avoid id conflict.
+ validFromFilterQuery.setId(queryIdBase--);
+ validFromFilterQuery.setOperator(FilterOperator.AND);
+ FilterQuery validFromP1 = getLeafFilterQuery(VALID_FROM, queryIdBase--, "(*\t\t" + lwm + "]", FilterOperator.RANGE, realtimeBrokerRequest);
+ FilterQuery validFromP2 = getLeafFilterQuery(VALID_FROM, queryIdBase--, "(-1\t\t*)", FilterOperator.RANGE, realtimeBrokerRequest);
+ List<Integer> nestedQueriesIdForValidFrom = new ArrayList<>();
+ nestedQueriesIdForValidFrom.add(validFromP1.getId());
+ nestedQueriesIdForValidFrom.add(validFromP2.getId());
+ validFromFilterQuery.setNestedFilterQueryIds(nestedQueriesIdForValidFrom);
+
+ // ValidUtilQuery: (lwm < $validUtil OR $validUtil = -1)
+ FilterQuery validUtilFilterQuery = new FilterQuery();
+ validUtilFilterQuery.setId(queryIdBase--);
+ validUtilFilterQuery.setOperator(FilterOperator.OR);
+
+ FilterQuery validUtilP1 = getLeafFilterQuery(VALID_UNTIL, queryIdBase--, "(" + lwm + "\t\t*)", FilterOperator.RANGE, realtimeBrokerRequest);
+ FilterQuery validUtilP2 = getLeafFilterQuery(VALID_UNTIL, queryIdBase--, "-1", FilterOperator.EQUALITY, realtimeBrokerRequest);
+ List<Integer> nestedQueriesIdForValidUtil = new ArrayList<>();
+ nestedQueriesIdForValidUtil.add(validUtilP1.getId());
+ nestedQueriesIdForValidUtil.add(validUtilP2.getId());
+ validUtilFilterQuery.setNestedFilterQueryIds(nestedQueriesIdForValidUtil);
+
+ // Top level query: ValidFromQuery AND ValidUtilQuery
+ FilterQuery lwmQuery = new FilterQuery();
+ lwmQuery.setId(queryIdBase--);
+ lwmQuery.setOperator(FilterOperator.AND);
+ List<Integer> nestQids = new ArrayList<>();
+ nestQids.add(validFromFilterQuery.getId());
+ nestQids.add(validUtilFilterQuery.getId());
+ lwmQuery.setNestedFilterQueryIds(nestQids);
+
+ // Add all the new created queries to the query map.
+ realtimeBrokerRequest.getFilterSubQueryMap().putToFilterQueryMap(lwmQuery.getId(), lwmQuery);
+ realtimeBrokerRequest.getFilterSubQueryMap().putToFilterQueryMap(validFromFilterQuery.getId(), validFromFilterQuery);
+ realtimeBrokerRequest.getFilterSubQueryMap().putToFilterQueryMap(validUtilFilterQuery.getId(), validUtilFilterQuery);
+ return lwmQuery;
+ }
+
+ private static FilterQuery getLeafFilterQuery(String column, int id, String value, FilterOperator operator,
+ BrokerRequest realtimeBrokerRequest) {
+ FilterQuery filterQuery = new FilterQuery();
+ filterQuery.setColumn(column);
+ filterQuery.setId(id);
+ filterQuery.setValue(Collections.singletonList(value));
+ filterQuery.setOperator(operator);
+ if (realtimeBrokerRequest.getFilterSubQueryMap() == null) {
+ realtimeBrokerRequest.setFilterSubQueryMap(new FilterQueryMap());
+ }
+ realtimeBrokerRequest.getFilterSubQueryMap().putToFilterQueryMap(id, filterQuery);
+ return filterQuery;
+ }
+}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
index 844a200..95f943f 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
@@ -23,6 +23,8 @@ import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
+
+import org.apache.pinot.core.segment.updater.LowWaterMarkService;
import org.apache.commons.configuration.Configuration;
import org.apache.helix.ZNRecord;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
@@ -57,8 +59,10 @@ public class SingleConnectionBrokerRequestHandler extends BaseBrokerRequestHandl
public SingleConnectionBrokerRequestHandler(Configuration config, RoutingTable routingTable,
TimeBoundaryService timeBoundaryService, AccessControlFactory accessControlFactory,
- QueryQuotaManager queryQuotaManager, BrokerMetrics brokerMetrics, ZkHelixPropertyStore<ZNRecord> propertyStore) {
- super(config, routingTable, timeBoundaryService, accessControlFactory, queryQuotaManager, brokerMetrics, propertyStore);
+ QueryQuotaManager queryQuotaManager, BrokerMetrics brokerMetrics, ZkHelixPropertyStore<ZNRecord> propertyStore,
+ LowWaterMarkService lowWaterMarkService) {
+ super(config, routingTable, timeBoundaryService, accessControlFactory, queryQuotaManager, brokerMetrics,
+ propertyStore, lowWaterMarkService);
_queryRouter = new QueryRouter(_brokerId, brokerMetrics);
}
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/upsert/DefaultUpsertComponentContainer.java b/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultLowWaterMarkService.java
similarity index 57%
copy from pinot-server/src/main/java/org/apache/pinot/server/upsert/DefaultUpsertComponentContainer.java
copy to pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultLowWaterMarkService.java
index 63424ac..94b3be4 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/upsert/DefaultUpsertComponentContainer.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultLowWaterMarkService.java
@@ -16,34 +16,32 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.server.upsert;
+package org.apache.pinot.broker.upsert;
-import com.codahale.metrics.MetricRegistry;
-import org.apache.commons.configuration.Configuration;
+import com.google.common.collect.ImmutableMap;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.core.segment.updater.LowWaterMarkService;
-public class DefaultUpsertComponentContainer implements UpsertComponentContainer {
+import java.util.Map;
- private final SegmentDeletionHandler deletionHandler = new SegmentDeletionHandler();
+public class DefaultLowWaterMarkService implements LowWaterMarkService {
@Override
- public void registerMetrics(MetricRegistry registry) {
-
- }
-
- @Override
- public void init(Configuration config) {
+ public void init(HelixDataAccessor helixDataAccessor, String helixClusterName, int serverPollingInterval,
+ int serverPort){
}
@Override
- public SegmentDeletionHandler getSegmentDeletionHandler() {
- return deletionHandler;
+ public Map<Integer, Long> getLowWaterMarks(String tableName) {
+ return ImmutableMap.of();
}
@Override
- public void start() {
+ public void shutDown() {
}
@Override
- public void stop() {
+ public void start(BrokerMetrics brokerMetrics) {
}
}
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainerProvider.java b/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/LowWaterMarkServiceProvider.java
similarity index 50%
copy from pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainerProvider.java
copy to pinot-broker/src/main/java/org/apache/pinot/broker/upsert/LowWaterMarkServiceProvider.java
index fe4af07..f5c06f3 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainerProvider.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/LowWaterMarkServiceProvider.java
@@ -16,43 +16,48 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.server.upsert;
+package org.apache.pinot.broker.upsert;
import com.google.common.base.Preconditions;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.apache.pinot.core.segment.updater.WatermarkManager;
-import org.apache.pinot.grigio.common.metrics.GrigioMetrics;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.core.segment.updater.LowWaterMarkService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class UpsertComponentContainerProvider {
+import static org.apache.pinot.common.utils.CommonConstants.Broker.*;
- private static final Logger LOGGER = LoggerFactory.getLogger(UpsertComponentContainerProvider.class);
- public static final String UPSERT_COMPONENT_CONFIG_KEY = "watermarkManager.class";
- public static final String UPSERT_COMPONENT_CONFIG_DEFAULT = DefaultUpsertComponentContainer.class.getName();
+public class LowWaterMarkServiceProvider {
- private final Configuration _conf;
- private UpsertComponentContainer _instance;
+ private static final Logger LOGGER = LoggerFactory.getLogger(LowWaterMarkServiceProvider.class);
- public UpsertComponentContainerProvider(Configuration conf, GrigioMetrics metrics) {
- _conf = conf;
- String className = _conf.getString(UPSERT_COMPONENT_CONFIG_KEY, UPSERT_COMPONENT_CONFIG_DEFAULT);
+ private LowWaterMarkService _instance;
+
+ public LowWaterMarkServiceProvider(Configuration brokerConfig, HelixDataAccessor dataAccessor, String clusterName) {
+ String className = brokerConfig.getString(CommonConstants.Broker.CONFIG_OF_BROKER_LWMS_CLASS_NAME,
+ DefaultLowWaterMarkService.class.getName());
LOGGER.info("creating watermark manager with class {}", className);
try {
- Class<UpsertComponentContainer> comonentContainerClass = (Class<UpsertComponentContainer>) Class.forName(className);
- Preconditions.checkState(comonentContainerClass.isAssignableFrom(WatermarkManager.class),
- "configured class not assignable from Callback class");
+ Class<LowWaterMarkService> comonentContainerClass = (Class<LowWaterMarkService>) Class.forName(className);
+ Preconditions.checkState(comonentContainerClass.isAssignableFrom(LowWaterMarkService.class),
+ "configured class not assignable from LowWaterMarkService class");
_instance = comonentContainerClass.newInstance();
- _instance.init(_conf);
+ _instance.init(dataAccessor, clusterName,
+ brokerConfig.getInt(CONFIG_OF_BROKER_POLLING_SERVER_LWMS_INTERVAL_MS,
+ DEFAULT_OF_BROKER_POLLING_SERVER_LWMS_INTERVAL_MS),
+ brokerConfig.getInt(CONFIG_OF_BROKER_POLLING_SERVER_LWMS_SERVER_PORT,
+ CommonConstants.Server.DEFAULT_ADMIN_API_PORT));
} catch (Exception e) {
LOGGER.error("failed to load watermark manager class", className, e);
+ _instance = null;
ExceptionUtils.rethrow(e);
}
}
- public UpsertComponentContainer getInstance() {
+ public LowWaterMarkService getInstance() {
return _instance;
}
}
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LowWaterMarkQueryWriterTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LowWaterMarkQueryWriterTest.java
new file mode 100644
index 0000000..1021477
--- /dev/null
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LowWaterMarkQueryWriterTest.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.requesthandler;
+
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.request.FilterOperator;
+import org.apache.pinot.common.request.FilterQuery;
+import org.apache.pinot.pql.parsers.Pql2Compiler;
+import org.apache.thrift.TException;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class LowWaterMarkQueryWriterTest {
+ @Test
+ public void testRewriteQueryWithoutExistingFilters() throws Exception{
+ Pql2Compiler pql2Compiler = new Pql2Compiler();
+ BrokerRequest req = pql2Compiler.compileToBrokerRequest("SELECT * FROM T");
+ Assert.assertFalse(req.isSetFilterQuery());
+ Map<Integer, Long> lwms = new HashMap<>();
+ lwms.put(0, 10L);
+ lwms.put(1, 20L);
+ LowWaterMarkQueryWriter.addLowWaterMarkToQuery(req, lwms);
+ Assert.assertTrue(req.isSetFilterQuery());
+ try {
+ req.validate();
+ } catch (TException e) {
+ Assert.fail("Query after low water mark query is not valid: ", e);
+ }
+ // Verify there are in total 7 filter query nodes in the filter query tree.
+ Map<Integer,FilterQuery> filterSubQueryMap = req.getFilterSubQueryMap().getFilterQueryMap();
+ Assert.assertEquals(filterSubQueryMap.size(), 7);
+
+ Integer lwmQueryId = req.getFilterQuery().getId();
+ // 1. Verify the low water mark query.
+ FilterQuery lwmQuery = filterSubQueryMap.get(lwmQueryId);
+ verifyNoneTerminalFilterQuery(lwmQuery, FilterOperator.AND, 2);
+ FilterQuery validFrom1Query = filterSubQueryMap.get(lwmQuery.getNestedFilterQueryIds().get(0));
+ FilterQuery validTo1Query = filterSubQueryMap.get(lwmQuery.getNestedFilterQueryIds().get(1));
+
+ // Verify the subtree (i.e., an AND with two nodes) for the $validFrom column.
+ verifyNoneTerminalFilterQuery(validFrom1Query, FilterOperator.AND, 2);
+ verifyTerminalFilterQuery(filterSubQueryMap.get(validFrom1Query.getNestedFilterQueryIds().get(0)),
+ "$validFrom", "(*\t\t10]", FilterOperator.RANGE);
+ verifyTerminalFilterQuery(filterSubQueryMap.get(validFrom1Query.getNestedFilterQueryIds().get(1)),
+ "$validFrom", "(-1\t\t*)", FilterOperator.RANGE);
+
+ // Verify the subtree (i.e., an OR with two nodes) for the $validutil column.
+ verifyNoneTerminalFilterQuery(validTo1Query, FilterOperator.OR, 2);
+ verifyTerminalFilterQuery(filterSubQueryMap.get(validTo1Query.getNestedFilterQueryIds().get(0)),
+ "$validUntil", "(10\t\t*)", FilterOperator.RANGE);
+ verifyTerminalFilterQuery(filterSubQueryMap.get(validTo1Query.getNestedFilterQueryIds().get(1)),
+ "$validUntil", "-1", FilterOperator.EQUALITY);
+ }
+
+ @Test
+ public void testRewriteQueryWithExistingFilters() {
+ Pql2Compiler pql2Compiler = new Pql2Compiler();
+ BrokerRequest req = pql2Compiler.compileToBrokerRequest("SELECT * FROM T WHERE A < 4");
+ Assert.assertTrue(req.isSetFilterQuery());
+ Map<Integer, Long> lwms = new HashMap<>();
+ lwms.put(0, 10L);
+ lwms.put(1, 20L);
+ LowWaterMarkQueryWriter.addLowWaterMarkToQuery(req, lwms);
+ Assert.assertTrue(req.isSetFilterQuery());
+ try {
+ req.validate();
+ } catch (TException e) {
+ Assert.fail("Query after low water mark query is not valid: ", e);
+ }
+ // Verify there are in total 9 filter query nodes in the filter query tree.
+ Map<Integer,FilterQuery> filterSubQueryMap = req.getFilterSubQueryMap().getFilterQueryMap();
+ Assert.assertEquals(filterSubQueryMap.size(), 9);
+ // 0. Verify there are one top level filter of operator OR with two sub filter queries.
+ FilterQuery rootFilterQuery = req.getFilterQuery();
+ verifyNoneTerminalFilterQuery(rootFilterQuery, FilterOperator.AND, 2);
+ // 1. Verify the existing filter query A < 4 is not affected.
+ verifyTerminalFilterQuery(filterSubQueryMap.get(rootFilterQuery.getNestedFilterQueryIds().get(0)), "A", "(*\t\t4)", FilterOperator.RANGE);
+
+ FilterQuery lowWaterMarkQuery = filterSubQueryMap.get(rootFilterQuery.getNestedFilterQueryIds().get(1));
+ // Verify the lwm query
+ verifyNoneTerminalFilterQuery(lowWaterMarkQuery, FilterOperator.AND, 2);
+ FilterQuery validFrom1Query = filterSubQueryMap.get(lowWaterMarkQuery.getNestedFilterQueryIds().get(0));
+ FilterQuery validTo1Query = filterSubQueryMap.get(lowWaterMarkQuery.getNestedFilterQueryIds().get(1));
+
+ // Verify the subtree (i.e., an AND with two nodes) for the $validFrom column.
+ verifyNoneTerminalFilterQuery(validFrom1Query, FilterOperator.AND, 2);
+ verifyTerminalFilterQuery(filterSubQueryMap.get(validFrom1Query.getNestedFilterQueryIds().get(0)),
+ "$validFrom", "(*\t\t10]", FilterOperator.RANGE);
+ verifyTerminalFilterQuery(filterSubQueryMap.get(validFrom1Query.getNestedFilterQueryIds().get(1)),
+ "$validFrom", "(-1\t\t*)", FilterOperator.RANGE);
+
+ // Verify the subtree (i.e., an OR with two nodes) for the $validutil column.
+ verifyNoneTerminalFilterQuery(validTo1Query, FilterOperator.OR, 2);
+ verifyTerminalFilterQuery(filterSubQueryMap.get(validTo1Query.getNestedFilterQueryIds().get(0)),
+ "$validUntil", "(10\t\t*)", FilterOperator.RANGE);
+ verifyTerminalFilterQuery(filterSubQueryMap.get(validTo1Query.getNestedFilterQueryIds().get(1)),
+ "$validUntil", "-1", FilterOperator.EQUALITY);
+ }
+
+ private void verifyTerminalFilterQuery(FilterQuery filterQuery, String column, String value, FilterOperator op) {
+ Assert.assertEquals(filterQuery.getColumn(), column);
+ Assert.assertEquals(filterQuery.getValue(), Collections.singletonList(value));
+ Assert.assertEquals(filterQuery.getOperator(), op);
+ }
+
+ private void verifyNoneTerminalFilterQuery(FilterQuery filterQuery, FilterOperator op, int numOfChildQueries) {
+ Assert.assertEquals(filterQuery.getOperator(), op);
+ Assert.assertEquals(filterQuery.getNestedFilterQueryIdsSize(), numOfChildQueries);
+ }
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index a6f242c..0e65779 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -175,7 +175,9 @@ public class CommonConstants {
public static final String CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE = "pinot.broker.enable.query.limit.override";
public static final String CONFIG_OF_BROKER_POLLING_SERVER_LWMS_INTERVAL_MS = "pinot.broker.query.polling.server.lwms.interval.ms";
+ public static final int DEFAULT_OF_BROKER_POLLING_SERVER_LWMS_INTERVAL_MS = 5 * 1_000;
public static final String CONFIG_OF_BROKER_POLLING_SERVER_LWMS_SERVER_PORT = "pinot.broker.query.polling.server.lwms.port";
+ public static final String CONFIG_OF_BROKER_LWMS_CLASS_NAME = "pinot.broker.lwms.classname";
public static final String CONFIG_OF_BROKER_LWM_REWRITE_ENABLE = "pinot.broker.query.lwm.rewrite";
public static final boolean CONFIG_OF_BROKER_LWM_REWRITE_ENABLE_DEFAULT = true;
public static class Request {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index a494e3f..9cce40d 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -112,6 +112,18 @@ import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
public class PinotHelixResourceManager {
private static final Logger LOGGER = LoggerFactory.getLogger(PinotHelixResourceManager.class);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WatermarkManager.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/DefaultWaterMarkManager.java
similarity index 71%
copy from pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WatermarkManager.java
copy to pinot-core/src/main/java/org/apache/pinot/core/segment/updater/DefaultWaterMarkManager.java
index 01579e7..b45060c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WatermarkManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/DefaultWaterMarkManager.java
@@ -18,16 +18,23 @@
*/
package org.apache.pinot.core.segment.updater;
+import com.google.common.collect.ImmutableMap;
import org.apache.commons.configuration.Configuration;
-import org.apache.pinot.grigio.common.metrics.GrigioMeter;
import org.apache.pinot.grigio.common.metrics.GrigioMetrics;
import java.util.Map;
-public interface WatermarkManager {
+public class DefaultWaterMarkManager implements WaterMarkManager {
- void init(Configuration config, GrigioMetrics metrics);
+ private static final Map<String, Map<Integer, Long>> DEFAULT_MAP = ImmutableMap.of();
- Map<String, Map<Integer, Long>> getHighWaterMarkTablePartitionMap();
+ @Override
+ public void init(Configuration config, GrigioMetrics metrics) {
+ }
+
+ @Override
+ public Map<String, Map<Integer, Long>> getHighWaterMarkTablePartitionMap() {
+ return DEFAULT_MAP;
+ }
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WatermarkManager.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/LowWaterMarkService.java
similarity index 56%
copy from pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WatermarkManager.java
copy to pinot-core/src/main/java/org/apache/pinot/core/segment/updater/LowWaterMarkService.java
index 01579e7..2fd7434 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WatermarkManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/LowWaterMarkService.java
@@ -18,16 +18,25 @@
*/
package org.apache.pinot.core.segment.updater;
-import org.apache.commons.configuration.Configuration;
-import org.apache.pinot.grigio.common.metrics.GrigioMeter;
-import org.apache.pinot.grigio.common.metrics.GrigioMetrics;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.pinot.common.metrics.BrokerMetrics;
import java.util.Map;
-public interface WatermarkManager {
+/**
+ * LowWaterMarkService keeps records of the low water mark (i.e., the stream ingestion progress) for each partition of
+ * an input table.
+ */
+public interface LowWaterMarkService {
+
+ void init(HelixDataAccessor helixDataAccessor, String helixClusterName, int serverPollingInterval, int serverPort);
- void init(Configuration config, GrigioMetrics metrics);
+ // Return the low water mark mapping from partition id to the corresponding low water mark of a given table.
+ Map<Integer, Long> getLowWaterMarks(String tableName);
- Map<String, Map<Integer, Long>> getHighWaterMarkTablePartitionMap();
+ // Shutdown the service.
+ void shutDown();
+ // start
+ void start(BrokerMetrics brokerMetrics);
}
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/upsert/SegmentDeletionHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/SegmentDeletionHandler.java
similarity index 94%
rename from pinot-server/src/main/java/org/apache/pinot/server/upsert/SegmentDeletionHandler.java
rename to pinot-core/src/main/java/org/apache/pinot/core/segment/updater/SegmentDeletionHandler.java
index 1aba06b..1db6f84 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/upsert/SegmentDeletionHandler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/SegmentDeletionHandler.java
@@ -16,10 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.server.upsert;
+package org.apache.pinot.core.segment.updater;
import com.google.common.collect.ImmutableList;
-import org.apache.pinot.core.segment.updater.SegmentDeletionListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertComponentContainer.java
similarity index 67%
rename from pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainer.java
rename to pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertComponentContainer.java
index 7f636fc..eb64285 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertComponentContainer.java
@@ -16,20 +16,27 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.server.upsert;
+package org.apache.pinot.core.segment.updater;
-import com.codahale.metrics.MetricRegistry;
+import com.yammer.metrics.core.MetricsRegistry;
import org.apache.commons.configuration.Configuration;
+import org.apache.helix.HelixManager;
public interface UpsertComponentContainer {
- void registerMetrics(MetricRegistry registry);
+ void registerMetrics(String prefix, MetricsRegistry registry);
- void init(Configuration config);
+ void init(Configuration config, HelixManager helixManager, String clusterName, String instanceName);
+
+ void startBackgroundThread();
+
+ void stopBackgroundThread();
+
+ void shutdown();
SegmentDeletionHandler getSegmentDeletionHandler();
- void start();
+ WaterMarkManager getWatermarkManager();
- void stop();
+ boolean isUpsertEnabled();
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WatermarkManager.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WaterMarkManager.java
similarity index 96%
copy from pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WatermarkManager.java
copy to pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WaterMarkManager.java
index 01579e7..acc8479 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WatermarkManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WaterMarkManager.java
@@ -24,10 +24,11 @@ import org.apache.pinot.grigio.common.metrics.GrigioMetrics;
import java.util.Map;
-public interface WatermarkManager {
+public interface WaterMarkManager {
void init(Configuration config, GrigioMetrics metrics);
Map<String, Map<Integer, Long>> getHighWaterMarkTablePartitionMap();
+
}
diff --git a/pinot-grigio/pinot-grigio-provided/pom.xml b/pinot-grigio/pinot-grigio-provided/pom.xml
index 5643070..ec7b4ca 100644
--- a/pinot-grigio/pinot-grigio-provided/pom.xml
+++ b/pinot-grigio/pinot-grigio-provided/pom.xml
@@ -52,8 +52,40 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
- </dependencies>
-
-
+ <dependency>
+ <groupId>com.github.tomakehurst</groupId>
+ <artifactId>wiremock-jre8</artifactId>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.ow2.asm</groupId>
+ <artifactId>asm</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.pinot</groupId>
+ <artifactId>pinot-broker</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.pinot</groupId>
+ <artifactId>pinot-controller</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.pinot</groupId>
+ <artifactId>pinot-controller</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
</project>
\ No newline at end of file
diff --git a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/PollingBasedLowWaterMarkService.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/PollingBasedLowWaterMarkService.java
new file mode 100644
index 0000000..f7e88c9
--- /dev/null
+++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/PollingBasedLowWaterMarkService.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.upsert;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyPathBuilder;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkCacheBaseDataAccessor;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.metrics.BrokerGauge;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.restlet.resources.TableLowWaterMarksInfo;
+import org.apache.pinot.core.segment.updater.LowWaterMarkService;
+import org.glassfish.jersey.client.ClientProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.WebTarget;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+// A low water mark service which polls various Pinot servers periodically to get the low water marks for partitions of
+// servers.
+public class PollingBasedLowWaterMarkService implements LowWaterMarkService {
+ private static final Logger LOGGER = LoggerFactory.getLogger(PollingBasedLowWaterMarkService.class);
+
+ private static final String LWMS_PATH = "lwms";
+ private static final String HTTP = "http";
+
+ private static final int SERVER_CONNENCT_TIMEOUT_MS = 10000;
+ private static final int SERVER_READ_TIMEOUT = 10000;
+ private static final String SERVER_PREFIX = "Server_";
+
+ // A map from table_name to its partition->lwm mapping.
+ private Map<String, Map<Integer, Long>> _tableLowWaterMarks;
+ private ZkCacheBaseDataAccessor<ZNRecord> _cacheInstanceConfigsDataAccessor;
+ private Client _httpClient;
+ // We can tune this polling interval to make sure we get the fresh snapshot of server low water marks.
+ private int _serverPollingInterval;
+ private int _serverPort;
+ private boolean _shuttingDown;
+ private BrokerMetrics _brokerMetrics;
+
+ @Override
+ public void init(HelixDataAccessor helixDataAccessor, String helixClusterName, int serverPollingInterval, int serverPort) {
+ // Construct the zk path to get the server instances.
+ String instanceConfigs = PropertyPathBuilder.instanceConfig(helixClusterName);
+ // Build a zk data reader.
+ _cacheInstanceConfigsDataAccessor =
+ new ZkCacheBaseDataAccessor<>((ZkBaseDataAccessor<ZNRecord>) helixDataAccessor.getBaseDataAccessor(),
+ instanceConfigs, null, Collections.singletonList(instanceConfigs));
+ _tableLowWaterMarks = new ConcurrentHashMap<>();
+ _httpClient = ClientBuilder.newClient();
+ _httpClient.property(ClientProperties.CONNECT_TIMEOUT, SERVER_CONNENCT_TIMEOUT_MS);
+ _httpClient.property(ClientProperties.READ_TIMEOUT, SERVER_READ_TIMEOUT);
+ _serverPollingInterval = serverPollingInterval;
+ _serverPort = serverPort;
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ try {
+ shutDown();
+ } catch (final Exception e) {
+ LOGGER.error("Caught exception while running shutdown hook", e);
+ }
+ }
+ });
+ }
+
+ @Override
+ public void start(BrokerMetrics brokerMetrics) {
+ _brokerMetrics = brokerMetrics;
+ Thread serverPollingThread = new Thread(new PinotServerPollingExecutor());
+ serverPollingThread.start();
+ }
+
+ @Override
+ public Map<Integer, Long> getLowWaterMarks(String tableName) {
+ return _tableLowWaterMarks.get(tableName);
+ }
+
+ @Override
+ public void shutDown() {
+ _shuttingDown = true;
+ }
+
+ // Poll all the servers periodically to find out the Low Water Mark info.
+ private class PinotServerPollingExecutor implements Runnable {
+ @Override
+ public void run() {
+ while (!_shuttingDown) {
+ try {
+ Map<String, Map<Integer, Long>> latestLowWaterMarks = new ConcurrentHashMap<>();
+ // 1. Find out all the alive servers.
+ List<String> serverInstances = _cacheInstanceConfigsDataAccessor.getChildNames("/", AccessOption.PERSISTENT);
+ List<ZNRecord> instances = _cacheInstanceConfigsDataAccessor.getChildren("/", null, AccessOption.PERSISTENT);
+ for (ZNRecord r : instances) {
+ LOGGER.info("Instance info for lwms: {}", r.toString());
+ }
+ // 2. Ask each server for its low water mark info.
+ for (String serverIntanceId : serverInstances) {
+ // Check the instance is in fact a server.
+ if (!serverIntanceId.startsWith(SERVER_PREFIX) && !serverIntanceId.startsWith("server_"))
+ continue;
+ InstanceConfig serverConfig = InstanceConfig.toInstanceConfig(serverIntanceId.substring(
+ SERVER_PREFIX.length()));
+ try {
+ // (TODO) Fixing this. Hardcode using the default server admin port for now.
+ WebTarget webTarget = _httpClient.target(getURI(serverConfig.getHostName(), _serverPort));
+ TableLowWaterMarksInfo lwms = webTarget.path(PollingBasedLowWaterMarkService.LWMS_PATH).request().
+ get(TableLowWaterMarksInfo.class);
+ LOGGER.info("Found low water mark info for server {}: {}", serverIntanceId, lwms.getTableLowWaterMarks());
+ // 3. Update the low water marks.
+ LwmMerger.updateLowWaterMarks(latestLowWaterMarks, lwms.getTableLowWaterMarks());
+ } catch (Exception e) {
+ // TODO(tingchen) Handle server failures. We could keep the last known lwms of a server.
+ LOGGER.warn("Error during getting low water marks from server {}", serverIntanceId, e);
+ }
+ }
+ // 4. Replace the broker's low water marks table with the latest low water mark info.
+ if (validate(latestLowWaterMarks)) {
+ _tableLowWaterMarks = latestLowWaterMarks;
+ }
+ // 5. Sleep for some interval.
+ Thread.sleep(_serverPollingInterval);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ // It is OK for us to break out the loop early because the Low Water Mark refresh is best effort.
+ break;
+ }
+ }
+ }
+
+ // Validate the low water mark info polled from all the servers are right. For now, return true.
+ // (TODO tingchen) figure out the right checks.
+ private boolean validate(Map<String, Map<Integer, Long>> latestLowWaterMarks) {
+ if (latestLowWaterMarks == null) {
+ _brokerMetrics.addMeteredGlobalValue(BrokerMeter.LOW_WATER_MARK_QUERY_FAILURES, 1);
+ return false;
+ }
+ for(String tableName : latestLowWaterMarks.keySet()) {
+ Map<Integer, Long> partitionLWMs = latestLowWaterMarks.get(tableName);
+ _brokerMetrics.addValueToTableGauge(tableName, BrokerGauge.TABLE_MIN_LOW_WATER_MARK,
+ Collections.min(partitionLWMs.values()));
+ }
+ return true;
+ }
+
+ private URI getURI(String host, int port) throws URISyntaxException {
+ LOGGER.info("requesting host {} and port {}", host, port);
+ return new URI(PollingBasedLowWaterMarkService.HTTP, null, host, port, null
+ , null, null);
+ }
+ }
+
+ static class LwmMerger {
+ // Update an existing map currentLwmsMap of tableName->low_water_marks with a new map of the same type.
+ // If an entry in the new map does not exist in currentLwmsMap, insert it to currentLwmsMap.
+ // otherwise merge the entry with the existing entry in currentLwmsMap using mergeTablePartitionLwms().
+ static void updateLowWaterMarks(Map<String, Map<Integer, Long>> currentLwmsMap,
+ final Map<String, Map<Integer, Long>> serverLwmsMap) {
+ for (Map.Entry<String, Map<Integer, Long>> serverMap : serverLwmsMap.entrySet()) {
+ String tableName = serverMap.getKey();
+ Map<Integer, Long> tableLwms = serverMap.getValue();
+ if (currentLwmsMap.containsKey(tableName)) {
+ currentLwmsMap.put(tableName,
+ LwmMerger.mergeTablePartitionLwms(Collections.unmodifiableMap(currentLwmsMap.get(tableName)),
+ tableLwms));
+ } else {
+ currentLwmsMap.put(tableName, tableLwms);
+ }
+ }
+ }
+
+ // Merge all the entries in the two input maps of partition_id->lwm.
+ // If an entry exists only in a map, put it in the combined map.
+ // If an entry exists in both maps, use the entry with the smaller low water marks.
+ static Map<Integer, Long> mergeTablePartitionLwms(final Map<Integer, Long> m1, final Map<Integer, Long> m2) {
+ if (m1 == null || m1.size() == 0) {
+ return m2;
+ }
+ if (m2 == null || m2.size() == 0) {
+ return m1;
+ }
+ Map<Integer, Long> result = new HashMap<>(m1);
+ for (Map.Entry<Integer, Long> entry : m2.entrySet()) {
+ Integer partitionNo = entry.getKey();
+ Long lwm = entry.getValue();
+ if (result.containsKey(partitionNo)) {
+ result.put(partitionNo, Math.min(lwm, result.get(partitionNo)));
+ } else {
+ result.put(partitionNo, lwm);
+ }
+ }
+ return result;
+ }
+ }
+}
diff --git a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertImmutableIndexSegmentCallback.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertImmutableIndexSegmentCallback.java
index 1ddc963..b7fcdb8 100644
--- a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertImmutableIndexSegmentCallback.java
+++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertImmutableIndexSegmentCallback.java
@@ -29,7 +29,7 @@ import org.apache.pinot.core.io.reader.BaseSingleColumnSingleValueReader;
import org.apache.pinot.core.io.reader.DataFileReader;
import org.apache.pinot.core.segment.index.column.ColumnIndexContainer;
import org.apache.pinot.core.segment.index.readers.Dictionary;
-import org.apache.pinot.core.segment.updater.UpsertWatermarkManager;
+import org.apache.pinot.core.segment.updater.UpsertWaterMarkManager;
import org.apache.pinot.core.segment.virtualcolumn.mutable.VirtualColumnLongValueReaderWriter;
import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntry;
import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntrySet;
@@ -50,7 +50,7 @@ public class UpsertImmutableIndexSegmentCallback implements IndexSegmentCallback
private String _segmentName;
private int _totalDoc;
private long _minSourceOffset;
- private UpsertWatermarkManager _upsertWatermarkManager;
+ private UpsertWaterMarkManager _upsertWatermarkManager;
private UpdateLogStorageProvider _updateLogStorageProvider;
// use array for mapping bewteen offset to docId, where actual offset = min_offset + array_index
// use 4 bytes per record
@@ -67,7 +67,7 @@ public class UpsertImmutableIndexSegmentCallback implements IndexSegmentCallback
CommonConstants.Helix.TableType.REALTIME);
_segmentName = segmentMetadata.getName();
_totalDoc = segmentMetadata.getTotalDocs();
- _upsertWatermarkManager = UpsertWatermarkManager.getInstance();
+ _upsertWatermarkManager = UpsertWaterMarkManager.getInstance();
_updateLogStorageProvider = UpdateLogStorageProvider.getInstance();
_virtualColumnsReaderWriter = new ArrayList<>();
for (DataFileReader reader: virtualColumnIndexReader.values()) {
@@ -80,7 +80,7 @@ public class UpsertImmutableIndexSegmentCallback implements IndexSegmentCallback
*/
@VisibleForTesting
protected void init(List<VirtualColumnLongValueReaderWriter> readerWriters,
- int totalDoc, UpsertWatermarkManager manager,
+ int totalDoc, UpsertWaterMarkManager manager,
UpdateLogStorageProvider updateLogStorageProvider,
long minSourceOffset, int[] offsetToDocId) {
_tableNameWithType = "testTable";
diff --git a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertMutableIndexSegmentCallback.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertMutableIndexSegmentCallback.java
index eeda5a6..6eae5fb 100644
--- a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertMutableIndexSegmentCallback.java
+++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertMutableIndexSegmentCallback.java
@@ -24,7 +24,7 @@ import org.apache.pinot.common.segment.SegmentMetadata;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.core.io.reader.DataFileReader;
import org.apache.pinot.core.segment.index.column.ColumnIndexContainer;
-import org.apache.pinot.core.segment.updater.UpsertWatermarkManager;
+import org.apache.pinot.core.segment.updater.UpsertWaterMarkManager;
import org.apache.pinot.core.segment.virtualcolumn.mutable.VirtualColumnLongValueReaderWriter;
import org.apache.pinot.grigio.common.messages.LogEventType;
import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntry;
@@ -48,7 +48,7 @@ public class UpsertMutableIndexSegmentCallback implements IndexSegmentCallback {
private String _tableName;
private String _segmentName;
private Schema _schema;
- private UpsertWatermarkManager _upsertWatermarkManager;
+ private UpsertWaterMarkManager _upsertWatermarkManager;
private String _offsetColumnName;
private final List<VirtualColumnLongValueReaderWriter> _mutableSegmentReaderWriters = new ArrayList<>();
// use map for mapping between kafka offset and docId because we at-most have 1 mutable segment per consumer
@@ -75,7 +75,7 @@ public class UpsertMutableIndexSegmentCallback implements IndexSegmentCallback {
_mutableSegmentReaderWriters.add((VirtualColumnLongValueReaderWriter) reader);
}
}
- _upsertWatermarkManager = UpsertWatermarkManager.getInstance();
+ _upsertWatermarkManager = UpsertWaterMarkManager.getInstance();
LOGGER.info("starting upsert segment with {} reader writer", _mutableSegmentReaderWriters.size());
}
diff --git a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdater.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdater.java
index 530cd0d..5f0aa43 100644
--- a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdater.java
+++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdater.java
@@ -91,7 +91,7 @@ public class SegmentUpdater implements SegmentDeletionListener {
_topicPrefix = conf.getString(SegmentUpdaterConfig.INPUT_TOPIC_PREFIX);
_updateSleepMs = conf.getInt(SegmentUpdaterConfig.SEGMENT_UDPATE_SLEEP_MS,
SegmentUpdaterConfig.SEGMENT_UDPATE_SLEEP_MS_DEFAULT);
- UpsertWatermarkManager.init(metrics);
+ UpsertWaterMarkManager.init(metrics);
_consumer = provider.getConsumer();
_ingestionExecutorService = Executors.newFixedThreadPool(1);
_updateLogStorageProvider = UpdateLogStorageProvider.getInstance();
@@ -179,7 +179,7 @@ public class SegmentUpdater implements SegmentDeletionListener {
if (System.currentTimeMillis() - lastReportedTime > LOGGER_TIME_GAP_MS) {
lastReportedTime = System.currentTimeMillis();
LOGGER.info("processed {} messages in {} ms", eventCount, System.currentTimeMillis() - loopStartTime);
- LOGGER.info("latest high water mark is {}", UpsertWatermarkManager.getInstance().toString());
+ LOGGER.info("latest high water mark is {}", UpsertWaterMarkManager.getInstance().toString());
}
_consumer.ackOffset();
_metrics.addTimedValueMs(GrigioTimer.SEGMENT_UPDATER_LOOP_TIME, System.currentTimeMillis() - loopStartTime);
diff --git a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/UpsertComponentContainerImpl.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/UpsertComponentContainerImpl.java
new file mode 100644
index 0000000..20758f0
--- /dev/null
+++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/UpsertComponentContainerImpl.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.updater;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.yammer.metrics.core.MetricsRegistry;
+import org.apache.commons.configuration.Configuration;
+import org.apache.helix.HelixManager;
+import org.apache.pinot.grigio.common.metrics.GrigioMetrics;
+import org.apache.pinot.grigio.common.storageProvider.UpdateLogStorageProvider;
+import org.apache.pinot.grigio.common.storageProvider.retentionManager.UpdateLogRetentionManager;
+import org.apache.pinot.grigio.common.storageProvider.retentionManager.UpdateLogRetentionManagerImpl;
+import org.apache.pinot.grigio.common.utils.IdealStateHelper;
+import org.apache.pinot.grigio.servers.GrigioServerMetrics;
+import org.apache.pinot.grigio.servers.KeyCoordinatorProvider;
+import org.apache.pinot.grigio.servers.SegmentUpdaterProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.pinot.common.utils.CommonConstants.Grigio.PINOT_UPSERT_SERVER_COMPONENT_PREFIX;
+
+public class UpsertComponentContainerImpl implements UpsertComponentContainer {
+ private static final Logger LOGGER = LoggerFactory.getLogger(UpsertComponentContainerImpl.class);
+
+ // config keys
+ public static final String ENABLED_CONFIG_KEY = "enabled";
+ public static final String STORAGE_CONFIG_KEY = "storage";
+ public static final String HOST_NAME_CONFIG_KEY = "hostname";
+ public static final String KC_CONFIG_KEY = "kc";
+ public static final String UPDATER_CONFIG_KEY = "updater";
+
+ private volatile boolean _isUpsertEnabled = false;
+ private AtomicBoolean _inited = new AtomicBoolean(false);
+
+ // members of related upsert components
+ private Configuration _conf;
+ private String _hostName;
+ private GrigioMetrics _grigioMetrics;
+ private KeyCoordinatorProvider _keyCoordinatorProvider;
+ private SegmentUpdaterProvider _segmentUpdaterProvider;
+ private SegmentUpdater _segmentUpdater;
+ private UpdateLogRetentionManager _retentionManager;
+ private SegmentDeletionHandler _segmentDeletionHandler;
+ private WaterMarkManager _waterMarkManager;
+
+ @Override
+ public void registerMetrics(String prefix, MetricsRegistry registry) {
+ _grigioMetrics = new GrigioServerMetrics(prefix + PINOT_UPSERT_SERVER_COMPONENT_PREFIX, registry);
+ _grigioMetrics.initializeGlobalMeters();
+ }
+
+ @Override
+ public void init(Configuration config, HelixManager helixManager, String clusterName, String instanceName) {
+ Preconditions.checkState(!_inited.getAndSet(true), "cannot initialize upsert component twice");
+ _isUpsertEnabled = config.getBoolean(ENABLED_CONFIG_KEY, false);
+ if (_isUpsertEnabled) {
+ LOGGER.info("initializing upsert components");
+ _conf = config;
+ _hostName = _conf.getString(HOST_NAME_CONFIG_KEY);
+ initVirtualColumnStorageProvider(config);
+ _keyCoordinatorProvider = buildKeyCoordinatorProvider(config, _hostName);
+ _segmentUpdaterProvider = buildSegmentUpdaterProvider(config, _hostName);
+ _retentionManager = new UpdateLogRetentionManagerImpl(
+ new IdealStateHelper(helixManager.getClusterManagmentTool(), clusterName), instanceName
+ );
+ _segmentUpdater = buildSegmentUpdater(config, _segmentUpdaterProvider, _retentionManager);
+ UpsertWaterMarkManager.init(_grigioMetrics);
+ _waterMarkManager = UpsertWaterMarkManager.getInstance();
+ _segmentDeletionHandler = new SegmentDeletionHandler(ImmutableList.of(_segmentUpdater));
+ } else {
+ _waterMarkManager = new DefaultWaterMarkManager();
+ _segmentDeletionHandler = new SegmentDeletionHandler();
+ }
+ _inited.set(true);
+ }
+
+ @Override
+ public SegmentDeletionHandler getSegmentDeletionHandler() {
+ Preconditions.checkState(_inited.get(), "upsert container is not initialized yet");
+ return _segmentDeletionHandler;
+ }
+
+ @Override
+ public WaterMarkManager getWatermarkManager() {
+ return _waterMarkManager;
+ }
+
+ @Override
+ public synchronized void startBackgroundThread() {
+ if (_segmentUpdater != null) {
+ _segmentUpdater.start();
+ }
+ }
+
+ @Override
+ public synchronized void stopBackgroundThread() {
+ if (_segmentUpdater != null) {
+ LOGGER.info("closing segment updater");
+ _segmentUpdater.shutdown();
+ }
+ }
+
+ @Override
+ public boolean isUpsertEnabled() {
+ return _isUpsertEnabled;
+ }
+
+ public synchronized void shutdown() {
+ if (_keyCoordinatorProvider != null) {
+ LOGGER.info("shutting down key coordinator provider");
+ _keyCoordinatorProvider.close();
+ }
+ }
+
+ private void initVirtualColumnStorageProvider(Configuration conf) {
+ UpdateLogStorageProvider.init(conf.subset(STORAGE_CONFIG_KEY));
+ }
+
+ public KeyCoordinatorProvider buildKeyCoordinatorProvider(Configuration conf, String hostname) {
+ return new KeyCoordinatorProvider(conf.subset(KC_CONFIG_KEY), hostname, _grigioMetrics);
+ }
+
+ public SegmentUpdaterProvider buildSegmentUpdaterProvider(Configuration conf, String hostname) {
+ return new SegmentUpdaterProvider(conf.subset(UPDATER_CONFIG_KEY), hostname, _grigioMetrics);
+ }
+
+ public SegmentUpdater buildSegmentUpdater(Configuration conf, SegmentUpdaterProvider updateProvider,
+ UpdateLogRetentionManager updateLogRetentionManager) {
+ return new SegmentUpdater(conf.subset(UPDATER_CONFIG_KEY), updateProvider, updateLogRetentionManager,
+ _grigioMetrics);
+ }
+}
diff --git a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/UpsertWatermarkManager.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/UpsertWaterMarkManager.java
similarity index 90%
rename from pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/UpsertWatermarkManager.java
rename to pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/UpsertWaterMarkManager.java
index faad27b..266f25d2 100644
--- a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/UpsertWatermarkManager.java
+++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/UpsertWaterMarkManager.java
@@ -31,26 +31,26 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-public class UpsertWatermarkManager implements WatermarkManager {
+public class UpsertWaterMarkManager implements WaterMarkManager {
private final Map<String, Map<Integer, Long>> _highWaterMarkTablePartitionMap = new ConcurrentHashMap<>();
private final GrigioMetrics _metrics;
- private static final Logger LOGGER = LoggerFactory.getLogger(UpsertWatermarkManager.class);
- private static volatile UpsertWatermarkManager _instance;
+ private static final Logger LOGGER = LoggerFactory.getLogger(UpsertWaterMarkManager.class);
+ private static volatile UpsertWaterMarkManager _instance;
- private UpsertWatermarkManager(GrigioMetrics metrics) {
+ private UpsertWaterMarkManager(GrigioMetrics metrics) {
_metrics = metrics;
}
public static void init(GrigioMetrics metrics) {
- synchronized (UpsertWatermarkManager.class) {
+ synchronized (UpsertWaterMarkManager.class) {
Preconditions.checkState(_instance == null, "upsert water mark manager is already init");
- _instance = new UpsertWatermarkManager(metrics);
+ _instance = new UpsertWaterMarkManager(metrics);
}
}
- public static UpsertWatermarkManager getInstance() {
+ public static UpsertWaterMarkManager getInstance() {
Preconditions.checkState(_instance != null, "upsert water mark manager is not yet init");
return _instance;
}
diff --git a/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/broker/upsert/PollingBasedLowWaterMarkServiceTest.java b/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/broker/upsert/PollingBasedLowWaterMarkServiceTest.java
new file mode 100644
index 0000000..611f257
--- /dev/null
+++ b/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/broker/upsert/PollingBasedLowWaterMarkServiceTest.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.upsert;
+
+import com.github.tomakehurst.wiremock.WireMockServer;
+import com.github.tomakehurst.wiremock.client.WireMock;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
+import org.apache.helix.model.MasterSlaveSMD;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.pinot.broker.broker.helix.HelixBrokerStarter;
+import org.apache.pinot.common.config.TagNameUtils;
+import org.apache.pinot.common.restlet.resources.ResourceUtils;
+import org.apache.pinot.common.restlet.resources.TableLowWaterMarksInfo;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.ZkStarter;
+import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.commons.configuration.Configuration;
+import org.apache.helix.HelixAdmin;
+import org.testng.Assert;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.get;
+import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
+import static org.apache.pinot.common.utils.CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE;
+
+public class PollingBasedLowWaterMarkServiceTest extends ControllerTest {
+ private PinotHelixResourceManager _pinotResourceManager;
+ private static final String HELIX_CLUSTER_NAME = "TestLowWaterMarksPolling";
+ private final Configuration _pinotHelixBrokerProperties = new PropertiesConfiguration();
+
+ private HelixAdmin _helixAdmin;
+ private HelixBrokerStarter _helixBrokerStarter;
+
+// @Test
+ public void testBrokerCallServersCorrectly()
+ throws Exception {
+ ZkStarter.startLocalZkServer();
+ final String instanceId = "localhost_helixController";
+ _pinotResourceManager =
+ new PinotHelixResourceManager(ZkStarter.DEFAULT_ZK_STR, HELIX_CLUSTER_NAME, null, 10000L,
+ true, /*isUpdateStateModel=*/false, true);
+ HelixManager helixManager = registerAndConnectAsHelixParticipant(HELIX_CLUSTER_NAME, instanceId, ZkStarter.DEFAULT_ZK_STR);
+ _pinotResourceManager.start(helixManager);
+ _helixAdmin = _pinotResourceManager.getHelixAdmin();
+
+ // Set up a cluster with one controller and 2 servers.
+ addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
+ addFakeServerInstancesToAutoJoinHelixCluster(2, true);
+
+
+ _pinotHelixBrokerProperties.addProperty(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, 8943);
+ _pinotHelixBrokerProperties
+ .addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL, 100L);
+
+
+ // Set the two servers' lwms info.
+ Map<Integer, Long> table1Map = new HashMap<>();
+ table1Map.put(0, 10L);
+ table1Map.put(1, 20L);
+ Map<Integer, Long> table2Map = new HashMap<>();
+ table2Map.put(0, 11L);
+ Map<String, Map<Integer, Long>> server1LwmsMap = new ConcurrentHashMap<>();
+ server1LwmsMap.put("Table1", table1Map);
+ server1LwmsMap.put("Table2", table2Map);
+
+ Map<Integer, Long> newTable1Map = new HashMap<>();
+ newTable1Map.put(0, 15L);
+ newTable1Map.put(1, 18L);
+ Map<Integer, Long> table3Map = new HashMap<>();
+ table3Map.put(0, 17L);
+ Map<String, Map<Integer, Long>> server2LwmsMap = new HashMap<>();
+ server2LwmsMap.put("Table1", newTable1Map);
+ server2LwmsMap.put("Table3", table3Map);
+
+
+ WireMockServer mockServer1 = new WireMockServer(1);
+ mockServer1.start();
+ mockServer1.stubFor(WireMock.get(WireMock.urlEqualTo("/lwms")).willReturn(WireMock.aResponse()
+ .withBody(ResourceUtils.convertToJsonString(new TableLowWaterMarksInfo(server1LwmsMap)))
+ .withHeader("Content-Type", "application/json")
+ .withStatus(200)));
+ WireMockServer mockServer2 = new WireMockServer(2);
+ mockServer2.start();
+ mockServer2.stubFor(WireMock.get(WireMock.urlEqualTo("/lwms")).willReturn(WireMock.aResponse()
+ .withBody(ResourceUtils.convertToJsonString(new TableLowWaterMarksInfo(server2LwmsMap)))
+ .withHeader("Content-Type", "application/json")
+ .withStatus(200)));
+
+ _helixBrokerStarter =
+ new HelixBrokerStarter(_pinotHelixBrokerProperties, HELIX_CLUSTER_NAME, ZkStarter.DEFAULT_ZK_STR);
+
+
+ while (_helixAdmin.getInstancesInClusterWithTag(HELIX_CLUSTER_NAME, "DefaultTenant_OFFLINE").size() == 0
+ || _helixAdmin.getInstancesInClusterWithTag(HELIX_CLUSTER_NAME, "DefaultTenant_BROKER").size() == 0) {
+ Thread.sleep(100);
+ }
+
+ Thread.sleep(1000);
+
+ // Verify the low water mark service behaviors.
+ mockServer1.verify(1, WireMock.getRequestedFor(WireMock.urlEqualTo("/lwms")));
+ mockServer2.verify(1, WireMock.getRequestedFor(WireMock.urlEqualTo("/lwms")));
+
+ Assert.assertNotNull(_helixBrokerStarter.getLwmService().getLowWaterMarks("Table1"));
+ Assert.assertNotNull(_helixBrokerStarter.getLwmService().getLowWaterMarks("Table2"));
+ Assert.assertNotNull(_helixBrokerStarter.getLwmService().getLowWaterMarks("Table3"));
+
+ // Table 1 verification.
+ Assert.assertEquals(_helixBrokerStarter.getLwmService().getLowWaterMarks("Table1").size(), 2);
+ Assert.assertTrue(_helixBrokerStarter.getLwmService().getLowWaterMarks("Table1").get(Integer.parseInt("0")) == 10L);
+ Assert.assertTrue(_helixBrokerStarter.getLwmService().getLowWaterMarks("Table1").get(Integer.parseInt("1")) == 18L);
+ // Table 1 verification.
+ Assert.assertEquals(_helixBrokerStarter.getLwmService().getLowWaterMarks("Table2").size(), 1);
+ Assert.assertTrue(_helixBrokerStarter.getLwmService().getLowWaterMarks("Table2").get(Integer.parseInt("0")) == 11L);
+ // Table 1 verification.
+ Assert.assertEquals(_helixBrokerStarter.getLwmService().getLowWaterMarks("Table3").size(), 1);
+ Assert.assertTrue(_helixBrokerStarter.getLwmService().getLowWaterMarks("Table3").get(Integer.parseInt("0")) == 17L);
+ }
+
+// @Test
+ public void testLowWaterMarksMerge() {
+ Map<Integer, Long> table1Map = new HashMap<>();
+ table1Map.put(0, 10L);
+ table1Map.put(1, 20L);
+ Map<Integer, Long> table2Map = new HashMap<>();
+ table2Map.put(0, 11L);
+ Map<String, Map<Integer, Long>> currentLwmsMap = new ConcurrentHashMap<>();
+ currentLwmsMap.put("Table1", table1Map);
+ currentLwmsMap.put("Table2", table2Map);
+
+ Map<Integer, Long> newTable1Map = new HashMap<>();
+ newTable1Map.put(0, 15L);
+ newTable1Map.put(1, 18L);
+ Map<Integer, Long> table3Map = new HashMap<>();
+ table3Map.put(0, 17L);
+ Map<String, Map<Integer, Long>> serverLwms = new HashMap<>();
+ serverLwms.put("Table1", newTable1Map);
+ serverLwms.put("Table3", table3Map);
+
+ PollingBasedLowWaterMarkService.LwmMerger.updateLowWaterMarks(currentLwmsMap, serverLwms);
+
+ Assert.assertEquals(currentLwmsMap.size(), 3);
+
+ // Verify Table1 content.
+ Assert.assertTrue(currentLwmsMap.containsKey("Table1"));
+ Map<Integer, Long> lwmsMap1 = currentLwmsMap.get("Table1");
+ Assert.assertEquals(lwmsMap1.size(), 2);
+ // Verify that the lower LWM value is chosen in the combined results.
+ Assert.assertTrue(lwmsMap1.get(0) == 10L);
+ Assert.assertTrue(lwmsMap1.get(1) == 18L);
+
+ // Verify Table2 content.
+ Assert.assertTrue(currentLwmsMap.containsKey("Table2"));
+ Map<Integer, Long> lwmsMap2 = currentLwmsMap.get("Table2");
+ Assert.assertEquals(lwmsMap2.size(), 1);
+ // Verify that the lower LWM value is chosen in the combined results.
+ Assert.assertTrue(lwmsMap2.get(0) == 11L);
+
+ // Verify Table3 content.
+ Assert.assertTrue(currentLwmsMap.containsKey("Table3"));
+ Map<Integer, Long> lwmsMap3 = currentLwmsMap.get("Table3");
+ Assert.assertEquals(lwmsMap3.size(), 1);
+ // Verify that the lower LWM value is chosen in the combined results.
+ Assert.assertTrue(lwmsMap3.get(0) == 17L);
+ }
+
+ /**
+ * Register and connect to Helix cluster as PARTICIPANT role.
+ */
+ private HelixManager registerAndConnectAsHelixParticipant(String helixClusterName, String instanceId, String helixZkURL) {
+ HelixManager helixManager =
+ HelixManagerFactory.getZKHelixManager(helixClusterName, instanceId, InstanceType.PARTICIPANT, helixZkURL);
+
+ // Registers Master-Slave state model to state machine engine, which is for calculating participant assignment in lead controller resource.
+ helixManager.getStateMachineEngine()
+ .registerStateModelFactory(MasterSlaveSMD.name, new MasterSlaveStateModelFactory());
+
+ try {
+ helixManager.connect();
+ return helixManager;
+ } catch (Exception e) {
+ String errorMsg =
+ String.format("Exception when connecting the instance %s as Participant to Helix.", instanceId);
+ throw new RuntimeException(errorMsg);
+ }
+ }
+
+}
diff --git a/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/core/data/manager/upsert/UpsertImmutableIndexSegmentCallbackTest.java b/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/core/data/manager/upsert/UpsertImmutableIndexSegmentCallbackTest.java
index 979e5c1..e2e270d 100644
--- a/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/core/data/manager/upsert/UpsertImmutableIndexSegmentCallbackTest.java
+++ b/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/core/data/manager/upsert/UpsertImmutableIndexSegmentCallbackTest.java
@@ -18,7 +18,7 @@
*/
package org.apache.pinot.core.data.manager.upsert;
-import org.apache.pinot.core.segment.updater.UpsertWatermarkManager;
+import org.apache.pinot.core.segment.updater.UpsertWaterMarkManager;
import org.apache.pinot.core.segment.virtualcolumn.mutable.VirtualColumnLongValueReaderWriter;
import org.apache.pinot.grigio.common.messages.LogEventType;
import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntry;
@@ -40,13 +40,13 @@ import static org.mockito.Mockito.when;
public class UpsertImmutableIndexSegmentCallbackTest {
UpdateLogStorageProvider _mockProvider;
- UpsertWatermarkManager _mockUpsertWatermarkManager;
+ UpsertWaterMarkManager _mockUpsertWatermarkManager;
List<VirtualColumnLongValueReaderWriter> _readerWriters = new ArrayList<>();
@BeforeMethod
public void init() {
_mockProvider = mock(UpdateLogStorageProvider.class);
- _mockUpsertWatermarkManager = mock(UpsertWatermarkManager.class);
+ _mockUpsertWatermarkManager = mock(UpsertWaterMarkManager.class);
}
@Test
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
index 6a626cc..8f30ca8 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
@@ -332,6 +332,8 @@ public class ClusterIntegrationTestUtils {
properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
properties.put("request.required.acks", "1");
properties.put("partitioner.class", "kafka.producer.ByteArrayPartitioner");
+ properties.put("max.request.size", "300000000");
+ properties.put("buffer.memory", "300000000");
StreamDataProducer producer =
StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties);
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/LowWatermarksResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/LowWatermarksResource.java
new file mode 100644
index 0000000..b346066
--- /dev/null
+++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/LowWatermarksResource.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api.resources;
+
+import org.apache.pinot.common.restlet.resources.ResourceUtils;
+import org.apache.pinot.common.restlet.resources.TableLowWaterMarksInfo;
+import org.apache.pinot.core.segment.updater.WaterMarkManager;
+import org.apache.pinot.server.starter.ServerInstance;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+@Api(tags = "LowWaterMarks")
+@Path("/")
+public class LowWatermarksResource {
+
+ @Inject
+ ServerInstance serverInstance;
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/lwms")
+ @ApiOperation(value = "Show the lwms of tables ", notes = "Returns the lwms of all upsert enable tables in this server")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Success"),
+ @ApiResponse(code = 500, message = "Internal server error"),
+ })
+ public String getLowWaterMarks() {
+ WaterMarkManager watermarkManager = serverInstance.getWatermarkManager();
+
+ if (watermarkManager == null) {
+ throw new WebApplicationException("Invalid server initialization", Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ return ResourceUtils.convertToJsonString(
+ new TableLowWaterMarksInfo(watermarkManager.getHighWaterMarkTablePartitionMap()));
+ }
+}
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/UpsertDebugResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/UpsertDebugResource.java
new file mode 100644
index 0000000..13a7612
--- /dev/null
+++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/UpsertDebugResource.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api.resources;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.SegmentDataManager;
+import org.apache.pinot.core.data.manager.TableDataManager;
+import org.apache.pinot.core.data.manager.UpsertSegmentDataManager;
+import org.apache.pinot.server.starter.ServerInstance;
+
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+
+@Api(tags = "UpsertDebug")
+@Path("/")
+public class UpsertDebugResource {
+
+ @Inject
+ ServerInstance serverInstance;
+
+ @GET
+ @Produces(MediaType.TEXT_PLAIN)
+ @Path("/upsert/{tableName}/{segmentName}/{offset}")
+ @ApiOperation(value = "$validFrom and $validUntil value", notes = "")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Success", response = String.class),
+ @ApiResponse(code = 500, message = "Internal server error"),
+ })
+ public String getUpsertDataAtOffset(
+ @ApiParam(value = "Table name including type", required = true, example = "myTable_REALTIME") @PathParam("tableName") String tableName,
+ @ApiParam(value = "segment name", required = true, example = "eats_supply_update__0__0__20190923T0700Z") @PathParam("segmentName") String segmentName,
+ @ApiParam(value = "offset", required = true, example = "100") @PathParam("offset") String offsetStr
+ ) {
+ if (!serverInstance.isUpsertEnabled()) {
+ return "not an upsert server";
+ }
+ InstanceDataManager instanceDataManager = serverInstance.getInstanceDataManager();
+ TableDataManager tableDataManager = instanceDataManager.getTableDataManager(tableName);
+ if (tableDataManager == null) {
+ return "no table for " + tableName;
+ }
+ SegmentDataManager segmentDataManager = null;
+ try {
+ segmentDataManager = tableDataManager.acquireSegment(segmentName);
+ if (segmentDataManager == null) {
+ return "cannot find associate segment for segment " + segmentName;
+ }
+ if (!(segmentDataManager instanceof UpsertSegmentDataManager)) {
+ return "it is not an upsert table";
+ } else {
+ return ((UpsertSegmentDataManager) segmentDataManager).getVirtualColumnInfo(Long.parseLong(offsetStr));
+ }
+ } finally {
+ if (segmentDataManager != null) {
+ tableDataManager.releaseSegment(segmentDataManager);
+ }
+ }
+ }
+}
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java b/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java
index 5610a4a..d2adcb3 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java
@@ -20,8 +20,8 @@ package org.apache.pinot.server.conf;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
-import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.server.upsert.DefaultUpsertComponentContainer;
/**
@@ -45,6 +45,11 @@ public class ServerConf {
private static final String PINOT_QUERY_SCHEDULER_PREFIX = "pinot.query.scheduler";
+ public static final String UPSERT_CONFIG_PARENT = "pinot.server.upsert";
+ public static final String UPSERT_COMPONENT_CONFIG_KEY = "pinot.server.upsertComponent.class";
+ public static final String UPSERT_COMPONENT_CONFIG_DEFAULT = DefaultUpsertComponentContainer.class.getName();
+
+
private Configuration _serverConf;
public ServerConf(Configuration serverConfig) {
@@ -92,6 +97,14 @@ public class ServerConf {
return _serverConf.subset(PINOT_QUERY_SCHEDULER_PREFIX);
}
+ public String getUpsertComponentContainerClassName() {
+ return _serverConf.getString(UPSERT_COMPONENT_CONFIG_KEY, UPSERT_COMPONENT_CONFIG_DEFAULT);
+ }
+
+ public Configuration getUpsertConfig() {
+ return _serverConf.subset(UPSERT_CONFIG_PARENT);
+ }
+
/**
* Returns an array of transform function names as defined in the config
* @return String array of transform functions
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
index 11e2326..845a90b 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
@@ -26,14 +26,18 @@ import java.util.concurrent.atomic.LongAccumulator;
import org.apache.helix.HelixManager;
import org.apache.pinot.common.metrics.MetricsHelper;
import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.core.segment.updater.SegmentDeletionHandler;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.operator.transform.function.TransformFunction;
import org.apache.pinot.core.operator.transform.function.TransformFunctionFactory;
import org.apache.pinot.core.query.executor.QueryExecutor;
import org.apache.pinot.core.query.scheduler.QueryScheduler;
import org.apache.pinot.core.query.scheduler.QuerySchedulerFactory;
+import org.apache.pinot.core.segment.updater.WaterMarkManager;
import org.apache.pinot.core.transport.QueryServer;
import org.apache.pinot.server.conf.ServerConf;
+import org.apache.pinot.core.segment.updater.UpsertComponentContainer;
+import org.apache.pinot.server.upsert.UpsertComponentContainerProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,6 +49,7 @@ import org.slf4j.LoggerFactory;
public class ServerInstance {
private static final Logger LOGGER = LoggerFactory.getLogger(ServerInstance.class);
+ private final ServerConf _serverConf;
private final ServerMetrics _serverMetrics;
private final InstanceDataManager _instanceDataManager;
private final QueryExecutor _queryExecutor;
@@ -52,12 +57,16 @@ public class ServerInstance {
private final QueryScheduler _queryScheduler;
private final QueryServer _queryServer;
+ // upsert related component, only initialize if necessary
+ private UpsertComponentContainer _upsertComponentContainer;
+
private boolean _started = false;
- public ServerInstance(ServerConf serverConf, HelixManager helixManager)
+ public ServerInstance(ServerConf serverConf, HelixManager helixManager, String clusterName, String instanceName)
throws Exception {
LOGGER.info("Initializing server instance");
+ _serverConf = serverConf;
LOGGER.info("Initializing server metrics");
MetricsHelper.initializeMetrics(serverConf.getMetricsConfig());
MetricsRegistry metricsRegistry = new MetricsRegistry();
@@ -97,12 +106,19 @@ public class ServerInstance {
}
TransformFunctionFactory.init(transformFunctionClasses);
+
+ final UpsertComponentContainerProvider upsertComponentContainerProvider = new UpsertComponentContainerProvider(serverConf);
+ _upsertComponentContainer = upsertComponentContainerProvider.getInstance();
+ _upsertComponentContainer.registerMetrics(_serverConf.getMetricsPrefix(), metricsRegistry);
+ _upsertComponentContainer.init(_serverConf.getUpsertConfig(), helixManager, clusterName, instanceName);
+
LOGGER.info("Finish initializing server instance");
}
public synchronized void start() {
// This method is called when Helix starts a new ZK session, and can be called multiple times. We only need to start
// the server instance once, and simply ignore the following invocations.
+ LOGGER.info("Starting server instance");
if (_started) {
LOGGER.info("Server instance is already running, skipping the start");
return;
@@ -127,12 +143,16 @@ public class ServerInstance {
Preconditions.checkState(_started, "Server instance is not running");
LOGGER.info("Shutting down server instance");
+ _upsertComponentContainer.stopBackgroundThread();
LOGGER.info("Shutting down query server");
_queryServer.shutDown();
LOGGER.info("Shutting down query scheduler");
_queryScheduler.stop();
LOGGER.info("Shutting down query executor");
_queryExecutor.shutDown();
+ LOGGER.info("Shutting down upsert components if necessary");
+ _upsertComponentContainer.shutdown();
+
LOGGER.info("Shutting down instance data manager");
_instanceDataManager.shutDown();
@@ -148,6 +168,23 @@ public class ServerInstance {
return _instanceDataManager;
}
+ public SegmentDeletionHandler getSegmentDeletionHandler() {
+ return _upsertComponentContainer.getSegmentDeletionHandler();
+ }
+
+ public WaterMarkManager getWatermarkManager() {
+ return _upsertComponentContainer.getWatermarkManager();
+ }
+
+ public void maybeStartUpsertBackgroundThread() {
+ LOGGER.info("starting upsert component background thread");
+ _upsertComponentContainer.startBackgroundThread();
+ }
+
+ public boolean isUpsertEnabled() {
+ return _upsertComponentContainer.isUpsertEnabled();
+ }
+
public long getLatestQueryTime() {
return _latestQueryTime.get();
}
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 920b422..710e22a 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -307,13 +307,6 @@ public class HelixInstanceDataManager implements InstanceDataManager {
}
}
- /*
- @Override
- public Map<String, Map<Integer, Long>> getLowWaterMarks() {
- return UpsertWaterMarkManager.getInstance().getHighWaterMarkTablePartitionMap();
- }
- */
-
@Override
public String getSegmentDataDirectory() {
return _instanceDataManagerConfig.getInstanceDataDir();
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
index c1ea182..402bb6b 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
@@ -21,14 +21,6 @@ package org.apache.pinot.server.starter.helix;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationUtils;
@@ -64,14 +56,57 @@ import org.apache.pinot.server.conf.ServerConf;
import org.apache.pinot.server.realtime.ControllerLeaderLocator;
import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
import org.apache.pinot.server.starter.ServerInstance;
-import org.apache.pinot.server.upsert.UpsertComponentContainer;
-import org.apache.pinot.server.upsert.UpsertComponentContainerProvider;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.pinot.common.utils.CommonConstants.Helix.*;
-import static org.apache.pinot.common.utils.CommonConstants.Server.*;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.pinot.common.utils.CommonConstants.Helix.CONFIG_OF_SERVER_FLAPPING_TIME_WINDOW_MS;
+import static org.apache.pinot.common.utils.CommonConstants.Helix.DEFAULT_FLAPPING_TIME_WINDOW_MS;
+import static org.apache.pinot.common.utils.CommonConstants.Helix.DEFAULT_SERVER_NETTY_PORT;
+import static org.apache.pinot.common.utils.CommonConstants.Helix.INSTANCE_CONNECTED_METRIC_NAME;
+import static org.apache.pinot.common.utils.CommonConstants.Helix.IS_SHUTDOWN_IN_PROGRESS;
+import static org.apache.pinot.common.utils.CommonConstants.Helix.Instance;
+import static org.apache.pinot.common.utils.CommonConstants.Helix.KEY_OF_SERVER_NETTY_HOST;
+import static org.apache.pinot.common.utils.CommonConstants.Helix.KEY_OF_SERVER_NETTY_PORT;
+import static org.apache.pinot.common.utils.CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE;
+import static org.apache.pinot.common.utils.CommonConstants.Helix.StateModel;
+import static org.apache.pinot.common.utils.CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE;
+import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_ADMIN_API_PORT;
+import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_INSTANCE_DATA_DIR;
+import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_INSTANCE_ID;
+import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR;
+import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_QUERY_EXECUTOR_TIMEOUT;
+import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_SERVER_MIN_RESOURCE_PERCENT_FOR_START;
+import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_SHUTDOWN_ENABLE_QUERY_CHECK;
+import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_SHUTDOWN_ENABLE_RESOURCE_CHECK;
+import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_SHUTDOWN_NO_QUERY_THRESHOLD_MS;
+import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_SHUTDOWN_RESOURCE_CHECK_INTERVAL_MS;
+import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_SHUTDOWN_TIMEOUT_MS;
+import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_STARTUP_ENABLE_SERVICE_STATUS_CHECK;
+import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_STARTUP_REALTIME_CONSUMPTION_CATCHUP_WAIT_MS;
+import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_STARTUP_SERVICE_STATUS_CHECK_INTERVAL_MS;
+import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_STARTUP_TIMEOUT_MS;
+import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_ADMIN_API_PORT;
+import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS;
+import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_SERVER_MIN_RESOURCE_PERCENT_FOR_START;
+import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_SHUTDOWN_ENABLE_QUERY_CHECK;
+import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_SHUTDOWN_ENABLE_RESOURCE_CHECK;
+import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_SHUTDOWN_RESOURCE_CHECK_INTERVAL_MS;
+import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_SHUTDOWN_TIMEOUT_MS;
+import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_STARTUP_ENABLE_SERVICE_STATUS_CHECK;
+import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_STARTUP_REALTIME_CONSUMPTION_CATCHUP_WAIT_MS;
+import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_STARTUP_SERVICE_STATUS_CHECK_INTERVAL_MS;
+import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_STARTUP_TIMEOUT_MS;
+import static org.apache.pinot.common.utils.CommonConstants.Server.SegmentCompletionProtocol;
/**
@@ -138,11 +173,11 @@ public class HelixServerStarter {
ServerSegmentCompletionProtocolHandler
.init(_serverConf.subset(SegmentCompletionProtocol.PREFIX_OF_CONFIG_OF_SEGMENT_UPLOADER));
ServerConf serverInstanceConfig = DefaultHelixStarterServerConfig.getDefaultHelixServerConfig(_serverConf);
- _serverInstance = new ServerInstance(serverInstanceConfig, _helixManager);
+ _serverInstance = new ServerInstance(serverInstanceConfig, _helixManager, _helixClusterName, _instanceId);
InstanceDataManager instanceDataManager = _serverInstance.getInstanceDataManager();
SegmentFetcherAndLoader fetcherAndLoader = new SegmentFetcherAndLoader(_serverConf, instanceDataManager);
- StateModelFactory<?> stateModelFactory =
- new SegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager, fetcherAndLoader);
+ StateModelFactory<?> stateModelFactory = new SegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager,
+ fetcherAndLoader, _serverInstance.getSegmentDeletionHandler());
_helixManager.getStateMachineEngine()
.registerStateModelFactory(SegmentOnlineOfflineStateModelFactory.getStateModelName(), stateModelFactory);
// Start the server instance as a pre-connect callback so that it starts after connecting to the ZK in order to
@@ -181,6 +216,8 @@ public class HelixServerStarter {
long endTimeMs = startTimeMs + _serverConf.getLong(CONFIG_OF_STARTUP_TIMEOUT_MS, DEFAULT_STARTUP_TIMEOUT_MS);
startupServiceStatusCheck(endTimeMs);
}
+ _serverInstance.maybeStartUpsertBackgroundThread();
+
setShuttingDownStatus(false);
LOGGER.info("Pinot server ready");
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
index 145e378..1eab7f5 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
@@ -19,8 +19,6 @@
package org.apache.pinot.server.starter.helix;
import com.google.common.base.Preconditions;
-import java.io.File;
-import java.util.concurrent.locks.Lock;
import org.apache.commons.io.FileUtils;
import org.apache.helix.NotificationContext;
import org.apache.helix.model.Message;
@@ -39,10 +37,13 @@ import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.SegmentDataManager;
import org.apache.pinot.core.data.manager.TableDataManager;
import org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager;
-import org.apache.pinot.server.upsert.SegmentDeletionHandler;
+import org.apache.pinot.core.segment.updater.SegmentDeletionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.util.concurrent.locks.Lock;
+
/**
* Data Server layer state model to take over how to operate on:
@@ -56,17 +57,18 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta
private final SegmentFetcherAndLoader _fetcherAndLoader;
private final SegmentDeletionHandler _segmentDeletionHandler;
+
public SegmentOnlineOfflineStateModelFactory(String instanceId, InstanceDataManager instanceDataManager,
SegmentFetcherAndLoader fetcherAndLoader) {
- this(instanceId, instanceDataManager, fetcherAndLoader, new SegmentDeletionHandler());
+ this(instanceId, instanceDataManager, fetcherAndLoader, new SegmentDeletionHandler());
}
public SegmentOnlineOfflineStateModelFactory(String instanceId, InstanceDataManager instanceDataManager,
- SegmentFetcherAndLoader fetcherAndLoader, SegmentDeletionHandler segmentDeletionHandler) {
+ SegmentFetcherAndLoader fetcherAndLoader, SegmentDeletionHandler deletionHandler) {
_instanceId = instanceId;
_instanceDataManager = instanceDataManager;
_fetcherAndLoader = fetcherAndLoader;
- _segmentDeletionHandler = segmentDeletionHandler;
+ _segmentDeletionHandler = deletionHandler;
}
public static String getStateModelName() {
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/upsert/DefaultUpsertComponentContainer.java b/pinot-server/src/main/java/org/apache/pinot/server/upsert/DefaultUpsertComponentContainer.java
index 63424ac..3841a2e 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/upsert/DefaultUpsertComponentContainer.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/upsert/DefaultUpsertComponentContainer.java
@@ -18,20 +18,25 @@
*/
package org.apache.pinot.server.upsert;
-import com.codahale.metrics.MetricRegistry;
+import com.yammer.metrics.core.MetricsRegistry;
import org.apache.commons.configuration.Configuration;
+import org.apache.helix.HelixManager;
+import org.apache.pinot.core.segment.updater.DefaultWaterMarkManager;
+import org.apache.pinot.core.segment.updater.SegmentDeletionHandler;
+import org.apache.pinot.core.segment.updater.UpsertComponentContainer;
+import org.apache.pinot.core.segment.updater.WaterMarkManager;
public class DefaultUpsertComponentContainer implements UpsertComponentContainer {
private final SegmentDeletionHandler deletionHandler = new SegmentDeletionHandler();
+ private final WaterMarkManager watermarkManager = new DefaultWaterMarkManager();
@Override
- public void registerMetrics(MetricRegistry registry) {
-
+ public void registerMetrics(String prefix, MetricsRegistry registry) {
}
@Override
- public void init(Configuration config) {
+ public void init(Configuration config, HelixManager helixManager, String clusterName, String instanceName) {
}
@Override
@@ -40,10 +45,29 @@ public class DefaultUpsertComponentContainer implements UpsertComponentContainer
}
@Override
- public void start() {
+ public WaterMarkManager getWatermarkManager() {
+ return watermarkManager;
+ }
+
+ @Override
+ public boolean isUpsertEnabled() {
+ return false;
}
@Override
- public void stop() {
+ public void startBackgroundThread() {
+
}
+
+ @Override
+ public void stopBackgroundThread() {
+
+ }
+
+ @Override
+ public void shutdown() {
+
+ }
+
+
}
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainerProvider.java b/pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainerProvider.java
index fe4af07..7ae27a6 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainerProvider.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainerProvider.java
@@ -19,10 +19,10 @@
package org.apache.pinot.server.upsert;
import com.google.common.base.Preconditions;
-import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.apache.pinot.core.segment.updater.WatermarkManager;
-import org.apache.pinot.grigio.common.metrics.GrigioMetrics;
+import org.apache.pinot.core.segment.updater.UpsertComponentContainer;
+import org.apache.pinot.core.segment.updater.WaterMarkManager;
+import org.apache.pinot.server.conf.ServerConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,22 +30,16 @@ public class UpsertComponentContainerProvider {
private static final Logger LOGGER = LoggerFactory.getLogger(UpsertComponentContainerProvider.class);
- public static final String UPSERT_COMPONENT_CONFIG_KEY = "watermarkManager.class";
- public static final String UPSERT_COMPONENT_CONFIG_DEFAULT = DefaultUpsertComponentContainer.class.getName();
-
- private final Configuration _conf;
private UpsertComponentContainer _instance;
- public UpsertComponentContainerProvider(Configuration conf, GrigioMetrics metrics) {
- _conf = conf;
- String className = _conf.getString(UPSERT_COMPONENT_CONFIG_KEY, UPSERT_COMPONENT_CONFIG_DEFAULT);
+ public UpsertComponentContainerProvider(ServerConf serverConf) {
+ String className = serverConf.getUpsertComponentContainerClassName();
LOGGER.info("creating watermark manager with class {}", className);
try {
Class<UpsertComponentContainer> comonentContainerClass = (Class<UpsertComponentContainer>) Class.forName(className);
- Preconditions.checkState(comonentContainerClass.isAssignableFrom(WatermarkManager.class),
+ Preconditions.checkState(comonentContainerClass.isAssignableFrom(WaterMarkManager.class),
"configured class not assignable from Callback class");
_instance = comonentContainerClass.newInstance();
- _instance.init(_conf);
} catch (Exception e) {
LOGGER.error("failed to load watermark manager class", className, e);
ExceptionUtils.rethrow(e);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WatermarkManager.java b/pinot-server/src/test/java/org/apache/pinot/server/api/resources/LowWatermarksResourceTest.java
similarity index 60%
rename from pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WatermarkManager.java
rename to pinot-server/src/test/java/org/apache/pinot/server/api/resources/LowWatermarksResourceTest.java
index 01579e7..b007c90 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WatermarkManager.java
+++ b/pinot-server/src/test/java/org/apache/pinot/server/api/resources/LowWatermarksResourceTest.java
@@ -1,3 +1,4 @@
+
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -16,18 +17,18 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.core.segment.updater;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.pinot.grigio.common.metrics.GrigioMeter;
-import org.apache.pinot.grigio.common.metrics.GrigioMetrics;
-
-import java.util.Map;
-
-public interface WatermarkManager {
+package org.apache.pinot.server.api.resources;
- void init(Configuration config, GrigioMetrics metrics);
+import org.apache.pinot.common.restlet.resources.TableLowWaterMarksInfo;
+import org.apache.pinot.server.api.BaseResourceTest;
+import org.testng.Assert;
+import org.testng.annotations.Test;
- Map<String, Map<Integer, Long>> getHighWaterMarkTablePartitionMap();
+public class LowWatermarksResourceTest extends BaseResourceTest {
+ @Test
+ public void testHappyPath() {
+ TableLowWaterMarksInfo lwms = _webTarget.path("lwms").request().get(TableLowWaterMarksInfo.class);
+ Assert.assertEquals(lwms.getTableLowWaterMarks().size(), 0);
+ }
}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java b/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
index ac989bb..65cb9e7 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
@@ -18,11 +18,6 @@
*/
package org.apache.pinot.tools.realtime.provisioning;
-import java.io.File;
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.config.TableConfig;
@@ -40,6 +35,12 @@ import org.apache.pinot.core.realtime.impl.RealtimeSegmentConfig;
import org.apache.pinot.core.realtime.impl.RealtimeSegmentStatsHistory;
import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
/**
* Given a sample segment, this class can estimate how much memory would be used per host, for various combinations of numHostsToProvision and numHoursToConsume
@@ -126,7 +127,8 @@ public class MemoryEstimator {
// create a config
RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
- new RealtimeSegmentConfig.Builder().setSegmentName(_segmentMetadata.getName())
+ new RealtimeSegmentConfig.Builder().setTableName(_tableConfig.getTableName())
+ .setSegmentName(_segmentMetadata.getName())
.setStreamName(_tableNameWithType).setSchema(_segmentMetadata.getSchema())
.setCapacity(_segmentMetadata.getTotalDocs()).setAvgNumMultiValues(_avgMultiValues)
.setNoDictionaryColumns(_noDictionaryColumns)
@@ -228,7 +230,8 @@ public class MemoryEstimator {
RealtimeSegmentZKMetadata segmentZKMetadata = getRealtimeSegmentZKMetadata(_segmentMetadata, totalDocs);
RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
- new RealtimeSegmentConfig.Builder().setSegmentName(_segmentMetadata.getName())
+ new RealtimeSegmentConfig.Builder().setTableName(_tableConfig.getTableName())
+ .setSegmentName(_segmentMetadata.getName())
.setStreamName(_tableNameWithType).setSchema(_segmentMetadata.getSchema())
.setCapacity(totalDocs).setAvgNumMultiValues(_avgMultiValues)
.setNoDictionaryColumns(_noDictionaryColumns)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org