You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2019/11/06 14:00:45 UTC
[hadoop] branch trunk updated: HDFS-14384. When lastLocatedBlock
token expire,
it will take 1~3s second to refetch it. Contributed by Surendra Singh
Lilhore.
This is an automated email from the ASF dual-hosted git repository.
surendralilhore pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new c360141 HDFS-14384. When lastLocatedBlock token expire, it will take 1~3s second to refetch it. Contributed by Surendra Singh Lilhore.
c360141 is described below
commit c36014165c212b26d75268ee3659aa2cadcff349
Author: Surendra Singh Lilhore <su...@apache.org>
AuthorDate: Wed Nov 6 19:28:55 2019 +0530
HDFS-14384. When lastLocatedBlock token expire, it will take 1~3s second to refetch it. Contributed by Surendra Singh Lilhore.
---
.../org/apache/hadoop/hdfs/DFSInputStream.java | 18 ++++++--
.../hdfs/security/token/block/TestBlockToken.java | 49 ++++++++++++++++++++++
2 files changed, 64 insertions(+), 3 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index f9e7d6e..757924d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -257,6 +257,13 @@ public class DFSInputStream extends FSInputStream
}
}
locatedBlocks = newInfo;
+ long lastBlkBeingWrittenLength = getLastBlockLength();
+ fileEncryptionInfo = locatedBlocks.getFileEncryptionInfo();
+
+ return lastBlkBeingWrittenLength;
+ }
+
+ private long getLastBlockLength() throws IOException{
long lastBlockBeingWrittenLength = 0;
if (!locatedBlocks.isLastBlockComplete()) {
final LocatedBlock last = locatedBlocks.getLastLocatedBlock();
@@ -275,8 +282,6 @@ public class DFSInputStream extends FSInputStream
}
}
- fileEncryptionInfo = locatedBlocks.getFileEncryptionInfo();
-
return lastBlockBeingWrittenLength;
}
@@ -459,7 +464,14 @@ public class DFSInputStream extends FSInputStream
if (newBlocks == null || newBlocks.locatedBlockCount() == 0) {
throw new EOFException("Could not find target position " + offset);
}
- locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
+ // Update the LastLocatedBlock, if offset is for last block.
+ if (offset >= locatedBlocks.getFileLength()) {
+ locatedBlocks = newBlocks;
+ lastBlockBeingWrittenLength = getLastBlockLength();
+ } else {
+ locatedBlocks.insertRange(targetBlockIdx,
+ newBlocks.getLocatedBlocks());
+ }
}
return locatedBlocks.get(targetBlockIdx);
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
index 59b28c1..4e22259 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
@@ -34,6 +34,7 @@ import java.io.File;
import java.io.IOException;
import java.io.DataOutput;
import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Calendar;
import java.util.EnumSet;
@@ -44,12 +45,14 @@ import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
@@ -64,6 +67,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetRep
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthResponseProto;
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.TestWritable;
@@ -91,6 +96,7 @@ import org.mockito.stubbing.Answer;
import com.google.protobuf.BlockingService;
import com.google.protobuf.ServiceException;
+
import org.apache.hadoop.fs.StorageType;
/** Unit tests for block tokens */
@@ -891,4 +897,47 @@ public class TestBlockToken {
new DataInputStream(new ByteArrayInputStream(masterId.getBytes())));
assertArrayEquals(password, sm.retrievePassword(slaveId));
}
+
+ /** Test for last in-progress block token expiry.
+ * 1. Write file with one block which is in-progress.
+ * 2. Open input stream and close the output stream.
+ * 3. Wait for block token expiration and read the data.
+ * 4. Read should be success.
+ */
+ @Test
+ public void testLastLocatedBlockTokenExpiry()
+ throws IOException, InterruptedException {
+ Configuration conf = new Configuration();
+ conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+ try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(1).build()) {
+ cluster.waitClusterUp();
+ final NameNode nn = cluster.getNameNode();
+ final BlockManager bm = nn.getNamesystem().getBlockManager();
+ final BlockTokenSecretManager sm = bm.getBlockTokenSecretManager();
+
+ // set a short token lifetime (1 second)
+ SecurityTestUtil.setBlockTokenLifetime(sm, 1000L);
+
+ DistributedFileSystem fs = cluster.getFileSystem();
+ Path p = new Path("/tmp/abc.log");
+ FSDataOutputStream out = fs.create(p);
+ byte[] data = "hello\n".getBytes(StandardCharsets.UTF_8);
+ out.write(data);
+ out.hflush();
+ FSDataInputStream in = fs.open(p);
+ out.close();
+
+ // wait for last block token to expire
+ Thread.sleep(2000L);
+
+ byte[] readData = new byte[data.length];
+ long startTime = System.currentTimeMillis();
+ in.read(readData);
+ // DFSInputStream#refetchLocations() minimum wait for 1sec to refetch
+ // complete located blocks.
+ assertTrue("Should not wait for refetch complete located blocks",
+ 1000L > (System.currentTimeMillis() - startTime));
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org