You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ki...@apache.org on 2020/01/24 15:52:29 UTC

[hadoop] branch branch-3.2 updated: HDFS-15119. Allow expiration of cached locations in DFSInputStream. Contributed by Ahmed Hussein.

This is an automated email from the ASF dual-hosted git repository.

kihwal pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 1bb9667  HDFS-15119. Allow expiration of cached locations in DFSInputStream. Contributed by Ahmed Hussein.
1bb9667 is described below

commit 1bb9667137bc80e63dea3e53bb2746be011a2a9a
Author: Kihwal Lee <ki...@apache.org>
AuthorDate: Fri Jan 24 09:21:09 2020 -0600

    HDFS-15119. Allow expiration of cached locations in DFSInputStream.
    Contributed by Ahmed Hussein.
    
    (cherry picked from commit d10f77e3c91225f86ed9c0f0e6a9adf2e1434674)
---
 .../java/org/apache/hadoop/hdfs/DFSClient.java     |   4 +
 .../org/apache/hadoop/hdfs/DFSInputStream.java     |  92 ++++++-
 .../hadoop/hdfs/client/HdfsClientConfigKeys.java   |   5 +
 .../hadoop/hdfs/client/impl/DfsClientConf.java     |  15 ++
 .../src/main/resources/hdfs-default.xml            |   8 +
 .../hdfs/TestDFSInputStreamBlockLocations.java     | 290 +++++++++++++++++++++
 6 files changed, 408 insertions(+), 6 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index fac0577..614fc68 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -837,6 +837,10 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     namenode.reportBadBlocks(blocks);
   }
 
+  public long getRefreshReadBlkLocationsInterval() {
+    return dfsClientConf.getRefreshReadBlockLocationsMS();
+  }
+
   public LocatedBlocks getLocatedBlocks(String src, long start)
       throws IOException {
     return getLocatedBlocks(src, start, dfsClientConf.getPrefetchSize());
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index d211e47..a4bf454 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -84,6 +84,7 @@ import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.IdentityHashStore;
 import org.apache.hadoop.util.StopWatch;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
 import org.apache.htrace.core.SpanId;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -133,6 +134,10 @@ public class DFSInputStream extends FSInputStream
   //       (it's OK to acquire this lock when the lock on <this> is held)
   protected final Object infoLock = new Object();
 
+  // refresh locatedBlocks periodically
+  private final long refreshReadBlockIntervals;
+  /** timeStamp of the last time a block location was refreshed. */
+  private long locatedBlocksTimeStamp;
   /**
    * Track the ByteBuffers that we have handed out to readers.
    *
@@ -149,6 +154,10 @@ public class DFSInputStream extends FSInputStream
     return extendedReadBuffers;
   }
 
+  private boolean isPeriodicRefreshEnabled() {
+    return (refreshReadBlockIntervals > 0L);
+  }
+
   /**
    * This variable tracks the number of failures since the start of the
    * most recent user-facing operation. That is to say, it should be reset
@@ -162,7 +171,7 @@ public class DFSInputStream extends FSInputStream
    */
   protected int failures = 0;
 
-  /* XXX Use of CocurrentHashMap is temp fix. Need to fix
+  /* XXX Use of ConcurrentHashMap is temp fix. Need to fix
    * parallel accesses to DFSInputStream (through ptreads) properly */
   private final ConcurrentHashMap<DatanodeInfo, DatanodeInfo> deadNodes =
              new ConcurrentHashMap<>();
@@ -176,6 +185,9 @@ public class DFSInputStream extends FSInputStream
   DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum,
       LocatedBlocks locatedBlocks) throws IOException {
     this.dfsClient = dfsClient;
+    this.refreshReadBlockIntervals =
+        this.dfsClient.getRefreshReadBlkLocationsInterval();
+    setLocatedBlocksTimeStamp();
     this.verifyChecksum = verifyChecksum;
     this.src = src;
     synchronized (infoLock) {
@@ -186,10 +198,28 @@ public class DFSInputStream extends FSInputStream
   }
 
   @VisibleForTesting
-  public long getlastBlockBeingWrittenLengthForTesting() {
+  long getlastBlockBeingWrittenLengthForTesting() {
     return lastBlockBeingWrittenLength;
   }
 
+  @VisibleForTesting
+  boolean deadNodesContain(DatanodeInfo nodeInfo) {
+    return deadNodes.containsKey(nodeInfo);
+  }
+
+  @VisibleForTesting
+  void setReadTimeStampsForTesting(long timeStamp) {
+    setLocatedBlocksTimeStamp(timeStamp);
+  }
+
+  private void setLocatedBlocksTimeStamp() {
+    setLocatedBlocksTimeStamp(Time.monotonicNow());
+  }
+
+  private void setLocatedBlocksTimeStamp(long timeStamp) {
+    this.locatedBlocksTimeStamp = timeStamp;
+  }
+
   /**
    * Grab the open-file info from namenode
    * @param refreshLocatedBlocks whether to re-fetch locatedblocks
@@ -234,6 +264,48 @@ public class DFSInputStream extends FSInputStream
     }
   }
 
+  /**
+   * Checks whether the block locations timestamps have expired.
+   * In the case of expired timestamp:
+   *    - clear list of deadNodes
+   *    - call openInfo(true) which will re-fetch locatedblocks
+   *    - update locatedBlocksTimeStamp
+   * @return true when the expiration feature is enabled and locatedblocks
+   *         timestamp has expired.
+   * @throws IOException
+   */
+  private boolean isLocatedBlocksExpired() {
+    if (!isPeriodicRefreshEnabled()) {
+      return false;
+    }
+    long now = Time.monotonicNow();
+    long elapsed = now - locatedBlocksTimeStamp;
+    if (elapsed < refreshReadBlockIntervals) {
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Update the block locations timestamps if they have expired.
+   * In the case of expired timestamp:
+   *    - clear list of deadNodes
+   *    - call openInfo(true) which will re-fetch locatedblocks
+   *    - update locatedBlocksTimeStamp
+   * @return true when the locatedblocks list is re-fetched from the namenode.
+   * @throws IOException
+   */
+  private boolean updateBlockLocationsStamp() throws IOException {
+    if (!isLocatedBlocksExpired()) {
+      return false;
+    }
+    // clear dead nodes
+    deadNodes.clear();
+    openInfo(true);
+    setLocatedBlocksTimeStamp();
+    return true;
+  }
+
   private long fetchLocatedBlocksAndGetLastBlockLength(boolean refresh)
       throws IOException {
     LocatedBlocks newInfo = locatedBlocks;
@@ -246,7 +318,8 @@ public class DFSInputStream extends FSInputStream
     }
 
     if (locatedBlocks != null) {
-      Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator();
+      Iterator<LocatedBlock> oldIter =
+          locatedBlocks.getLocatedBlocks().iterator();
       Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
       while (oldIter.hasNext() && newIter.hasNext()) {
         if (!oldIter.next().getBlock().equals(newIter.next().getBlock())) {
@@ -450,6 +523,7 @@ public class DFSInputStream extends FSInputStream
   private LocatedBlock fetchBlockAt(long offset, long length, boolean useCache)
       throws IOException {
     synchronized(infoLock) {
+      updateBlockLocationsStamp();
       int targetBlockIdx = locatedBlocks.findBlock(offset);
       if (targetBlockIdx < 0) { // block is not cached
         targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
@@ -549,7 +623,6 @@ public class DFSInputStream extends FSInputStream
     if (target >= getFileLength()) {
       throw new IOException("Attempted to read past end of file");
     }
-
     // Will be getting a new BlockReader.
     closeCurrentBlockReaders();
 
@@ -563,9 +636,13 @@ public class DFSInputStream extends FSInputStream
     boolean connectFailedOnce = false;
 
     while (true) {
+      // Re-fetch the locatedBlocks from NN if the timestamp has expired.
+      updateBlockLocationsStamp();
+
       //
       // Compute desired block
       //
+
       LocatedBlock targetBlock = getBlockAt(target);
 
       // update current position
@@ -765,7 +842,10 @@ public class DFSInputStream extends FSInputStream
         try {
           // currentNode can be left as null if previous read had a checksum
           // error on the same block. See HDFS-3067
-          if (pos > blockEnd || currentNode == null) {
+          // currentNode needs to be updated if the blockLocations timestamp has
+          // expired.
+          if (pos > blockEnd || currentNode == null
+              || updateBlockLocationsStamp()) {
             currentNode = blockSeekTo(pos);
           }
           int realLen = (int) Math.min(len, (blockEnd - pos + 1L));
@@ -1504,7 +1584,7 @@ public class DFSInputStream extends FSInputStream
    * the current datanode and might connect to the same node.
    */
   private boolean seekToBlockSource(long targetPos)
-                                                 throws IOException {
+      throws IOException {
     currentNode = blockSeekTo(targetPos);
     return true;
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
index 3d934b4..b596c40 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
@@ -149,6 +149,11 @@ public interface HdfsClientConfigKeys {
   long    DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT =
               TimeUnit.DAYS.toMillis(10); // 10 days
 
+  // refreshing LocatedBlocks period. A value of 0 disables the feature.
+  String  DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_KEY =
+      "dfs.client.refresh.read-block-locations.ms";
+  long DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_DEFAULT = 0L;
+
   String  DFS_DATANODE_KERBEROS_PRINCIPAL_KEY =
       "dfs.datanode.kerberos.principal";
   String  DFS_DATANODE_READAHEAD_BYTES_KEY = "dfs.datanode.readahead.bytes";
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
index 8120998..49709f5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
@@ -133,6 +133,9 @@ public class DfsClientConf {
   private final long datanodeRestartTimeout;
   private final long slowIoWarningThresholdMs;
 
+  /** wait time window before refreshing blocklocation for inputstream. */
+  private final long refreshReadBlockLocationsMS;
+
   private final ShortCircuitConf shortCircuitConf;
 
   private final long hedgedReadThresholdMillis;
@@ -249,6 +252,11 @@ public class DfsClientConf {
         DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY,
         DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT);
 
+    refreshReadBlockLocationsMS = conf.getLong(
+        HdfsClientConfigKeys.DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_KEY,
+        HdfsClientConfigKeys.
+            DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_DEFAULT);
+
     shortCircuitConf = new ShortCircuitConf(conf);
 
     hedgedReadThresholdMillis = conf.getLong(
@@ -596,6 +604,13 @@ public class DfsClientConf {
   }
 
   /**
+   * @return the replicaAccessorBuilderClasses
+   */
+  public long getRefreshReadBlockLocationsMS() {
+    return refreshReadBlockLocationsMS;
+  }
+
+  /**
    * @return the shortCircuitConf
    */
   public ShortCircuitConf getShortCircuitConf() {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 2e45503..2fe095d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -2906,6 +2906,14 @@
   </description>
 </property>
 
+  <property>
+    <name>dfs.client.refresh.read-block-locations.ms</name>
+    <value>0</value>
+    <description>
+      Refreshing LocatedBlocks period. A value of 0 disables the feature.
+    </description>
+  </property>
+
 <property>
   <name>dfs.namenode.lease-recheck-interval-ms</name>
   <value>2000</value>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStreamBlockLocations.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStreamBlockLocations.java
new file mode 100644
index 0000000..9fed914
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStreamBlockLocations.java
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.util.Time;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Test the caches expiration of the block locations.
+ */
+@RunWith(Parameterized.class)
+public class TestDFSInputStreamBlockLocations {
+  private static final int BLOCK_SIZE = 1024 * 1024;
+  private static final String[] RACKS = new String[] {
+      "/d1/r1", "/d1/r1", "/d1/r2", "/d1/r2", "/d1/r2", "/d2/r3", "/d2/r3" };
+  private static final int NUM_DATA_NODES = RACKS.length;
+  private static final short REPLICATION_FACTOR = (short) 4;
+  private final int staleInterval = 8000;
+  private final int numOfBlocks = 24;
+  private final int fileLength = numOfBlocks * BLOCK_SIZE;
+  private final int dfsClientPrefetchSize = fileLength / 2;
+  // locatedBlocks expiration set to 1 hour
+  private final long dfsInputLocationsTimeout = 60 * 60 * 1000L;
+
+  private HdfsConfiguration conf;
+  private MiniDFSCluster dfsCluster;
+  private DFSClient dfsClient;
+  private DistributedFileSystem fs;
+  private Path filePath;
+  private boolean enableBlkExpiration;
+
+  @Parameterized.Parameters(name = "{index}: CacheExpirationConfig(Enable {0})")
+  public static Collection<Object[]> getTestParameters() {
+    return Arrays.asList(new Object[][] {
+        {Boolean.TRUE},
+        {Boolean.FALSE}
+    });
+  }
+
+  public TestDFSInputStreamBlockLocations(Boolean enableExpiration) {
+    enableBlkExpiration = enableExpiration;
+  }
+
+  @Before
+  public void setup() throws IOException {
+    conf = new HdfsConfiguration();
+    conf.setBoolean(
+        DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true);
+    // set the heartbeat intervals and stale considerations
+    conf.setLong(DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
+        staleInterval);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
+        staleInterval / 2);
+    // disable shortcircuit reading
+    conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, false);
+    // set replication factor
+    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, REPLICATION_FACTOR);
+    // set block size and other sizes
+    conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    conf.setLong(HdfsClientConfigKeys.Read.PREFETCH_SIZE_KEY,
+        dfsClientPrefetchSize);
+    if (enableBlkExpiration) {
+      // set the refresh locations for every dfsInputLocationsTimeout
+      conf.setLong(
+          HdfsClientConfigKeys.DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_KEY,
+          dfsInputLocationsTimeout);
+    }
+    // start the cluster and create a DFSClient
+    dfsCluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(NUM_DATA_NODES).racks(RACKS).build();
+    dfsCluster.waitActive();
+    assertEquals(NUM_DATA_NODES, dfsCluster.getDataNodes().size());
+    InetSocketAddress addr = new InetSocketAddress("localhost",
+        dfsCluster.getNameNodePort());
+    dfsClient = new DFSClient(addr, conf);
+    fs = dfsCluster.getFileSystem();
+  }
+
+  @After
+  public void teardown() throws IOException {
+    if (dfsClient != null) {
+      dfsClient.close();
+      dfsClient = null;
+    }
+    if (fs != null) {
+      fs.deleteOnExit(filePath);
+      fs.close();
+      fs = null;
+    }
+    if (dfsCluster != null) {
+      dfsCluster.shutdown();
+      dfsCluster = null;
+    }
+  }
+
+  @Test
+  public void testRead() throws Exception {
+    final String fileName = "/test_cache_locations";
+    filePath = new Path(fileName);
+    DFSInputStream fin = null;
+    FSDataOutputStream fout = null;
+    try {
+      // create a file and write for testing
+      fout = fs.create(filePath, REPLICATION_FACTOR);
+      fout.write(new byte[(fileLength)]);
+      // finalize the file by closing the output stream
+      fout.close();
+      fout = null;
+      // get the located blocks
+      LocatedBlocks referenceLocatedBlocks =
+          dfsClient.getLocatedBlocks(fileName, 0, fileLength);
+      assertEquals(numOfBlocks, referenceLocatedBlocks.locatedBlockCount());
+      String poolId = dfsCluster.getNamesystem().getBlockPoolId();
+      fin = dfsClient.open(fileName);
+      // get the located blocks from fin
+      LocatedBlocks finLocatedBlocks = fin.locatedBlocks;
+      assertEquals(dfsClientPrefetchSize / BLOCK_SIZE,
+          finLocatedBlocks.locatedBlockCount());
+      final int chunkReadSize = BLOCK_SIZE / 4;
+      byte[] readBuffer = new byte[chunkReadSize];
+      // read the first block
+      DatanodeInfo prevDNInfo = null;
+      DatanodeInfo currDNInfo = null;
+      int bytesRead = 0;
+      int firstBlockMark = BLOCK_SIZE;
+      // get the second block locations
+      LocatedBlock firstLocatedBlk =
+          fin.locatedBlocks.getLocatedBlocks().get(0);
+      DatanodeInfo[] firstBlkDNInfos = firstLocatedBlk.getLocations();
+      while (fin.getPos() < firstBlockMark) {
+        bytesRead = fin.read(readBuffer);
+        Assert.assertTrue("Unexpected number of read bytes",
+            chunkReadSize >= bytesRead);
+        if (currDNInfo == null) {
+          currDNInfo = fin.getCurrentDatanode();
+          assertNotNull("current FIS datanode is null", currDNInfo);
+          continue;
+        }
+        prevDNInfo = currDNInfo;
+        currDNInfo = fin.getCurrentDatanode();
+        assertEquals("the DFSInput stream does not read from same node",
+            prevDNInfo, currDNInfo);
+      }
+
+      assertEquals("InputStream exceeds expected position",
+          firstBlockMark, fin.getPos());
+      // get the second block locations
+      LocatedBlock secondLocatedBlk =
+          fin.locatedBlocks.getLocatedBlocks().get(1);
+      // get the nodeinfo for that block
+      DatanodeInfo[] secondBlkDNInfos = secondLocatedBlk.getLocations();
+      DatanodeInfo deadNodeInfo = secondBlkDNInfos[0];
+      // stop the datanode in the list of the
+      DataNode deadNode = getdataNodeFromHostName(dfsCluster,
+          deadNodeInfo.getHostName());
+      // Shutdown and wait for datanode to be marked dead
+      DatanodeRegistration reg = InternalDataNodeTestUtils.
+          getDNRegistrationForBP(dfsCluster.getDataNodes().get(0), poolId);
+      DataNodeProperties stoppedDNProps =
+          dfsCluster.stopDataNode(deadNodeInfo.getName());
+
+      List<DataNode> datanodesPostStoppage = dfsCluster.getDataNodes();
+      assertEquals(NUM_DATA_NODES - 1, datanodesPostStoppage.size());
+      // get the located blocks
+      LocatedBlocks afterStoppageLocatedBlocks =
+          dfsClient.getLocatedBlocks(fileName, 0, fileLength);
+      // read second block
+      int secondBlockMark =  (int) (1.5 * BLOCK_SIZE);
+      boolean firstIteration = true;
+      if (this.enableBlkExpiration) {
+        // set the time stamps to make sure that we do not refresh locations yet
+        fin.setReadTimeStampsForTesting(Time.monotonicNow());
+      }
+      while (fin.getPos() < secondBlockMark) {
+        bytesRead = fin.read(readBuffer);
+        assertTrue("dead node used to read at position: " + fin.getPos(),
+            fin.deadNodesContain(deadNodeInfo));
+        Assert.assertTrue("Unexpected number of read bytes",
+            chunkReadSize >= bytesRead);
+        prevDNInfo = currDNInfo;
+        currDNInfo = fin.getCurrentDatanode();
+        assertNotEquals(deadNodeInfo, currDNInfo);
+        if (firstIteration) {
+          // currDNInfo has to be different unless first block locs is different
+          assertFalse("FSInputStream should pick a different DN",
+              firstBlkDNInfos[0].equals(deadNodeInfo)
+                  && prevDNInfo.equals(currDNInfo));
+          firstIteration = false;
+        }
+      }
+      assertEquals("InputStream exceeds expected position",
+          secondBlockMark, fin.getPos());
+      // restart the dead node with the same port
+      assertTrue(dfsCluster.restartDataNode(stoppedDNProps, true));
+      dfsCluster.waitActive();
+      List<DataNode> datanodesPostRestart = dfsCluster.getDataNodes();
+      assertEquals(NUM_DATA_NODES, datanodesPostRestart.size());
+      // continue reading from block 2 again. We should read from deadNode
+      int thirdBlockMark =  2 * BLOCK_SIZE;
+      firstIteration = true;
+      while (fin.getPos() < thirdBlockMark) {
+        bytesRead = fin.read(readBuffer);
+        if (this.enableBlkExpiration) {
+          assertEquals("node is removed from deadNodes after 1st iteration",
+              firstIteration, fin.deadNodesContain(deadNodeInfo));
+        } else {
+          assertTrue(fin.deadNodesContain(deadNodeInfo));
+        }
+        Assert.assertTrue("Unexpected number of read bytes",
+            chunkReadSize >= bytesRead);
+        prevDNInfo = currDNInfo;
+        currDNInfo = fin.getCurrentDatanode();
+        if (!this.enableBlkExpiration) {
+          assertNotEquals(deadNodeInfo, currDNInfo);
+        }
+        if (firstIteration) {
+          assertEquals(prevDNInfo, currDNInfo);
+          firstIteration = false;
+          if (this.enableBlkExpiration) {
+            // reset the time stamps of located blocks to force cache expiration
+            fin.setReadTimeStampsForTesting(
+                Time.monotonicNow() - (dfsInputLocationsTimeout + 1));
+          }
+        }
+      }
+      assertEquals("InputStream exceeds expected position",
+          thirdBlockMark, fin.getPos());
+    } finally {
+      if (fout != null) {
+        fout.close();
+      }
+      if (fin != null) {
+        fin.close();
+      }
+    }
+  }
+
+  private DataNode getdataNodeFromHostName(MiniDFSCluster cluster,
+      String hostName) {
+    for (DataNode dn : cluster.getDataNodes()) {
+      if (dn.getDatanodeId().getHostName().equals(hostName)) {
+        return dn;
+      }
+    }
+    return null;
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org