You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2018/07/09 18:26:34 UTC
[42/50] [abbrv] hadoop git commit: HDFS-13121. NPE when request file
descriptors when SC read. Contributed by Zsolt Venczel.
HDFS-13121. NPE when request file descriptors when SC read. Contributed by Zsolt Venczel.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0247cb63
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0247cb63
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0247cb63
Branch: refs/heads/HDFS-12090
Commit: 0247cb6318507afe06816e337a19f396afc53efa
Parents: 061b168
Author: Wei-Chiu Chuang <we...@apache.org>
Authored: Fri Jul 6 14:59:49 2018 -0700
Committer: Wei-Chiu Chuang <we...@apache.org>
Committed: Fri Jul 6 14:59:49 2018 -0700
----------------------------------------------------------------------
.../hdfs/client/impl/BlockReaderFactory.java | 5 ++
.../shortcircuit/TestShortCircuitCache.java | 89 ++++++++++++++++++++
2 files changed, 94 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0247cb63/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java
index 1003b95..ce43185 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java
@@ -598,6 +598,11 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
sock.recvFileInputStreams(fis, buf, 0, buf.length);
ShortCircuitReplica replica = null;
try {
+ if (fis[0] == null || fis[1] == null) {
+ throw new IOException("the datanode " + datanode + " failed to " +
+ "pass a file descriptor (might have reached open file limit).");
+ }
+
ExtendedBlockId key =
new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
if (buf[0] == USE_RECEIPT_VERIFICATION.getNumber()) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0247cb63/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
index 4e2cede..ac29c3c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
@@ -42,6 +42,10 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.ClientContext;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.PeerCache;
import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory;
import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
import org.apache.hadoop.hdfs.DFSInputStream;
@@ -50,10 +54,12 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.net.DomainPeer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.DatanodeInfoBuilder;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
@@ -66,9 +72,11 @@ import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RetriableException;
+import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Time;
@@ -819,4 +827,85 @@ public class TestShortCircuitCache {
.fetch(Mockito.eq(extendedBlockId), Mockito.any());
}
}
+
+ @Test
+ public void testRequestFileDescriptorsWhenULimit() throws Exception {
+ TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
+ Configuration conf = createShortCircuitConf(
+ "testRequestFileDescriptorsWhenULimit", sockDir);
+
+ final short replicas = 1;
+ final int fileSize = 3;
+ final String testFile = "/testfile";
+
+ try (MiniDFSCluster cluster =
+ new MiniDFSCluster.Builder(conf).numDataNodes(replicas).build()) {
+
+ cluster.waitActive();
+
+ DistributedFileSystem fs = cluster.getFileSystem();
+ DFSTestUtil.createFile(fs, new Path(testFile), fileSize, replicas, 0L);
+
+ LocatedBlock blk = new DFSClient(DFSUtilClient.getNNAddress(conf), conf)
+ .getLocatedBlocks(testFile, 0, fileSize).get(0);
+
+ ClientContext clientContext = Mockito.mock(ClientContext.class);
+ Mockito.when(clientContext.getPeerCache()).thenAnswer(
+ (Answer<PeerCache>) peerCacheCall -> {
+ PeerCache peerCache = new PeerCache(10, Long.MAX_VALUE);
+ DomainPeer peer = Mockito.spy(getDomainPeerToDn(conf));
+ peerCache.put(blk.getLocations()[0], peer);
+
+ Mockito.when(peer.getDomainSocket()).thenAnswer(
+ (Answer<DomainSocket>) domainSocketCall -> {
+ DomainSocket domainSocket = Mockito.mock(DomainSocket.class);
+ Mockito.when(domainSocket
+ .recvFileInputStreams(
+ Mockito.any(FileInputStream[].class),
+ Mockito.any(byte[].class),
+ Mockito.anyInt(),
+ Mockito.anyInt())
+ ).thenAnswer(
+ // we are mocking the FileOutputStream array with nulls
+ (Answer<Void>) recvFileInputStreamsCall -> null
+ );
+ return domainSocket;
+ }
+ );
+
+ return peerCache;
+ });
+
+ Mockito.when(clientContext.getShortCircuitCache()).thenAnswer(
+ (Answer<ShortCircuitCache>) shortCircuitCacheCall -> {
+ ShortCircuitCache cache = Mockito.mock(ShortCircuitCache.class);
+ Mockito.when(cache.allocShmSlot(
+ Mockito.any(DatanodeInfo.class),
+ Mockito.any(DomainPeer.class),
+ Mockito.any(MutableBoolean.class),
+ Mockito.any(ExtendedBlockId.class),
+ Mockito.anyString()))
+ .thenAnswer((Answer<Slot>) call -> null);
+
+ return cache;
+ }
+ );
+
+ DatanodeInfo[] nodes = blk.getLocations();
+
+ try {
+ Assert.assertNull(new BlockReaderFactory(new DfsClientConf(conf))
+ .setInetSocketAddress(NetUtils.createSocketAddr(nodes[0]
+ .getXferAddr()))
+ .setClientCacheContext(clientContext)
+ .setDatanodeInfo(blk.getLocations()[0])
+ .setBlock(blk.getBlock())
+ .setBlockToken(new Token())
+ .createShortCircuitReplicaInfo());
+ } catch (NullPointerException ex) {
+ Assert.fail("Should not throw NPE when the native library is unable " +
+ "to create new files!");
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org