You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sa...@apache.org on 2022/07/20 22:49:43 UTC
[pinot] branch master updated: Support pause/resume consumption of realtime tables (#8986)
This is an automated email from the ASF dual-hosted git repository.
sajjad pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 6903856d39 Support pause/resume consumption of realtime tables (#8986)
6903856d39 is described below
commit 6903856d399b4d7b09548a75a888382e1659e9e3
Author: Sajjad Moradi <mo...@gmail.com>
AuthorDate: Wed Jul 20 15:49:38 2022 -0700
Support pause/resume consumption of realtime tables (#8986)
---
.../pinot/common/messages/ForceCommitMessage.java | 62 +++++++++++
.../protocols/SegmentCompletionProtocol.java | 2 +
.../pinot/common/utils/helix/HelixHelper.java | 21 ++--
.../pinot/controller/BaseControllerStarter.java | 1 +
.../controller/api/resources/PauseStatus.java | 53 +++++++++
.../resources/PeriodicTaskInvocationResponse.java | 42 +++++++
...PinotControllerPeriodicTaskRestletResource.java | 11 +-
.../api/resources/PinotRealtimeTableResource.java | 83 ++++++++++----
.../controller/helix/ControllerRequestClient.java | 34 ++++++
.../helix/core/PinotHelixResourceManager.java | 8 +-
.../realtime/PinotLLCRealtimeSegmentManager.java | 121 +++++++++++++++++++--
.../core/realtime/SegmentCompletionManager.java | 3 +-
.../core/data/manager/InstanceDataManager.java | 5 +
.../realtime/LLRealtimeSegmentDataManager.java | 17 ++-
.../starter/helix/HelixInstanceDataManager.java | 25 +++++
.../helix/SegmentMessageHandlerFactory.java | 32 ++++++
.../utils/builder/ControllerRequestURLBuilder.java | 12 ++
17 files changed, 483 insertions(+), 49 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/messages/ForceCommitMessage.java b/pinot-common/src/main/java/org/apache/pinot/common/messages/ForceCommitMessage.java
new file mode 100644
index 0000000000..4f131c9bac
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/messages/ForceCommitMessage.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.messages;
+
+import com.google.common.base.Preconditions;
+import java.util.Arrays;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.Message;
+
+
+/**
+ * Force commit helix message is created on controller and get sent to servers to instruct them to stop consumption and
+ * immediately start committing the segment.
+ */
+public class ForceCommitMessage extends Message {
+ public static final String FORCE_COMMIT_MSG_SUB_TYPE = "FORCE_COMMIT";
+ private static final String TABLE_NAME = "tableName";
+ private static final String SEGMENT_NAMES = "segmentNames";
+
+ public ForceCommitMessage(String tableNameWithType, Set<String> segmentNames) {
+ super(MessageType.USER_DEFINE_MSG, UUID.randomUUID().toString());
+ setMsgSubType(FORCE_COMMIT_MSG_SUB_TYPE);
+ setExecutionTimeout(-1); // no timeout
+ ZNRecord znRecord = getRecord();
+ znRecord.setSimpleField(TABLE_NAME, tableNameWithType);
+ znRecord.setSimpleField(SEGMENT_NAMES, String.join(",", segmentNames));
+ }
+
+ public ForceCommitMessage(Message message) {
+ super(message.getRecord());
+ String msgSubType = message.getMsgSubType();
+ Preconditions.checkArgument(msgSubType.equals(FORCE_COMMIT_MSG_SUB_TYPE),
+ "Invalid message sub type: " + msgSubType + " for SegmentReloadMessage");
+ }
+
+ public String getTableName() {
+ return getRecord().getSimpleField(TABLE_NAME);
+ }
+
+ public Set<String> getSegmentNames() {
+ return Arrays.stream(getRecord().getSimpleField(SEGMENT_NAMES).split(",")).collect(Collectors.toSet());
+ }
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
index 790e242009..8293bcf4d3 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
@@ -148,6 +148,8 @@ public class SegmentCompletionProtocol {
public static final String REASON_TIME_LIMIT = "timeLimit";
// Stop reason sent by server as end of partitionGroup reached
public static final String REASON_END_OF_PARTITION_GROUP = "endOfPartitionGroup";
+ // Stop reason sent by server as force commit message received
+ public static final String REASON_FORCE_COMMIT_MESSAGE_RECEIVED = "forceCommitMessageReceived";
// Canned responses
public static final Response RESP_NOT_LEADER =
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java
index fe90644ed0..5a371a1fb1 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java
@@ -89,12 +89,12 @@ public class HelixHelper {
* @param helixManager The HelixManager used to interact with the Helix cluster
* @param resourceName The resource for which to update the ideal state
* @param updater A function that returns an updated ideal state given an input ideal state
+ * @return updated ideal state if successful, null if not
*/
- // TODO: since updater always update ideal state in place, it should return boolean indicating whether the ideal
- // state get changed.
- public static void updateIdealState(final HelixManager helixManager, final String resourceName,
+ public static IdealState updateIdealState(final HelixManager helixManager, final String resourceName,
final Function<IdealState, IdealState> updater, RetryPolicy policy, final boolean noChangeOk) {
try {
+ IdealStateWrapper idealStateWrapper = new IdealStateWrapper();
policy.attempt(new Callable<Boolean>() {
@Override
public Boolean call() {
@@ -139,6 +139,7 @@ public class HelixHelper {
if (dataAccessor.getBaseDataAccessor()
.set(idealStateKey.getPath(), updatedZNRecord, idealState.getRecord().getVersion(),
AccessOption.PERSISTENT)) {
+ idealStateWrapper._idealState = updatedIdealState;
return true;
} else {
LOGGER.warn("Failed to update ideal state for resource: {}", resourceName);
@@ -158,15 +159,21 @@ public class HelixHelper {
} else {
LOGGER.warn("Idempotent or null ideal state update for resource {}, skipping update.", resourceName);
}
+ idealStateWrapper._idealState = idealState;
return true;
}
}
});
+ return idealStateWrapper._idealState;
} catch (Exception e) {
throw new RuntimeException("Caught exception while updating ideal state for resource: " + resourceName, e);
}
}
+ private static class IdealStateWrapper {
+ IdealState _idealState;
+ }
+
/**
* Exception to be thrown by updater function to exit from retry in {@link HelixHelper::updatedIdealState}
*/
@@ -181,14 +188,14 @@ public class HelixHelper {
}
}
- public static void updateIdealState(HelixManager helixManager, String resourceName,
+ public static IdealState updateIdealState(HelixManager helixManager, String resourceName,
Function<IdealState, IdealState> updater) {
- updateIdealState(helixManager, resourceName, updater, DEFAULT_TABLE_IDEALSTATES_UPDATE_RETRY_POLICY, false);
+ return updateIdealState(helixManager, resourceName, updater, DEFAULT_TABLE_IDEALSTATES_UPDATE_RETRY_POLICY, false);
}
- public static void updateIdealState(final HelixManager helixManager, final String resourceName,
+ public static IdealState updateIdealState(final HelixManager helixManager, final String resourceName,
final Function<IdealState, IdealState> updater, RetryPolicy policy) {
- updateIdealState(helixManager, resourceName, updater, policy, false);
+ return updateIdealState(helixManager, resourceName, updater, policy, false);
}
/**
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index fd9f00ef6d..27c8eb2a25 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -465,6 +465,7 @@ public abstract class BaseControllerStarter implements ServiceStartable {
bind(_leadControllerManager).to(LeadControllerManager.class);
bind(_periodicTaskScheduler).to(PeriodicTaskScheduler.class);
bind(_sqlQueryExecutor).to(SqlQueryExecutor.class);
+ bind(_pinotLLCRealtimeSegmentManager).to(PinotLLCRealtimeSegmentManager.class);
}
});
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PauseStatus.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PauseStatus.java
new file mode 100644
index 0000000000..9542e70eba
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PauseStatus.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Set;
+
+
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class PauseStatus {
+ private boolean _pauseFlag;
+ private Set<String> _consumingSegments;
+ private String _description;
+
+ @JsonCreator
+ public PauseStatus(@JsonProperty("pauseFlag") boolean pauseFlag,
+ @JsonProperty("consumingSegments") Set<String> consumingSegments,
+ @JsonProperty("description") String description) {
+ _pauseFlag = pauseFlag;
+ _consumingSegments = consumingSegments;
+ _description = description;
+ }
+
+ public boolean getPauseFlag() {
+ return _pauseFlag;
+ }
+
+ public Set<String> getConsumingSegments() {
+ return _consumingSegments;
+ }
+
+ public String getDescription() {
+ return _description;
+ }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PeriodicTaskInvocationResponse.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PeriodicTaskInvocationResponse.java
new file mode 100644
index 0000000000..c383170b55
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PeriodicTaskInvocationResponse.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+
+public class PeriodicTaskInvocationResponse {
+ String _taskId;
+ boolean _isSuccessful;
+
+ public PeriodicTaskInvocationResponse(String taskId, boolean isSuccessful) {
+ _taskId = taskId;
+ _isSuccessful = isSuccessful;
+ }
+
+ @JsonProperty("isSuccessful")
+ public boolean isSuccessful() {
+ return _isSuccessful;
+ }
+
+ @JsonProperty("taskId")
+ public String getTaskId() {
+ return _taskId;
+ }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java
index 0ae3f1fcf8..50d4c35a9e 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java
@@ -35,7 +35,6 @@ import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.core.periodictask.PeriodicTaskScheduler;
import org.slf4j.Logger;
@@ -62,7 +61,7 @@ public class PinotControllerPeriodicTaskRestletResource {
@Produces(MediaType.APPLICATION_JSON)
@Path("/run")
@ApiOperation(value = "Run periodic task against table. If table name is missing, task will run against all tables.")
- public String runPeriodicTask(
+ public Response runPeriodicTask(
@ApiParam(value = "Periodic task name", required = true) @QueryParam("taskname") String periodicTaskName,
@ApiParam(value = "Name of the table") @QueryParam("tableName") String tableName,
@ApiParam(value = "OFFLINE | REALTIME") @QueryParam("type") String tableType) {
@@ -87,11 +86,9 @@ public class PinotControllerPeriodicTaskRestletResource {
tableName = matchingTableNamesWithType.get(0);
}
- Pair<String, Integer> taskExecutionDetails = _pinotHelixResourceManager
- .invokeControllerPeriodicTask(tableName, periodicTaskName, null);
-
- return "{\"Log Request Id\": \"" + taskExecutionDetails.getLeft()
- + "\",\"Controllers notified\":" + (taskExecutionDetails.getRight() > 0) + "}";
+ return Response.ok()
+ .entity(_pinotHelixResourceManager.invokeControllerPeriodicTask(tableName, periodicTaskName, null))
+ .build();
}
@GET
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
index b6310af3a4..ba03a6cc5c 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.controller.api.resources;
-import com.fasterxml.jackson.core.JsonProcessingException;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiKeyAuthDefinition;
import io.swagger.annotations.ApiOperation;
@@ -26,20 +25,22 @@ import io.swagger.annotations.ApiParam;
import io.swagger.annotations.Authorization;
import io.swagger.annotations.SecurityDefinition;
import io.swagger.annotations.SwaggerDefinition;
-import java.util.HashMap;
-import java.util.Map;
import javax.inject.Inject;
-import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
-import org.apache.commons.lang3.tuple.Pair;
+import javax.ws.rs.core.Response;
+import org.apache.helix.model.IdealState;
+import org.apache.pinot.controller.api.exception.ControllerApplicationException;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
-import org.apache.pinot.controller.validation.RealtimeSegmentValidationManager;
+import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;
@@ -49,27 +50,71 @@ import static org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_K
HttpHeaders.AUTHORIZATION, in = ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = SWAGGER_AUTHORIZATION_KEY)))
@Path("/")
public class PinotRealtimeTableResource {
+ private static final Logger LOGGER = LoggerFactory.getLogger(PinotRealtimeTableResource.class);
+
@Inject
PinotHelixResourceManager _pinotHelixResourceManager;
+ @Inject
+ PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;
+
+ @POST
+ @Path("/tables/{tableName}/pauseConsumption")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Pause consumption of a realtime table",
+ notes = "Pause the consumption of a realtime table")
+ public Response pauseConsumption(
+ @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName) {
+ String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName);
+ validate(tableNameWithType);
+ try {
+ return Response.ok(_pinotLLCRealtimeSegmentManager.pauseConsumption(tableNameWithType)).build();
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e);
+ }
+ }
+
@POST
@Path("/tables/{tableName}/resumeConsumption")
@Produces(MediaType.APPLICATION_JSON)
- @Consumes(MediaType.APPLICATION_JSON)
- @ApiOperation(value = "Resume the consumption of a realtime table",
- notes = "Resume the consumption of a realtime table")
- public String resumeConsumption(
- @ApiParam(value = "Name of the table", required = true)
- @PathParam("tableName") String tableName) throws JsonProcessingException {
- // TODO: Add util method for invoking periodic tasks
+ @ApiOperation(value = "Resume consumption of a realtime table",
+ notes = "Resume the consumption for a realtime table")
+ public Response resumeConsumption(
+ @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName) {
String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName);
- Map<String, String> taskProperties = new HashMap<>();
- taskProperties.put(RealtimeSegmentValidationManager.RECREATE_DELETED_CONSUMING_SEGMENT_KEY, "true");
+ validate(tableNameWithType);
+ try {
+ return Response.ok(_pinotLLCRealtimeSegmentManager.resumeConsumption(tableNameWithType)).build();
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e);
+ }
+ }
- Pair<String, Integer> taskExecutionDetails = _pinotHelixResourceManager
- .invokeControllerPeriodicTask(tableNameWithType, Constants.REALTIME_SEGMENT_VALIDATION_MANAGER, taskProperties);
+ @GET
+ @Path("/tables/{tableName}/pauseStatus")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Return pause status of a realtime table",
+ notes = "Return pause status of a realtime table along with list of consuming segments.")
+ public Response getConsumptionStatus(
+ @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName) {
+ String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName);
+ validate(tableNameWithType);
+ try {
+ return Response.ok().entity(_pinotLLCRealtimeSegmentManager.getPauseStatus(tableNameWithType)).build();
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e);
+ }
+ }
- return "{\"Log Request Id\": \"" + taskExecutionDetails.getLeft()
- + "\",\"Controllers notified\":" + (taskExecutionDetails.getRight() > 0) + "}";
+ private void validate(String tableNameWithType) {
+ IdealState idealState = _pinotHelixResourceManager.getTableIdealState(tableNameWithType);
+ if (idealState == null) {
+ throw new ControllerApplicationException(LOGGER, String.format("Table %s not found!", tableNameWithType),
+ Response.Status.NOT_FOUND);
+ }
+ if (!idealState.isEnabled()) {
+ throw new ControllerApplicationException(LOGGER, String.format("Table %s is disabled!", tableNameWithType),
+ Response.Status.BAD_REQUEST);
+ }
}
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
index bb6002903b..a5ee4cbc4e 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
@@ -24,6 +24,7 @@ import java.net.URL;
import org.apache.pinot.common.exception.HttpErrorStatusException;
import org.apache.pinot.common.utils.SimpleHttpResponse;
import org.apache.pinot.common.utils.http.HttpClient;
+import org.apache.pinot.controller.api.resources.PauseStatus;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.tenant.Tenant;
@@ -169,6 +170,39 @@ public class ControllerRequestClient {
}
}
+ public PauseStatus pauseConsumption(String tableName)
+ throws IOException {
+ try {
+ SimpleHttpResponse response = HttpClient.wrapAndThrowHttpException(_httpClient.sendJsonPostRequest(new URL(
+ _controllerRequestURLBuilder.forPauseConsumption(tableName)).toURI(), null));
+ return JsonUtils.stringToObject(response.getResponse(), PauseStatus.class);
+ } catch (HttpErrorStatusException | URISyntaxException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public PauseStatus resumeConsumption(String tableName)
+ throws IOException {
+ try {
+ SimpleHttpResponse response = HttpClient.wrapAndThrowHttpException(_httpClient.sendJsonPostRequest(new URL(
+ _controllerRequestURLBuilder.forResumeConsumption(tableName)).toURI(), null));
+ return JsonUtils.stringToObject(response.getResponse(), PauseStatus.class);
+ } catch (HttpErrorStatusException | URISyntaxException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public PauseStatus getPauseStatus(String tableName)
+ throws IOException {
+ try {
+ SimpleHttpResponse response = HttpClient.wrapAndThrowHttpException(_httpClient.sendGetRequest(new URL(
+ _controllerRequestURLBuilder.forPauseStatus(tableName)).toURI()));
+ return JsonUtils.stringToObject(response.getResponse(), PauseStatus.class);
+ } catch (HttpErrorStatusException | URISyntaxException e) {
+ throw new IOException(e);
+ }
+ }
+
public void createBrokerTenant(String tenantName, int numBrokers)
throws IOException {
try {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index b1143a9873..a563f3048f 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -56,7 +56,6 @@ import javax.ws.rs.core.Response;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.AccessOption;
import org.apache.helix.ClusterMessagingService;
import org.apache.helix.Criteria;
@@ -115,6 +114,7 @@ import org.apache.pinot.controller.api.exception.InvalidTableConfigException;
import org.apache.pinot.controller.api.exception.TableAlreadyExistsException;
import org.apache.pinot.controller.api.exception.UserAlreadyExistsException;
import org.apache.pinot.controller.api.resources.InstanceInfo;
+import org.apache.pinot.controller.api.resources.PeriodicTaskInvocationResponse;
import org.apache.pinot.controller.api.resources.StateType;
import org.apache.pinot.controller.helix.core.assignment.instance.InstanceAssignmentDriver;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
@@ -3596,9 +3596,9 @@ public class PinotHelixResourceManager {
* @param tableName Name of table against which task is to be run
* @param periodicTaskName Task name
* @param taskProperties Extra properties to be passed along
- * @return Task id for filtering logs, along with the number of successfully sent messages
+ * @return Task id for filtering logs, along with success status (whether helix messeages were sent)
*/
- public Pair<String, Integer> invokeControllerPeriodicTask(String tableName, String periodicTaskName,
+ public PeriodicTaskInvocationResponse invokeControllerPeriodicTask(String tableName, String periodicTaskName,
Map<String, String> taskProperties) {
String periodicTaskRequestId = API_REQUEST_ID_PREFIX + UUID.randomUUID().toString().substring(0, 8);
@@ -3622,7 +3622,7 @@ public class PinotHelixResourceManager {
LOGGER.info("[TaskRequestId: {}] Periodic task execution message sent to {} controllers.", periodicTaskRequestId,
messageCount);
- return Pair.of(periodicTaskRequestId, messageCount);
+ return new PeriodicTaskInvocationResponse(periodicTaskRequestId, messageCount > 0);
}
/*
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 7cb579b83a..582c371ba6 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -29,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
+import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
@@ -36,13 +37,16 @@ import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.helix.AccessOption;
+import org.apache.helix.Criteria;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
import org.apache.helix.ZNRecord;
import org.apache.helix.model.IdealState;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.assignment.InstancePartitionsUtils;
+import org.apache.pinot.common.messages.ForceCommitMessage;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
@@ -56,6 +60,8 @@ import org.apache.pinot.common.utils.URIUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.api.events.MetadataEventNotifierFactory;
+import org.apache.pinot.controller.api.resources.Constants;
+import org.apache.pinot.controller.api.resources.PauseStatus;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
@@ -67,6 +73,7 @@ import org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpd
import org.apache.pinot.controller.helix.core.retention.strategy.RetentionStrategy;
import org.apache.pinot.controller.helix.core.retention.strategy.TimeRetentionStrategy;
import org.apache.pinot.controller.util.SegmentCompletionUtils;
+import org.apache.pinot.controller.validation.RealtimeSegmentValidationManager;
import org.apache.pinot.core.util.PeerServerSegmentFinder;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
@@ -122,6 +129,9 @@ public class PinotLLCRealtimeSegmentManager {
private static final int STARTING_SEQUENCE_NUMBER = 0; // Initial sequence number for new table segments
private static final String METADATA_EVENT_NOTIFIER_PREFIX = "metadata.event.notifier";
+ // simple field in Ideal State representing pause status for the table
+ private static final String IS_TABLE_PAUSED = "isTablePaused";
+
// Max time to wait for all LLC segments to complete committing their metadata while stopping the controller.
private static final long MAX_LLC_SEGMENT_METADATA_COMMIT_TIME_MILLIS = 30_000L;
@@ -552,12 +562,12 @@ public class PinotLLCRealtimeSegmentManager {
.collect(Collectors.toSet());
int numPartitionGroups = newPartitionGroupMetadataList.size();
- // Only if committingSegment's partitionGroup is present in the newPartitionGroupMetadataList, we create new
- // segment metadata
String newConsumingSegmentName = null;
- String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);
- long newSegmentCreationTimeMs = getCurrentTimeMs();
- if (newPartitionGroupSet.contains(committingSegmentPartitionGroupId)) {
+ if (!isTablePaused(idealState) && newPartitionGroupSet.contains(committingSegmentPartitionGroupId)) {
+ // Only if committingSegment's partitionGroup is present in the newPartitionGroupMetadataList, we create new
+ // segment metadata
+ String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);
+ long newSegmentCreationTimeMs = getCurrentTimeMs();
LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, committingSegmentPartitionGroupId,
committingLLCSegment.getSequenceNumber() + 1, newSegmentCreationTimeMs);
createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment, newSegmentCreationTimeMs,
@@ -854,7 +864,6 @@ public class PinotLLCRealtimeSegmentManager {
*
* If the controller fails after step-3, we are fine because the idealState has the new segments.
* If the controller fails before step-1, the server will see this as an upload failure, and will re-try.
- * @param tableConfig
*
* If the consuming segment is deleted by user intentionally or by mistake:
* Check whether there are segments in the PROPERTYSTORE with status DONE, but no new segment in status
@@ -870,7 +879,9 @@ public class PinotLLCRealtimeSegmentManager {
String realtimeTableName = tableConfig.getTableName();
HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState -> {
assert idealState != null;
- if (idealState.isEnabled()) {
+ boolean isTableEnabled = idealState.isEnabled();
+ boolean isTablePaused = isTablePaused(idealState);
+ if (isTableEnabled && !isTablePaused) {
List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList =
getPartitionGroupConsumptionStatusList(idealState, streamConfig);
// Read the smallest offset when a new partition is detected
@@ -882,7 +893,8 @@ public class PinotLLCRealtimeSegmentManager {
return ensureAllPartitionsConsuming(tableConfig, streamConfig, idealState, newPartitionGroupMetadataList,
recreateDeletedConsumingSegment);
} else {
- LOGGER.info("Skipping LLC segments validation for disabled table: {}", realtimeTableName);
+ LOGGER.info("Skipping LLC segments validation for table: {}, isTableEnabled: {}, isTablePaused: {}",
+ realtimeTableName, isTableEnabled, isTablePaused);
return idealState;
}
}, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f), true);
@@ -913,11 +925,15 @@ public class PinotLLCRealtimeSegmentManager {
"Exceeded max segment completion time for segment " + committingSegmentName);
}
updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(), committingSegmentName,
- newSegmentName, segmentAssignment, instancePartitionsMap);
+ isTablePaused(idealState) ? null : newSegmentName, segmentAssignment, instancePartitionsMap);
return idealState;
}, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f));
}
+ private boolean isTablePaused(IdealState idealState) {
+ return Boolean.parseBoolean(idealState.getRecord().getSimpleField(IS_TABLE_PAUSED));
+ }
+
@VisibleForTesting
void updateInstanceStatesForNewConsumingSegment(Map<String, Map<String, String>> instanceStatesMap,
@Nullable String committingSegmentName, @Nullable String newSegmentName, SegmentAssignment segmentAssignment,
@@ -1403,4 +1419,91 @@ public class PinotLLCRealtimeSegmentManager {
}
}
}
+
+ /**
+ * Pause consumption on a table by
+ * 1) setting "isTablePaused" in ideal states to true and
+ * 2) sending force commit messages to servers
+ */
+ public PauseStatus pauseConsumption(String tableNameWithType) {
+ IdealState updatedIdealState = updatePauseStatusInIdealState(tableNameWithType, true);
+ Set<String> consumingSegments = findConsumingSegments(updatedIdealState);
+ sendForceCommitMessageToServers(tableNameWithType, consumingSegments);
+ return new PauseStatus(true, consumingSegments, consumingSegments.isEmpty() ? null : "Pause flag is set."
+ + " Consuming segments are being committed."
+ + " Use /pauseStatus endpoint in a few moments to check if all consuming segments have been committed.");
+ }
+
+ /**
+ * Resume consumption on a table by
+ * 1) setting "isTablePaused" in ideal states to false and
+ * 2) triggering segment validation job to create new consuming segments in ideal states
+ */
+ public PauseStatus resumeConsumption(String tableNameWithType) {
+ IdealState updatedIdealState = updatePauseStatusInIdealState(tableNameWithType, false);
+
+ // trigger realtime segment validation job to resume consumption
+ Map<String, String> taskProperties = new HashMap<>();
+ taskProperties.put(RealtimeSegmentValidationManager.RECREATE_DELETED_CONSUMING_SEGMENT_KEY, "true");
+ _helixResourceManager
+ .invokeControllerPeriodicTask(tableNameWithType, Constants.REALTIME_SEGMENT_VALIDATION_MANAGER, taskProperties);
+
+ return new PauseStatus(false, findConsumingSegments(updatedIdealState), "Pause flag is cleared. "
+ + "Consuming segments are being created. Use /pauseStatus endpoint in a few moments to double check.");
+ }
+
+ private IdealState updatePauseStatusInIdealState(String tableNameWithType, boolean pause) {
+ IdealState updatedIdealState = HelixHelper.updateIdealState(_helixManager, tableNameWithType, idealState -> {
+ ZNRecord znRecord = idealState.getRecord();
+ znRecord.setSimpleField(IS_TABLE_PAUSED, Boolean.valueOf(pause).toString());
+ return new IdealState(znRecord);
+ }, RetryPolicies.noDelayRetryPolicy(1));
+ LOGGER.info("Set 'isTablePaused' to {} in the Ideal State for table {}.", pause, tableNameWithType);
+ return updatedIdealState;
+ }
+
+ private void sendForceCommitMessageToServers(String tableNameWithType, Set<String> consumingSegments) {
+ if (!consumingSegments.isEmpty()) {
+ Criteria recipientCriteria = new Criteria();
+ recipientCriteria.setInstanceName("%");
+ recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
+ recipientCriteria.setResource(tableNameWithType);
+ recipientCriteria.setSessionSpecific(true);
+ ForceCommitMessage message = new ForceCommitMessage(tableNameWithType, consumingSegments);
+ int numMessagesSent = _helixManager.getMessagingService().send(recipientCriteria, message, null, -1);
+ if (numMessagesSent > 0) {
+ LOGGER.info("Sent {} force commit messages for table: {} segments: {}", numMessagesSent, tableNameWithType,
+ consumingSegments);
+ } else {
+ throw new RuntimeException(String
+ .format("No force commit message was sent for table: %s segments: %s", tableNameWithType,
+ consumingSegments));
+ }
+ }
+ }
+
+ private Set<String> findConsumingSegments(IdealState idealState) {
+ Set<String> consumingSegments = new TreeSet<>();
+ idealState.getRecord().getMapFields().forEach((segmentName, instanceToStateMap) -> {
+ for (String state : instanceToStateMap.values()) {
+ if (state.equals(SegmentStateModel.CONSUMING)) {
+ consumingSegments.add(segmentName);
+ break;
+ }
+ }
+ });
+ return consumingSegments;
+ }
+
+ /**
+ * Return pause status:
+ * - the value of `isTablePaused` flag in ideal state
+ * - list of consuming segments
+ */
+ public PauseStatus getPauseStatus(String tableNameWithType) {
+ IdealState idealState = getIdealState(tableNameWithType);
+ String isTablePausedStr = idealState.getRecord().getSimpleField(IS_TABLE_PAUSED);
+ Set<String> consumingSegments = findConsumingSegments(idealState);
+ return new PauseStatus(Boolean.parseBoolean(isTablePausedStr), consumingSegments, null);
+ }
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
index 7f5f7185a9..28ad064601 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
@@ -1170,7 +1170,8 @@ public class SegmentCompletionManager {
*/
private boolean isWinnerPicked(String preferredInstance, long now, final String stopReason) {
if ((SegmentCompletionProtocol.REASON_ROW_LIMIT.equals(stopReason)
- || SegmentCompletionProtocol.REASON_END_OF_PARTITION_GROUP.equals(stopReason))
+ || SegmentCompletionProtocol.REASON_END_OF_PARTITION_GROUP.equals(stopReason)
+ || SegmentCompletionProtocol.REASON_FORCE_COMMIT_MESSAGE_RECEIVED.equals(stopReason))
&& _commitStateMap.size() == 1) {
_winner = preferredInstance;
_winningOffset = _commitStateMap.get(preferredInstance);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
index a87995325f..c7365dff93 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
@@ -163,4 +163,9 @@ public interface InstanceDataManager {
* uploaded segment file. Servers utilize segment uploader to upload llc segment to segment store.
*/
SegmentUploader getSegmentUploader();
+
+ /**
+ * Immediately stop consumption and start committing the consuming segments.
+ */
+ void forceCommit(String tableNameWithType, Set<String> segmentNames);
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index fb168602d7..fdfe31377e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -244,6 +244,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
// Segment end criteria
private volatile long _consumeEndTime = 0;
private volatile boolean _endOfPartitionGroup = false;
+ private volatile boolean _forceCommitMessageReceived = false;
private StreamPartitionMsgOffset _finalOffset; // Used when we want to catch up to this one
private volatile boolean _shouldStop = false;
@@ -295,8 +296,11 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
switch (_state) {
case INITIAL_CONSUMING:
// The segment has been created, and we have not posted a segmentConsumed() message on the controller yet.
- // We need to consume as much data as available, until we have either reached the max number of rows or
- // the max time we are allowed to consume.
+ // We need to consume as much data as available, until we have encountered one of the following scenarios:
+ // - the max number of rows has been reached
+ // - the max time we are allowed to consume has passed;
+ // - partition group is ended
+ // - force commit message has been received
if (now >= _consumeEndTime) {
if (_realtimeSegment.getNumDocsIndexed() == 0) {
_segmentLogger.info("No events came in, extending time by {} hours", TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS);
@@ -318,6 +322,11 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
+ "numRowsConsumed={}", _segmentMaxRowCount, _numRowsIndexed, _numRowsConsumed);
_stopReason = SegmentCompletionProtocol.REASON_END_OF_PARTITION_GROUP;
return true;
+ } else if (_forceCommitMessageReceived) {
+ _segmentLogger.info("Stopping consumption due to force commit - numRowsConsumed={} numRowsIndexed={}",
+ _numRowsConsumed, _numRowsIndexed);
+ _stopReason = SegmentCompletionProtocol.REASON_FORCE_COMMIT_MESSAGE_RECEIVED;
+ return true;
}
return false;
@@ -1535,4 +1544,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
public String getSegmentName() {
return _segmentNameStr;
}
+
+ public void forceCommit() {
+ _forceCommitMessageReceived = true;
+ }
}
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 4ee010ee3b..18c5830eb8 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -49,6 +49,7 @@ import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
+import org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager;
import org.apache.pinot.core.data.manager.realtime.PinotFSSegmentUploader;
import org.apache.pinot.core.data.manager.realtime.SegmentBuildTimeLeaseExtender;
import org.apache.pinot.core.data.manager.realtime.SegmentUploader;
@@ -64,6 +65,7 @@ import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -462,4 +464,27 @@ public class HelixInstanceDataManager implements InstanceDataManager {
public SegmentUploader getSegmentUploader() {
return _segmentUploader;
}
+
+ @Override
+ public void forceCommit(String tableNameWithType, Set<String> segmentNames) {
+ Preconditions.checkArgument(TableNameBuilder.isRealtimeTableResource(tableNameWithType), String
+ .format("Force commit is only supported for segments of realtime tables - table name: %s segment names: %s",
+ tableNameWithType, segmentNames));
+ TableDataManager tableDataManager = _tableDataManagerMap.get(tableNameWithType);
+ if (tableDataManager != null) {
+ segmentNames.forEach(segName -> {
+ SegmentDataManager segmentDataManager = tableDataManager.acquireSegment(segName);
+ if (segmentDataManager != null) {
+ try {
+ if (segmentDataManager instanceof LLRealtimeSegmentDataManager) {
+ LLRealtimeSegmentDataManager llSegmentDataManager = (LLRealtimeSegmentDataManager) segmentDataManager;
+ llSegmentDataManager.forceCommit();
+ }
+ } finally {
+ tableDataManager.releaseSegment(segmentDataManager);
+ }
+ }
+ });
+ }
+ }
}
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
index ed6e9b6dbe..9e4dab78fe 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
@@ -18,12 +18,14 @@
*/
package org.apache.pinot.server.starter.helix;
+import java.util.Set;
import org.apache.helix.NotificationContext;
import org.apache.helix.messaging.handling.HelixTaskResult;
import org.apache.helix.messaging.handling.MessageHandler;
import org.apache.helix.messaging.handling.MessageHandlerFactory;
import org.apache.helix.model.Message;
import org.apache.pinot.common.Utils;
+import org.apache.pinot.common.messages.ForceCommitMessage;
import org.apache.pinot.common.messages.SegmentRefreshMessage;
import org.apache.pinot.common.messages.SegmentReloadMessage;
import org.apache.pinot.common.messages.TableDeletionMessage;
@@ -61,6 +63,8 @@ public class SegmentMessageHandlerFactory implements MessageHandlerFactory {
return new SegmentReloadMessageHandler(new SegmentReloadMessage(message), _metrics, context);
case TableDeletionMessage.DELETE_TABLE_MSG_SUB_TYPE:
return new TableDeletionMessageHandler(new TableDeletionMessage(message), _metrics, context);
+ case ForceCommitMessage.FORCE_COMMIT_MSG_SUB_TYPE:
+ return new ForceCommitMessageHandler(new ForceCommitMessage(message), _metrics, context);
default:
LOGGER.warn("Unsupported user defined message sub type: {} for segment: {}", msgSubType,
message.getPartitionName());
@@ -169,6 +173,34 @@ public class SegmentMessageHandlerFactory implements MessageHandlerFactory {
}
}
+ private class ForceCommitMessageHandler extends DefaultMessageHandler {
+
+ private String _tableName;
+ private Set<String> _segmentNames;
+
+ public ForceCommitMessageHandler(ForceCommitMessage forceCommitMessage, ServerMetrics metrics,
+ NotificationContext ctx) {
+ super(forceCommitMessage, metrics, ctx);
+ _tableName = forceCommitMessage.getTableName();
+ _segmentNames = forceCommitMessage.getSegmentNames();
+ }
+
+ @Override
+ public HelixTaskResult handleMessage()
+ throws InterruptedException {
+ HelixTaskResult helixTaskResult = new HelixTaskResult();
+ _logger.info("Handling force commit message for table {} segments {}", _tableName, _segmentNames);
+ try {
+ _instanceDataManager.forceCommit(_tableName, _segmentNames);
+ helixTaskResult.setSuccess(true);
+ } catch (Exception e) {
+ _metrics.addMeteredTableValue(_tableNameWithType, ServerMeter.DELETE_TABLE_FAILURES, 1);
+ Utils.rethrowException(e);
+ }
+ return helixTaskResult;
+ }
+ }
+
private static class DefaultMessageHandler extends MessageHandler {
final String _segmentName;
final String _tableNameWithType;
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
index 882a4a802a..020ef35e05 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
@@ -459,6 +459,18 @@ public class ControllerRequestURLBuilder {
"?cardinality=" + cardinality + "&primaryKeySize=" + primaryKeySize + "&numPartitions=" + numPartitions);
}
+ public String forPauseConsumption(String tableName) {
+ return StringUtil.join("/", _baseUrl, "tables", tableName, "pauseConsumption");
+ }
+
+ public String forResumeConsumption(String tableName) {
+ return StringUtil.join("/", _baseUrl, "tables", tableName, "resumeConsumption");
+ }
+
+ public String forPauseStatus(String tableName) {
+ return StringUtil.join("/", _baseUrl, "tables", tableName, "pauseStatus");
+ }
+
private static String encode(String s) {
try {
return URLEncoder.encode(s, "UTF-8");
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org