You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by GitBox <gi...@apache.org> on 2022/11/30 00:08:55 UTC

[GitHub] [hadoop] slfan1989 commented on a diff in pull request #5169: YARN-11349. [Federation] Router Support DelegationToken With SQL.

slfan1989 commented on code in PR #5169:
URL: https://github.com/apache/hadoop/pull/5169#discussion_r1035403875


##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java:
##########
@@ -1353,45 +1386,560 @@ public Connection getConn() {
     return conn;
   }
 
+  /**
+   * SQLFederationStateStore Supports Store New MasterKey.
+   *
+   * @param request The request contains RouterMasterKey, which is an abstraction for DelegationKey.
+   * @return routerMasterKeyResponse, the response contains the RouterMasterKey.
+   * @throws YarnException if the call to the state store is unsuccessful.
+   * @throws IOException An IO Error occurred.
+   */
   @Override
   public RouterMasterKeyResponse storeNewMasterKey(RouterMasterKeyRequest request)
       throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    // Step1: Verify parameters to ensure that key fields are not empty.
+    FederationRouterRMTokenInputValidator.validate(request);
+
+    // Step2: Parse the parameters and serialize the DelegationKey as a string.
+    DelegationKey delegationKey = convertMasterKeyToDelegationKey(request);
+    int keyId = delegationKey.getKeyId();
+    String delegationKeyStr = encodeWritable(delegationKey);
+    CallableStatement cstmt = null;
+
+    // Step3. store data in database.
+    try {
+      // Call procedure
+      cstmt = getCallableStatement(CALL_SP_ADD_MASTERKEY);
+
+      // Set the parameters for the stored procedure
+      // 1)IN keyId_IN bigint
+      cstmt.setInt("keyId_IN", keyId);
+      // 2)IN masterKey_IN varbinary(1024)
+      cstmt.setString("masterKey_IN", delegationKeyStr);
+      // 3) OUT rowCount_OUT int
+      cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER);
+
+      long startTime = clock.getTime();
+      cstmt.executeUpdate();
+      long stopTime = clock.getTime();
+
+      // Get rowCount
+      int rowCount = cstmt.getInt("rowCount_OUT");
+
+      // We hope that 1 record can be written to the database.
+      // If the number of records is not 1, it means that the data was written incorrectly.
+      if (rowCount != 1) {
+        FederationStateStoreUtils.logAndThrowStoreException(LOG,
+            "Wrong behavior during the insertion of masterKey, keyId = %s. " +
+            "please check the records of the database.", String.valueOf(keyId));
+      }
+      FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
+    } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
+      FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
+          "Unable to insert the newly masterKey, keyId = %s.", String.valueOf(keyId));
+    } finally {
+      // Return to the pool the CallableStatement
+      FederationStateStoreUtils.returnToPool(LOG, cstmt);
+    }
+
+    // Step4. Query Data from the database and return the result.
+    return getMasterKeyByDelegationKey(request);
   }
 
+  /**
+   * SQLFederationStateStore Supports Remove MasterKey.
+   *
+   * @param request The request contains RouterMasterKey, which is an abstraction for DelegationKey
+   * @return routerMasterKeyResponse, the response contains the RouterMasterKey.
+   * @throws YarnException if the call to the state store is unsuccessful.
+   * @throws IOException An IO Error occurred.
+   */
   @Override
   public RouterMasterKeyResponse removeStoredMasterKey(RouterMasterKeyRequest request)
       throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+    // Step1: Verify parameters to ensure that key fields are not empty.
+    FederationRouterRMTokenInputValidator.validate(request);
+
+    // Step2: Parse parameters and get KeyId.
+    RouterMasterKey paramMasterKey = request.getRouterMasterKey();
+    int paramKeyId = paramMasterKey.getKeyId();
+    CallableStatement cstmt = null;
+
+    // Step3. Clear data from database.
+    try {
+      // Defined the sp_deleteMasterKey procedure
+      // This procedure requires 1 input parameters, 1 output parameters
+      // Input parameters
+      // 1)IN keyId_IN int
+      // Output parameters
+      // 2)OUT rowCount_OUT int
+      cstmt = getCallableStatement(CALL_SP_DELETE_MASTERKEY);
+
+      // Set the parameters for the stored procedure
+      // 1)IN keyId_IN int
+      cstmt.setInt("keyId_IN", paramKeyId);
+      // 2)OUT rowCount_OUT int
+      cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER);
+
+      // Execute the query
+      long startTime = clock.getTime();
+      cstmt.executeUpdate();
+      long stopTime = clock.getTime();
+
+      int rowCount = cstmt.getInt("rowCount_OUT");
+
+      // if it is equal to 0 it means the call
+      // did not delete the reservation from FederationStateStore
+      if (rowCount == 0) {
+        FederationStateStoreUtils.logAndThrowStoreException(LOG,
+            "masterKeyId = %s does not exist.", String.valueOf(paramKeyId));
+      } else if (rowCount != 1) {
+        // if it is different from 1 it means the call
+        // had a wrong behavior. Maybe the database is not set correctly.
+        FederationStateStoreUtils.logAndThrowStoreException(LOG,
+            "Wrong behavior during deleting the keyId %s. " +
+            "The database is expected to delete 1 record, " +
+            "but the number of deleted records returned by the database is greater than 1, " +
+            "indicating that a duplicate masterKey occurred during the deletion process.",
+            paramKeyId);
+      }
+
+      LOG.info("Delete from the StateStore the keyId: {}.", String.valueOf(paramKeyId));
+      FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
+      return RouterMasterKeyResponse.newInstance(paramMasterKey);
+    } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
+      FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
+          "Unable to delete the keyId %s.", paramKeyId);
+    } finally {
+      // Return to the pool the CallableStatement
+      FederationStateStoreUtils.returnToPool(LOG, cstmt);
+    }
+    throw new YarnException("Unable to delete the masterKey, keyId = " + paramKeyId);
   }
 
+  /**
+   * SQLFederationStateStore Supports Remove MasterKey.
+   *
+   * @param request The request contains RouterMasterKey, which is an abstraction for DelegationKey
+   * @return routerMasterKeyResponse, the response contains the RouterMasterKey.
+   * @throws YarnException if the call to the state store is unsuccessful.
+   * @throws IOException An IO Error occurred.
+   */
   @Override
   public RouterMasterKeyResponse getMasterKeyByDelegationKey(RouterMasterKeyRequest request)
       throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+    // Step1: Verify parameters to ensure that key fields are not empty.
+    FederationRouterRMTokenInputValidator.validate(request);
+
+    // Step2: Parse parameters and get KeyId.
+    RouterMasterKey paramMasterKey = request.getRouterMasterKey();
+    int paramKeyId = paramMasterKey.getKeyId();
+    CallableStatement cstmt = null;
+
+    // Step3: Call the stored procedure to get the result.
+    try {
+      // Defined the sp_getMasterKey procedure
+      // this procedure requires 2 parameters
+      // Input parameters
+      // 1)IN keyId_IN int
+      // Output parameters
+      // 2)OUT masterKey_OUT varchar(1024)
+      cstmt = getCallableStatement(CALL_SP_GET_MASTERKEY);
+
+      // Set the parameters for the stored procedure
+      // 1)IN keyId_IN int
+      cstmt.setInt("keyId_IN", paramKeyId);
+      // 2)OUT masterKey_OUT varchar(1024)
+      cstmt.registerOutParameter("masterKey_OUT", java.sql.Types.VARCHAR);
+
+      // Execute the query
+      long startTime = clock.getTime();
+      cstmt.execute();
+      long stopTime = clock.getTime();
+
+      // Get Result
+      String resultMasterKey = cstmt.getString("masterKey_OUT");
+      DelegationKey key = new DelegationKey();
+      decodeWritable(key, resultMasterKey);
+
+      // Get RouterMasterKey
+      RouterMasterKey routerMasterKey = RouterMasterKey.newInstance(key.getKeyId(),
+          ByteBuffer.wrap(key.getEncodedKey()), key.getExpiryDate());
+
+      LOG.info("Got the information about the specified masterKey = {} according to keyId = {}.",
+          routerMasterKey, paramKeyId);
+
+      FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
+
+      // Return query result.
+      return RouterMasterKeyResponse.newInstance(routerMasterKey);
+
+    } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
+      FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
+          "Unable to obtain the masterKey information according to %s.",
+          String.valueOf(paramKeyId));
+    } finally {
+      // Return to the pool the CallableStatement
+      FederationStateStoreUtils.returnToPool(LOG, cstmt);
+    }
+
+    // Throw exception information
+    throw new YarnException(
+        "Unable to obtain the masterKey information according to " + paramKeyId);
   }
 
+  /**
+   * SQLFederationStateStore Supports Store RMDelegationTokenIdentifier.
+   *
+   * @param request The request contains RouterRMToken (RMDelegationTokenIdentifier and renewDate)
+   * @return routerRMTokenResponse, the response contains the RouterStoreToken.
+   * @throws YarnException if the call to the state store is unsuccessful.
+   * @throws IOException An IO Error occurred.
+   */
   @Override
   public RouterRMTokenResponse storeNewToken(RouterRMTokenRequest request)
       throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    // Step1: Verify parameters to ensure that key fields are not empty.
+    FederationRouterRMTokenInputValidator.validate(request);
+
+    // Step2: Parse parameters and get KeyId.
+    RouterStoreToken routerStoreToken = request.getRouterStoreToken();
+    YARNDelegationTokenIdentifier identifier = routerStoreToken.getTokenIdentifier();
+    String tokenIdentifier = encodeWritable(identifier);
+    String tokenInfo = routerStoreToken.getTokenInfo();
+    long renewDate = routerStoreToken.getRenewDate();
+    int sequenceNum = identifier.getSequenceNumber();
+    CallableStatement cstmt = null;
+
+    // Step3. store data in database.
+    try {
+
+      // Defined the sp_addDelegationToken procedure
+      // This procedure requires 4 input parameters, 1 output parameters
+      // Input parameters
+      // 1)IN sequenceNum_IN int
+      // 2)IN tokenIdent_IN varchar(1024)
+      // 3)IN token_IN varchar(1024)
+      // 4) IN renewDate_IN bigint
+      // Output parameters
+      // 5)OUT rowCount_OUT int
+
+      cstmt = getCallableStatement(CALL_SP_ADD_DELEGATIONTOKEN);
+
+      // Set the parameters for the stored procedure
+      // 1)IN sequenceNum_IN int
+      cstmt.setInt("sequenceNum_IN", sequenceNum);
+      // 2)IN tokenIdent_IN varchar(1024)
+      cstmt.setString("tokenIdent_IN", tokenIdentifier);
+      // 3) IN token_IN varchar(1024)
+      cstmt.setString("token_IN", tokenInfo);
+      // 4) IN renewDate_IN long
+      cstmt.setLong("renewDate_IN", renewDate);
+      // 5) OUT rowCount_OUT int
+      cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER);
+
+      // Execute the query
+      long startTime = clock.getTime();
+      cstmt.executeUpdate();

Review Comment:
   Thank you very much for your help in reviewing the code, I will refactor this part of the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org