You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by um...@apache.org on 2012/04/26 22:18:01 UTC
svn commit: r1331061 - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src:
main/java/org/apache/hadoop/hdfs/DFSInputStream.java
test/java/org/apache/hadoop/hdfs/TestFileLengthOnClusterRestart.java
Author: umamahesh
Date: Thu Apr 26 20:18:01 2012
New Revision: 1331061
URL: http://svn.apache.org/viewvc?rev=1331061&view=rev
Log:
HDFS-3222. DFSInputStream#openInfo should not silently get the length as 0 when locations length is zero for last partial block. Contributed by Uma Maheswara Rao G.
Added:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileLengthOnClusterRestart.java
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1331061&r1=1331060&r2=1331061&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Thu Apr 26 20:18:01 2012
@@ -118,6 +118,39 @@ public class DFSInputStream extends FSIn
* Grab the open-file info from namenode
*/
synchronized void openInfo() throws IOException, UnresolvedLinkException {
+ lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
+ int retriesForLastBlockLength = 3;
+ while (retriesForLastBlockLength > 0) {
+ // Getting last block length as -1 is a special case. When cluster
+ // restarts, DNs may not report immediately. At this time partial block
+ // locations will not be available with NN for getting the length. Lets
+ // retry for 3 times to get the length.
+ if (lastBlockBeingWrittenLength == -1) {
+ DFSClient.LOG.warn("Last block locations not available. "
+ + "Datanodes might not have reported blocks completely."
+ + " Will retry for " + retriesForLastBlockLength + " times");
+ waitFor(4000);
+ lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
+ } else {
+ break;
+ }
+ retriesForLastBlockLength--;
+ }
+ if (retriesForLastBlockLength == 0) {
+ throw new IOException("Could not obtain the last block locations.");
+ }
+ }
+
+ private void waitFor(int waitTime) throws IOException {
+ try {
+ Thread.sleep(waitTime);
+ } catch (InterruptedException e) {
+ throw new IOException(
+ "Interrupted while getting the last block length.");
+ }
+ }
+
+ private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException {
LocatedBlocks newInfo = DFSClient.callGetBlockLocations(dfsClient.namenode, src, 0, prefetchSize);
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("newInfo = " + newInfo);
@@ -136,10 +169,13 @@ public class DFSInputStream extends FSIn
}
}
locatedBlocks = newInfo;
- lastBlockBeingWrittenLength = 0;
+ long lastBlockBeingWrittenLength = 0;
if (!locatedBlocks.isLastBlockComplete()) {
final LocatedBlock last = locatedBlocks.getLastLocatedBlock();
if (last != null) {
+ if (last.getLocations().length == 0) {
+ return -1;
+ }
final long len = readBlockLength(last);
last.getBlock().setNumBytes(len);
lastBlockBeingWrittenLength = len;
@@ -147,13 +183,12 @@ public class DFSInputStream extends FSIn
}
currentNode = null;
+ return lastBlockBeingWrittenLength;
}
/** Read the block length from one of the datanodes. */
private long readBlockLength(LocatedBlock locatedblock) throws IOException {
- if (locatedblock == null || locatedblock.getLocations().length == 0) {
- return 0;
- }
+ assert locatedblock != null : "LocatedBlock cannot be null";
int replicaNotFoundCount = locatedblock.getLocations().length;
for(DatanodeInfo datanode : locatedblock.getLocations()) {
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileLengthOnClusterRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileLengthOnClusterRestart.java?rev=1331061&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileLengthOnClusterRestart.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileLengthOnClusterRestart.java Thu Apr 26 20:18:01 2012
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
+import org.junit.Assert;
+import org.junit.Test;
+
+/** Test the fileLength on cluster restarts */
+public class TestFileLengthOnClusterRestart {
+ /**
+ * Tests the fileLength when we sync the file and restart the cluster and
+ * Datanodes not report to Namenode yet.
+ */
+ @Test(timeout = 60000)
+ public void testFileLengthWithHSyncAndClusterRestartWithOutDNsRegister()
+ throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ // create cluster
+ conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 512);
+
+ final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(2).build();
+ HdfsDataInputStream in = null;
+ try {
+ Path path = new Path(MiniDFSCluster.getBaseDirectory(), "test");
+ DistributedFileSystem dfs = (DistributedFileSystem) cluster
+ .getFileSystem();
+ FSDataOutputStream out = dfs.create(path);
+ int fileLength = 1030;
+ out.write(new byte[fileLength]);
+ out.hsync();
+ cluster.restartNameNode();
+ cluster.waitActive();
+ in = (HdfsDataInputStream) dfs.open(path, 1024);
+ // Verify the length when we just restart NN. DNs will register
+ // immediately.
+ Assert.assertEquals(fileLength, in.getVisibleLength());
+ cluster.shutdownDataNodes();
+ cluster.restartNameNode(false);
+ // This is just for ensuring NN started.
+ verifyNNIsInSafeMode(dfs);
+
+ try {
+ in = (HdfsDataInputStream) dfs.open(path);
+ Assert.fail("Expected IOException");
+ } catch (IOException e) {
+ Assert.assertEquals("Could not obtain the last block locations.", e
+ .getLocalizedMessage());
+ }
+ } finally {
+ if (null != in) {
+ in.close();
+ }
+ cluster.shutdown();
+
+ }
+ }
+
+ private void verifyNNIsInSafeMode(DistributedFileSystem dfs)
+ throws IOException {
+ while (true) {
+ try {
+ if (dfs.isInSafeMode()) {
+ return;
+ } else {
+ throw new IOException("Expected to be in SafeMode");
+ }
+ } catch (IOException e) {
+ // NN might not started completely Ignore
+ }
+ }
+ }
+}