You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by um...@apache.org on 2012/04/26 22:18:01 UTC

svn commit: r1331061 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src: main/java/org/apache/hadoop/hdfs/DFSInputStream.java test/java/org/apache/hadoop/hdfs/TestFileLengthOnClusterRestart.java

Author: umamahesh
Date: Thu Apr 26 20:18:01 2012
New Revision: 1331061

URL: http://svn.apache.org/viewvc?rev=1331061&view=rev
Log:
HDFS-3222. DFSInputStream#openInfo should not silently get the length as 0 when locations length is zero for last partial block. Contributed by Uma Maheswara Rao G.

Added:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileLengthOnClusterRestart.java
Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1331061&r1=1331060&r2=1331061&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Thu Apr 26 20:18:01 2012
@@ -118,6 +118,39 @@ public class DFSInputStream extends FSIn
    * Grab the open-file info from namenode
    */
   synchronized void openInfo() throws IOException, UnresolvedLinkException {
+    lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
+    int retriesForLastBlockLength = 3;
+    while (retriesForLastBlockLength > 0) {
+      // Getting last block length as -1 is a special case. When cluster
+      // restarts, DNs may not report immediately. At this time partial block
+      // locations will not be available with NN for getting the length. Lets
+      // retry for 3 times to get the length.
+      if (lastBlockBeingWrittenLength == -1) {
+        DFSClient.LOG.warn("Last block locations not available. "
+            + "Datanodes might not have reported blocks completely."
+            + " Will retry for " + retriesForLastBlockLength + " times");
+        waitFor(4000);
+        lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
+      } else {
+        break;
+      }
+      retriesForLastBlockLength--;
+    }
+    if (retriesForLastBlockLength == 0) {
+      throw new IOException("Could not obtain the last block locations.");
+    }
+  }
+
+  private void waitFor(int waitTime) throws IOException {
+    try {
+      Thread.sleep(waitTime);
+    } catch (InterruptedException e) {
+      throw new IOException(
+          "Interrupted while getting the last block length.");
+    }
+  }
+
+  private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException {
     LocatedBlocks newInfo = DFSClient.callGetBlockLocations(dfsClient.namenode, src, 0, prefetchSize);
     if (DFSClient.LOG.isDebugEnabled()) {
       DFSClient.LOG.debug("newInfo = " + newInfo);
@@ -136,10 +169,13 @@ public class DFSInputStream extends FSIn
       }
     }
     locatedBlocks = newInfo;
-    lastBlockBeingWrittenLength = 0;
+    long lastBlockBeingWrittenLength = 0;
     if (!locatedBlocks.isLastBlockComplete()) {
       final LocatedBlock last = locatedBlocks.getLastLocatedBlock();
       if (last != null) {
+        if (last.getLocations().length == 0) {
+          return -1;
+        }
         final long len = readBlockLength(last);
         last.getBlock().setNumBytes(len);
         lastBlockBeingWrittenLength = len; 
@@ -147,13 +183,12 @@ public class DFSInputStream extends FSIn
     }
 
     currentNode = null;
+    return lastBlockBeingWrittenLength;
   }
 
   /** Read the block length from one of the datanodes. */
   private long readBlockLength(LocatedBlock locatedblock) throws IOException {
-    if (locatedblock == null || locatedblock.getLocations().length == 0) {
-      return 0;
-    }
+    assert locatedblock != null : "LocatedBlock cannot be null";
     int replicaNotFoundCount = locatedblock.getLocations().length;
     
     for(DatanodeInfo datanode : locatedblock.getLocations()) {

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileLengthOnClusterRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileLengthOnClusterRestart.java?rev=1331061&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileLengthOnClusterRestart.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileLengthOnClusterRestart.java Thu Apr 26 20:18:01 2012
@@ -0,0 +1,94 @@
+/**                                                                                                                               
+ * Licensed to the Apache Software Foundation (ASF) under one                                                                     
+ * or more contributor license agreements.  See the NOTICE file                                                                   
+ * distributed with this work for additional information                                                                          
+ * regarding copyright ownership.  The ASF licenses this file                                                                     
+ * to you under the Apache License, Version 2.0 (the                                                                              
+ * "License"); you may not use this file except in compliance                                                                     
+ * with the License.  You may obtain a copy of the License at                                                                     
+ *                                                                                                                                
+ *     http://www.apache.org/licenses/LICENSE-2.0                                                                                 
+ *                                                                                                                                
+ * Unless required by applicable law or agreed to in writing, software                                                            
+ * distributed under the License is distributed on an "AS IS" BASIS,                                                              
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.                                                       
+ * See the License for the specific language governing permissions and                                                            
+ * limitations under the License.                                                                                                 
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
+import org.junit.Assert;
+import org.junit.Test;
+
+/** Test the fileLength on cluster restarts */
+public class TestFileLengthOnClusterRestart {
+  /**
+   * Tests the fileLength when we sync the file and restart the cluster and
+   * Datanodes not report to Namenode yet.
+   */
+  @Test(timeout = 60000)
+  public void testFileLengthWithHSyncAndClusterRestartWithOutDNsRegister()
+      throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    // create cluster
+    conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 512);
+
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(2).build();
+    HdfsDataInputStream in = null;
+    try {
+      Path path = new Path(MiniDFSCluster.getBaseDirectory(), "test");
+      DistributedFileSystem dfs = (DistributedFileSystem) cluster
+          .getFileSystem();
+      FSDataOutputStream out = dfs.create(path);
+      int fileLength = 1030;
+      out.write(new byte[fileLength]);
+      out.hsync();
+      cluster.restartNameNode();
+      cluster.waitActive();
+      in = (HdfsDataInputStream) dfs.open(path, 1024);
+      // Verify the length when we just restart NN. DNs will register
+      // immediately.
+      Assert.assertEquals(fileLength, in.getVisibleLength());
+      cluster.shutdownDataNodes();
+      cluster.restartNameNode(false);
+      // This is just for ensuring NN started.
+      verifyNNIsInSafeMode(dfs);
+
+      try {
+        in = (HdfsDataInputStream) dfs.open(path);
+        Assert.fail("Expected IOException");
+      } catch (IOException e) {
+        Assert.assertEquals("Could not obtain the last block locations.", e
+            .getLocalizedMessage());
+      }
+    } finally {
+      if (null != in) {
+        in.close();
+      }
+      cluster.shutdown();
+
+    }
+  }
+
+  private void verifyNNIsInSafeMode(DistributedFileSystem dfs)
+      throws IOException {
+    while (true) {
+      try {
+        if (dfs.isInSafeMode()) {
+          return;
+        } else {
+          throw new IOException("Expected to be in SafeMode");
+        }
+      } catch (IOException e) {
+        // NN might not started completely Ignore
+      }
+    }
+  }
+}