You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ki...@apache.org on 2020/01/24 15:52:29 UTC
[hadoop] branch branch-3.2 updated: HDFS-15119. Allow expiration of
cached locations in DFSInputStream. Contributed by Ahmed Hussein.
This is an automated email from the ASF dual-hosted git repository.
kihwal pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 1bb9667 HDFS-15119. Allow expiration of cached locations in DFSInputStream. Contributed by Ahmed Hussein.
1bb9667 is described below
commit 1bb9667137bc80e63dea3e53bb2746be011a2a9a
Author: Kihwal Lee <ki...@apache.org>
AuthorDate: Fri Jan 24 09:21:09 2020 -0600
HDFS-15119. Allow expiration of cached locations in DFSInputStream.
Contributed by Ahmed Hussein.
(cherry picked from commit d10f77e3c91225f86ed9c0f0e6a9adf2e1434674)
---
.../java/org/apache/hadoop/hdfs/DFSClient.java | 4 +
.../org/apache/hadoop/hdfs/DFSInputStream.java | 92 ++++++-
.../hadoop/hdfs/client/HdfsClientConfigKeys.java | 5 +
.../hadoop/hdfs/client/impl/DfsClientConf.java | 15 ++
.../src/main/resources/hdfs-default.xml | 8 +
.../hdfs/TestDFSInputStreamBlockLocations.java | 290 +++++++++++++++++++++
6 files changed, 408 insertions(+), 6 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index fac0577..614fc68 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -837,6 +837,10 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
namenode.reportBadBlocks(blocks);
}
+ public long getRefreshReadBlkLocationsInterval() {
+ return dfsClientConf.getRefreshReadBlockLocationsMS();
+ }
+
public LocatedBlocks getLocatedBlocks(String src, long start)
throws IOException {
return getLocatedBlocks(src, start, dfsClientConf.getPrefetchSize());
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index d211e47..a4bf454 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -84,6 +84,7 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.IdentityHashStore;
import org.apache.hadoop.util.StopWatch;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
import org.apache.htrace.core.SpanId;
import com.google.common.annotations.VisibleForTesting;
@@ -133,6 +134,10 @@ public class DFSInputStream extends FSInputStream
// (it's OK to acquire this lock when the lock on <this> is held)
protected final Object infoLock = new Object();
+ // refresh locatedBlocks periodically
+ private final long refreshReadBlockIntervals;
+ /** timeStamp of the last time a block location was refreshed. */
+ private long locatedBlocksTimeStamp;
/**
* Track the ByteBuffers that we have handed out to readers.
*
@@ -149,6 +154,10 @@ public class DFSInputStream extends FSInputStream
return extendedReadBuffers;
}
+ private boolean isPeriodicRefreshEnabled() {
+ return (refreshReadBlockIntervals > 0L);
+ }
+
/**
* This variable tracks the number of failures since the start of the
* most recent user-facing operation. That is to say, it should be reset
@@ -162,7 +171,7 @@ public class DFSInputStream extends FSInputStream
*/
protected int failures = 0;
- /* XXX Use of CocurrentHashMap is temp fix. Need to fix
+ /* XXX Use of ConcurrentHashMap is temp fix. Need to fix
* parallel accesses to DFSInputStream (through ptreads) properly */
private final ConcurrentHashMap<DatanodeInfo, DatanodeInfo> deadNodes =
new ConcurrentHashMap<>();
@@ -176,6 +185,9 @@ public class DFSInputStream extends FSInputStream
DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum,
LocatedBlocks locatedBlocks) throws IOException {
this.dfsClient = dfsClient;
+ this.refreshReadBlockIntervals =
+ this.dfsClient.getRefreshReadBlkLocationsInterval();
+ setLocatedBlocksTimeStamp();
this.verifyChecksum = verifyChecksum;
this.src = src;
synchronized (infoLock) {
@@ -186,10 +198,28 @@ public class DFSInputStream extends FSInputStream
}
@VisibleForTesting
- public long getlastBlockBeingWrittenLengthForTesting() {
+ long getlastBlockBeingWrittenLengthForTesting() {
return lastBlockBeingWrittenLength;
}
+ @VisibleForTesting
+ boolean deadNodesContain(DatanodeInfo nodeInfo) {
+ return deadNodes.containsKey(nodeInfo);
+ }
+
+ @VisibleForTesting
+ void setReadTimeStampsForTesting(long timeStamp) {
+ setLocatedBlocksTimeStamp(timeStamp);
+ }
+
+ private void setLocatedBlocksTimeStamp() {
+ setLocatedBlocksTimeStamp(Time.monotonicNow());
+ }
+
+ private void setLocatedBlocksTimeStamp(long timeStamp) {
+ this.locatedBlocksTimeStamp = timeStamp;
+ }
+
/**
* Grab the open-file info from namenode
* @param refreshLocatedBlocks whether to re-fetch locatedblocks
@@ -234,6 +264,48 @@ public class DFSInputStream extends FSInputStream
}
}
+ /**
+ * Checks whether the block locations timestamps have expired.
+ * In the case of expired timestamp:
+ * - clear list of deadNodes
+ * - call openInfo(true) which will re-fetch locatedblocks
+ * - update locatedBlocksTimeStamp
+ * @return true when the expiration feature is enabled and locatedblocks
+ * timestamp has expired.
+ * @throws IOException
+ */
+ private boolean isLocatedBlocksExpired() {
+ if (!isPeriodicRefreshEnabled()) {
+ return false;
+ }
+ long now = Time.monotonicNow();
+ long elapsed = now - locatedBlocksTimeStamp;
+ if (elapsed < refreshReadBlockIntervals) {
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Update the block locations timestamps if they have expired.
+ * In the case of expired timestamp:
+ * - clear list of deadNodes
+ * - call openInfo(true) which will re-fetch locatedblocks
+ * - update locatedBlocksTimeStamp
+ * @return true when the locatedblocks list is re-fetched from the namenode.
+ * @throws IOException
+ */
+ private boolean updateBlockLocationsStamp() throws IOException {
+ if (!isLocatedBlocksExpired()) {
+ return false;
+ }
+ // clear dead nodes
+ deadNodes.clear();
+ openInfo(true);
+ setLocatedBlocksTimeStamp();
+ return true;
+ }
+
private long fetchLocatedBlocksAndGetLastBlockLength(boolean refresh)
throws IOException {
LocatedBlocks newInfo = locatedBlocks;
@@ -246,7 +318,8 @@ public class DFSInputStream extends FSInputStream
}
if (locatedBlocks != null) {
- Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator();
+ Iterator<LocatedBlock> oldIter =
+ locatedBlocks.getLocatedBlocks().iterator();
Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
while (oldIter.hasNext() && newIter.hasNext()) {
if (!oldIter.next().getBlock().equals(newIter.next().getBlock())) {
@@ -450,6 +523,7 @@ public class DFSInputStream extends FSInputStream
private LocatedBlock fetchBlockAt(long offset, long length, boolean useCache)
throws IOException {
synchronized(infoLock) {
+ updateBlockLocationsStamp();
int targetBlockIdx = locatedBlocks.findBlock(offset);
if (targetBlockIdx < 0) { // block is not cached
targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
@@ -549,7 +623,6 @@ public class DFSInputStream extends FSInputStream
if (target >= getFileLength()) {
throw new IOException("Attempted to read past end of file");
}
-
// Will be getting a new BlockReader.
closeCurrentBlockReaders();
@@ -563,9 +636,13 @@ public class DFSInputStream extends FSInputStream
boolean connectFailedOnce = false;
while (true) {
+ // Re-fetch the locatedBlocks from NN if the timestamp has expired.
+ updateBlockLocationsStamp();
+
//
// Compute desired block
//
+
LocatedBlock targetBlock = getBlockAt(target);
// update current position
@@ -765,7 +842,10 @@ public class DFSInputStream extends FSInputStream
try {
// currentNode can be left as null if previous read had a checksum
// error on the same block. See HDFS-3067
- if (pos > blockEnd || currentNode == null) {
+ // currentNode needs to be updated if the blockLocations timestamp has
+ // expired.
+ if (pos > blockEnd || currentNode == null
+ || updateBlockLocationsStamp()) {
currentNode = blockSeekTo(pos);
}
int realLen = (int) Math.min(len, (blockEnd - pos + 1L));
@@ -1504,7 +1584,7 @@ public class DFSInputStream extends FSInputStream
* the current datanode and might connect to the same node.
*/
private boolean seekToBlockSource(long targetPos)
- throws IOException {
+ throws IOException {
currentNode = blockSeekTo(targetPos);
return true;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
index 3d934b4..b596c40 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
@@ -149,6 +149,11 @@ public interface HdfsClientConfigKeys {
long DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT =
TimeUnit.DAYS.toMillis(10); // 10 days
+ // refreshing LocatedBlocks period. A value of 0 disables the feature.
+ String DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_KEY =
+ "dfs.client.refresh.read-block-locations.ms";
+ long DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_DEFAULT = 0L;
+
String DFS_DATANODE_KERBEROS_PRINCIPAL_KEY =
"dfs.datanode.kerberos.principal";
String DFS_DATANODE_READAHEAD_BYTES_KEY = "dfs.datanode.readahead.bytes";
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
index 8120998..49709f5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
@@ -133,6 +133,9 @@ public class DfsClientConf {
private final long datanodeRestartTimeout;
private final long slowIoWarningThresholdMs;
+ /** wait time window before refreshing blocklocation for inputstream. */
+ private final long refreshReadBlockLocationsMS;
+
private final ShortCircuitConf shortCircuitConf;
private final long hedgedReadThresholdMillis;
@@ -249,6 +252,11 @@ public class DfsClientConf {
DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY,
DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT);
+ refreshReadBlockLocationsMS = conf.getLong(
+ HdfsClientConfigKeys.DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_KEY,
+ HdfsClientConfigKeys.
+ DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_DEFAULT);
+
shortCircuitConf = new ShortCircuitConf(conf);
hedgedReadThresholdMillis = conf.getLong(
@@ -596,6 +604,13 @@ public class DfsClientConf {
}
/**
+ * @return the replicaAccessorBuilderClasses
+ */
+ public long getRefreshReadBlockLocationsMS() {
+ return refreshReadBlockLocationsMS;
+ }
+
+ /**
* @return the shortCircuitConf
*/
public ShortCircuitConf getShortCircuitConf() {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 2e45503..2fe095d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -2906,6 +2906,14 @@
</description>
</property>
+ <property>
+ <name>dfs.client.refresh.read-block-locations.ms</name>
+ <value>0</value>
+ <description>
+ Refreshing LocatedBlocks period. A value of 0 disables the feature.
+ </description>
+ </property>
+
<property>
<name>dfs.namenode.lease-recheck-interval-ms</name>
<value>2000</value>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStreamBlockLocations.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStreamBlockLocations.java
new file mode 100644
index 0000000..9fed914
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStreamBlockLocations.java
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.util.Time;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Test the caches expiration of the block locations.
+ */
+@RunWith(Parameterized.class)
+public class TestDFSInputStreamBlockLocations {
+ private static final int BLOCK_SIZE = 1024 * 1024;
+ private static final String[] RACKS = new String[] {
+ "/d1/r1", "/d1/r1", "/d1/r2", "/d1/r2", "/d1/r2", "/d2/r3", "/d2/r3" };
+ private static final int NUM_DATA_NODES = RACKS.length;
+ private static final short REPLICATION_FACTOR = (short) 4;
+ private final int staleInterval = 8000;
+ private final int numOfBlocks = 24;
+ private final int fileLength = numOfBlocks * BLOCK_SIZE;
+ private final int dfsClientPrefetchSize = fileLength / 2;
+ // locatedBlocks expiration set to 1 hour
+ private final long dfsInputLocationsTimeout = 60 * 60 * 1000L;
+
+ private HdfsConfiguration conf;
+ private MiniDFSCluster dfsCluster;
+ private DFSClient dfsClient;
+ private DistributedFileSystem fs;
+ private Path filePath;
+ private boolean enableBlkExpiration;
+
+ @Parameterized.Parameters(name = "{index}: CacheExpirationConfig(Enable {0})")
+ public static Collection<Object[]> getTestParameters() {
+ return Arrays.asList(new Object[][] {
+ {Boolean.TRUE},
+ {Boolean.FALSE}
+ });
+ }
+
+ public TestDFSInputStreamBlockLocations(Boolean enableExpiration) {
+ enableBlkExpiration = enableExpiration;
+ }
+
+ @Before
+ public void setup() throws IOException {
+ conf = new HdfsConfiguration();
+ conf.setBoolean(
+ DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true);
+ // set the heartbeat intervals and stale considerations
+ conf.setLong(DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
+ staleInterval);
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
+ staleInterval / 2);
+ // disable shortcircuit reading
+ conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, false);
+ // set replication factor
+ conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, REPLICATION_FACTOR);
+ // set block size and other sizes
+ conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+ conf.setLong(HdfsClientConfigKeys.Read.PREFETCH_SIZE_KEY,
+ dfsClientPrefetchSize);
+ if (enableBlkExpiration) {
+ // set the refresh locations for every dfsInputLocationsTimeout
+ conf.setLong(
+ HdfsClientConfigKeys.DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_KEY,
+ dfsInputLocationsTimeout);
+ }
+ // start the cluster and create a DFSClient
+ dfsCluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(NUM_DATA_NODES).racks(RACKS).build();
+ dfsCluster.waitActive();
+ assertEquals(NUM_DATA_NODES, dfsCluster.getDataNodes().size());
+ InetSocketAddress addr = new InetSocketAddress("localhost",
+ dfsCluster.getNameNodePort());
+ dfsClient = new DFSClient(addr, conf);
+ fs = dfsCluster.getFileSystem();
+ }
+
+ @After
+ public void teardown() throws IOException {
+ if (dfsClient != null) {
+ dfsClient.close();
+ dfsClient = null;
+ }
+ if (fs != null) {
+ fs.deleteOnExit(filePath);
+ fs.close();
+ fs = null;
+ }
+ if (dfsCluster != null) {
+ dfsCluster.shutdown();
+ dfsCluster = null;
+ }
+ }
+
+ @Test
+ public void testRead() throws Exception {
+ final String fileName = "/test_cache_locations";
+ filePath = new Path(fileName);
+ DFSInputStream fin = null;
+ FSDataOutputStream fout = null;
+ try {
+ // create a file and write for testing
+ fout = fs.create(filePath, REPLICATION_FACTOR);
+ fout.write(new byte[(fileLength)]);
+ // finalize the file by closing the output stream
+ fout.close();
+ fout = null;
+ // get the located blocks
+ LocatedBlocks referenceLocatedBlocks =
+ dfsClient.getLocatedBlocks(fileName, 0, fileLength);
+ assertEquals(numOfBlocks, referenceLocatedBlocks.locatedBlockCount());
+ String poolId = dfsCluster.getNamesystem().getBlockPoolId();
+ fin = dfsClient.open(fileName);
+ // get the located blocks from fin
+ LocatedBlocks finLocatedBlocks = fin.locatedBlocks;
+ assertEquals(dfsClientPrefetchSize / BLOCK_SIZE,
+ finLocatedBlocks.locatedBlockCount());
+ final int chunkReadSize = BLOCK_SIZE / 4;
+ byte[] readBuffer = new byte[chunkReadSize];
+ // read the first block
+ DatanodeInfo prevDNInfo = null;
+ DatanodeInfo currDNInfo = null;
+ int bytesRead = 0;
+ int firstBlockMark = BLOCK_SIZE;
+ // get the second block locations
+ LocatedBlock firstLocatedBlk =
+ fin.locatedBlocks.getLocatedBlocks().get(0);
+ DatanodeInfo[] firstBlkDNInfos = firstLocatedBlk.getLocations();
+ while (fin.getPos() < firstBlockMark) {
+ bytesRead = fin.read(readBuffer);
+ Assert.assertTrue("Unexpected number of read bytes",
+ chunkReadSize >= bytesRead);
+ if (currDNInfo == null) {
+ currDNInfo = fin.getCurrentDatanode();
+ assertNotNull("current FIS datanode is null", currDNInfo);
+ continue;
+ }
+ prevDNInfo = currDNInfo;
+ currDNInfo = fin.getCurrentDatanode();
+ assertEquals("the DFSInput stream does not read from same node",
+ prevDNInfo, currDNInfo);
+ }
+
+ assertEquals("InputStream exceeds expected position",
+ firstBlockMark, fin.getPos());
+ // get the second block locations
+ LocatedBlock secondLocatedBlk =
+ fin.locatedBlocks.getLocatedBlocks().get(1);
+ // get the nodeinfo for that block
+ DatanodeInfo[] secondBlkDNInfos = secondLocatedBlk.getLocations();
+ DatanodeInfo deadNodeInfo = secondBlkDNInfos[0];
+ // stop the datanode in the list of the
+ DataNode deadNode = getdataNodeFromHostName(dfsCluster,
+ deadNodeInfo.getHostName());
+ // Shutdown and wait for datanode to be marked dead
+ DatanodeRegistration reg = InternalDataNodeTestUtils.
+ getDNRegistrationForBP(dfsCluster.getDataNodes().get(0), poolId);
+ DataNodeProperties stoppedDNProps =
+ dfsCluster.stopDataNode(deadNodeInfo.getName());
+
+ List<DataNode> datanodesPostStoppage = dfsCluster.getDataNodes();
+ assertEquals(NUM_DATA_NODES - 1, datanodesPostStoppage.size());
+ // get the located blocks
+ LocatedBlocks afterStoppageLocatedBlocks =
+ dfsClient.getLocatedBlocks(fileName, 0, fileLength);
+ // read second block
+ int secondBlockMark = (int) (1.5 * BLOCK_SIZE);
+ boolean firstIteration = true;
+ if (this.enableBlkExpiration) {
+ // set the time stamps to make sure that we do not refresh locations yet
+ fin.setReadTimeStampsForTesting(Time.monotonicNow());
+ }
+ while (fin.getPos() < secondBlockMark) {
+ bytesRead = fin.read(readBuffer);
+ assertTrue("dead node used to read at position: " + fin.getPos(),
+ fin.deadNodesContain(deadNodeInfo));
+ Assert.assertTrue("Unexpected number of read bytes",
+ chunkReadSize >= bytesRead);
+ prevDNInfo = currDNInfo;
+ currDNInfo = fin.getCurrentDatanode();
+ assertNotEquals(deadNodeInfo, currDNInfo);
+ if (firstIteration) {
+ // currDNInfo has to be different unless first block locs is different
+ assertFalse("FSInputStream should pick a different DN",
+ firstBlkDNInfos[0].equals(deadNodeInfo)
+ && prevDNInfo.equals(currDNInfo));
+ firstIteration = false;
+ }
+ }
+ assertEquals("InputStream exceeds expected position",
+ secondBlockMark, fin.getPos());
+ // restart the dead node with the same port
+ assertTrue(dfsCluster.restartDataNode(stoppedDNProps, true));
+ dfsCluster.waitActive();
+ List<DataNode> datanodesPostRestart = dfsCluster.getDataNodes();
+ assertEquals(NUM_DATA_NODES, datanodesPostRestart.size());
+ // continue reading from block 2 again. We should read from deadNode
+ int thirdBlockMark = 2 * BLOCK_SIZE;
+ firstIteration = true;
+ while (fin.getPos() < thirdBlockMark) {
+ bytesRead = fin.read(readBuffer);
+ if (this.enableBlkExpiration) {
+ assertEquals("node is removed from deadNodes after 1st iteration",
+ firstIteration, fin.deadNodesContain(deadNodeInfo));
+ } else {
+ assertTrue(fin.deadNodesContain(deadNodeInfo));
+ }
+ Assert.assertTrue("Unexpected number of read bytes",
+ chunkReadSize >= bytesRead);
+ prevDNInfo = currDNInfo;
+ currDNInfo = fin.getCurrentDatanode();
+ if (!this.enableBlkExpiration) {
+ assertNotEquals(deadNodeInfo, currDNInfo);
+ }
+ if (firstIteration) {
+ assertEquals(prevDNInfo, currDNInfo);
+ firstIteration = false;
+ if (this.enableBlkExpiration) {
+ // reset the time stamps of located blocks to force cache expiration
+ fin.setReadTimeStampsForTesting(
+ Time.monotonicNow() - (dfsInputLocationsTimeout + 1));
+ }
+ }
+ }
+ assertEquals("InputStream exceeds expected position",
+ thirdBlockMark, fin.getPos());
+ } finally {
+ if (fout != null) {
+ fout.close();
+ }
+ if (fin != null) {
+ fin.close();
+ }
+ }
+ }
+
+ private DataNode getdataNodeFromHostName(MiniDFSCluster cluster,
+ String hostName) {
+ for (DataNode dn : cluster.getDataNodes()) {
+ if (dn.getDatanodeId().getHostName().equals(hostName)) {
+ return dn;
+ }
+ }
+ return null;
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org