You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by cp...@apache.org on 2021/07/07 17:33:03 UTC
[incubator-pinot] branch master updated: Adding ability to check
ingestion status for Offline Pinot table (#7070)
This is an automated email from the ASF dual-hosted git repository.
cpsoman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a251c42 Adding ability to check ingestion status for Offline Pinot table (#7070)
a251c42 is described below
commit a251c423183a0460dd4bf1e6dd644a935003725a
Author: icefury71 <ch...@gmail.com>
AuthorDate: Wed Jul 7 10:31:50 2021 -0700
Adding ability to check ingestion status for Offline Pinot table (#7070)
* Adding ability to check ingestion status for Offline Pinot table
* - Adding table ingestion status to debug endpoint
- Error handling for offline table
* Fixing unused import violation
* Fixing formatting violation (unused import)
* Addressing review feedback (TABLE_NAME constant)
---
.../pinot/controller/api/debug/TableDebugInfo.java | 11 ++-
.../api/resources/PinotTableRestletResource.java | 34 ++++++---
.../api/resources/PinotTaskRestletResource.java | 9 +++
.../api/resources/TableDebugResource.java | 28 ++++++-
.../core/minion/PinotHelixTaskResourceManager.java | 35 +++++++++
.../util/TableIngestionStatusHelper.java | 89 ++++++++++++++++++++++
.../apache/pinot/spi/config/table/TableStatus.java | 3 +-
.../apache/pinot/spi/utils/CommonConstants.java | 2 +
8 files changed, 198 insertions(+), 13 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/debug/TableDebugInfo.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/debug/TableDebugInfo.java
index 3242894..54d13da 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/debug/TableDebugInfo.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/debug/TableDebugInfo.java
@@ -27,6 +27,7 @@ import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.restlet.resources.SegmentConsumerInfo;
import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
+import org.apache.pinot.spi.config.table.TableStatus;
/**
@@ -44,6 +45,9 @@ public class TableDebugInfo {
@JsonProperty("tableName")
private final String _tableName;
+ @JsonProperty("ingestionStatus")
+ private final TableStatus.IngestionStatus _ingestionStatus;
+
@JsonProperty("tableSize")
private final TableSizeSummary _tableSizeSummary;
@@ -66,10 +70,11 @@ public class TableDebugInfo {
private final List<BrokerDebugInfo> _brokerDebugInfos;
@JsonCreator
- public TableDebugInfo(String tableName, TableSizeSummary tableSizeSummary, int numBrokers, int numServers,
+ public TableDebugInfo(String tableName, TableStatus.IngestionStatus ingestionStatus, TableSizeSummary tableSizeSummary, int numBrokers, int numServers,
int numSegments, List<SegmentDebugInfo> segmentDebugInfos, List<ServerDebugInfo> serverDebugInfos,
List<BrokerDebugInfo> brokerDebugInfos) {
_tableName = tableName;
+ _ingestionStatus = ingestionStatus;
_tableSizeSummary = tableSizeSummary;
_numBrokers = numBrokers;
@@ -85,6 +90,10 @@ public class TableDebugInfo {
return _tableName;
}
+ public TableStatus.IngestionStatus getIngestionStatus() {
+ return _ingestionStatus;
+ }
+
public TableSizeSummary getTableSize() {
return _tableSizeSummary;
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index a0bf714..bb47d77 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -62,11 +62,12 @@ import org.apache.pinot.controller.api.exception.ControllerApplicationException;
import org.apache.pinot.controller.api.exception.InvalidTableConfigException;
import org.apache.pinot.controller.api.exception.TableAlreadyExistsException;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfigConstants;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
import org.apache.pinot.controller.recommender.RecommenderDriver;
import org.apache.pinot.controller.tuner.TableConfigTunerUtils;
-import org.apache.pinot.controller.util.ConsumingSegmentInfoReader;
+import org.apache.pinot.controller.util.TableIngestionStatusHelper;
import org.apache.pinot.segment.local.utils.TableConfigUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableStats;
@@ -107,6 +108,9 @@ public class PinotTableRestletResource {
PinotHelixResourceManager _pinotHelixResourceManager;
@Inject
+ PinotHelixTaskResourceManager _pinotHelixTaskResourceManager;
+
+ @Inject
ControllerConf _controllerConf;
@Inject
@@ -606,17 +610,27 @@ public class PinotTableRestletResource {
@ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
@ApiParam(value = "realtime|offline") @QueryParam("type") String tableTypeStr) {
try {
- TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName);
+ TableType tableType = Constants.validateTableType(tableTypeStr);
+ if (tableType == null) {
+ throw new ControllerApplicationException(LOGGER, "Table type should either be realtime|offline",
+ Response.Status.BAD_REQUEST);
+ }
+ String tableNameWithType = TableNameBuilder.forType(tableType).tableNameWithType(tableName);
+ if (!_pinotHelixResourceManager.hasTable(tableNameWithType)) {
+ throw new ControllerApplicationException(LOGGER,
+ "Specified table name: " + tableName + " of type: " + tableTypeStr + " does not exist.",
+ Response.Status.BAD_REQUEST);
+ }
+ TableStatus.IngestionStatus ingestionStatus = null;
if (TableType.OFFLINE == tableType) {
- // TODO: Support table status for offline table. Currently only supported for realtime.
- throw new UnsupportedOperationException(
- "Table status for OFFLINE table: " + tableName + " is currently unsupported");
+ ingestionStatus = TableIngestionStatusHelper
+ .getOfflineTableIngestionStatus(tableNameWithType, _pinotHelixResourceManager,
+ _pinotHelixTaskResourceManager);
+ } else {
+ ingestionStatus = TableIngestionStatusHelper.getRealtimeTableIngestionStatus(tableNameWithType,
+ _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000, _executor, _connectionManager,
+ _pinotHelixResourceManager);
}
- String tableNameWithType = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName);
- ConsumingSegmentInfoReader consumingSegmentInfoReader =
- new ConsumingSegmentInfoReader(_executor, _connectionManager, _pinotHelixResourceManager);
- TableStatus.IngestionStatus ingestionStatus = consumingSegmentInfoReader
- .getIngestionStatus(tableNameWithType, _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
TableStatus tableStatus = new TableStatus(ingestionStatus);
return JsonUtils.objectToPrettyString(tableStatus);
} catch (Exception e) {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
index 096ab3b..e573be2 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
@@ -122,6 +122,15 @@ public class PinotTaskRestletResource {
return _pinotHelixTaskResourceManager.getTasks(taskType);
}
+ @GET
+ @Path("/tasks/{taskType}/{tableNameWithType}/state")
+ @ApiOperation("List all tasks for the given task type")
+ public Map<String, TaskState> getTaskStatesByTable(
+ @ApiParam(value = "Task type", required = true) @PathParam("taskType") String taskType,
+ @ApiParam(value = "Table name with type", required = true) @PathParam("tableNameWithType") String tableNameWithType) {
+ return _pinotHelixTaskResourceManager.getTaskStatesByTable(taskType, tableNameWithType);
+ }
+
@Deprecated
@GET
@Path("/tasks/tasks/{taskType}")
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableDebugResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableDebugResource.java
index f42afc2..8bbab53 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableDebugResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableDebugResource.java
@@ -61,8 +61,11 @@ import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.api.debug.TableDebugInfo;
import org.apache.pinot.controller.api.exception.ControllerApplicationException;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
import org.apache.pinot.controller.util.CompletionServiceHelper;
+import org.apache.pinot.controller.util.TableIngestionStatusHelper;
import org.apache.pinot.controller.util.TableSizeReader;
+import org.apache.pinot.spi.config.table.TableStatus;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -87,7 +90,11 @@ public class TableDebugResource {
PinotHelixResourceManager _pinotHelixResourceManager;
@Inject
+ PinotHelixTaskResourceManager _pinotHelixTaskResourceManager;
+
+ @Inject
Executor _executor;
+
@Inject
HttpConnectionManager _connectionManager;
@@ -152,16 +159,35 @@ public class TableDebugResource {
// Table size summary.
TableDebugInfo.TableSizeSummary tableSizeSummary = getTableSize(tableNameWithType);
+ TableStatus.IngestionStatus ingestionStatus = getIngestionStatus(tableNameWithType, tableType);
+
// Number of segments in the table.
IdealState idealState = _pinotHelixResourceManager.getTableIdealState(tableNameWithType);
int numSegments = (idealState != null) ? idealState.getPartitionSet().size() : 0;
- return new TableDebugInfo(tableNameWithType, tableSizeSummary,
+ return new TableDebugInfo(tableNameWithType, ingestionStatus, tableSizeSummary,
_pinotHelixResourceManager.getBrokerInstancesForTable(tableName, tableType).size(),
_pinotHelixResourceManager.getServerInstancesForTable(tableName, tableType).size(), numSegments,
segmentDebugInfos, serverDebugInfos, brokerDebugInfos);
}
+ private TableStatus.IngestionStatus getIngestionStatus(String tableNameWithType, TableType tableType) {
+ try {
+ switch (tableType) {
+ case OFFLINE:
+ return TableIngestionStatusHelper.getOfflineTableIngestionStatus(tableNameWithType, _pinotHelixResourceManager,
+ _pinotHelixTaskResourceManager);
+ case REALTIME:
+ return TableIngestionStatusHelper.getRealtimeTableIngestionStatus(tableNameWithType,
+ _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000, _executor, _connectionManager,
+ _pinotHelixResourceManager);
+ }
+ } catch (Exception e) {
+ return TableStatus.IngestionStatus.newIngestionStatus(TableStatus.IngestionState.UNKNOWN, e.getMessage());
+ }
+ return null;
+ }
+
private TableDebugInfo.TableSizeSummary getTableSize(String tableNameWithType) {
TableSizeReader tableSizeReader =
new TableSizeReader(_executor, _connectionManager, _controllerMetrics, _pinotHelixResourceManager);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
index 265d9f3..f687b36 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
@@ -42,6 +42,8 @@ import org.apache.pinot.spi.utils.CommonConstants.Helix;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.pinot.spi.utils.CommonConstants.TABLE_NAME;
+
/**
* The class <code>PinotHelixTaskResourceManager</code> manages all the task resources in Pinot cluster.
@@ -347,6 +349,39 @@ public class PinotHelixTaskResourceManager {
}
/**
+ * Helper method to return a map of task names to corresponding task state
+ * where the task corresponds to the given Pinot table name. This is used to
+ * check status of all tasks for a given table.
+ * @param taskType Task Name
+ * @param tableNameWithType table name with type to filter on
+ * @return Map of filtered task name to corresponding state
+ */
+ public synchronized Map<String, TaskState> getTaskStatesByTable(String taskType, String tableNameWithType) {
+ Map<String, TaskState> filteredTaskStateMap = new HashMap<>();
+ Map<String, TaskState> taskStateMap = getTaskStates(taskType);
+
+ for (Map.Entry<String, TaskState> taskState : taskStateMap.entrySet()) {
+ String taskName = taskState.getKey();
+
+ // Iterate through all task configs associated with this task name
+ for (PinotTaskConfig taskConfig: getTaskConfigs(taskName)) {
+ Map<String, String> pinotConfigs = taskConfig.getConfigs();
+
+ // Filter task configs that matches this table name
+ if (pinotConfigs != null) {
+ String tableNameConfig = pinotConfigs.get(TABLE_NAME);
+ if (tableNameConfig != null && tableNameConfig.equals(tableNameWithType)) {
+ // Found a match ! Track state for this particular task in the final result map
+ filteredTaskStateMap.put(taskName, taskStateMap.get(taskName));
+ break;
+ }
+ }
+ }
+ }
+ return filteredTaskStateMap;
+ }
+
+ /**
* Helper method to convert task type to Helix JobQueue name.
* <p>E.g. DummyTask -> TaskQueue_DummyTask
*
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableIngestionStatusHelper.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableIngestionStatusHelper.java
new file mode 100644
index 0000000..3ca4625
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableIngestionStatusHelper.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.util;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import javax.ws.rs.core.Response;
+import org.apache.commons.httpclient.HttpConnectionManager;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.controller.api.exception.ControllerApplicationException;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableStatus;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Helper class to fetch ingestion status for realtime and offline table
+ */
+public class TableIngestionStatusHelper {
+ public static org.slf4j.Logger LOGGER = LoggerFactory.getLogger(TableIngestionStatusHelper.class);
+
+ public static TableStatus.IngestionStatus getRealtimeTableIngestionStatus(String tableNameWithType, int timeoutMs,
+ Executor executor, HttpConnectionManager connectionManager, PinotHelixResourceManager pinotHelixResourceManager) {
+ ConsumingSegmentInfoReader consumingSegmentInfoReader =
+ new ConsumingSegmentInfoReader(executor, connectionManager, pinotHelixResourceManager);
+ return consumingSegmentInfoReader.getIngestionStatus(tableNameWithType, timeoutMs);
+ }
+
+ public static TableStatus.IngestionStatus getOfflineTableIngestionStatus(String tableNameWithType,
+ PinotHelixResourceManager pinotHelixResourceManager,
+ PinotHelixTaskResourceManager pinotHelixTaskResourceManager) {
+ // Check if this offline table uses the built-in segment generation and push task type
+ // Offline table ingestion status for ingestion via other task types is not supported.
+ TableConfig tableConfig = pinotHelixResourceManager.getTableConfig(tableNameWithType);
+ TableTaskConfig taskConfig = tableConfig.getTaskConfig();
+ if (taskConfig == null
+ || taskConfig.getConfigsForTaskType(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE) == null) {
+ throw new ControllerApplicationException(LOGGER,
+ "Cannot retrieve ingestion status for Table : " + tableNameWithType
+ + " since it does not use the built-in SegmentGenerationAndPushTask task", Response.Status.BAD_REQUEST);
+ }
+
+ TableStatus.IngestionState ingestionState = TableStatus.IngestionState.HEALTHY;
+ String errorMessage = "";
+
+ // Retrieve all the Minion tasks and corresponding states for this table
+ Map<String, TaskState> taskStateMap = pinotHelixTaskResourceManager
+ .getTaskStatesByTable(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE, tableNameWithType);
+ List<String> failedTasks = new ArrayList<>();
+
+ // Check if any of the tasks are in error state
+ for (Map.Entry<String, TaskState> taskStateEntry : taskStateMap.entrySet()) {
+ switch (taskStateEntry.getValue()) {
+ case FAILED:
+ case ABORTED:
+ failedTasks.add(taskStateEntry.getKey());
+ default:
+ continue;
+ }
+ }
+ if (failedTasks.size() > 0) {
+ ingestionState = TableStatus.IngestionState.UNHEALTHY;
+ errorMessage = "Follow ingestion tasks have failed: " + failedTasks.toString();
+ }
+ return new TableStatus.IngestionStatus(ingestionState.toString(), errorMessage);
+ }
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableStatus.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableStatus.java
index 9c0ea16..8be22d5 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableStatus.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableStatus.java
@@ -33,7 +33,8 @@ public class TableStatus {
public enum IngestionState {
HEALTHY,
- UNHEALTHY
+ UNHEALTHY,
+ UNKNOWN
}
@JsonIgnoreProperties(ignoreUnknown = true)
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 7cec52d..7a43e06 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -33,6 +33,8 @@ public class CommonConstants {
public static final String KEY_OF_AUTH_TOKEN = "auth.token";
+ public static final String TABLE_NAME = "tableName";
+
/**
* The state of the consumer for a given segment
*/
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org