You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sa...@apache.org on 2022/07/20 22:49:43 UTC

[pinot] branch master updated: Support pause/resume consumption of realtime tables (#8986)

This is an automated email from the ASF dual-hosted git repository.

sajjad pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 6903856d39 Support pause/resume consumption of realtime tables (#8986)
6903856d39 is described below

commit 6903856d399b4d7b09548a75a888382e1659e9e3
Author: Sajjad Moradi <mo...@gmail.com>
AuthorDate: Wed Jul 20 15:49:38 2022 -0700

    Support pause/resume consumption of realtime tables (#8986)
---
 .../pinot/common/messages/ForceCommitMessage.java  |  62 +++++++++++
 .../protocols/SegmentCompletionProtocol.java       |   2 +
 .../pinot/common/utils/helix/HelixHelper.java      |  21 ++--
 .../pinot/controller/BaseControllerStarter.java    |   1 +
 .../controller/api/resources/PauseStatus.java      |  53 +++++++++
 .../resources/PeriodicTaskInvocationResponse.java  |  42 +++++++
 ...PinotControllerPeriodicTaskRestletResource.java |  11 +-
 .../api/resources/PinotRealtimeTableResource.java  |  83 ++++++++++----
 .../controller/helix/ControllerRequestClient.java  |  34 ++++++
 .../helix/core/PinotHelixResourceManager.java      |   8 +-
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 121 +++++++++++++++++++--
 .../core/realtime/SegmentCompletionManager.java    |   3 +-
 .../core/data/manager/InstanceDataManager.java     |   5 +
 .../realtime/LLRealtimeSegmentDataManager.java     |  17 ++-
 .../starter/helix/HelixInstanceDataManager.java    |  25 +++++
 .../helix/SegmentMessageHandlerFactory.java        |  32 ++++++
 .../utils/builder/ControllerRequestURLBuilder.java |  12 ++
 17 files changed, 483 insertions(+), 49 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/messages/ForceCommitMessage.java b/pinot-common/src/main/java/org/apache/pinot/common/messages/ForceCommitMessage.java
new file mode 100644
index 0000000000..4f131c9bac
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/messages/ForceCommitMessage.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.messages;
+
+import com.google.common.base.Preconditions;
+import java.util.Arrays;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.Message;
+
+
+/**
+ * Force commit helix message is created on controller and get sent to servers to instruct them to stop consumption and
+ * immediately start committing the segment.
+ */
+public class ForceCommitMessage extends Message {
+  public static final String FORCE_COMMIT_MSG_SUB_TYPE = "FORCE_COMMIT";
+  private static final String TABLE_NAME = "tableName";
+  private static final String SEGMENT_NAMES = "segmentNames";
+
+  public ForceCommitMessage(String tableNameWithType, Set<String> segmentNames) {
+    super(MessageType.USER_DEFINE_MSG, UUID.randomUUID().toString());
+    setMsgSubType(FORCE_COMMIT_MSG_SUB_TYPE);
+    setExecutionTimeout(-1); // no timeout
+    ZNRecord znRecord = getRecord();
+    znRecord.setSimpleField(TABLE_NAME, tableNameWithType);
+    znRecord.setSimpleField(SEGMENT_NAMES, String.join(",", segmentNames));
+  }
+
+  public ForceCommitMessage(Message message) {
+    super(message.getRecord());
+    String msgSubType = message.getMsgSubType();
+    Preconditions.checkArgument(msgSubType.equals(FORCE_COMMIT_MSG_SUB_TYPE),
+        "Invalid message sub type: " + msgSubType + " for SegmentReloadMessage");
+  }
+
+  public String getTableName() {
+    return getRecord().getSimpleField(TABLE_NAME);
+  }
+
+  public Set<String> getSegmentNames() {
+    return Arrays.stream(getRecord().getSimpleField(SEGMENT_NAMES).split(",")).collect(Collectors.toSet());
+  }
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
index 790e242009..8293bcf4d3 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
@@ -148,6 +148,8 @@ public class SegmentCompletionProtocol {
   public static final String REASON_TIME_LIMIT = "timeLimit";
   // Stop reason sent by server as end of partitionGroup reached
   public static final String REASON_END_OF_PARTITION_GROUP = "endOfPartitionGroup";
+  // Stop reason sent by server as force commit message received
+  public static final String REASON_FORCE_COMMIT_MESSAGE_RECEIVED = "forceCommitMessageReceived";
 
   // Canned responses
   public static final Response RESP_NOT_LEADER =
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java
index fe90644ed0..5a371a1fb1 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java
@@ -89,12 +89,12 @@ public class HelixHelper {
    * @param helixManager The HelixManager used to interact with the Helix cluster
    * @param resourceName The resource for which to update the ideal state
    * @param updater A function that returns an updated ideal state given an input ideal state
+   * @return updated ideal state if successful, null if not
    */
-  // TODO: since updater always update ideal state in place, it should return boolean indicating whether the ideal
-  //  state get changed.
-  public static void updateIdealState(final HelixManager helixManager, final String resourceName,
+  public static IdealState updateIdealState(final HelixManager helixManager, final String resourceName,
       final Function<IdealState, IdealState> updater, RetryPolicy policy, final boolean noChangeOk) {
     try {
+      IdealStateWrapper idealStateWrapper = new IdealStateWrapper();
       policy.attempt(new Callable<Boolean>() {
         @Override
         public Boolean call() {
@@ -139,6 +139,7 @@ public class HelixHelper {
               if (dataAccessor.getBaseDataAccessor()
                   .set(idealStateKey.getPath(), updatedZNRecord, idealState.getRecord().getVersion(),
                       AccessOption.PERSISTENT)) {
+                idealStateWrapper._idealState = updatedIdealState;
                 return true;
               } else {
                 LOGGER.warn("Failed to update ideal state for resource: {}", resourceName);
@@ -158,15 +159,21 @@ public class HelixHelper {
             } else {
               LOGGER.warn("Idempotent or null ideal state update for resource {}, skipping update.", resourceName);
             }
+            idealStateWrapper._idealState = idealState;
             return true;
           }
         }
       });
+      return idealStateWrapper._idealState;
     } catch (Exception e) {
       throw new RuntimeException("Caught exception while updating ideal state for resource: " + resourceName, e);
     }
   }
 
+  private static class IdealStateWrapper {
+    IdealState _idealState;
+  }
+
   /**
    * Exception to be thrown by updater function to exit from retry in {@link HelixHelper::updatedIdealState}
    */
@@ -181,14 +188,14 @@ public class HelixHelper {
     }
   }
 
-  public static void updateIdealState(HelixManager helixManager, String resourceName,
+  public static IdealState updateIdealState(HelixManager helixManager, String resourceName,
       Function<IdealState, IdealState> updater) {
-    updateIdealState(helixManager, resourceName, updater, DEFAULT_TABLE_IDEALSTATES_UPDATE_RETRY_POLICY, false);
+    return updateIdealState(helixManager, resourceName, updater, DEFAULT_TABLE_IDEALSTATES_UPDATE_RETRY_POLICY, false);
   }
 
-  public static void updateIdealState(final HelixManager helixManager, final String resourceName,
+  public static IdealState updateIdealState(final HelixManager helixManager, final String resourceName,
       final Function<IdealState, IdealState> updater, RetryPolicy policy) {
-    updateIdealState(helixManager, resourceName, updater, policy, false);
+    return updateIdealState(helixManager, resourceName, updater, policy, false);
   }
 
   /**
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index fd9f00ef6d..27c8eb2a25 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -465,6 +465,7 @@ public abstract class BaseControllerStarter implements ServiceStartable {
         bind(_leadControllerManager).to(LeadControllerManager.class);
         bind(_periodicTaskScheduler).to(PeriodicTaskScheduler.class);
         bind(_sqlQueryExecutor).to(SqlQueryExecutor.class);
+        bind(_pinotLLCRealtimeSegmentManager).to(PinotLLCRealtimeSegmentManager.class);
       }
     });
 
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PauseStatus.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PauseStatus.java
new file mode 100644
index 0000000000..9542e70eba
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PauseStatus.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Set;
+
+
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class PauseStatus {
+  private boolean _pauseFlag;
+  private Set<String> _consumingSegments;
+  private String _description;
+
+  @JsonCreator
+  public PauseStatus(@JsonProperty("pauseFlag") boolean pauseFlag,
+      @JsonProperty("consumingSegments") Set<String> consumingSegments,
+      @JsonProperty("description") String description) {
+    _pauseFlag = pauseFlag;
+    _consumingSegments = consumingSegments;
+    _description = description;
+  }
+
+  public boolean getPauseFlag() {
+    return _pauseFlag;
+  }
+
+  public Set<String> getConsumingSegments() {
+    return _consumingSegments;
+  }
+
+  public String getDescription() {
+    return _description;
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PeriodicTaskInvocationResponse.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PeriodicTaskInvocationResponse.java
new file mode 100644
index 0000000000..c383170b55
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PeriodicTaskInvocationResponse.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+
+public class PeriodicTaskInvocationResponse {
+  String _taskId;
+  boolean _isSuccessful;
+
+  public PeriodicTaskInvocationResponse(String taskId, boolean isSuccessful) {
+    _taskId = taskId;
+    _isSuccessful = isSuccessful;
+  }
+
+  @JsonProperty("isSuccessful")
+  public boolean isSuccessful() {
+    return _isSuccessful;
+  }
+
+  @JsonProperty("taskId")
+  public String getTaskId() {
+    return _taskId;
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java
index 0ae3f1fcf8..50d4c35a9e 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java
@@ -35,7 +35,6 @@ import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.core.periodictask.PeriodicTaskScheduler;
 import org.slf4j.Logger;
@@ -62,7 +61,7 @@ public class PinotControllerPeriodicTaskRestletResource {
   @Produces(MediaType.APPLICATION_JSON)
   @Path("/run")
   @ApiOperation(value = "Run periodic task against table. If table name is missing, task will run against all tables.")
-  public String runPeriodicTask(
+  public Response runPeriodicTask(
       @ApiParam(value = "Periodic task name", required = true) @QueryParam("taskname") String periodicTaskName,
       @ApiParam(value = "Name of the table") @QueryParam("tableName") String tableName,
       @ApiParam(value = "OFFLINE | REALTIME") @QueryParam("type") String tableType) {
@@ -87,11 +86,9 @@ public class PinotControllerPeriodicTaskRestletResource {
       tableName = matchingTableNamesWithType.get(0);
     }
 
-    Pair<String, Integer> taskExecutionDetails = _pinotHelixResourceManager
-        .invokeControllerPeriodicTask(tableName, periodicTaskName, null);
-
-    return "{\"Log Request Id\": \"" + taskExecutionDetails.getLeft()
-        + "\",\"Controllers notified\":" + (taskExecutionDetails.getRight() > 0) + "}";
+    return Response.ok()
+        .entity(_pinotHelixResourceManager.invokeControllerPeriodicTask(tableName, periodicTaskName, null))
+        .build();
   }
 
   @GET
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
index b6310af3a4..ba03a6cc5c 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.controller.api.resources;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiKeyAuthDefinition;
 import io.swagger.annotations.ApiOperation;
@@ -26,20 +25,22 @@ import io.swagger.annotations.ApiParam;
 import io.swagger.annotations.Authorization;
 import io.swagger.annotations.SecurityDefinition;
 import io.swagger.annotations.SwaggerDefinition;
-import java.util.HashMap;
-import java.util.Map;
 import javax.inject.Inject;
-import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
 import javax.ws.rs.POST;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
-import org.apache.commons.lang3.tuple.Pair;
+import javax.ws.rs.core.Response;
+import org.apache.helix.model.IdealState;
+import org.apache.pinot.controller.api.exception.ControllerApplicationException;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
-import org.apache.pinot.controller.validation.RealtimeSegmentValidationManager;
+import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;
 
@@ -49,27 +50,71 @@ import static org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_K
     HttpHeaders.AUTHORIZATION, in = ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = SWAGGER_AUTHORIZATION_KEY)))
 @Path("/")
 public class PinotRealtimeTableResource {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PinotRealtimeTableResource.class);
+
   @Inject
   PinotHelixResourceManager _pinotHelixResourceManager;
 
+  @Inject
+  PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;
+
+  @POST
+  @Path("/tables/{tableName}/pauseConsumption")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Pause consumption of a realtime table",
+      notes = "Pause the consumption of a realtime table")
+  public Response pauseConsumption(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName) {
+    String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName);
+    validate(tableNameWithType);
+    try {
+      return Response.ok(_pinotLLCRealtimeSegmentManager.pauseConsumption(tableNameWithType)).build();
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e);
+    }
+  }
+
   @POST
   @Path("/tables/{tableName}/resumeConsumption")
   @Produces(MediaType.APPLICATION_JSON)
-  @Consumes(MediaType.APPLICATION_JSON)
-  @ApiOperation(value = "Resume the consumption of a realtime table",
-      notes = "Resume the consumption of a realtime table")
-  public String resumeConsumption(
-      @ApiParam(value = "Name of the table", required = true)
-      @PathParam("tableName") String tableName) throws JsonProcessingException {
-    // TODO: Add util method for invoking periodic tasks
+  @ApiOperation(value = "Resume consumption of a realtime table",
+      notes = "Resume the consumption for a realtime table")
+  public Response resumeConsumption(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName) {
     String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName);
-    Map<String, String> taskProperties = new HashMap<>();
-    taskProperties.put(RealtimeSegmentValidationManager.RECREATE_DELETED_CONSUMING_SEGMENT_KEY, "true");
+    validate(tableNameWithType);
+    try {
+      return Response.ok(_pinotLLCRealtimeSegmentManager.resumeConsumption(tableNameWithType)).build();
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e);
+    }
+  }
 
-    Pair<String, Integer> taskExecutionDetails = _pinotHelixResourceManager
-        .invokeControllerPeriodicTask(tableNameWithType, Constants.REALTIME_SEGMENT_VALIDATION_MANAGER, taskProperties);
+  @GET
+  @Path("/tables/{tableName}/pauseStatus")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Return pause status of a realtime table",
+      notes = "Return pause status of a realtime table along with list of consuming segments.")
+  public Response getConsumptionStatus(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName) {
+    String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName);
+    validate(tableNameWithType);
+    try {
+      return Response.ok().entity(_pinotLLCRealtimeSegmentManager.getPauseStatus(tableNameWithType)).build();
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e);
+    }
+  }
 
-    return "{\"Log Request Id\": \"" + taskExecutionDetails.getLeft()
-        + "\",\"Controllers notified\":" + (taskExecutionDetails.getRight() > 0) + "}";
+  private void validate(String tableNameWithType) {
+    IdealState idealState = _pinotHelixResourceManager.getTableIdealState(tableNameWithType);
+    if (idealState == null) {
+      throw new ControllerApplicationException(LOGGER, String.format("Table %s not found!", tableNameWithType),
+          Response.Status.NOT_FOUND);
+    }
+    if (!idealState.isEnabled()) {
+      throw new ControllerApplicationException(LOGGER, String.format("Table %s is disabled!", tableNameWithType),
+          Response.Status.BAD_REQUEST);
+    }
   }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
index bb6002903b..a5ee4cbc4e 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
@@ -24,6 +24,7 @@ import java.net.URL;
 import org.apache.pinot.common.exception.HttpErrorStatusException;
 import org.apache.pinot.common.utils.SimpleHttpResponse;
 import org.apache.pinot.common.utils.http.HttpClient;
+import org.apache.pinot.controller.api.resources.PauseStatus;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.tenant.Tenant;
@@ -169,6 +170,39 @@ public class ControllerRequestClient {
     }
   }
 
+  public PauseStatus pauseConsumption(String tableName)
+      throws IOException {
+    try {
+      SimpleHttpResponse response = HttpClient.wrapAndThrowHttpException(_httpClient.sendJsonPostRequest(new URL(
+          _controllerRequestURLBuilder.forPauseConsumption(tableName)).toURI(), null));
+      return JsonUtils.stringToObject(response.getResponse(), PauseStatus.class);
+    } catch (HttpErrorStatusException | URISyntaxException e) {
+      throw new IOException(e);
+    }
+  }
+
+  public PauseStatus resumeConsumption(String tableName)
+      throws IOException {
+    try {
+      SimpleHttpResponse response = HttpClient.wrapAndThrowHttpException(_httpClient.sendJsonPostRequest(new URL(
+          _controllerRequestURLBuilder.forResumeConsumption(tableName)).toURI(), null));
+      return JsonUtils.stringToObject(response.getResponse(), PauseStatus.class);
+    } catch (HttpErrorStatusException | URISyntaxException e) {
+      throw new IOException(e);
+    }
+  }
+
+  public PauseStatus getPauseStatus(String tableName)
+      throws IOException {
+    try {
+      SimpleHttpResponse response = HttpClient.wrapAndThrowHttpException(_httpClient.sendGetRequest(new URL(
+          _controllerRequestURLBuilder.forPauseStatus(tableName)).toURI()));
+      return JsonUtils.stringToObject(response.getResponse(), PauseStatus.class);
+    } catch (HttpErrorStatusException | URISyntaxException e) {
+      throw new IOException(e);
+    }
+  }
+
   public void createBrokerTenant(String tenantName, int numBrokers)
       throws IOException {
     try {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index b1143a9873..a563f3048f 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -56,7 +56,6 @@ import javax.ws.rs.core.Response;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.helix.AccessOption;
 import org.apache.helix.ClusterMessagingService;
 import org.apache.helix.Criteria;
@@ -115,6 +114,7 @@ import org.apache.pinot.controller.api.exception.InvalidTableConfigException;
 import org.apache.pinot.controller.api.exception.TableAlreadyExistsException;
 import org.apache.pinot.controller.api.exception.UserAlreadyExistsException;
 import org.apache.pinot.controller.api.resources.InstanceInfo;
+import org.apache.pinot.controller.api.resources.PeriodicTaskInvocationResponse;
 import org.apache.pinot.controller.api.resources.StateType;
 import org.apache.pinot.controller.helix.core.assignment.instance.InstanceAssignmentDriver;
 import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
@@ -3596,9 +3596,9 @@ public class PinotHelixResourceManager {
    * @param tableName Name of table against which task is to be run
    * @param periodicTaskName Task name
    * @param taskProperties Extra properties to be passed along
-   * @return Task id for filtering logs, along with the number of successfully sent messages
+   * @return Task id for filtering logs, along with success status (whether helix messeages were sent)
    */
-  public Pair<String, Integer> invokeControllerPeriodicTask(String tableName, String periodicTaskName,
+  public PeriodicTaskInvocationResponse invokeControllerPeriodicTask(String tableName, String periodicTaskName,
       Map<String, String> taskProperties) {
     String periodicTaskRequestId = API_REQUEST_ID_PREFIX + UUID.randomUUID().toString().substring(0, 8);
 
@@ -3622,7 +3622,7 @@ public class PinotHelixResourceManager {
 
     LOGGER.info("[TaskRequestId: {}] Periodic task execution message sent to {} controllers.", periodicTaskRequestId,
         messageCount);
-    return Pair.of(periodicTaskRequestId, messageCount);
+    return new PeriodicTaskInvocationResponse(periodicTaskRequestId, messageCount > 0);
   }
 
   /*
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 7cb579b83a..582c371ba6 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
+import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
@@ -36,13 +37,16 @@ import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.helix.AccessOption;
+import org.apache.helix.Criteria;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.assignment.InstancePartitionsUtils;
+import org.apache.pinot.common.messages.ForceCommitMessage;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
@@ -56,6 +60,8 @@ import org.apache.pinot.common.utils.URIUtils;
 import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.api.events.MetadataEventNotifierFactory;
+import org.apache.pinot.controller.api.resources.Constants;
+import org.apache.pinot.controller.api.resources.PauseStatus;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
 import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
@@ -67,6 +73,7 @@ import org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpd
 import org.apache.pinot.controller.helix.core.retention.strategy.RetentionStrategy;
 import org.apache.pinot.controller.helix.core.retention.strategy.TimeRetentionStrategy;
 import org.apache.pinot.controller.util.SegmentCompletionUtils;
+import org.apache.pinot.controller.validation.RealtimeSegmentValidationManager;
 import org.apache.pinot.core.util.PeerServerSegmentFinder;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
@@ -122,6 +129,9 @@ public class PinotLLCRealtimeSegmentManager {
   private static final int STARTING_SEQUENCE_NUMBER = 0; // Initial sequence number for new table segments
   private static final String METADATA_EVENT_NOTIFIER_PREFIX = "metadata.event.notifier";
 
+  // simple field in Ideal State representing pause status for the table
+  private static final String IS_TABLE_PAUSED = "isTablePaused";
+
   // Max time to wait for all LLC segments to complete committing their metadata while stopping the controller.
   private static final long MAX_LLC_SEGMENT_METADATA_COMMIT_TIME_MILLIS = 30_000L;
 
@@ -552,12 +562,12 @@ public class PinotLLCRealtimeSegmentManager {
             .collect(Collectors.toSet());
     int numPartitionGroups = newPartitionGroupMetadataList.size();
 
-    // Only if committingSegment's partitionGroup is present in the newPartitionGroupMetadataList, we create new
-    // segment metadata
     String newConsumingSegmentName = null;
-    String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);
-    long newSegmentCreationTimeMs = getCurrentTimeMs();
-    if (newPartitionGroupSet.contains(committingSegmentPartitionGroupId)) {
+    if (!isTablePaused(idealState) && newPartitionGroupSet.contains(committingSegmentPartitionGroupId)) {
+      // Only if committingSegment's partitionGroup is present in the newPartitionGroupMetadataList, we create new
+      // segment metadata
+      String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);
+      long newSegmentCreationTimeMs = getCurrentTimeMs();
       LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, committingSegmentPartitionGroupId,
           committingLLCSegment.getSequenceNumber() + 1, newSegmentCreationTimeMs);
       createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment, newSegmentCreationTimeMs,
@@ -854,7 +864,6 @@ public class PinotLLCRealtimeSegmentManager {
    *
    * If the controller fails after step-3, we are fine because the idealState has the new segments.
    * If the controller fails before step-1, the server will see this as an upload failure, and will re-try.
-   * @param tableConfig
    *
    * If the consuming segment is deleted by user intentionally or by mistake:
    * Check whether there are segments in the PROPERTYSTORE with status DONE, but no new segment in status
@@ -870,7 +879,9 @@ public class PinotLLCRealtimeSegmentManager {
     String realtimeTableName = tableConfig.getTableName();
     HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState -> {
       assert idealState != null;
-      if (idealState.isEnabled()) {
+      boolean isTableEnabled = idealState.isEnabled();
+      boolean isTablePaused = isTablePaused(idealState);
+      if (isTableEnabled && !isTablePaused) {
         List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList =
             getPartitionGroupConsumptionStatusList(idealState, streamConfig);
         // Read the smallest offset when a new partition is detected
@@ -882,7 +893,8 @@ public class PinotLLCRealtimeSegmentManager {
         return ensureAllPartitionsConsuming(tableConfig, streamConfig, idealState, newPartitionGroupMetadataList,
             recreateDeletedConsumingSegment);
       } else {
-        LOGGER.info("Skipping LLC segments validation for disabled table: {}", realtimeTableName);
+        LOGGER.info("Skipping LLC segments validation for table: {}, isTableEnabled: {}, isTablePaused: {}",
+            realtimeTableName, isTableEnabled, isTablePaused);
         return idealState;
       }
     }, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f), true);
@@ -913,11 +925,15 @@ public class PinotLLCRealtimeSegmentManager {
             "Exceeded max segment completion time for segment " + committingSegmentName);
       }
       updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(), committingSegmentName,
-          newSegmentName, segmentAssignment, instancePartitionsMap);
+          isTablePaused(idealState) ? null : newSegmentName, segmentAssignment, instancePartitionsMap);
       return idealState;
     }, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f));
   }
 
+  private boolean isTablePaused(IdealState idealState) {
+    return Boolean.parseBoolean(idealState.getRecord().getSimpleField(IS_TABLE_PAUSED));
+  }
+
   @VisibleForTesting
   void updateInstanceStatesForNewConsumingSegment(Map<String, Map<String, String>> instanceStatesMap,
       @Nullable String committingSegmentName, @Nullable String newSegmentName, SegmentAssignment segmentAssignment,
@@ -1403,4 +1419,91 @@ public class PinotLLCRealtimeSegmentManager {
       }
     }
   }
+
+  /**
+   * Pause consumption on a table by
+   *   1) setting "isTablePaused" in ideal states to true and
+   *   2) sending force commit messages to servers
+   */
+  public PauseStatus pauseConsumption(String tableNameWithType) {
+    IdealState updatedIdealState = updatePauseStatusInIdealState(tableNameWithType, true);
+    Set<String> consumingSegments = findConsumingSegments(updatedIdealState);
+    sendForceCommitMessageToServers(tableNameWithType, consumingSegments);
+    return new PauseStatus(true, consumingSegments, consumingSegments.isEmpty() ? null : "Pause flag is set."
+        + " Consuming segments are being committed."
+        + " Use /pauseStatus endpoint in a few moments to check if all consuming segments have been committed.");
+  }
+
+  /**
+   * Resume consumption on a table by
+   *   1) setting "isTablePaused" in ideal states to false and
+   *   2) triggering segment validation job to create new consuming segments in ideal states
+   */
+  public PauseStatus resumeConsumption(String tableNameWithType) {
+    IdealState updatedIdealState = updatePauseStatusInIdealState(tableNameWithType, false);
+
+    // trigger realtime segment validation job to resume consumption
+    Map<String, String> taskProperties = new HashMap<>();
+    taskProperties.put(RealtimeSegmentValidationManager.RECREATE_DELETED_CONSUMING_SEGMENT_KEY, "true");
+    _helixResourceManager
+        .invokeControllerPeriodicTask(tableNameWithType, Constants.REALTIME_SEGMENT_VALIDATION_MANAGER, taskProperties);
+
+    return new PauseStatus(false, findConsumingSegments(updatedIdealState), "Pause flag is cleared. "
+        + "Consuming segments are being created. Use /pauseStatus endpoint in a few moments to double check.");
+  }
+
+  private IdealState updatePauseStatusInIdealState(String tableNameWithType, boolean pause) {
+    IdealState updatedIdealState = HelixHelper.updateIdealState(_helixManager, tableNameWithType, idealState -> {
+      ZNRecord znRecord = idealState.getRecord();
+      znRecord.setSimpleField(IS_TABLE_PAUSED, Boolean.valueOf(pause).toString());
+      return new IdealState(znRecord);
+    }, RetryPolicies.noDelayRetryPolicy(1));
+    LOGGER.info("Set 'isTablePaused' to {} in the Ideal State for table {}.", pause, tableNameWithType);
+    return updatedIdealState;
+  }
+
+  private void sendForceCommitMessageToServers(String tableNameWithType, Set<String> consumingSegments) {
+    if (!consumingSegments.isEmpty()) {
+      Criteria recipientCriteria = new Criteria();
+      recipientCriteria.setInstanceName("%");
+      recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
+      recipientCriteria.setResource(tableNameWithType);
+      recipientCriteria.setSessionSpecific(true);
+      ForceCommitMessage message = new ForceCommitMessage(tableNameWithType, consumingSegments);
+      int numMessagesSent = _helixManager.getMessagingService().send(recipientCriteria, message, null, -1);
+      if (numMessagesSent > 0) {
+        LOGGER.info("Sent {} force commit messages for table: {} segments: {}", numMessagesSent, tableNameWithType,
+            consumingSegments);
+      } else {
+        throw new RuntimeException(String
+            .format("No force commit message was sent for table: %s segments: %s", tableNameWithType,
+                consumingSegments));
+      }
+    }
+  }
+
+  private Set<String> findConsumingSegments(IdealState idealState) {
+    Set<String> consumingSegments = new TreeSet<>();
+    idealState.getRecord().getMapFields().forEach((segmentName, instanceToStateMap) -> {
+      for (String state : instanceToStateMap.values()) {
+        if (state.equals(SegmentStateModel.CONSUMING)) {
+          consumingSegments.add(segmentName);
+          break;
+        }
+      }
+    });
+    return consumingSegments;
+  }
+
+  /**
+   * Return pause status:
+   *   - the value of `isTablePaused` flag in ideal state
+   *   - list of consuming segments
+   */
+  public PauseStatus getPauseStatus(String tableNameWithType) {
+    IdealState idealState = getIdealState(tableNameWithType);
+    String isTablePausedStr = idealState.getRecord().getSimpleField(IS_TABLE_PAUSED);
+    Set<String> consumingSegments = findConsumingSegments(idealState);
+    return new PauseStatus(Boolean.parseBoolean(isTablePausedStr), consumingSegments, null);
+  }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
index 7f5f7185a9..28ad064601 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
@@ -1170,7 +1170,8 @@ public class SegmentCompletionManager {
      */
     private boolean isWinnerPicked(String preferredInstance, long now, final String stopReason) {
       if ((SegmentCompletionProtocol.REASON_ROW_LIMIT.equals(stopReason)
-          || SegmentCompletionProtocol.REASON_END_OF_PARTITION_GROUP.equals(stopReason))
+          || SegmentCompletionProtocol.REASON_END_OF_PARTITION_GROUP.equals(stopReason)
+          || SegmentCompletionProtocol.REASON_FORCE_COMMIT_MESSAGE_RECEIVED.equals(stopReason))
           && _commitStateMap.size() == 1) {
         _winner = preferredInstance;
         _winningOffset = _commitStateMap.get(preferredInstance);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
index a87995325f..c7365dff93 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
@@ -163,4 +163,9 @@ public interface InstanceDataManager {
    * uploaded segment file. Servers utilize segment uploader to upload llc segment to segment store.
    */
   SegmentUploader getSegmentUploader();
+
+  /**
+   * Immediately stop consumption and start committing the consuming segments.
+   */
+  void forceCommit(String tableNameWithType, Set<String> segmentNames);
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index fb168602d7..fdfe31377e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -244,6 +244,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   // Segment end criteria
   private volatile long _consumeEndTime = 0;
   private volatile boolean _endOfPartitionGroup = false;
+  private volatile boolean _forceCommitMessageReceived = false;
   private StreamPartitionMsgOffset _finalOffset; // Used when we want to catch up to this one
   private volatile boolean _shouldStop = false;
 
@@ -295,8 +296,11 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     switch (_state) {
       case INITIAL_CONSUMING:
         // The segment has been created, and we have not posted a segmentConsumed() message on the controller yet.
-        // We need to consume as much data as available, until we have either reached the max number of rows or
-        // the max time we are allowed to consume.
+        // We need to consume as much data as available, until we have encountered one of the following scenarios:
+        //   - the max number of rows has been reached
+        //   - the max time we are allowed to consume has passed;
+        //   - partition group is ended
+        //   - force commit message has been received
         if (now >= _consumeEndTime) {
           if (_realtimeSegment.getNumDocsIndexed() == 0) {
             _segmentLogger.info("No events came in, extending time by {} hours", TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS);
@@ -318,6 +322,11 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
               + "numRowsConsumed={}", _segmentMaxRowCount, _numRowsIndexed, _numRowsConsumed);
           _stopReason = SegmentCompletionProtocol.REASON_END_OF_PARTITION_GROUP;
           return true;
+        } else if (_forceCommitMessageReceived) {
+          _segmentLogger.info("Stopping consumption due to force commit - numRowsConsumed={} numRowsIndexed={}",
+              _numRowsConsumed, _numRowsIndexed);
+          _stopReason = SegmentCompletionProtocol.REASON_FORCE_COMMIT_MESSAGE_RECEIVED;
+          return true;
         }
         return false;
 
@@ -1535,4 +1544,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   public String getSegmentName() {
     return _segmentNameStr;
   }
+
+  public void forceCommit() {
+    _forceCommitMessageReceived = true;
+  }
 }
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 4ee010ee3b..18c5830eb8 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -49,6 +49,7 @@ import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
+import org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager;
 import org.apache.pinot.core.data.manager.realtime.PinotFSSegmentUploader;
 import org.apache.pinot.core.data.manager.realtime.SegmentBuildTimeLeaseExtender;
 import org.apache.pinot.core.data.manager.realtime.SegmentUploader;
@@ -64,6 +65,7 @@ import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -462,4 +464,27 @@ public class HelixInstanceDataManager implements InstanceDataManager {
   public SegmentUploader getSegmentUploader() {
     return _segmentUploader;
   }
+
+  @Override
+  public void forceCommit(String tableNameWithType, Set<String> segmentNames) {
+    Preconditions.checkArgument(TableNameBuilder.isRealtimeTableResource(tableNameWithType), String
+        .format("Force commit is only supported for segments of realtime tables - table name: %s segment names: %s",
+            tableNameWithType, segmentNames));
+    TableDataManager tableDataManager = _tableDataManagerMap.get(tableNameWithType);
+    if (tableDataManager != null) {
+      segmentNames.forEach(segName -> {
+        SegmentDataManager segmentDataManager = tableDataManager.acquireSegment(segName);
+        if (segmentDataManager != null) {
+          try {
+            if (segmentDataManager instanceof LLRealtimeSegmentDataManager) {
+              LLRealtimeSegmentDataManager llSegmentDataManager = (LLRealtimeSegmentDataManager) segmentDataManager;
+              llSegmentDataManager.forceCommit();
+            }
+          } finally {
+            tableDataManager.releaseSegment(segmentDataManager);
+          }
+        }
+      });
+    }
+  }
 }
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
index ed6e9b6dbe..9e4dab78fe 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
@@ -18,12 +18,14 @@
  */
 package org.apache.pinot.server.starter.helix;
 
+import java.util.Set;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.messaging.handling.HelixTaskResult;
 import org.apache.helix.messaging.handling.MessageHandler;
 import org.apache.helix.messaging.handling.MessageHandlerFactory;
 import org.apache.helix.model.Message;
 import org.apache.pinot.common.Utils;
+import org.apache.pinot.common.messages.ForceCommitMessage;
 import org.apache.pinot.common.messages.SegmentRefreshMessage;
 import org.apache.pinot.common.messages.SegmentReloadMessage;
 import org.apache.pinot.common.messages.TableDeletionMessage;
@@ -61,6 +63,8 @@ public class SegmentMessageHandlerFactory implements MessageHandlerFactory {
         return new SegmentReloadMessageHandler(new SegmentReloadMessage(message), _metrics, context);
       case TableDeletionMessage.DELETE_TABLE_MSG_SUB_TYPE:
         return new TableDeletionMessageHandler(new TableDeletionMessage(message), _metrics, context);
+      case ForceCommitMessage.FORCE_COMMIT_MSG_SUB_TYPE:
+        return new ForceCommitMessageHandler(new ForceCommitMessage(message), _metrics, context);
       default:
         LOGGER.warn("Unsupported user defined message sub type: {} for segment: {}", msgSubType,
             message.getPartitionName());
@@ -169,6 +173,34 @@ public class SegmentMessageHandlerFactory implements MessageHandlerFactory {
     }
   }
 
+  private class ForceCommitMessageHandler extends DefaultMessageHandler {
+
+    private String _tableName;
+    private Set<String> _segmentNames;
+
+    public ForceCommitMessageHandler(ForceCommitMessage forceCommitMessage, ServerMetrics metrics,
+        NotificationContext ctx) {
+      super(forceCommitMessage, metrics, ctx);
+      _tableName = forceCommitMessage.getTableName();
+      _segmentNames = forceCommitMessage.getSegmentNames();
+    }
+
+    @Override
+    public HelixTaskResult handleMessage()
+        throws InterruptedException {
+      HelixTaskResult helixTaskResult = new HelixTaskResult();
+      _logger.info("Handling force commit message for table {} segments {}", _tableName, _segmentNames);
+      try {
+        _instanceDataManager.forceCommit(_tableName, _segmentNames);
+        helixTaskResult.setSuccess(true);
+      } catch (Exception e) {
+        _metrics.addMeteredTableValue(_tableNameWithType, ServerMeter.DELETE_TABLE_FAILURES, 1);
+        Utils.rethrowException(e);
+      }
+      return helixTaskResult;
+    }
+  }
+
   private static class DefaultMessageHandler extends MessageHandler {
     final String _segmentName;
     final String _tableNameWithType;
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
index 882a4a802a..020ef35e05 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
@@ -459,6 +459,18 @@ public class ControllerRequestURLBuilder {
         "?cardinality=" + cardinality + "&primaryKeySize=" + primaryKeySize + "&numPartitions=" + numPartitions);
   }
 
+  public String forPauseConsumption(String tableName) {
+    return StringUtil.join("/", _baseUrl, "tables", tableName, "pauseConsumption");
+  }
+
+  public String forResumeConsumption(String tableName) {
+    return StringUtil.join("/", _baseUrl, "tables", tableName, "resumeConsumption");
+  }
+
+  public String forPauseStatus(String tableName) {
+    return StringUtil.join("/", _baseUrl, "tables", tableName, "pauseStatus");
+  }
+
   private static String encode(String s) {
     try {
       return URLEncoder.encode(s, "UTF-8");


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org