You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2009/09/29 22:10:19 UTC
svn commit: r820078 - in /hadoop/hdfs/branches/HDFS-265: ./
src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/
src/java/org/apache/hadoop/hdfs/server/namenode/
src/test/hdfs/org/apache/hadoop/hdfs/
Author: szetszwo
Date: Tue Sep 29 20:10:18 2009
New Revision: 820078
URL: http://svn.apache.org/viewvc?rev=820078&view=rev
Log:
HDFS-659. If the the last block is not complete, update its length with one of its replica's length stored in datanode.
Modified:
hadoop/hdfs/branches/HDFS-265/CHANGES.txt
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java
Modified: hadoop/hdfs/branches/HDFS-265/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/CHANGES.txt?rev=820078&r1=820077&r2=820078&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-265/CHANGES.txt Tue Sep 29 20:10:18 2009
@@ -84,6 +84,9 @@
HDFS-550. DataNode restarts may introduce corrupt/duplicated/lost replicas
when handling detached replicas. (hairong)
+ HDFS-659. If the the last block is not complete, update its length with
+ one of its replica's length stored in datanode. (szetszwo)
+
Trunk (unreleased changes)
INCOMPATIBLE CHANGES
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=820078&r1=820077&r2=820078&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java Tue Sep 29 20:10:18 2009
@@ -1661,9 +1661,16 @@
}
}
this.locatedBlocks = newInfo;
- this.lastBlockBeingWrittenLength =
- locatedBlocks.isLastBlockComplete()? 0:
- readBlockLength(locatedBlocks.getLastLocatedBlock());
+ this.lastBlockBeingWrittenLength = 0;
+ if (!locatedBlocks.isLastBlockComplete()) {
+ final LocatedBlock last = locatedBlocks.getLastLocatedBlock();
+ if (last != null) {
+ final long len = readBlockLength(last);
+ last.getBlock().setNumBytes(len);
+ this.lastBlockBeingWrittenLength = len;
+ }
+ }
+
this.currentNode = null;
}
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java?rev=820078&r1=820077&r2=820078&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java Tue Sep 29 20:10:18 2009
@@ -156,6 +156,7 @@
/** {@inheritDoc} */
public String toString() {
return getClass().getSimpleName() + "{" + b
+ + "; getBlockSize()=" + getBlockSize()
+ "; corrupt=" + corrupt
+ "; offset=" + offset
+ "; locs=" + java.util.Arrays.asList(locs)
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java?rev=820078&r1=820077&r2=820078&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java Tue Sep 29 20:10:18 2009
@@ -329,7 +329,7 @@
return machineSet;
}
- List<LocatedBlock> getBlockLocations(Block[] blocks, long offset,
+ List<LocatedBlock> getBlockLocations(BlockInfo[] blocks, long offset,
long length, int nrBlocksToReturn) throws IOException {
int curBlk = 0;
long curPos = 0, blkSize = 0;
@@ -359,8 +359,14 @@
}
/** @return a LocatedBlock for the given block */
- LocatedBlock getBlockLocation(final Block blk, final long pos
+ LocatedBlock getBlockLocation(final BlockInfo blk, final long pos
) throws IOException {
+ if (!blk.isComplete()) {
+ final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)blk;
+ final DatanodeDescriptor[] locations = uc.getExpectedLocations();
+ return namesystem.createLocatedBlock(uc, locations, pos, false);
+ }
+
// get block locations
final int numCorruptNodes = countNodes(blk).corruptReplicas();
final int numCorruptReplicas = corruptReplicas.numCorruptReplicas(blk);
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=820078&r1=820077&r2=820078&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue Sep 29 20:10:18 2009
@@ -734,13 +734,8 @@
}
if (!last.isComplete()) {
- final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)last;
- final DatanodeDescriptor[] locations = uc.getExpectedLocations();
- if (LOG.isDebugEnabled()) {
- LOG.debug("locations = " + java.util.Arrays.asList(locations));
- }
return new LocatedBlocks(n, inode.isUnderConstruction(), locatedblocks,
- createLocatedBlock(uc, locations, n, false), false);
+ blockManager.getBlockLocation(last, n), false);
}
else {
return new LocatedBlocks(n, inode.isUnderConstruction(), locatedblocks,
Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java?rev=820078&r1=820077&r2=820078&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java Tue Sep 29 20:10:18 2009
@@ -17,13 +17,19 @@
*/
package org.apache.hadoop.hdfs;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Test;
@@ -38,56 +44,106 @@
private static final String DIR = "/"
+ TestReadWhileWriting.class.getSimpleName() + "/";
private static final int BLOCK_SIZE = 8192;
-
+
/** Test reading while writing. */
@Test
public void testReadWhileWriting() throws Exception {
- Configuration conf = new Configuration();
+ final Configuration conf = new Configuration();
+ //enable append
+ conf.setBoolean("dfs.support.append", true);
+
// create cluster
final MiniDFSCluster cluster = new MiniDFSCluster(conf, 3, true, null);
try {
+ //change the lease soft limit to 1 second.
+ final long leaseSoftLimit = 1000;
+ cluster.setLeasePeriod(leaseSoftLimit, FSConstants.LEASE_HARDLIMIT_PERIOD);
+
+ //wait for the cluster
cluster.waitActive();
final FileSystem fs = cluster.getFileSystem();
-
- // write to a file but not closing it.
final Path p = new Path(DIR, "file1");
- final FSDataOutputStream out = fs.create(p, true,
- fs.getConf().getInt("io.file.buffer.size", 4096),
- (short)3, BLOCK_SIZE);
- final int size = BLOCK_SIZE/3;
- final byte[] buffer = AppendTestUtil.randomBytes(0, size);
- out.write(buffer, 0, size);
- out.flush();
- out.sync();
+ final int half = BLOCK_SIZE/2;
- // able to read?
- Assert.assertTrue(read(fs, p, size));
+ //a. On Machine M1, Create file. Write half block of data.
+ // Invoke (DFSOutputStream).fsync() on the dfs file handle.
+ // Do not close file yet.
+ {
+ final FSDataOutputStream out = fs.create(p, true,
+ fs.getConf().getInt("io.file.buffer.size", 4096),
+ (short)3, BLOCK_SIZE);
+ write(out, 0, half);
+
+ //hflush
+ ((DFSClient.DFSOutputStream)out.getWrappedStream()).hflush();
+ }
+
+ //b. On another machine M2, open file and verify that the half-block
+ // of data can be read successfully.
+ checkFile(p, half, conf);
+
+ /* TODO: enable the following when append is done.
+ //c. On M1, append another half block of data. Close file on M1.
+ {
+ //sleep to make sure the lease is expired the soft limit.
+ Thread.sleep(2*leaseSoftLimit);
+
+ FSDataOutputStream out = fs.append(p);
+ write(out, 0, half);
+ out.close();
+ }
- out.close();
+ //d. On M2, open file and read 1 block of data from it. Close file.
+ checkFile(p, 2*half, conf);
+ */
} finally {
cluster.shutdown();
}
}
- /** able to read? */
- private static boolean read(FileSystem fs, Path p, int expectedsize
- ) throws Exception {
- //try at most 3 minutes
- for(int i = 0; i < 360; i++) {
- final FSDataInputStream in = fs.open(p);
- try {
- final int available = in.available();
- System.out.println(i + ") in.available()=" + available);
- Assert.assertTrue(available >= 0);
- Assert.assertTrue(available <= expectedsize);
- if (available == expectedsize) {
- return true;
- }
- } finally {
- in.close();
- }
- Thread.sleep(500);
+ static private int userCount = 0;
+ //check the file
+ static void checkFile(Path p, int expectedsize, Configuration conf
+ ) throws IOException {
+ //open the file with another user account
+ final Configuration conf2 = new Configuration(conf);
+ final String username = UserGroupInformation.getCurrentUGI().getUserName()
+ + "_" + ++userCount;
+ UnixUserGroupInformation.saveToConf(conf2,
+ UnixUserGroupInformation.UGI_PROPERTY_NAME,
+ new UnixUserGroupInformation(username, new String[]{"supergroup"}));
+ final FileSystem fs = FileSystem.get(conf2);
+ final InputStream in = fs.open(p);
+
+ //Is the data available?
+ Assert.assertTrue(available(in, expectedsize));
+
+ //Able to read?
+ for(int i = 0; i < expectedsize; i++) {
+ Assert.assertEquals((byte)i, (byte)in.read());
+ }
+
+ in.close();
+ }
+
+ /** Write something to a file */
+ private static void write(OutputStream out, int offset, int length
+ ) throws IOException {
+ final byte[] bytes = new byte[length];
+ for(int i = 0; i < length; i++) {
+ bytes[i] = (byte)(offset + i);
}
- return false;
+ out.write(bytes);
+ }
+
+ /** Is the data available? */
+ private static boolean available(InputStream in, int expectedsize
+ ) throws IOException {
+ final int available = in.available();
+ System.out.println(" in.available()=" + available);
+ Assert.assertTrue(available >= 0);
+ Assert.assertTrue(available <= expectedsize);
+ return available == expectedsize;
}
}
+