You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2009/09/29 22:10:19 UTC

svn commit: r820078 - in /hadoop/hdfs/branches/HDFS-265: ./ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/server/namenode/ src/test/hdfs/org/apache/hadoop/hdfs/

Author: szetszwo
Date: Tue Sep 29 20:10:18 2009
New Revision: 820078

URL: http://svn.apache.org/viewvc?rev=820078&view=rev
Log:
HDFS-659. If the the last block is not complete, update its length with one of its replica's length stored in datanode.

Modified:
    hadoop/hdfs/branches/HDFS-265/CHANGES.txt
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java

Modified: hadoop/hdfs/branches/HDFS-265/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/CHANGES.txt?rev=820078&r1=820077&r2=820078&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-265/CHANGES.txt Tue Sep 29 20:10:18 2009
@@ -84,6 +84,9 @@
     HDFS-550. DataNode restarts may introduce corrupt/duplicated/lost replicas
     when handling detached replicas. (hairong)
 
+    HDFS-659. If the the last block is not complete, update its length with
+    one of its replica's length stored in datanode.  (szetszwo)
+
 Trunk (unreleased changes)
 
   INCOMPATIBLE CHANGES

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=820078&r1=820077&r2=820078&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java Tue Sep 29 20:10:18 2009
@@ -1661,9 +1661,16 @@
         }
       }
       this.locatedBlocks = newInfo;
-      this.lastBlockBeingWrittenLength = 
-          locatedBlocks.isLastBlockComplete()? 0:
-              readBlockLength(locatedBlocks.getLastLocatedBlock()); 
+      this.lastBlockBeingWrittenLength = 0;
+      if (!locatedBlocks.isLastBlockComplete()) {
+        final LocatedBlock last = locatedBlocks.getLastLocatedBlock();
+        if (last != null) {
+          final long len = readBlockLength(last);
+          last.getBlock().setNumBytes(len);
+          this.lastBlockBeingWrittenLength = len; 
+        }
+      }
+
       this.currentNode = null;
     }
 

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java?rev=820078&r1=820077&r2=820078&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java Tue Sep 29 20:10:18 2009
@@ -156,6 +156,7 @@
   /** {@inheritDoc} */
   public String toString() {
     return getClass().getSimpleName() + "{" + b
+        + "; getBlockSize()=" + getBlockSize()
         + "; corrupt=" + corrupt
         + "; offset=" + offset
         + "; locs=" + java.util.Arrays.asList(locs)

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java?rev=820078&r1=820077&r2=820078&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java Tue Sep 29 20:10:18 2009
@@ -329,7 +329,7 @@
     return machineSet;
   }
 
-  List<LocatedBlock> getBlockLocations(Block[] blocks, long offset,
+  List<LocatedBlock> getBlockLocations(BlockInfo[] blocks, long offset,
       long length, int nrBlocksToReturn) throws IOException {
     int curBlk = 0;
     long curPos = 0, blkSize = 0;
@@ -359,8 +359,14 @@
   }
 
   /** @return a LocatedBlock for the given block */
-  LocatedBlock getBlockLocation(final Block blk, final long pos
+  LocatedBlock getBlockLocation(final BlockInfo blk, final long pos
       ) throws IOException {
+    if (!blk.isComplete()) {
+      final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)blk;
+      final DatanodeDescriptor[] locations = uc.getExpectedLocations();
+      return namesystem.createLocatedBlock(uc, locations, pos, false);
+    }
+
     // get block locations
     final int numCorruptNodes = countNodes(blk).corruptReplicas();
     final int numCorruptReplicas = corruptReplicas.numCorruptReplicas(blk);

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=820078&r1=820077&r2=820078&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue Sep 29 20:10:18 2009
@@ -734,13 +734,8 @@
       }
 
       if (!last.isComplete()) {
-        final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)last;
-        final DatanodeDescriptor[] locations = uc.getExpectedLocations();
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("locations = " + java.util.Arrays.asList(locations));
-        }
         return new LocatedBlocks(n, inode.isUnderConstruction(), locatedblocks,
-            createLocatedBlock(uc, locations, n, false), false);
+            blockManager.getBlockLocation(last, n), false);
       }
       else {
         return new LocatedBlocks(n, inode.isUnderConstruction(), locatedblocks,

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java?rev=820078&r1=820077&r2=820078&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java Tue Sep 29 20:10:18 2009
@@ -17,13 +17,19 @@
  */
 package org.apache.hadoop.hdfs;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.log4j.Level;
 import org.junit.Assert;
 import org.junit.Test;
@@ -38,56 +44,106 @@
   private static final String DIR = "/"
       + TestReadWhileWriting.class.getSimpleName() + "/";
   private static final int BLOCK_SIZE = 8192;
-
+  
   /** Test reading while writing. */
   @Test
   public void testReadWhileWriting() throws Exception {
-    Configuration conf = new Configuration();
+    final Configuration conf = new Configuration();
+    //enable append
+    conf.setBoolean("dfs.support.append", true);
+
     // create cluster
     final MiniDFSCluster cluster = new MiniDFSCluster(conf, 3, true, null);
     try {
+      //change the lease soft limit to 1 second.
+      final long leaseSoftLimit = 1000;
+      cluster.setLeasePeriod(leaseSoftLimit, FSConstants.LEASE_HARDLIMIT_PERIOD);
+
+      //wait for the cluster
       cluster.waitActive();
       final FileSystem fs = cluster.getFileSystem();
-
-      // write to a file but not closing it.
       final Path p = new Path(DIR, "file1");
-      final FSDataOutputStream out = fs.create(p, true,
-          fs.getConf().getInt("io.file.buffer.size", 4096),
-          (short)3, BLOCK_SIZE);
-      final int size = BLOCK_SIZE/3;
-      final byte[] buffer = AppendTestUtil.randomBytes(0, size);
-      out.write(buffer, 0, size);
-      out.flush();
-      out.sync();
+      final int half = BLOCK_SIZE/2;
 
-      // able to read?
-      Assert.assertTrue(read(fs, p, size));
+      //a. On Machine M1, Create file. Write half block of data.
+      //   Invoke (DFSOutputStream).fsync() on the dfs file handle.
+      //   Do not close file yet.
+      {
+        final FSDataOutputStream out = fs.create(p, true,
+            fs.getConf().getInt("io.file.buffer.size", 4096),
+            (short)3, BLOCK_SIZE);
+        write(out, 0, half);
+
+        //hflush
+        ((DFSClient.DFSOutputStream)out.getWrappedStream()).hflush();
+      }
+
+      //b. On another machine M2, open file and verify that the half-block
+      //   of data can be read successfully.
+      checkFile(p, half, conf);
+
+      /* TODO: enable the following when append is done.
+      //c. On M1, append another half block of data.  Close file on M1.
+      {
+        //sleep to make sure the lease is expired the soft limit.
+        Thread.sleep(2*leaseSoftLimit);
+
+        FSDataOutputStream out = fs.append(p);
+        write(out, 0, half);
+        out.close();
+      }
 
-      out.close();
+      //d. On M2, open file and read 1 block of data from it. Close file.
+      checkFile(p, 2*half, conf);
+      */
     } finally {
       cluster.shutdown();
     }
   }
 
-  /** able to read? */
-  private static boolean read(FileSystem fs, Path p, int expectedsize
-      ) throws Exception {
-    //try at most 3 minutes
-    for(int i = 0; i < 360; i++) {
-      final FSDataInputStream in = fs.open(p);
-      try {
-        final int available = in.available();
-        System.out.println(i + ") in.available()=" + available);
-        Assert.assertTrue(available >= 0);
-        Assert.assertTrue(available <= expectedsize);
-        if (available == expectedsize) {
-          return true;
-        }
-      } finally {
-        in.close();
-      }
-      Thread.sleep(500);
+  static private int userCount = 0;
+  //check the file
+  static void checkFile(Path p, int expectedsize, Configuration conf
+      ) throws IOException {
+    //open the file with another user account
+    final Configuration conf2 = new Configuration(conf);
+    final String username = UserGroupInformation.getCurrentUGI().getUserName()
+        + "_" + ++userCount;
+    UnixUserGroupInformation.saveToConf(conf2,
+        UnixUserGroupInformation.UGI_PROPERTY_NAME,
+        new UnixUserGroupInformation(username, new String[]{"supergroup"}));
+    final FileSystem fs = FileSystem.get(conf2);
+    final InputStream in = fs.open(p);
+
+    //Is the data available?
+    Assert.assertTrue(available(in, expectedsize));
+
+    //Able to read?
+    for(int i = 0; i < expectedsize; i++) {
+      Assert.assertEquals((byte)i, (byte)in.read());  
+    }
+
+    in.close();
+  }
+
+  /** Write something to a file */
+  private static void write(OutputStream out, int offset, int length
+      ) throws IOException {
+    final byte[] bytes = new byte[length];
+    for(int i = 0; i < length; i++) {
+      bytes[i] = (byte)(offset + i);
     }
-    return false;
+    out.write(bytes);
+  }
+
+  /** Is the data available? */
+  private static boolean available(InputStream in, int expectedsize
+      ) throws IOException {
+    final int available = in.available();
+    System.out.println(" in.available()=" + available);
+    Assert.assertTrue(available >= 0);
+    Assert.assertTrue(available <= expectedsize);
+    return available == expectedsize;
   }
 }
+