You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by GitBox <gi...@apache.org> on 2021/08/31 16:30:50 UTC

[GitHub] [hadoop] virajjasani commented on a change in pull request #3359: HDFS-16198. Short circuit read leaks Slot objects when InvalidToken exception is thrown

virajjasani commented on a change in pull request #3359:
URL: https://github.com/apache/hadoop/pull/3359#discussion_r699484858



##########
File path: hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithShortCircuitRead.java
##########
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.block.SecurityTestUtil;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.hdfs.shortcircuit.DfsClientShm;
+import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.PerDatanodeVisitorInfo;
+import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.Visitor;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestBlockTokenWithShortCircuitRead {
+
+  private static final int BLOCK_SIZE = 1024;
+  private static final int FILE_SIZE = 2 * BLOCK_SIZE;
+  private static final String FILE_TO_SHORT_CIRCUIT_READ = "/fileToSSR.dat";
+  private final byte[] rawData = new byte[FILE_SIZE];
+
+  {
+    GenericTestUtils.setLogLevel(DFSClient.LOG, Level.ALL);
+    Random r = new Random();
+    r.nextBytes(rawData);
+  }
+
+  private void createFile(FileSystem fs, Path filename) throws IOException {
+    FSDataOutputStream out = fs.create(filename);
+    out.write(rawData);
+    out.close();
+  }
+
+  // read a file using blockSeekTo()
+  private boolean checkFile1(FSDataInputStream in) {
+    byte[] toRead = new byte[FILE_SIZE];
+    int totalRead = 0;
+    int nRead;
+    try {
+      while ((nRead = in.read(toRead, totalRead,
+        toRead.length - totalRead)) > 0) {
+        totalRead += nRead;
+      }
+    } catch (IOException e) {
+      return false;
+    }
+    assertEquals("Cannot read file.", toRead.length, totalRead);
+    return checkFile(toRead);
+  }
+
+  private boolean checkFile(byte[] fileToCheck) {
+    if (fileToCheck.length != rawData.length) {
+      return false;
+    }
+    for (int i = 0; i < fileToCheck.length; i++) {
+      if (fileToCheck[i] != rawData[i]) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Test
+  public void testShortCircuitReadWithInvalidToken() throws Exception {
+    MiniDFSCluster cluster = null;
+    int numDataNodes = 1;
+    Configuration conf = new Configuration();
+    conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+    conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    conf.setInt("io.bytes.per.checksum", BLOCK_SIZE);
+    conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
+    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, numDataNodes);
+    conf.setInt("ipc.client.connect.max.retries", 0);

Review comment:
       nit: good to use `IPC_CLIENT_CONNECT_MAX_RETRIES_KEY` constant for tracking the usage of this config

##########
File path: hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithShortCircuitRead.java
##########
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.block.SecurityTestUtil;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.hdfs.shortcircuit.DfsClientShm;
+import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.PerDatanodeVisitorInfo;
+import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.Visitor;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestBlockTokenWithShortCircuitRead {
+
+  private static final int BLOCK_SIZE = 1024;
+  private static final int FILE_SIZE = 2 * BLOCK_SIZE;
+  private static final String FILE_TO_SHORT_CIRCUIT_READ = "/fileToSSR.dat";
+  private final byte[] rawData = new byte[FILE_SIZE];
+
+  {
+    GenericTestUtils.setLogLevel(DFSClient.LOG, Level.ALL);
+    Random r = new Random();
+    r.nextBytes(rawData);
+  }
+
+  private void createFile(FileSystem fs, Path filename) throws IOException {
+    FSDataOutputStream out = fs.create(filename);
+    out.write(rawData);
+    out.close();
+  }
+
+  // read a file using blockSeekTo()
+  private boolean checkFile1(FSDataInputStream in) {
+    byte[] toRead = new byte[FILE_SIZE];
+    int totalRead = 0;
+    int nRead;
+    try {
+      while ((nRead = in.read(toRead, totalRead,
+        toRead.length - totalRead)) > 0) {
+        totalRead += nRead;
+      }
+    } catch (IOException e) {
+      return false;
+    }
+    assertEquals("Cannot read file.", toRead.length, totalRead);
+    return checkFile(toRead);
+  }
+
+  private boolean checkFile(byte[] fileToCheck) {
+    if (fileToCheck.length != rawData.length) {
+      return false;
+    }
+    for (int i = 0; i < fileToCheck.length; i++) {
+      if (fileToCheck[i] != rawData[i]) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Test
+  public void testShortCircuitReadWithInvalidToken() throws Exception {
+    MiniDFSCluster cluster = null;
+    int numDataNodes = 1;
+    Configuration conf = new Configuration();
+    conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+    conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    conf.setInt("io.bytes.per.checksum", BLOCK_SIZE);
+    conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
+    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, numDataNodes);
+    conf.setInt("ipc.client.connect.max.retries", 0);
+    // Set short retry timeouts so this test runs faster
+    conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10);
+    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
+    conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(),
+      "testShortCircuitReadWithInvalidToken").getAbsolutePath());
+    conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
+    // avoid caching
+    conf.setInt(HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_SIZE_KEY, 0);
+    DomainSocket.disableBindPathValidation();
+
+    try {
+      cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(numDataNodes).format(true).build();
+      cluster.waitActive();
+
+      final NameNode nn = cluster.getNameNode();
+      final NamenodeProtocols nnProto = nn.getRpcServer();
+      final BlockManager bm = nn.getNamesystem().getBlockManager();
+      final BlockTokenSecretManager sm = bm.getBlockTokenSecretManager();
+      // set a short token lifetime (1 second) initially
+      SecurityTestUtil.setBlockTokenLifetime(sm, 1000L);
+
+      Path fileToRead = new Path(FILE_TO_SHORT_CIRCUIT_READ);
+      DistributedFileSystem fs = cluster.getFileSystem();
+
+      final ShortCircuitCache cache =
+        fs.getClient().getClientContext().getShortCircuitCache();
+      final DatanodeInfo datanode =
+        new DatanodeInfo.DatanodeInfoBuilder()
+          .setNodeID(cluster.getDataNodes().get(0).getDatanodeId())
+          .build();
+
+      cache.getDfsClientShmManager().visit(new Visitor() {
+        @Override
+        public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info) {
+          // The ClientShmManager starts off empty
+          Assert.assertEquals(0, info.size());
+        }
+      });
+
+      // create file to read
+      createFile(fs, fileToRead);
+
+      // acquire access token
+      FSDataInputStream in = fs.open(fileToRead);
+      assertTrue(checkFile1(in));
+
+      // verify token is not expired
+      List<LocatedBlock> locatedBlocks = nnProto.getBlockLocations(
+        FILE_TO_SHORT_CIRCUIT_READ, 0, FILE_SIZE).getLocatedBlocks();
+      LocatedBlock lblock = locatedBlocks.get(0); // first block
+      Token<BlockTokenIdentifier> myToken = lblock.getBlockToken();
+      assertFalse(SecurityTestUtil.isBlockTokenExpired(myToken));
+
+      // wait token expiration
+      while (!SecurityTestUtil.isBlockTokenExpired(myToken)) {
+        try {
+          Thread.sleep(10);

Review comment:
       10 ms seems too less for waiting here. Either we should increase it or good to avoid waiting altogether.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org