You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-issues@hadoop.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2022/12/01 01:04:00 UTC

[jira] [Commented] (YARN-11350) [Federation] Router Support DelegationToken With ZK

    [ https://issues.apache.org/jira/browse/YARN-11350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17641605#comment-17641605 ] 

ASF GitHub Bot commented on YARN-11350:
---------------------------------------

goiri commented on code in PR #5131:
URL: https://github.com/apache/hadoop/pull/5131#discussion_r1036592621


##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java:
##########
@@ -886,45 +1000,608 @@ public UpdateReservationHomeSubClusterResponse updateReservationHomeSubCluster(
     return UpdateReservationHomeSubClusterResponse.newInstance();
   }
 
+  /**
+   * ZookeeperFederationStateStore Supports Store NewMasterKey.
+   *
+   * @param request The request contains RouterMasterKey, which is an abstraction for DelegationKey
+   * @return routerMasterKeyResponse, the response contains the RouterMasterKey.
+   * @throws YarnException if the call to the state store is unsuccessful.
+   * @throws IOException An IO Error occurred.
+   */
   @Override
   public RouterMasterKeyResponse storeNewMasterKey(RouterMasterKeyRequest request)
       throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    long start = clock.getTime();
+    // For the verification of the request, after passing the verification,
+    // the request and the internal objects will not be empty and can be used directly.
+    FederationRouterRMTokenInputValidator.validate(request);
+
+    // Parse the delegationKey from the request and get the ZK storage path.
+    DelegationKey delegationKey = convertMasterKeyToDelegationKey(request);
+    String nodeCreatePath = getMasterKeyZNodePathByDelegationKey(delegationKey);
+    LOG.debug("Storing RMDelegationKey_{}, ZkNodePath = {}.", delegationKey.getKeyId(),
+        nodeCreatePath);
+
+    // Write master key data to zk.
+    try(ByteArrayOutputStream os = new ByteArrayOutputStream();
+        DataOutputStream fsOut = new DataOutputStream(os)) {
+      delegationKey.write(fsOut);
+      put(nodeCreatePath, os.toByteArray(), false);
+    }
+
+    // Get the stored masterKey from zk.
+    RouterMasterKey masterKeyFromZK = getRouterMasterKeyFromZK(nodeCreatePath);
+    long end = clock.getTime();
+    opDurations.addStoreNewMasterKeyDuration(start, end);
+    return RouterMasterKeyResponse.newInstance(masterKeyFromZK);
   }
 
+  /**
+   * ZookeeperFederationStateStore Supports Remove MasterKey.
+   *
+   * @param request The request contains RouterMasterKey, which is an abstraction for DelegationKey
+   * @return routerMasterKeyResponse, the response contains the RouterMasterKey.
+   * @throws YarnException if the call to the state store is unsuccessful.
+   * @throws IOException An IO Error occurred.
+   */
   @Override
   public RouterMasterKeyResponse removeStoredMasterKey(RouterMasterKeyRequest request)
       throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    long start = clock.getTime();
+    // For the verification of the request, after passing the verification,
+    // the request and the internal objects will not be empty and can be used directly.
+    FederationRouterRMTokenInputValidator.validate(request);
+
+    try {
+      // Parse the delegationKey from the request and get the ZK storage path.
+      RouterMasterKey masterKey = request.getRouterMasterKey();
+      DelegationKey delegationKey = convertMasterKeyToDelegationKey(request);
+      String nodeRemovePath = getMasterKeyZNodePathByDelegationKey(delegationKey);
+      LOG.debug("Removing RMDelegationKey_{}, ZkNodePath = {}.", delegationKey.getKeyId(),
+          nodeRemovePath);
+
+      // Check if the path exists, Throws an exception if the path does not exist.
+      if (!exists(nodeRemovePath)) {
+        throw new YarnException("ZkNodePath = " + nodeRemovePath + " not exists!");
+      }
+
+      // try to remove masterKey.
+      zkManager.delete(nodeRemovePath);
+      long end = clock.getTime();
+      opDurations.removeStoredMasterKeyDuration(start, end);
+      return RouterMasterKeyResponse.newInstance(masterKey);
+    } catch (Exception e) {
+      throw new YarnException(e);
+    }
   }
 
+  /**
+   * ZookeeperFederationStateStore Supports Remove MasterKey.
+   *
+   * @param request The request contains RouterMasterKey, which is an abstraction for DelegationKey
+   * @return routerMasterKeyResponse, the response contains the RouterMasterKey.
+   * @throws YarnException if the call to the state store is unsuccessful.
+   * @throws IOException An IO Error occurred.
+   */
   @Override
   public RouterMasterKeyResponse getMasterKeyByDelegationKey(RouterMasterKeyRequest request)
       throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    long start = clock.getTime();
+    // For the verification of the request, after passing the verification,
+    // the request and the internal objects will not be empty and can be used directly.
+    FederationRouterRMTokenInputValidator.validate(request);
+
+    try {
+
+      // Parse the delegationKey from the request and get the ZK storage path.
+      DelegationKey delegationKey = convertMasterKeyToDelegationKey(request);
+      String nodePath = getMasterKeyZNodePathByDelegationKey(delegationKey);
+
+      // Check if the path exists, Throws an exception if the path does not exist.
+      if (!exists(nodePath)) {
+        throw new YarnException("ZkNodePath = " + nodePath + " not exists!");
+      }
+
+      // Get the stored masterKey from zk.
+      RouterMasterKey routerMasterKey = getRouterMasterKeyFromZK(nodePath);
+      long end = clock.getTime();
+      opDurations.getMasterKeyByDelegationKeyDuration(start, end);
+      return RouterMasterKeyResponse.newInstance(routerMasterKey);
+    } catch (Exception e) {
+      throw new YarnException(e);
+    }
+  }
+
+  /**
+   * Get MasterKeyZNodePath based on DelegationKey.
+   *
+   * @param delegationKey delegationKey.
+   * @return masterKey ZNodePath.
+   */
+  private String getMasterKeyZNodePathByDelegationKey(DelegationKey delegationKey) {
+    return getMasterKeyZNodePathByKeyId(delegationKey.getKeyId());
   }
 
+  /**
+   * Get MasterKeyZNodePath based on KeyId.
+   *
+   * @param keyId master key id.
+   * @return masterKey ZNodePath.
+   */
+  private String getMasterKeyZNodePathByKeyId(int keyId) {
+    String nodeName = ROUTER_RM_DELEGATION_KEY_PREFIX + keyId;
+    return getNodePath(routerRMDTMasterKeysRootPath, nodeName);
+  }
+
+  /**
+   * Get RouterMasterKey from ZK.
+   *
+   * @param nodePath The path where masterKey is stored in zk.
+   *
+   * @return RouterMasterKey.
+   * @throws IOException An IO Error occurred.
+   */
+  private RouterMasterKey getRouterMasterKeyFromZK(String nodePath)
+      throws IOException {
+    try {
+      byte[] data = get(nodePath);
+      if ((data == null) || (data.length == 0)) {
+        return null;
+      }
+
+      ByteArrayInputStream bin = new ByteArrayInputStream(data);
+      DataInputStream din = new DataInputStream(bin);
+      DelegationKey key = new DelegationKey();
+      key.readFields(din);
+
+      return RouterMasterKey.newInstance(key.getKeyId(),
+          ByteBuffer.wrap(key.getEncodedKey()), key.getExpiryDate());
+    } catch (Exception ex) {
+      LOG.error("No node in path {}.", nodePath);
+      throw new IOException(ex);
+    }
+  }
+
+  /**
+   * ZookeeperFederationStateStore Supports Store RMDelegationTokenIdentifier.
+   *
+   * The stored token method is a synchronized method
+   * used to ensure that storeNewToken is a thread-safe method.
+   *
+   * @param request The request contains RouterRMToken (RMDelegationTokenIdentifier and renewDate)
+   * @return routerRMTokenResponse, the response contains the RouterStoreToken.
+   * @throws YarnException if the call to the state store is unsuccessful.
+   * @throws IOException An IO Error occurred.
+   */
   @Override
   public RouterRMTokenResponse storeNewToken(RouterRMTokenRequest request)
       throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    long start = clock.getTime();
+    // We verify the RouterRMTokenRequest to ensure that the request is not empty,
+    // and that the internal RouterStoreToken is not empty.
+    FederationRouterRMTokenInputValidator.validate(request);
+
+    try {
+
+      // add delegationToken
+      storeOrUpdateRouterRMDT(request, false);
+
+      // Get the stored delegationToken from ZK and return.
+      RouterStoreToken resultStoreToken = getStoreTokenFromZK(request, false);
+      long end = clock.getTime();
+      opDurations.getStoreNewTokenDuration(start, end);
+      return RouterRMTokenResponse.newInstance(resultStoreToken);
+    } catch (YarnException | IOException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new YarnException(e);
+    }
   }
 
+  /**
+   * ZookeeperFederationStateStore Supports Update RMDelegationTokenIdentifier.
+   *
+   * The update stored token method is a synchronized method
+   * used to ensure that storeNewToken is a thread-safe method.
+   *
+   * @param request The request contains RouterRMToken (RMDelegationTokenIdentifier and renewDate)
+   * @return routerRMTokenResponse, the response contains the RouterStoreToken.
+   * @throws YarnException if the call to the state store is unsuccessful.
+   * @throws IOException An IO Error occurred.
+   */
   @Override
   public RouterRMTokenResponse updateStoredToken(RouterRMTokenRequest request)
       throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    long start = clock.getTime();
+    // We verify the RouterRMTokenRequest to ensure that the request is not empty,
+    // and that the internal RouterStoreToken is not empty.
+    FederationRouterRMTokenInputValidator.validate(request);
+
+    try {
+
+      // get the Token storage path
+      String nodePath = getStoreTokenZNodePathByTokenRequest(request);
+
+      // updateStoredToken needs to determine whether the zkNode exists.
+      // If it exists, update the token data.
+      // If it does not exist, write the new token data directly.
+      boolean pathExists = true;
+      if (!exists(nodePath)) {
+        pathExists = false;
+      }
+
+      if (pathExists) {
+        // update delegationToken
+        storeOrUpdateRouterRMDT(request, true);
+      } else {
+        // add new delegationToken
+        storeNewToken(request);
+      }
+
+      // Get the stored delegationToken from ZK and return.
+      RouterStoreToken resultStoreToken = getStoreTokenFromZK(request, false);
+      long end = clock.getTime();
+      opDurations.updateStoredTokenDuration(start, end);
+      return RouterRMTokenResponse.newInstance(resultStoreToken);
+    } catch (YarnException | IOException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new YarnException(e);
+    }
   }
 
+  /**
+   * ZookeeperFederationStateStore Supports Remove RMDelegationTokenIdentifier.
+   *
+   * The remove stored token method is a synchronized method
+   * used to ensure that storeNewToken is a thread-safe method.
+   *
+   * @param request The request contains RouterRMToken (RMDelegationTokenIdentifier and renewDate)
+   * @return routerRMTokenResponse, the response contains the RouterStoreToken.
+   * @throws YarnException if the call to the state store is unsuccessful.
+   * @throws IOException An IO Error occurred.
+   */
   @Override
   public RouterRMTokenResponse removeStoredToken(RouterRMTokenRequest request)
       throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    long start = clock.getTime();
+    // We verify the RouterRMTokenRequest to ensure that the request is not empty,
+    // and that the internal RouterStoreToken is not empty.
+    FederationRouterRMTokenInputValidator.validate(request);
+
+    try {
+
+      // get the Token storage path
+      String nodePath = getStoreTokenZNodePathByTokenRequest(request);
+
+      // If the path to be deleted does not exist, throw an exception directly.
+      if (!exists(nodePath)) {
+        throw new YarnException("ZkNodePath = " + nodePath + " not exists!");
+      }
+
+      // Check again, first get the data from ZK,
+      // if the data is not empty, then delete it
+      RouterStoreToken storeToken = getStoreTokenFromZK(request, false);
+      if (storeToken != null) {
+        zkManager.delete(nodePath);
+      }
+
+      // return deleted token data.
+      long end = clock.getTime();
+      opDurations.removeStoredTokenDuration(start, end);
+      return RouterRMTokenResponse.newInstance(storeToken);
+    } catch (YarnException | IOException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new YarnException(e);
+    }
   }
 
+  /**
+   * The Router Supports GetTokenByRouterStoreToken.
+   *
+   * @param request The request contains RouterRMToken (RMDelegationTokenIdentifier and renewDate)
+   * @return RouterRMTokenResponse.
+   * @throws YarnException if the call to the state store is unsuccessful
+   * @throws IOException An IO Error occurred
+   */
   @Override
   public RouterRMTokenResponse getTokenByRouterStoreToken(RouterRMTokenRequest request)
       throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    long start = clock.getTime();
+    // We verify the RouterRMTokenRequest to ensure that the request is not empty,
+    // and that the internal RouterStoreToken is not empty.
+    FederationRouterRMTokenInputValidator.validate(request);
+
+    try {
+
+      // Before get the token,
+      // we need to determine whether the path where the token is stored exists.
+      // If it doesn't exist, we will throw an exception.
+      String nodePath = getStoreTokenZNodePathByTokenRequest(request);
+      if (!exists(nodePath)) {
+        throw new YarnException("ZkNodePath = " + nodePath + " not exists!");
+      }
+
+      // Get the stored delegationToken from ZK and return.
+      RouterStoreToken resultStoreToken = getStoreTokenFromZK(request, false);
+      // return deleted token data.
+      long end = clock.getTime();
+      opDurations.getTokenByRouterStoreTokenDuration(start, end);
+      return RouterRMTokenResponse.newInstance(resultStoreToken);
+    } catch (YarnException | IOException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new YarnException(e);
+    }
+  }
+
+  /**
+   * Convert MasterKey to DelegationKey.
+   *
+   * Before using this function,
+   * please use FederationRouterRMTokenInputValidator to verify the request.
+   * By default, the request is not empty, and the internal object is not empty.
+   *
+   * @param request RouterMasterKeyRequest
+   * @return DelegationKey.
+   */
+  private DelegationKey convertMasterKeyToDelegationKey(RouterMasterKeyRequest request) {
+    RouterMasterKey masterKey = request.getRouterMasterKey();
+    return convertMasterKeyToDelegationKey(masterKey);
+  }
+
+  /**
+   * Convert MasterKey to DelegationKey.
+   *
+   * @param masterKey masterKey.
+   * @return DelegationKey.
+   */
+  private DelegationKey convertMasterKeyToDelegationKey(RouterMasterKey masterKey) {
+    ByteBuffer keyByteBuf = masterKey.getKeyBytes();
+    byte[] keyBytes = new byte[keyByteBuf.remaining()];
+    keyByteBuf.get(keyBytes);
+    return new DelegationKey(masterKey.getKeyId(), masterKey.getExpiryDate(), keyBytes);
+  }
+
+  /**
+   * Check if a path exists in zk.
+   *
+   * @param path Path to be checked.
+   * @return Returns true if the path exists, false if the path does not exist.
+   * @throws Exception When an exception to access zk occurs.
+   */
+  @VisibleForTesting
+  boolean exists(final String path) throws Exception {
+    return zkManager.exists(path);
+  }
+
+  /**
+   * Add or update delegationToken.
+   *
+   * Before using this function,
+   * please use FederationRouterRMTokenInputValidator to verify the request.
+   * By default, the request is not empty, and the internal object is not empty.
+   *
+   * @param request storeToken
+   * @param isUpdate true, update the token; false, create a new token.
+   * @throws Exception exception occurs.
+   */
+  private void storeOrUpdateRouterRMDT(RouterRMTokenRequest request,  boolean isUpdate)
+      throws Exception {
+
+    RouterStoreToken routerStoreToken  = request.getRouterStoreToken();
+    String nodeCreatePath = getStoreTokenZNodePathByTokenRequest(request);
+    LOG.debug("nodeCreatePath = {}, isUpdate = {}", nodeCreatePath, isUpdate);
+    put(nodeCreatePath, routerStoreToken.toByteArray(), isUpdate);
+  }
+
+  /**
+   * Get ZNode Path of StoreToken.
+   *
+   * Before using this method, we should use FederationRouterRMTokenInputValidator
+   * to verify the request,ensure that the request is not empty,
+   * and ensure that the object in the request is not empty.
+   *
+   * @param request RouterMasterKeyRequest.
+   * @return RouterRMToken ZNode Path.
+   * @throws IOException io exception occurs.
+   */
+  private String getStoreTokenZNodePathByTokenRequest(RouterRMTokenRequest request)
+      throws IOException {
+    RouterStoreToken routerStoreToken = request.getRouterStoreToken();
+    YARNDelegationTokenIdentifier identifier = routerStoreToken.getTokenIdentifier();
+    return getStoreTokenZNodePathByIdentifier(identifier);
+  }
+
+  /**
+   * Get ZNode Path of StoreToken.
+   *
+   * @param identifier YARNDelegationTokenIdentifier
+   * @return RouterRMToken ZNode Path.
+   */
+  private String getStoreTokenZNodePathByIdentifier(YARNDelegationTokenIdentifier identifier) {
+    String nodePath = getNodePath(routerRMDelegationTokensRootPath,
+        ROUTER_RM_DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber());
+    return nodePath;
+  }
+
+  /**
+   * Get RouterStoreToken from ZK.
+   *
+   * @param request RouterMasterKeyRequest.
+   * @param quiet If true is silent mode, no error message is printed at this time,
+   * if false is non-silent mode, error message is printed at this time.
+   * @return RouterStoreToken.
+   * @throws IOException io exception occurs.
+   */
+  private RouterStoreToken getStoreTokenFromZK(RouterRMTokenRequest request,
+      boolean quiet) throws IOException {
+    RouterStoreToken routerStoreToken = request.getRouterStoreToken();
+    YARNDelegationTokenIdentifier identifier = routerStoreToken.getTokenIdentifier();
+    return getStoreTokenFromZK(identifier, quiet);
+  }
+
+  /**
+   * Get RouterStoreToken from ZK.
+   *
+   * @param identifier YARN DelegationToken Identifier
+   * @param quiet Whether it is in quiet mode,
+   *        if it is in quiet mode, no exception information will be output.
+   * @return RouterStoreToken.
+   * @throws IOException io exception occurs.
+   */
+  private RouterStoreToken getStoreTokenFromZK(YARNDelegationTokenIdentifier identifier,
+      boolean quiet) throws IOException {
+    // get the Token storage path
+    String nodePath = getStoreTokenZNodePathByIdentifier(identifier);
+    return getStoreTokenFromZK(nodePath, quiet);
+  }
+
+  /**
+   * Get RouterStoreToken from ZK.
+   *
+   * @param nodePath Znode location where data is stored.
+   * @param quiet Whether it is in quiet mode,
+   *        if it is in quiet mode, no exception information will be output.
+   * @return RouterStoreToken.
+   * @throws IOException io exception occurs.
+   */
+  private RouterStoreToken getStoreTokenFromZK(String nodePath, boolean quiet)
+      throws IOException {
+    try {
+      byte[] data = get(nodePath);
+      if ((data == null) || (data.length == 0)) {
+        return null;
+      }
+      ByteArrayInputStream bin = new ByteArrayInputStream(data);
+      DataInputStream din = new DataInputStream(bin);
+      RouterStoreToken storeToken = Records.newRecord(RouterStoreToken.class);
+      storeToken.readFields(din);
+      return storeToken;
+    } catch (Exception ex) {
+      if (!quiet) {
+        LOG.error("No node in path [" + nodePath + "]");

Review Comment:
   Use {}





> [Federation] Router Support DelegationToken With ZK
> ---------------------------------------------------
>
>                 Key: YARN-11350
>                 URL: https://issues.apache.org/jira/browse/YARN-11350
>             Project: Hadoop YARN
>          Issue Type: Sub-task
>          Components: federation, router
>    Affects Versions: 3.4.0
>            Reporter: Shilun Fan
>            Assignee: Shilun Fan
>            Priority: Major
>              Labels: pull-request-available
>
> [Federation] Router Support DelegationToken With ZookeeperFederationStateStore.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: yarn-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: yarn-issues-help@hadoop.apache.org