You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by cp...@apache.org on 2021/07/07 17:33:03 UTC

[incubator-pinot] branch master updated: Adding ability to check ingestion status for Offline Pinot table (#7070)

This is an automated email from the ASF dual-hosted git repository.

cpsoman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a251c42  Adding ability to check ingestion status for Offline Pinot table (#7070)
a251c42 is described below

commit a251c423183a0460dd4bf1e6dd644a935003725a
Author: icefury71 <ch...@gmail.com>
AuthorDate: Wed Jul 7 10:31:50 2021 -0700

    Adding ability to check ingestion status for Offline Pinot table (#7070)
    
    * Adding ability to check ingestion status for Offline Pinot table
    
    * - Adding table ingestion status to debug endpoint
     - Error handling for offline table
    
    * Fixing unused import violation
    
    * Fixing formatting violation (unused import)
    
    * Addressing review feedback (TABLE_NAME constant)
---
 .../pinot/controller/api/debug/TableDebugInfo.java | 11 ++-
 .../api/resources/PinotTableRestletResource.java   | 34 ++++++---
 .../api/resources/PinotTaskRestletResource.java    |  9 +++
 .../api/resources/TableDebugResource.java          | 28 ++++++-
 .../core/minion/PinotHelixTaskResourceManager.java | 35 +++++++++
 .../util/TableIngestionStatusHelper.java           | 89 ++++++++++++++++++++++
 .../apache/pinot/spi/config/table/TableStatus.java |  3 +-
 .../apache/pinot/spi/utils/CommonConstants.java    |  2 +
 8 files changed, 198 insertions(+), 13 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/debug/TableDebugInfo.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/debug/TableDebugInfo.java
index 3242894..54d13da 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/debug/TableDebugInfo.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/debug/TableDebugInfo.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.restlet.resources.SegmentConsumerInfo;
 import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
+import org.apache.pinot.spi.config.table.TableStatus;
 
 
 /**
@@ -44,6 +45,9 @@ public class TableDebugInfo {
   @JsonProperty("tableName")
   private final String _tableName;
 
+  @JsonProperty("ingestionStatus")
+  private final TableStatus.IngestionStatus _ingestionStatus;
+
   @JsonProperty("tableSize")
   private final TableSizeSummary _tableSizeSummary;
 
@@ -66,10 +70,11 @@ public class TableDebugInfo {
   private final List<BrokerDebugInfo> _brokerDebugInfos;
 
   @JsonCreator
-  public TableDebugInfo(String tableName, TableSizeSummary tableSizeSummary, int numBrokers, int numServers,
+  public TableDebugInfo(String tableName, TableStatus.IngestionStatus ingestionStatus, TableSizeSummary tableSizeSummary, int numBrokers, int numServers,
       int numSegments, List<SegmentDebugInfo> segmentDebugInfos, List<ServerDebugInfo> serverDebugInfos,
       List<BrokerDebugInfo> brokerDebugInfos) {
     _tableName = tableName;
+    _ingestionStatus = ingestionStatus;
     _tableSizeSummary = tableSizeSummary;
 
     _numBrokers = numBrokers;
@@ -85,6 +90,10 @@ public class TableDebugInfo {
     return _tableName;
   }
 
+  public TableStatus.IngestionStatus getIngestionStatus() {
+    return _ingestionStatus;
+  }
+
   public TableSizeSummary getTableSize() {
     return _tableSizeSummary;
   }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index a0bf714..bb47d77 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -62,11 +62,12 @@ import org.apache.pinot.controller.api.exception.ControllerApplicationException;
 import org.apache.pinot.controller.api.exception.InvalidTableConfigException;
 import org.apache.pinot.controller.api.exception.TableAlreadyExistsException;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfigConstants;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
 import org.apache.pinot.controller.recommender.RecommenderDriver;
 import org.apache.pinot.controller.tuner.TableConfigTunerUtils;
-import org.apache.pinot.controller.util.ConsumingSegmentInfoReader;
+import org.apache.pinot.controller.util.TableIngestionStatusHelper;
 import org.apache.pinot.segment.local.utils.TableConfigUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableStats;
@@ -107,6 +108,9 @@ public class PinotTableRestletResource {
   PinotHelixResourceManager _pinotHelixResourceManager;
 
   @Inject
+  PinotHelixTaskResourceManager _pinotHelixTaskResourceManager;
+
+  @Inject
   ControllerConf _controllerConf;
 
   @Inject
@@ -606,17 +610,27 @@ public class PinotTableRestletResource {
       @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
       @ApiParam(value = "realtime|offline") @QueryParam("type") String tableTypeStr) {
     try {
-      TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName);
+      TableType tableType = Constants.validateTableType(tableTypeStr);
+      if (tableType == null) {
+        throw new ControllerApplicationException(LOGGER, "Table type should either be realtime|offline",
+            Response.Status.BAD_REQUEST);
+      }
+      String tableNameWithType = TableNameBuilder.forType(tableType).tableNameWithType(tableName);
+      if (!_pinotHelixResourceManager.hasTable(tableNameWithType)) {
+        throw new ControllerApplicationException(LOGGER,
+            "Specified table name: " + tableName + " of type: " + tableTypeStr + " does not exist.",
+            Response.Status.BAD_REQUEST);
+      }
+      TableStatus.IngestionStatus ingestionStatus = null;
       if (TableType.OFFLINE == tableType) {
-        // TODO: Support table status for offline table. Currently only supported for realtime.
-        throw new UnsupportedOperationException(
-            "Table status for OFFLINE table: " + tableName + " is currently unsupported");
+        ingestionStatus = TableIngestionStatusHelper
+            .getOfflineTableIngestionStatus(tableNameWithType, _pinotHelixResourceManager,
+                _pinotHelixTaskResourceManager);
+      } else {
+        ingestionStatus = TableIngestionStatusHelper.getRealtimeTableIngestionStatus(tableNameWithType,
+            _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000, _executor, _connectionManager,
+            _pinotHelixResourceManager);
       }
-      String tableNameWithType = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName);
-      ConsumingSegmentInfoReader consumingSegmentInfoReader =
-          new ConsumingSegmentInfoReader(_executor, _connectionManager, _pinotHelixResourceManager);
-      TableStatus.IngestionStatus ingestionStatus = consumingSegmentInfoReader
-          .getIngestionStatus(tableNameWithType, _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
       TableStatus tableStatus = new TableStatus(ingestionStatus);
       return JsonUtils.objectToPrettyString(tableStatus);
     } catch (Exception e) {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
index 096ab3b..e573be2 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
@@ -122,6 +122,15 @@ public class PinotTaskRestletResource {
     return _pinotHelixTaskResourceManager.getTasks(taskType);
   }
 
+  @GET
+  @Path("/tasks/{taskType}/{tableNameWithType}/state")
+  @ApiOperation("List all tasks for the given task type")
+  public Map<String, TaskState> getTaskStatesByTable(
+      @ApiParam(value = "Task type", required = true) @PathParam("taskType") String taskType,
+      @ApiParam(value = "Table name with type", required = true) @PathParam("tableNameWithType") String tableNameWithType) {
+    return _pinotHelixTaskResourceManager.getTaskStatesByTable(taskType, tableNameWithType);
+  }
+
   @Deprecated
   @GET
   @Path("/tasks/tasks/{taskType}")
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableDebugResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableDebugResource.java
index f42afc2..8bbab53 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableDebugResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableDebugResource.java
@@ -61,8 +61,11 @@ import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.api.debug.TableDebugInfo;
 import org.apache.pinot.controller.api.exception.ControllerApplicationException;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
 import org.apache.pinot.controller.util.CompletionServiceHelper;
+import org.apache.pinot.controller.util.TableIngestionStatusHelper;
 import org.apache.pinot.controller.util.TableSizeReader;
+import org.apache.pinot.spi.config.table.TableStatus;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.JsonUtils;
@@ -87,7 +90,11 @@ public class TableDebugResource {
   PinotHelixResourceManager _pinotHelixResourceManager;
 
   @Inject
+  PinotHelixTaskResourceManager _pinotHelixTaskResourceManager;
+
+  @Inject
   Executor _executor;
+
   @Inject
   HttpConnectionManager _connectionManager;
 
@@ -152,16 +159,35 @@ public class TableDebugResource {
     // Table size summary.
     TableDebugInfo.TableSizeSummary tableSizeSummary = getTableSize(tableNameWithType);
 
+    TableStatus.IngestionStatus ingestionStatus = getIngestionStatus(tableNameWithType, tableType);
+
     // Number of segments in the table.
     IdealState idealState = _pinotHelixResourceManager.getTableIdealState(tableNameWithType);
     int numSegments = (idealState != null) ? idealState.getPartitionSet().size() : 0;
 
-    return new TableDebugInfo(tableNameWithType, tableSizeSummary,
+    return new TableDebugInfo(tableNameWithType, ingestionStatus, tableSizeSummary,
         _pinotHelixResourceManager.getBrokerInstancesForTable(tableName, tableType).size(),
         _pinotHelixResourceManager.getServerInstancesForTable(tableName, tableType).size(), numSegments,
         segmentDebugInfos, serverDebugInfos, brokerDebugInfos);
   }
 
+  private TableStatus.IngestionStatus getIngestionStatus(String tableNameWithType, TableType tableType) {
+    try {
+      switch (tableType) {
+        case OFFLINE:
+          return TableIngestionStatusHelper.getOfflineTableIngestionStatus(tableNameWithType, _pinotHelixResourceManager,
+              _pinotHelixTaskResourceManager);
+        case REALTIME:
+          return TableIngestionStatusHelper.getRealtimeTableIngestionStatus(tableNameWithType,
+              _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000, _executor, _connectionManager,
+              _pinotHelixResourceManager);
+      }
+    } catch (Exception e) {
+      return TableStatus.IngestionStatus.newIngestionStatus(TableStatus.IngestionState.UNKNOWN, e.getMessage());
+    }
+    return null;
+  }
+
   private TableDebugInfo.TableSizeSummary getTableSize(String tableNameWithType) {
     TableSizeReader tableSizeReader =
         new TableSizeReader(_executor, _connectionManager, _controllerMetrics, _pinotHelixResourceManager);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
index 265d9f3..f687b36 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
@@ -42,6 +42,8 @@ import org.apache.pinot.spi.utils.CommonConstants.Helix;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.pinot.spi.utils.CommonConstants.TABLE_NAME;
+
 
 /**
  * The class <code>PinotHelixTaskResourceManager</code> manages all the task resources in Pinot cluster.
@@ -347,6 +349,39 @@ public class PinotHelixTaskResourceManager {
   }
 
   /**
+   * Helper method to return a map of task names to corresponding task state
+   * where the task corresponds to the given Pinot table name. This is used to
+   * check status of all tasks for a given table.
+   * @param taskType Task Name
+   * @param tableNameWithType table name with type to filter on
+   * @return Map of filtered task name to corresponding state
+   */
+  public synchronized Map<String, TaskState> getTaskStatesByTable(String taskType, String tableNameWithType) {
+    Map<String, TaskState> filteredTaskStateMap = new HashMap<>();
+    Map<String, TaskState> taskStateMap = getTaskStates(taskType);
+
+    for (Map.Entry<String, TaskState> taskState : taskStateMap.entrySet()) {
+      String taskName = taskState.getKey();
+
+      // Iterate through all task configs associated with this task name
+      for (PinotTaskConfig taskConfig: getTaskConfigs(taskName)) {
+        Map<String, String> pinotConfigs = taskConfig.getConfigs();
+
+        // Filter task configs that matches this table name
+        if (pinotConfigs != null) {
+          String tableNameConfig = pinotConfigs.get(TABLE_NAME);
+          if (tableNameConfig != null && tableNameConfig.equals(tableNameWithType)) {
+            // Found a match ! Track state for this particular task in the final result map
+            filteredTaskStateMap.put(taskName, taskStateMap.get(taskName));
+            break;
+          }
+        }
+      }
+    }
+    return filteredTaskStateMap;
+  }
+
+  /**
    * Helper method to convert task type to Helix JobQueue name.
    * <p>E.g. DummyTask -> TaskQueue_DummyTask
    *
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableIngestionStatusHelper.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableIngestionStatusHelper.java
new file mode 100644
index 0000000..3ca4625
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableIngestionStatusHelper.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.util;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import javax.ws.rs.core.Response;
+import org.apache.commons.httpclient.HttpConnectionManager;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.controller.api.exception.ControllerApplicationException;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableStatus;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Helper class to fetch ingestion status for realtime and offline table
+ */
+public class TableIngestionStatusHelper {
+  public static org.slf4j.Logger LOGGER = LoggerFactory.getLogger(TableIngestionStatusHelper.class);
+
+  public static TableStatus.IngestionStatus getRealtimeTableIngestionStatus(String tableNameWithType, int timeoutMs,
+      Executor executor, HttpConnectionManager connectionManager, PinotHelixResourceManager pinotHelixResourceManager) {
+    ConsumingSegmentInfoReader consumingSegmentInfoReader =
+        new ConsumingSegmentInfoReader(executor, connectionManager, pinotHelixResourceManager);
+    return consumingSegmentInfoReader.getIngestionStatus(tableNameWithType, timeoutMs);
+  }
+
+  public static TableStatus.IngestionStatus getOfflineTableIngestionStatus(String tableNameWithType,
+      PinotHelixResourceManager pinotHelixResourceManager,
+      PinotHelixTaskResourceManager pinotHelixTaskResourceManager) {
+    // Check if this offline table uses the built-in segment generation and push task type
+    // Offline table ingestion status for ingestion via other task types is not supported.
+    TableConfig tableConfig = pinotHelixResourceManager.getTableConfig(tableNameWithType);
+    TableTaskConfig taskConfig = tableConfig.getTaskConfig();
+    if (taskConfig == null
+        || taskConfig.getConfigsForTaskType(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE) == null) {
+      throw new ControllerApplicationException(LOGGER,
+          "Cannot retrieve ingestion status for Table : " + tableNameWithType
+              + " since it does not use the built-in SegmentGenerationAndPushTask task", Response.Status.BAD_REQUEST);
+    }
+
+    TableStatus.IngestionState ingestionState = TableStatus.IngestionState.HEALTHY;
+    String errorMessage = "";
+
+    // Retrieve all the Minion tasks and corresponding states for this table
+    Map<String, TaskState> taskStateMap = pinotHelixTaskResourceManager
+        .getTaskStatesByTable(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE, tableNameWithType);
+    List<String> failedTasks = new ArrayList<>();
+
+    // Check if any of the tasks are in error state
+    for (Map.Entry<String, TaskState> taskStateEntry : taskStateMap.entrySet()) {
+      switch (taskStateEntry.getValue()) {
+        case FAILED:
+        case ABORTED:
+          failedTasks.add(taskStateEntry.getKey());
+        default:
+          continue;
+      }
+    }
+    if (failedTasks.size() > 0) {
+      ingestionState = TableStatus.IngestionState.UNHEALTHY;
+      errorMessage = "Follow ingestion tasks have failed: " + failedTasks.toString();
+    }
+    return new TableStatus.IngestionStatus(ingestionState.toString(), errorMessage);
+  }
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableStatus.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableStatus.java
index 9c0ea16..8be22d5 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableStatus.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableStatus.java
@@ -33,7 +33,8 @@ public class TableStatus {
 
   public enum IngestionState {
     HEALTHY,
-    UNHEALTHY
+    UNHEALTHY,
+    UNKNOWN
   }
 
   @JsonIgnoreProperties(ignoreUnknown = true)
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 7cec52d..7a43e06 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -33,6 +33,8 @@ public class CommonConstants {
 
   public static final String KEY_OF_AUTH_TOKEN = "auth.token";
 
+  public static final String TABLE_NAME = "tableName";
+
   /**
    * The state of the consumer for a given segment
    */

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org