You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by aa...@apache.org on 2021/12/03 14:36:20 UTC

[hadoop] branch branch-3.3 updated: HDFS-16332. Handle invalid token exception in sasl handshake (#3677)

This is an automated email from the ASF dual-hosted git repository.

aajisaka pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 2cdecd4  HDFS-16332. Handle invalid token exception in sasl handshake (#3677)
2cdecd4 is described below

commit 2cdecd420a12d2af94aaf3a2b5a44dad1a429f76
Author: bitterfox <bi...@gmail.com>
AuthorDate: Fri Dec 3 23:30:13 2021 +0900

    HDFS-16332. Handle invalid token exception in sasl handshake (#3677)
    
    Signed-off-by: Akira Ajisaka <aa...@apache.org>
    (cherry picked from commit dd6b987c93e8319560b633360f30ac84fc48e403)
    
     Conflicts:
    	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java
---
 .../datatransfer/sasl/DataTransferSaslUtil.java    |  72 ++++----
 .../datatransfer/sasl/SaslDataTransferClient.java  |  19 ++-
 .../src/main/proto/datatransfer.proto              |   1 +
 .../datatransfer/sasl/SaslDataTransferServer.java  |  22 +++
 .../TestSaslDataTransferExpiredBlockToken.java     | 181 +++++++++++++++++++++
 5 files changed, 259 insertions(+), 36 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java
index 94ae400..a517188 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java
@@ -34,6 +34,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
 import javax.security.sasl.Sasl;
 
 import org.apache.commons.codec.binary.Base64;
@@ -52,6 +53,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncr
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.HandshakeSecretProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CipherOptionProto;
 import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.security.SaslPropertiesResolver;
 import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection;
 import org.slf4j.Logger;
@@ -204,6 +206,26 @@ public final class DataTransferSaslUtil {
     return resolver;
   }
 
+  private static <T> T readSaslMessage(InputStream in,
+      Function<DataTransferEncryptorMessageProto, ? extends T> handler) throws IOException {
+    DataTransferEncryptorMessageProto proto =
+        DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in));
+    switch (proto.getStatus()) {
+    case ERROR_UNKNOWN_KEY:
+      throw new InvalidEncryptionKeyException(proto.getMessage());
+    case ERROR:
+      if (proto.hasAccessTokenError() && proto.getAccessTokenError()) {
+        throw new InvalidBlockTokenException(proto.getMessage());
+      }
+      throw new IOException(proto.getMessage());
+    case SUCCESS:
+      return handler.apply(proto);
+    default:
+      throw new IOException(
+          "Unknown status: " + proto.getStatus() + ", message: " + proto.getMessage());
+    }
+  }
+
   /**
    * Reads a SASL negotiation message.
    *
@@ -212,15 +234,7 @@ public final class DataTransferSaslUtil {
    * @throws IOException for any error
    */
   public static byte[] readSaslMessage(InputStream in) throws IOException {
-    DataTransferEncryptorMessageProto proto =
-        DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in));
-    if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
-      throw new InvalidEncryptionKeyException(proto.getMessage());
-    } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
-      throw new IOException(proto.getMessage());
-    } else {
-      return proto.getPayload().toByteArray();
-    }
+    return readSaslMessage(in, proto -> proto.getPayload().toByteArray());
   }
 
   /**
@@ -233,13 +247,7 @@ public final class DataTransferSaslUtil {
    */
   public static byte[] readSaslMessageAndNegotiationCipherOptions(
       InputStream in, List<CipherOption> cipherOptions) throws IOException {
-    DataTransferEncryptorMessageProto proto =
-        DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in));
-    if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
-      throw new InvalidEncryptionKeyException(proto.getMessage());
-    } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
-      throw new IOException(proto.getMessage());
-    } else {
+    return readSaslMessage(in, proto -> {
       List<CipherOptionProto> optionProtos = proto.getCipherOptionList();
       if (optionProtos != null) {
         for (CipherOptionProto optionProto : optionProtos) {
@@ -247,7 +255,7 @@ public final class DataTransferSaslUtil {
         }
       }
       return proto.getPayload().toByteArray();
-    }
+    });
   }
 
   static class SaslMessageWithHandshake {
@@ -276,13 +284,7 @@ public final class DataTransferSaslUtil {
 
   public static SaslMessageWithHandshake readSaslMessageWithHandshakeSecret(
       InputStream in) throws IOException {
-    DataTransferEncryptorMessageProto proto =
-        DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in));
-    if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
-      throw new InvalidEncryptionKeyException(proto.getMessage());
-    } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
-      throw new IOException(proto.getMessage());
-    } else {
+    return readSaslMessage(in, proto -> {
       byte[] payload = proto.getPayload().toByteArray();
       byte[] secret = null;
       String bpid = null;
@@ -292,7 +294,7 @@ public final class DataTransferSaslUtil {
         bpid = handshakeSecret.getBpid();
       }
       return new SaslMessageWithHandshake(payload, secret, bpid);
-    }
+    });
   }
 
   /**
@@ -465,13 +467,7 @@ public final class DataTransferSaslUtil {
   public static SaslResponseWithNegotiatedCipherOption
       readSaslMessageAndNegotiatedCipherOption(InputStream in)
       throws IOException {
-    DataTransferEncryptorMessageProto proto =
-        DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in));
-    if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
-      throw new InvalidEncryptionKeyException(proto.getMessage());
-    } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
-      throw new IOException(proto.getMessage());
-    } else {
+    return readSaslMessage(in, proto -> {
       byte[] response = proto.getPayload().toByteArray();
       List<CipherOption> options = PBHelperClient.convertCipherOptionProtos(
           proto.getCipherOptionList());
@@ -480,7 +476,7 @@ public final class DataTransferSaslUtil {
         option = options.get(0);
       }
       return new SaslResponseWithNegotiatedCipherOption(response, option);
-    }
+    });
   }
 
   /**
@@ -556,6 +552,13 @@ public final class DataTransferSaslUtil {
       DataTransferEncryptorStatus status, byte[] payload, String message,
       HandshakeSecretProto handshakeSecret)
       throws IOException {
+    sendSaslMessage(out, status, payload, message, handshakeSecret, false);
+  }
+
+  public static void sendSaslMessage(OutputStream out,
+      DataTransferEncryptorStatus status, byte[] payload, String message,
+      HandshakeSecretProto handshakeSecret, boolean accessTokenError)
+      throws IOException {
     DataTransferEncryptorMessageProto.Builder builder =
         DataTransferEncryptorMessageProto.newBuilder();
 
@@ -569,6 +572,9 @@ public final class DataTransferSaslUtil {
     if (handshakeSecret != null) {
       builder.setHandshakeSecret(handshakeSecret);
     }
+    if (accessTokenError) {
+      builder.setAccessTokenError(true);
+    }
 
     DataTransferEncryptorMessageProto proto = builder.build();
     proto.writeDelimitedTo(out);
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java
index 86053ee..7f6e474 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java
@@ -583,11 +583,11 @@ public class SaslDataTransferClient {
               // the client accepts some cipher suites, but the server does not.
               LOG.debug("Client accepts cipher suites {}, "
                       + "but server {} does not accept any of them",
-                  cipherSuites, addr.toString());
+                  cipherSuites, addr);
             }
           } else {
             LOG.debug("Client using cipher suite {} with server {}",
-                cipherOption.getCipherSuite().getName(), addr.toString());
+                cipherOption.getCipherSuite().getName(), addr);
           }
         }
       }
@@ -598,7 +598,20 @@ public class SaslDataTransferClient {
           conf, cipherOption, underlyingOut, underlyingIn, false) :
           sasl.createStreamPair(out, in);
     } catch (IOException ioe) {
-      sendGenericSaslErrorMessage(out, ioe.getMessage());
+      String message = ioe.getMessage();
+      try {
+        sendGenericSaslErrorMessage(out, message);
+      } catch (Exception e) {
+        // If ioe is caused by error response from server, server will close peer connection.
+        // So sendGenericSaslErrorMessage might cause IOException due to "Broken pipe".
+        // We suppress IOException from sendGenericSaslErrorMessage
+        // and always throw `ioe` as top level.
+        // `ioe` can be InvalidEncryptionKeyException or InvalidBlockTokenException
+        // that indicates refresh key or token and are important for caller.
+        LOG.debug("Failed to send generic sasl error to server {} (message: {}), "
+                + "suppress exception", addr, message, e);
+        ioe.addSuppressed(e);
+      }
       throw ioe;
     }
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
index 28a292e..d2f72f9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
@@ -44,6 +44,7 @@ message DataTransferEncryptorMessageProto {
   optional string message = 3;
   repeated CipherOptionProto cipherOption = 4;
   optional HandshakeSecretProto handshakeSecret = 5;
+  optional bool accessTokenError = 6;
 }
 
 message HandshakeSecretProto {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java
index 0e2dc71..059c920 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java
@@ -52,10 +52,12 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyExceptio
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
 import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.datanode.DNConf;
 import org.apache.hadoop.security.SaslPropertiesResolver;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -441,6 +443,14 @@ public class SaslDataTransferServer {
         // error, the client will get a new encryption key from the NN and retry
         // connecting to this DN.
         sendInvalidKeySaslErrorMessage(out, ioe.getCause().getMessage());
+      } else if (ioe instanceof SaslException &&
+          ioe.getCause() != null &&
+          (ioe.getCause() instanceof InvalidBlockTokenException ||
+              ioe.getCause() instanceof SecretManager.InvalidToken)) {
+        // This could be because the client is long-lived and block token is expired
+        // The client will get new block token from the NN, upon receiving this error
+        // and retry connecting to this DN
+        sendInvalidTokenSaslErrorMessage(out, ioe.getCause().getMessage());
       } else {
         sendGenericSaslErrorMessage(out, ioe.getMessage());
       }
@@ -460,4 +470,16 @@ public class SaslDataTransferServer {
     sendSaslMessage(out, DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY, null,
         message);
   }
+
+  /**
+   * Sends a SASL negotiation message indicating an invalid token error.
+   *
+   * @param out     stream to receive message
+   * @param message to send
+   * @throws IOException for any error
+   */
+  private static void sendInvalidTokenSaslErrorMessage(DataOutputStream out,
+      String message) throws IOException {
+    sendSaslMessage(out, DataTransferEncryptorStatus.ERROR, null, message, null, true);
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/TestSaslDataTransferExpiredBlockToken.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/TestSaslDataTransferExpiredBlockToken.java
new file mode 100644
index 0000000..838ad7c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/TestSaslDataTransferExpiredBlockToken.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSInputStream;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.HedgedRead;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Retry;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.token.block.SecurityTestUtil;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+public class TestSaslDataTransferExpiredBlockToken extends SaslDataTransferTestCase {
+  private static final int BLOCK_SIZE = 4096;
+  private static final int FILE_SIZE = 2 * BLOCK_SIZE;
+  private static final Path PATH = new Path("/file1");
+
+  private final byte[] rawData = new byte[FILE_SIZE];
+  private MiniDFSCluster cluster;
+
+  @Rule
+  public Timeout timeout = new Timeout(60, TimeUnit.SECONDS);
+
+  @Before
+  public void before() throws Exception {
+    Random r = new Random();
+    r.nextBytes(rawData);
+
+    HdfsConfiguration conf = createSecureConfig("authentication,integrity,privacy");
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+    cluster.waitActive();
+
+    try (FileSystem fs = cluster.getFileSystem()) {
+      createFile(fs);
+    }
+
+    // set a short token lifetime (1 second) initially
+    SecurityTestUtil.setBlockTokenLifetime(
+        cluster.getNameNode().getNamesystem().getBlockManager().getBlockTokenSecretManager(),
+        1000L);
+  }
+
+  @After
+  public void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  private void createFile(FileSystem fs) throws IOException {
+    try (FSDataOutputStream out = fs.create(PATH)) {
+      out.write(rawData);
+    }
+  }
+
+  // read a file using blockSeekTo()
+  private boolean checkFile1(FSDataInputStream in) {
+    byte[] toRead = new byte[FILE_SIZE];
+    int totalRead = 0;
+    int nRead = 0;
+    try {
+      while ((nRead = in.read(toRead, totalRead, toRead.length - totalRead)) > 0) {
+        totalRead += nRead;
+      }
+    } catch (IOException e) {
+      return false;
+    }
+    assertEquals("Cannot read file.", toRead.length, totalRead);
+    return checkFile(toRead);
+  }
+
+  // read a file using fetchBlockByteRange()/hedgedFetchBlockByteRange()
+  private boolean checkFile2(FSDataInputStream in) {
+    byte[] toRead = new byte[FILE_SIZE];
+    try {
+      assertEquals("Cannot read file", toRead.length, in.read(0, toRead, 0, toRead.length));
+    } catch (IOException e) {
+      return false;
+    }
+    return checkFile(toRead);
+  }
+
+  private boolean checkFile(byte[] fileToCheck) {
+    if (fileToCheck.length != rawData.length) {
+      return false;
+    }
+    for (int i = 0; i < fileToCheck.length; i++) {
+      if (fileToCheck[i] != rawData[i]) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private FileSystem newFileSystem() throws IOException {
+    Configuration clientConf = new Configuration(cluster.getConfiguration(0));
+
+    clientConf.setInt(Retry.WINDOW_BASE_KEY, Integer.MAX_VALUE);
+
+    return FileSystem.newInstance(cluster.getURI(), clientConf);
+  }
+
+  private FileSystem newFileSystemHedgedRead() throws IOException {
+    Configuration clientConf = new Configuration(cluster.getConfiguration(0));
+
+    clientConf.setInt(Retry.WINDOW_BASE_KEY, 3000);
+    clientConf.setInt(HedgedRead.THREADPOOL_SIZE_KEY, 5);
+
+    return FileSystem.newInstance(cluster.getURI(), clientConf);
+  }
+
+  @Test
+  public void testBlockSeekToWithExpiredToken() throws Exception {
+    // read using blockSeekTo(). Acquired tokens are cached in in
+    try (FileSystem fs = newFileSystem(); FSDataInputStream in = fs.open(PATH)) {
+      waitBlockTokenExpired(in);
+      assertTrue(checkFile1(in));
+    }
+  }
+
+  @Test
+  public void testFetchBlockByteRangeWithExpiredToken() throws Exception {
+    // read using fetchBlockByteRange(). Acquired tokens are cached in in
+    try (FileSystem fs = newFileSystem(); FSDataInputStream in = fs.open(PATH)) {
+      waitBlockTokenExpired(in);
+      assertTrue(checkFile2(in));
+    }
+  }
+
+  @Test
+  public void testHedgedFetchBlockByteRangeWithExpiredToken() throws Exception {
+    // read using hedgedFetchBlockByteRange(). Acquired tokens are cached in in
+    try (FileSystem fs = newFileSystemHedgedRead(); FSDataInputStream in = fs.open(PATH)) {
+      waitBlockTokenExpired(in);
+      assertTrue(checkFile2(in));
+    }
+  }
+
+  private void waitBlockTokenExpired(FSDataInputStream in1) throws Exception {
+    DFSInputStream innerStream = (DFSInputStream) in1.getWrappedStream();
+    for (LocatedBlock block : innerStream.getAllBlocks()) {
+      while (!SecurityTestUtil.isBlockTokenExpired(block.getBlockToken())) {
+        Thread.sleep(100);
+      }
+    }
+  }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org