You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2022/10/27 09:11:06 UTC

[GitHub] [iotdb] HTHou commented on a diff in pull request #7750: [IOTDB-4743] OperatePipeProcedure support rollback

HTHou commented on code in PR #7750:
URL: https://github.com/apache/iotdb/pull/7750#discussion_r1006608309


##########
confignode/src/main/java/org/apache/iotdb/confignode/manager/SyncManager.java:
##########
@@ -180,41 +187,91 @@ public TGetAllPipeInfoResp getAllPipeInfo() {
    * @param pipeName name of PIPE
    * @param operation only support {@link SyncOperation#START_PIPE}, {@link SyncOperation#STOP_PIPE}
    *     and {@link SyncOperation#DROP_PIPE}
-   * @return list of TSStatus
+   * @return Map key is DataNodeId and value is TSStatus
    */
-  public List<TSStatus> operatePipeOnDataNodes(String pipeName, SyncOperation operation) {
+  public Map<Integer, TSStatus> operatePipeOnDataNodes(String pipeName, SyncOperation operation) {
     NodeManager nodeManager = configManager.getNodeManager();
-    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
-        nodeManager.getRegisteredDataNodeLocations();
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap = new ConcurrentHashMap<>();
+    nodeManager
+        .filterDataNodeThroughStatus(NodeStatus.Running)
+        .forEach(
+            dataNodeConfiguration ->
+                dataNodeLocationMap.put(
+                    dataNodeConfiguration.getLocation().getDataNodeId(),
+                    dataNodeConfiguration.getLocation()));
     final TOperatePipeOnDataNodeReq request =
         new TOperatePipeOnDataNodeReq(pipeName, (byte) operation.ordinal());
 
     AsyncClientHandler<TOperatePipeOnDataNodeReq, TSStatus> clientHandler =
         new AsyncClientHandler<>(DataNodeRequestType.OPERATE_PIPE, request, dataNodeLocationMap);
     AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
 
-    return clientHandler.getResponseList();
+    return clientHandler.getResponseMap();
+  }
+
+  /**
+   * Broadcast DataNodes to operate PIPE operation for roll back procedure.
+   *
+   * @param pipeName name of PIPE
+   * @param operation only support {@link SyncOperation#START_PIPE}, {@link SyncOperation#STOP_PIPE}
+   *     and {@link SyncOperation#DROP_PIPE}
+   * @param dataNodeIds target DataNodeId set
+   */
+  public void operatePipeOnDataNodesForRollback(
+      String pipeName, long createTime, SyncOperation operation, Set<Integer> dataNodeIds) {

Review Comment:
   Add `@param createTime` in javadoc



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org