You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by st...@apache.org on 2009/11/28 21:06:08 UTC

svn commit: r885143 [15/18] - in /hadoop/hdfs/branches/HDFS-326: ./ .eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/ src/ant/org/apache/hadoop/ant/ src/ant/org/apache/hadoop/ant/condition/ src/c++/ src/c++/libhdfs/ src/c++/libhdfs/docs...

Modified: hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java Sat Nov 28 20:05:56 2009
@@ -22,19 +22,25 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fi.DataTransferTestUtil;
 import org.apache.hadoop.fi.FiTestUtil;
+import org.apache.hadoop.fi.DataTransferTestUtil.DataNodeAction;
 import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
+import org.apache.hadoop.fi.DataTransferTestUtil.DatanodeMarkingAction;
 import org.apache.hadoop.fi.DataTransferTestUtil.DoosAction;
+import org.apache.hadoop.fi.DataTransferTestUtil.IoeAction;
 import org.apache.hadoop.fi.DataTransferTestUtil.OomAction;
 import org.apache.hadoop.fi.DataTransferTestUtil.SleepAction;
 import org.apache.hadoop.fi.DataTransferTestUtil.VerificationAction;
 import org.apache.hadoop.fi.FiTestUtil.Action;
+import org.apache.hadoop.fi.FiTestUtil.ConstraintSatisfactionAction;
+import org.apache.hadoop.fi.FiTestUtil.MarkerConstraint;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
-
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -43,11 +49,11 @@
   static final short REPLICATION = 3;
   static final long BLOCKSIZE = 1L * (1L << 20);
 
-  static final Configuration conf = new Configuration();
+  static final Configuration conf = new HdfsConfiguration();
   static {
     conf.setInt("dfs.datanode.handler.count", 1);
     conf.setInt("dfs.replication", REPLICATION);
-    conf.setInt("dfs.socket.timeout", 5000);
+    conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 5000);
   }
 
   static private FSDataOutputStream createFile(FileSystem fs, Path p
@@ -298,4 +304,184 @@
     final String methodName = FiTestUtil.getMethodName();
     runCallReceivePacketTest(methodName, 2, new DoosAction(methodName, 2));
   }
+
+  private static void runPipelineCloseTest(String methodName,
+      Action<DatanodeID> a) throws IOException {
+    FiTestUtil.LOG.info("Running " + methodName + " ...");
+    final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
+        .initTest();
+    t.fiPipelineClose.set(a);
+    write1byte(methodName);
+  }
+
+  private static void run41_43(String name, int i) throws IOException {
+    runPipelineCloseTest(name, new SleepAction(name, i, 3000));
+  }
+
+  private static void runPipelineCloseAck(String name, int i, DataNodeAction a
+      ) throws IOException {
+    FiTestUtil.LOG.info("Running " + name + " ...");
+    final DataTransferTest t = (DataTransferTest)DataTransferTestUtil.initTest();
+    final MarkerConstraint marker = new MarkerConstraint(name);
+    t.fiPipelineClose.set(new DatanodeMarkingAction(name, i, marker));
+    t.fiPipelineAck.set(new ConstraintSatisfactionAction<DatanodeID>(a, marker));
+    write1byte(name);
+  }
+
+  private static void run39_40(String name, int i) throws IOException {
+    runPipelineCloseAck(name, i, new SleepAction(name, i, 0));
+  }
+
+  /**
+   * Pipeline close:
+   * DN1 never responses after received close ack DN2.
+   * Client gets an IOException and determine DN1 bad.
+   */
+  @Test
+  public void pipeline_Fi_39() throws IOException {
+    run39_40(FiTestUtil.getMethodName(), 1);
+  }
+
+  /**
+   * Pipeline close:
+   * DN0 never responses after received close ack DN1.
+   * Client gets an IOException and determine DN0 bad.
+   */
+  @Test
+  public void pipeline_Fi_40() throws IOException {
+    run39_40(FiTestUtil.getMethodName(), 0);
+  }
+  
+  /**
+   * Pipeline close with DN0 very slow but it won't lead to timeout.
+   * Client finishes close successfully.
+   */
+  @Test
+  public void pipeline_Fi_41() throws IOException {
+    run41_43(FiTestUtil.getMethodName(), 0);
+  }
+
+  /**
+   * Pipeline close with DN1 very slow but it won't lead to timeout.
+   * Client finishes close successfully.
+   */
+  @Test
+  public void pipeline_Fi_42() throws IOException {
+    run41_43(FiTestUtil.getMethodName(), 1);
+  }
+
+  /**
+   * Pipeline close with DN2 very slow but it won't lead to timeout.
+   * Client finishes close successfully.
+   */
+  @Test
+  public void pipeline_Fi_43() throws IOException {
+    run41_43(FiTestUtil.getMethodName(), 2);
+  }
+
+  /**
+   * Pipeline close:
+   * DN0 throws an OutOfMemoryException
+   * right after it received a close request from client.
+   * Client gets an IOException and determine DN0 bad.
+   */
+  @Test
+  public void pipeline_Fi_44() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runPipelineCloseTest(methodName, new OomAction(methodName, 0));
+  }
+
+  /**
+   * Pipeline close:
+   * DN1 throws an OutOfMemoryException
+   * right after it received a close request from client.
+   * Client gets an IOException and determine DN1 bad.
+   */
+  @Test
+  public void pipeline_Fi_45() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runPipelineCloseTest(methodName, new OomAction(methodName, 1));
+  }
+
+  /**
+   * Pipeline close:
+   * DN2 throws an OutOfMemoryException
+   * right after it received a close request from client.
+   * Client gets an IOException and determine DN2 bad.
+   */
+  @Test
+  public void pipeline_Fi_46() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runPipelineCloseTest(methodName, new OomAction(methodName, 2));
+  }
+
+  private static void run47_48(String name, int i) throws IOException {
+    runPipelineCloseAck(name, i, new OomAction(name, i));
+  }
+
+  /**
+   * Pipeline close:
+   * DN1 throws an OutOfMemoryException right after
+   * it received a close ack from DN2.
+   * Client gets an IOException and determine DN1 bad.
+   */
+  @Test
+  public void pipeline_Fi_47() throws IOException {
+    run47_48(FiTestUtil.getMethodName(), 1);
+  }
+
+  /**
+   * Pipeline close:
+   * DN0 throws an OutOfMemoryException right after
+   * it received a close ack from DN1.
+   * Client gets an IOException and determine DN0 bad.
+   */
+  @Test
+  public void pipeline_Fi_48() throws IOException {
+    run47_48(FiTestUtil.getMethodName(), 0);
+  }
+
+  private static void runBlockFileCloseTest(String methodName,
+      Action<DatanodeID> a) throws IOException {
+    FiTestUtil.LOG.info("Running " + methodName + " ...");
+    final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
+        .initTest();
+    t.fiBlockFileClose.set(a);
+    write1byte(methodName);
+  }
+
+  private static void run49_51(String name, int i) throws IOException {
+    runBlockFileCloseTest(name, new IoeAction(name, i, "DISK ERROR"));
+  }
+
+  /**
+   * Pipeline close:
+   * DN0 throws a disk error exception when it is closing the block file.
+   * Client gets an IOException and determine DN0 bad.
+   */
+  @Test
+  public void pipeline_Fi_49() throws IOException {
+    run49_51(FiTestUtil.getMethodName(), 0);
+  }
+
+
+  /**
+   * Pipeline close:
+   * DN1 throws a disk error exception when it is closing the block file.
+   * Client gets an IOException and determine DN1 bad.
+   */
+  @Test
+  public void pipeline_Fi_50() throws IOException {
+    run49_51(FiTestUtil.getMethodName(), 1);
+  }
+
+  /**
+   * Pipeline close:
+   * DN2 throws a disk error exception when it is closing the block file.
+   * Client gets an IOException and determine DN2 bad.
+   */
+  @Test
+  public void pipeline_Fi_51() throws IOException {
+    run49_51(FiTestUtil.getMethodName(), 2);
+  }
 }
\ No newline at end of file

Modified: hadoop/hdfs/branches/HDFS-326/src/test/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/test/findbugsExcludeFile.xml?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/test/findbugsExcludeFile.xml (original)
+++ hadoop/hdfs/branches/HDFS-326/src/test/findbugsExcludeFile.xml Sat Nov 28 20:05:56 2009
@@ -208,17 +208,22 @@
      </Match>
 
      <!--
-       CreateBlockWriteStreams and getTmpInputStreams are pretty much like a stream constructor.
+       getTmpInputStreams is pretty much like a stream constructor.
        The newly created streams are not supposed to be closed in the constructor. So ignore
        the OBL warning.
      -->
      <Match>
        <Class name="org.apache.hadoop.hdfs.server.datanode.FSDataset" />
-       <Or>
-         <Method name="createBlockWriteStreams" />
-         <Method name="getTmpInputStreams" />
-       </Or>
+       <Method name="getTmpInputStreams" />
        <Bug pattern="OBL_UNSATISFIED_OBLIGATION" />
      </Match>
 
+     <!--
+      ResponseProccessor is thread that is designed to catch RuntimeException.
+     -->
+     <Match>
+       <Class name="org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$DataStreamer$ResponseProcessor" />
+       <Method name="run" />
+       <Bug pattern="REC_CATCH_EXCEPTION" />
+     </Match>
  </FindBugsFilter>

Propchange: hadoop/hdfs/branches/HDFS-326/src/test/hdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Nov 28 20:05:56 2009
@@ -1,3 +1,5 @@
 /hadoop/core/branches/branch-0.19/hdfs/src/test/hdfs:713112
 /hadoop/core/trunk/src/test/hdfs:776175-785643
-/hadoop/hdfs/trunk/src/test/hdfs:804973-807690
+/hadoop/hdfs/branches/HDFS-265/src/test/hdfs:796829-820463
+/hadoop/hdfs/branches/branch-0.21/src/test/hdfs:820487
+/hadoop/hdfs/trunk/src/test/hdfs:804973-884907

Modified: hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/cli/TestHDFSCLI.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/cli/TestHDFSCLI.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/cli/TestHDFSCLI.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/cli/TestHDFSCLI.java Sat Nov 28 20:05:56 2009
@@ -23,6 +23,7 @@
 import org.apache.hadoop.cli.util.CommandExecutor.Result;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HDFSPolicyProvider;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -53,7 +54,7 @@
                        "host5", "host6", "host7", "host8" };
     dfsCluster = new MiniDFSCluster(conf, 8, true, racks, hosts);
     
-    namenode = conf.get("fs.default.name", "file:///");
+    namenode = conf.get(DFSConfigKeys.FS_DEFAULT_NAME_KEY, "file:///");
     
     username = System.getProperty("user.name");
     dfsAdmCmdExecutor = new DFSAdminCmdExecutor(namenode);

Modified: hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/fs/TestGlobPaths.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/fs/TestGlobPaths.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/fs/TestGlobPaths.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/fs/TestGlobPaths.java Sat Nov 28 20:05:56 2009
@@ -21,6 +21,7 @@
 import java.util.regex.Pattern;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 
 import junit.framework.TestCase;
@@ -48,7 +49,7 @@
   
   protected void setUp() throws Exception {
     try {
-      Configuration conf = new Configuration();
+      Configuration conf = new HdfsConfiguration();
       dfsCluster = new MiniDFSCluster(conf, 1, true, null);
       fs = FileSystem.get(conf);
     } catch (IOException e) {

Modified: hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/fs/TestUrlStreamHandler.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/fs/TestUrlStreamHandler.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/fs/TestUrlStreamHandler.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/fs/TestUrlStreamHandler.java Sat Nov 28 20:05:56 2009
@@ -28,6 +28,7 @@
 import junit.framework.TestCase;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
@@ -48,7 +49,7 @@
    */
   public void testDfsUrls() throws IOException {
 
-    Configuration conf = new Configuration();
+    Configuration conf = new HdfsConfiguration();
     MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
     FileSystem fs = cluster.getFileSystem();
 
@@ -106,7 +107,7 @@
    */
   public void testFileUrls() throws IOException, URISyntaxException {
     // URLStreamHandler is already set in JVM by testDfsUrls() 
-    Configuration conf = new Configuration();
+    Configuration conf = new HdfsConfiguration();
 
     // Locate the test temporary directory.
     File tmpDir = new File(conf.get("hadoop.tmp.dir"));

Modified: hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/fs/loadGenerator/TestLoadGenerator.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/fs/loadGenerator/TestLoadGenerator.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/fs/loadGenerator/TestLoadGenerator.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/fs/loadGenerator/TestLoadGenerator.java Sat Nov 28 20:05:56 2009
@@ -23,6 +23,8 @@
 import java.io.FileWriter;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 
 import junit.framework.TestCase;
@@ -30,7 +32,7 @@
  * This class tests if a balancer schedules tasks correctly.
  */
 public class TestLoadGenerator extends TestCase {
-  private static final Configuration CONF = new Configuration();
+  private static final Configuration CONF = new HdfsConfiguration();
   private static final int DEFAULT_BLOCK_SIZE = 10;
   private static final String OUT_DIR = 
     System.getProperty("test.build.data","build/test/data");
@@ -47,8 +49,8 @@
   
 
   static {
-    CONF.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
-    CONF.setInt("io.bytes.per.checksum", DEFAULT_BLOCK_SIZE);
+    CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
+    CONF.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE);
     CONF.setLong("dfs.heartbeat.interval", 1L);
   }
 

Modified: hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/fs/permission/TestStickyBit.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/fs/permission/TestStickyBit.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/fs/permission/TestStickyBit.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/fs/permission/TestStickyBit.java Sat Nov 28 20:05:56 2009
@@ -27,7 +27,9 @@
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UnixUserGroupInformation;
@@ -160,8 +162,8 @@
   public void testGeneralSBBehavior() throws IOException {
     MiniDFSCluster cluster = null;
     try {
-      Configuration conf = new Configuration();
-      conf.setBoolean("dfs.permissions", true);
+      Configuration conf = new HdfsConfiguration();
+      conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
       conf.setBoolean("dfs.support.append", true);
       cluster = new MiniDFSCluster(conf, 4, true, null);
 
@@ -200,8 +202,8 @@
 
     try {
       // Set up cluster for testing
-      Configuration conf = new Configuration();
-      conf.setBoolean("dfs.permissions", true);
+      Configuration conf = new HdfsConfiguration();
+      conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
       cluster = new MiniDFSCluster(conf, 4, true, null);
       FileSystem hdfs = cluster.getFileSystem();
 
@@ -246,8 +248,8 @@
   public void testStickyBitPersistence() throws IOException {
     MiniDFSCluster cluster = null;
     try {
-      Configuration conf = new Configuration();
-      conf.setBoolean("dfs.permissions", true);
+      Configuration conf = new HdfsConfiguration();
+      conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
       cluster = new MiniDFSCluster(conf, 4, true, null);
       FileSystem hdfs = cluster.getFileSystem();
 
@@ -293,7 +295,7 @@
    */
   static private FileSystem logonAs(UnixUserGroupInformation user,
       Configuration conf, FileSystem hdfs) throws IOException {
-    Configuration conf2 = new Configuration(conf);
+    Configuration conf2 = new HdfsConfiguration(conf);
     UnixUserGroupInformation.saveToConf(conf2,
         UnixUserGroupInformation.UGI_PROPERTY_NAME, user);
 

Modified: hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/AppendTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/AppendTestUtil.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/AppendTestUtil.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/AppendTestUtil.java Sat Nov 28 20:05:56 2009
@@ -31,9 +31,10 @@
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
 
 /** Utilities for append-related tests */ 
-class AppendTestUtil {
+public class AppendTestUtil {
   /** For specifying the random number generator seed,
    *  change the following value:
    */
@@ -84,10 +85,17 @@
       LOG.info("ms=" + ms, e);
     }
   }
-
-  static FileSystem createHdfsWithDifferentUsername(Configuration conf
+  
+  /**
+   * Returns the reference to a new instance of FileSystem created 
+   * with different user name
+   * @param conf current Configuration
+   * @return FileSystem instance
+   * @throws IOException
+   */
+  public static FileSystem createHdfsWithDifferentUsername(Configuration conf
       ) throws IOException {
-    Configuration conf2 = new Configuration(conf);
+    Configuration conf2 = new HdfsConfiguration(conf);
     String username = UserGroupInformation.getCurrentUGI().getUserName()+"_XXX";
     UnixUserGroupInformation.saveToConf(conf2,
         UnixUserGroupInformation.UGI_PROPERTY_NAME,
@@ -134,7 +142,7 @@
    *  Make sure to call close() on the returned stream
    *  @throws IOException an exception might be thrown
    */
-  static FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl)
+  public static FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl)
       throws IOException {
     return fileSys.create(name, true,
         fileSys.getConf().getInt("io.file.buffer.size", 4096),
@@ -146,7 +154,7 @@
    *  the specified byte[] buffer's content
    *  @throws IOException an exception might be thrown
    */
-  static void checkFullFile(FileSystem fs, Path name, int len,
+  public static void checkFullFile(FileSystem fs, Path name, int len,
                             final byte[] compareContent, String message) throws IOException {
     FSDataInputStream stm = fs.open(name);
     byte[] actual = new byte[len];

Modified: hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/BenchmarkThroughput.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/BenchmarkThroughput.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/BenchmarkThroughput.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/BenchmarkThroughput.java Sat Nov 28 20:05:56 2009
@@ -226,7 +226,7 @@
    * @param args
    */
   public static void main(String[] args) throws Exception {
-    int res = ToolRunner.run(new Configuration(),
+    int res = ToolRunner.run(new HdfsConfiguration(),
         new BenchmarkThroughput(), args);
     System.exit(res);
   }

Modified: hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java Sat Nov 28 20:05:56 2009
@@ -37,8 +37,9 @@
 import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.BlockAccessToken;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.security.AccessToken;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 
@@ -110,7 +111,7 @@
   /** create nFiles with random names and directory hierarchies
    *  with random (but reproducible) data in them.
    */
-  void createFiles(FileSystem fs, String topdir,
+  public void createFiles(FileSystem fs, String topdir,
                    short replicationFactor) throws IOException {
     files = new MyFile[nFiles];
     
@@ -155,7 +156,7 @@
   /** check if the files have been copied correctly. */
   public boolean checkFiles(FileSystem fs, String topdir) throws IOException {
     
-    //Configuration conf = new Configuration();
+    //Configuration conf = new HdfsConfiguration();
     Path root = new Path(topdir);
     
     for (int idx = 0; idx < nFiles; idx++) {
@@ -257,7 +258,7 @@
     return ((DFSClient.DFSDataInputStream) in).getAllBlocks();
   }
 
-  public static AccessToken getAccessToken(FSDataOutputStream out) {
+  public static BlockAccessToken getAccessToken(FSDataOutputStream out) {
     return ((DFSClient.DFSOutputStream) out.getWrappedStream()).getAccessToken();
   }
 
@@ -284,7 +285,7 @@
 
   static public Configuration getConfigurationWithDifferentUsername(Configuration conf
       ) throws IOException {
-    final Configuration c = new Configuration(conf);
+    final Configuration c = new HdfsConfiguration(conf);
     final UserGroupInformation ugi = UserGroupInformation.getCurrentUGI();
     final String username = ugi.getUserName()+"_XXX";
     final String[] groups = {ugi.getGroupNames()[0] + "_XXX"};

Modified: hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/DataNodeCluster.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/DataNodeCluster.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/DataNodeCluster.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/DataNodeCluster.java Sat Nov 28 20:05:56 2009
@@ -21,6 +21,7 @@
 import java.net.UnknownHostException;
 import java.security.NoSuchAlgorithmException;
 import java.security.SecureRandom;
+import java.util.Arrays;
 import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
@@ -97,7 +98,7 @@
     int numBlocksPerDNtoInject = 0;
     int replication = 1;
     
-    Configuration conf = new Configuration();
+    Configuration conf = new HdfsConfiguration();
 
     for (int i = 0; i < args.length; i++) { // parse command line
       if (args[i].equals("-n")) {
@@ -200,7 +201,7 @@
           }
           for (int i = 1; i <= replication; ++i) { 
             // inject blocks for dn_i into dn_i and replica in dn_i's neighbors 
-            mc.injectBlocks((i_dn + i- 1)% numDataNodes, blocks);
+            mc.injectBlocks((i_dn + i- 1)% numDataNodes, Arrays.asList(blocks));
             System.out.println("Injecting blocks of dn " + i_dn  + " into dn" + 
                 ((i_dn + i- 1)% numDataNodes));
           }

Modified: hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java Sat Nov 28 20:05:56 2009
@@ -19,31 +19,38 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.net.InetSocketAddress;
 import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.nio.channels.FileChannel;
 import java.util.Random;
-import java.io.RandomAccessFile;
 
 import javax.security.auth.login.LoginException;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.tools.DFSAdmin;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.security.*;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.StaticMapping;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -65,7 +72,7 @@
       this.dnArgs = args;
     }
   }
-
+  private URI myUri = null;
   private Configuration conf;
   private NameNode nameNode;
   private int numDataNodes;
@@ -74,6 +81,8 @@
   private File base_dir;
   private File data_dir;
   
+  public final static String FINALIZED_DIR_NAME = "/current/finalized/";
+  
   
   /**
    * This null constructor is used only when wishing to start a data node cluster
@@ -247,17 +256,17 @@
     
     // Setup the NameNode configuration
     FileSystem.setDefaultUri(conf, "hdfs://localhost:"+ Integer.toString(nameNodePort));
-    conf.set("dfs.http.address", "127.0.0.1:0");  
+    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "127.0.0.1:0");  
     if (manageNameDfsDirs) {
-      conf.set("dfs.name.dir", new File(base_dir, "name1").getPath()+","+
+      conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, new File(base_dir, "name1").getPath()+","+
                new File(base_dir, "name2").getPath());
-      conf.set("fs.checkpoint.dir", new File(base_dir, "namesecondary1").
+      conf.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY, new File(base_dir, "namesecondary1").
                 getPath()+"," + new File(base_dir, "namesecondary2").getPath());
     }
     
     int replication = conf.getInt("dfs.replication", 3);
     conf.setInt("dfs.replication", Math.min(replication, numDataNodes));
-    conf.setInt("dfs.safemode.extension", 0);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, 0);
     conf.setInt("dfs.namenode.decommission.interval", 3); // 3 second
     
     // Format and clean out DataNode directories
@@ -273,7 +282,7 @@
                      operation == StartupOption.FORMAT ||
                      operation == StartupOption.REGULAR) ?
       new String[] {} : new String[] {operation.getName()};
-    conf.setClass("topology.node.switch.mapping.impl", 
+    conf.setClass(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, 
                    StaticMapping.class, DNSToSwitchMapping.class);
     nameNode = NameNode.createNameNode(args, conf);
     
@@ -281,6 +290,20 @@
     startDataNodes(conf, numDataNodes, manageDataDfsDirs, 
                     operation, racks, hosts, simulatedCapacities);
     waitClusterUp();
+    String myUriStr = "hdfs://localhost:"+ Integer.toString(this.getNameNodePort());
+    try {
+      this.myUri = new URI(myUriStr);
+    } catch (URISyntaxException e) {
+      NameNode.LOG.warn("unexpected URISyntaxException: " + e );
+    }
+  }
+  
+  /**
+   * 
+   * @return URI of this MiniDFSCluster
+   */
+  public URI getURI() {
+    return myUri;
   }
 
   /**
@@ -379,7 +402,7 @@
     
     
     for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; i++) {
-      Configuration dnConf = new Configuration(conf);
+      Configuration dnConf = new HdfsConfiguration(conf);
       if (manageDfsDirs) {
         File dir1 = new File(data_dir, "data"+(2*i+1));
         File dir2 = new File(data_dir, "data"+(2*i+2));
@@ -389,7 +412,7 @@
           throw new IOException("Mkdirs failed to create directory for DataNode "
                                 + i + ": " + dir1 + " or " + dir2);
         }
-        dnConf.set("dfs.data.dir", dir1.getPath() + "," + dir2.getPath()); 
+        dnConf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dir1.getPath() + "," + dir2.getPath()); 
       }
       if (simulatedCapacities != null) {
         dnConf.setBoolean("dfs.datanode.simulateddatastorage", true);
@@ -397,11 +420,11 @@
             simulatedCapacities[i-curDatanodesNum]);
       }
       System.out.println("Starting DataNode " + i + " with dfs.data.dir: " 
-                         + dnConf.get("dfs.data.dir"));
+                         + dnConf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY));
       if (hosts != null) {
-        dnConf.set("slave.host.name", hosts[i - curDatanodesNum]);
+        dnConf.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, hosts[i - curDatanodesNum]);
         System.out.println("Starting DataNode " + i + " with hostname set to: " 
-                           + dnConf.get("slave.host.name"));
+                           + dnConf.get(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY));
       }
       if (racks != null) {
         String name = hosts[i - curDatanodesNum];
@@ -410,7 +433,7 @@
         StaticMapping.addNodeToRack(name,
                                     racks[i-curDatanodesNum]);
       }
-      Configuration newconf = new Configuration(dnConf); // save config
+      Configuration newconf = new HdfsConfiguration(dnConf); // save config
       if (hosts != null) {
         NetUtils.addStaticResolution(hosts[i - curDatanodesNum], "localhost");
       }
@@ -515,7 +538,7 @@
    * @return {@link FSNamesystem} object.
    */
   public FSNamesystem getNamesystem() {
-    return nameNode.getNamesystem();
+    return NameNodeAdapter.getNamesystem(nameNode);
   }
 
   /**
@@ -631,7 +654,7 @@
     if (i < 0 || i >= dataNodes.size())
       return false;
     for (int dn = i*2; dn < i*2+2; dn++) {
-      File blockFile = new File(dataDir, "data" + (dn+1) + "/current/" +
+      File blockFile = new File(dataDir, "data" + (dn+1) + FINALIZED_DIR_NAME +
                                 blockName);
       System.out.println("Corrupting for: " + blockFile);
       if (blockFile.exists()) {
@@ -702,7 +725,7 @@
       boolean keepPort) throws IOException {
     Configuration conf = dnprop.conf;
     String[] args = dnprop.dnArgs;
-    Configuration newconf = new Configuration(conf); // save cloned config
+    Configuration newconf = new HdfsConfiguration(conf); // save cloned config
     if (keepPort) {
       InetSocketAddress addr = dnprop.datanode.getSelfAddr();
       conf.set("dfs.datanode.address", addr.getAddress().getHostAddress() + ":"
@@ -849,7 +872,7 @@
    * @param dataNodeIndex - data node whose block report is desired - the index is same as for getDataNodes()
    * @return the block report for the specified data node
    */
-  public Block[] getBlockReport(int dataNodeIndex) {
+  public Iterable<Block> getBlockReport(int dataNodeIndex) {
     if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) {
       throw new IndexOutOfBoundsException();
     }
@@ -860,11 +883,11 @@
   /**
    * 
    * @return block reports from all data nodes
-   *    Block[] is indexed in the same order as the list of datanodes returned by getDataNodes()
+   *    BlockListAsLongs is indexed in the same order as the list of datanodes returned by getDataNodes()
    */
-  public Block[][] getAllBlockReports() {
+  public Iterable<Block>[] getAllBlockReports() {
     int numDataNodes = dataNodes.size();
-    Block[][] result = new Block[numDataNodes][];
+    Iterable<Block>[] result = new BlockListAsLongs[numDataNodes];
     for (int i = 0; i < numDataNodes; ++i) {
      result[i] = getBlockReport(i);
     }
@@ -881,7 +904,7 @@
    *             if any of blocks already exist in the data node
    *   
    */
-  public void injectBlocks(int dataNodeIndex, Block[] blocksToInject) throws IOException {
+  public void injectBlocks(int dataNodeIndex, Iterable<Block> blocksToInject) throws IOException {
     if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) {
       throw new IndexOutOfBoundsException();
     }
@@ -903,7 +926,7 @@
    *             if any of blocks already exist in the data nodes
    *             Note the rest of the blocks are not injected.
    */
-  public void injectBlocks(Block[][] blocksToInject) throws IOException {
+  public void injectBlocks(Iterable<Block>[] blocksToInject) throws IOException {
     if (blocksToInject.length >  dataNodes.size()) {
       throw new IndexOutOfBoundsException();
     }
@@ -916,7 +939,7 @@
    * Set the softLimit and hardLimit of client lease periods
    */
   void setLeasePeriod(long soft, long hard) {
-    final FSNamesystem namesystem = nameNode.getNamesystem();
+    final FSNamesystem namesystem = getNamesystem();
     namesystem.leaseManager.setLeasePeriod(soft, hard);
     namesystem.lmthread.interrupt();
   }

Modified: hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestAbandonBlock.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestAbandonBlock.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestAbandonBlock.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestAbandonBlock.java Sat Nov 28 20:05:56 2009
@@ -31,7 +31,7 @@
 public class TestAbandonBlock extends junit.framework.TestCase {
   public static final Log LOG = LogFactory.getLog(TestAbandonBlock.class);
   
-  private static final Configuration CONF = new Configuration();
+  private static final Configuration CONF = new HdfsConfiguration();
   static final String FILE_NAME_PREFIX
       = "/" + TestAbandonBlock.class.getSimpleName() + "_"; 
 
@@ -47,7 +47,7 @@
       for(int i = 0; i < 1024; i++) {
         fout.write(123);
       }
-      fout.sync();
+      fout.hflush();
   
       //try reading the block by someone
       final DFSClient dfsclient = new DFSClient(NameNode.getAddress(CONF), CONF);

Modified: hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestBlockMissingException.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestBlockMissingException.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestBlockMissingException.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestBlockMissingException.java Sat Nov 28 20:05:56 2009
@@ -53,7 +53,7 @@
     LOG.info("Test testBlockMissingException started.");
     long blockSize = 1024L;
     int numBlocks = 4;
-    conf = new Configuration();
+    conf = new HdfsConfiguration();
     try {
       dfs = new MiniDFSCluster(conf, NUM_DATANODES, true, null);
       dfs.waitActive();
@@ -123,14 +123,14 @@
    * The Data directories for a datanode
    */
   private File[] getDataNodeDirs(int i) throws IOException {
-    File base_dir = new File(System.getProperty("test.build.data"), "dfs/");
+    String base_dir = MiniDFSCluster.getBaseDirectory();
     File data_dir = new File(base_dir, "data");
     File dir1 = new File(data_dir, "data"+(2*i+1));
     File dir2 = new File(data_dir, "data"+(2*i+2));
     if (dir1.isDirectory() && dir2.isDirectory()) {
       File[] dir = new File[2];
-      dir[0] = new File(dir1, "current");
-      dir[1] = new File(dir2, "current"); 
+      dir[0] = new File(dir1, MiniDFSCluster.FINALIZED_DIR_NAME);
+      dir[1] = new File(dir2, MiniDFSCluster.FINALIZED_DIR_NAME); 
       return dir;
     }
     return new File[0];

Propchange: hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestBlockMissingException.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestBlocksScheduledCounter.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestBlocksScheduledCounter.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestBlocksScheduledCounter.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestBlocksScheduledCounter.java Sat Nov 28 20:05:56 2009
@@ -38,7 +38,7 @@
 
   public void testBlocksScheduledCounter() throws IOException {
     
-    MiniDFSCluster cluster = new MiniDFSCluster(new Configuration(), 1, 
+    MiniDFSCluster cluster = new MiniDFSCluster(new HdfsConfiguration(), 1, 
                                                 true, null);
     cluster.waitActive();
     FileSystem fs = cluster.getFileSystem();
@@ -49,7 +49,7 @@
       out.write(i);
     }
     // flush to make sure a block is allocated.
-    ((DFSOutputStream)(out.getWrappedStream())).sync();
+    ((DFSOutputStream)(out.getWrappedStream())).hflush();
     
     ArrayList<DatanodeDescriptor> dnList = new ArrayList<DatanodeDescriptor>();
     cluster.getNamesystem().DFSNodesStatus(dnList, dnList);

Modified: hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestCrcCorruption.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestCrcCorruption.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestCrcCorruption.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestCrcCorruption.java Sat Nov 28 20:05:56 2009
@@ -43,8 +43,8 @@
  *  5. Swaps two meta files, i.e the format of the meta files 
  *     are valid but their CRCs do not match with their corresponding 
  *     data blocks
- * The above tests are run for varied values of io.bytes.per.checksum 
- * and dfs.block.size. It tests for the case when the meta file is 
+ * The above tests are run for varied values of dfs.bytes-per-checksum 
+ * and dfs.blocksize. It tests for the case when the meta file is 
  * multiple blocks.
  *
  * Another portion of the test is commented out till HADOOP-1557 
@@ -90,7 +90,7 @@
       // However, a client is alowed access to this block.
       //
       File data_dir = new File(System.getProperty("test.build.data"),
-                               "dfs/data/data1/current");
+                               "dfs/data/data1" + MiniDFSCluster.FINALIZED_DIR_NAME);
       assertTrue("data directory does not exist", data_dir.exists());
       File[] blocks = data_dir.listFiles();
       assertTrue("Blocks do not exist in data-dir", (blocks != null) && (blocks.length > 0));
@@ -147,7 +147,7 @@
       // directory of the first datanode
       //
       data_dir = new File(System.getProperty("test.build.data"),
-                               "dfs/data/data2/current");
+                               "dfs/data/data2" + MiniDFSCluster.FINALIZED_DIR_NAME);
       assertTrue("data directory does not exist", data_dir.exists());
       blocks = data_dir.listFiles();
       assertTrue("Blocks do not exist in data-dir", (blocks != null) && (blocks.length > 0));
@@ -207,7 +207,7 @@
     // default parameters
     //
     System.out.println("TestCrcCorruption with default parameters");
-    Configuration conf1 = new Configuration();
+    Configuration conf1 = new HdfsConfiguration();
     conf1.setInt("dfs.blockreport.intervalMsec", 3 * 1000);
     DFSTestUtil util1 = new DFSTestUtil("TestCrcCorruption", 40, 3, 8*1024);
     thistest(conf1, util1);
@@ -216,9 +216,9 @@
     // specific parameters
     //
     System.out.println("TestCrcCorruption with specific parameters");
-    Configuration conf2 = new Configuration();
-    conf2.setInt("io.bytes.per.checksum", 17);
-    conf2.setInt("dfs.block.size", 34);
+    Configuration conf2 = new HdfsConfiguration();
+    conf2.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 17);
+    conf2.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 34);
     DFSTestUtil util2 = new DFSTestUtil("TestCrcCorruption", 40, 3, 400);
     thistest(conf2, util2);
   }

Modified: hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java Sat Nov 28 20:05:56 2009
@@ -25,6 +25,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.protocol.*;
 import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
@@ -61,14 +62,14 @@
    */
   public void testWriteTimeoutAtDataNode() throws IOException,
                                                   InterruptedException { 
-    Configuration conf = new Configuration();
+    Configuration conf = new HdfsConfiguration();
     
     final int writeTimeout = 100; //milliseconds.
     // set a very short write timeout for datanode, so that tests runs fast.
     conf.setInt("dfs.datanode.socket.write.timeout", writeTimeout); 
     // set a smaller block size
     final int blockSize = 10*1024*1024;
-    conf.setInt("dfs.block.size", blockSize);
+    conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
     conf.setInt("dfs.client.max.block.acquire.failures", 1);
     // set a small buffer size
     final int bufferSize = 4096;
@@ -136,8 +137,17 @@
       return versionID;
     }
 
-    public LocatedBlock addBlock(String src, String clientName)
-    throws IOException
+    public LocatedBlock addBlock(String src, String clientName,
+                                 Block previous) throws IOException {
+
+      return addBlock(src, clientName, previous, null);
+    }
+
+    public LocatedBlock addBlock(String src,
+                                 String clientName,
+                                 Block previous,
+                                 DatanodeInfo[] excludedNode
+                                 ) throws IOException
     {
       num_calls++;
       if (num_calls > num_calls_allowed) { 
@@ -155,7 +165,9 @@
     
     public LocatedBlocks  getBlockLocations(String src, long offset, long length) throws IOException { return null; }
     
-    public void create(String src, FsPermission masked, String clientName, EnumSetWritable<CreateFlag> flag, short replication, long blockSize) throws IOException {}
+    public FsServerDefaults getServerDefaults() throws IOException { return null; }
+    
+    public void create(String src, FsPermission masked, String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent, short replication, long blockSize) throws IOException {}
     
     public LocatedBlock append(String src, String clientName) throws IOException { return null; }
 
@@ -167,17 +179,22 @@
 
     public void abandonBlock(Block b, String src, String holder) throws IOException {}
 
-    public boolean complete(String src, String clientName) throws IOException { return false; }
+    public boolean complete(String src, String clientName, Block last) throws IOException { return false; }
 
     public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {}
 
+    @Deprecated
     public boolean rename(String src, String dst) throws IOException { return false; }
+    
+    public void concat(String trg, String[] srcs) throws IOException {  }
+
+    public void rename(String src, String dst, Rename... options) throws IOException { }
 
     public boolean delete(String src) throws IOException { return false; }
 
     public boolean delete(String src, boolean recursive) throws IOException { return false; }
 
-    public boolean mkdirs(String src, FsPermission masked) throws IOException { return false; }
+    public boolean mkdirs(String src, FsPermission masked, boolean createParent) throws IOException { return false; }
 
     public FileStatus[] getListing(String src) throws IOException { return null; }
 
@@ -213,11 +230,17 @@
 
     public void setTimes(String src, long mtime, long atime) throws IOException {}
 
+    @Override public LocatedBlock updateBlockForPipeline(Block block, 
+        String clientName) throws IOException { return null; }
+
+    @Override public void updatePipeline(String clientName, Block oldblock,
+        Block newBlock, DatanodeID[] newNodes)
+        throws IOException {}
   }
   
   public void testNotYetReplicatedErrors() throws IOException
   {   
-    Configuration conf = new Configuration();
+    Configuration conf = new HdfsConfiguration();
     
     // allow 1 retry (2 total calls)
     conf.setInt("dfs.client.block.write.locateFollowingBlock.retries", 1);

Modified: hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSFinalize.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSFinalize.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSFinalize.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSFinalize.java Sat Nov 28 20:05:56 2009
@@ -88,11 +88,11 @@
        * For now disabling block verification so that the contents are 
        * not changed.
        */
-      conf = new Configuration();
+      conf = new HdfsConfiguration();
       conf.setInt("dfs.datanode.scan.period.hours", -1);
       conf = UpgradeUtilities.initializeStorageStateConf(numDirs, conf);
-      String[] nameNodeDirs = conf.getStrings("dfs.name.dir");
-      String[] dataNodeDirs = conf.getStrings("dfs.data.dir");
+      String[] nameNodeDirs = conf.getStrings(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY);
+      String[] dataNodeDirs = conf.getStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
       
       log("Finalize with existing previous dir", numDirs);
       UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");

Modified: hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSMkdirs.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSMkdirs.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSMkdirs.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSMkdirs.java Sat Nov 28 20:05:56 2009
@@ -20,8 +20,10 @@
 import junit.framework.TestCase;
 import java.io.*;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 
 
 /**
@@ -41,7 +43,7 @@
    * not create a subdirectory off a file.
    */
   public void testDFSMkdirs() throws IOException {
-    Configuration conf = new Configuration();
+    Configuration conf = new HdfsConfiguration();
     MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
     FileSystem fileSys = cluster.getFileSystem();
     try {
@@ -73,4 +75,46 @@
       cluster.shutdown();
     }
   }
+  
+  /**
+   * Tests mkdir will not create directory when parent is missing.
+   */
+  public void testMkdir() throws IOException {
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
+    DistributedFileSystem dfs = (DistributedFileSystem) cluster.getFileSystem();
+    try {
+      // Create a dir in root dir, should succeed
+      assertTrue(dfs.mkdir(new Path("/mkdir-" + System.currentTimeMillis()),
+          FsPermission.getDefault()));
+      // Create a dir when parent dir exists as a file, should fail
+      IOException expectedException = null;
+      String filePath = "/mkdir-file-" + System.currentTimeMillis();
+      writeFile(dfs, new Path(filePath));
+      try {
+        dfs.mkdir(new Path(filePath + "/mkdir"), FsPermission.getDefault());
+      } catch (IOException e) {
+        expectedException = e;
+      }
+      assertTrue("Create a directory when parent dir exists as file using"
+          + " mkdir() should throw FileAlreadyExistsException ",
+          expectedException != null
+              && expectedException instanceof FileAlreadyExistsException);
+      // Create a dir in a non-exist directory, should fail
+      expectedException = null;
+      try {
+        dfs.mkdir(new Path("/non-exist/mkdir-" + System.currentTimeMillis()),
+            FsPermission.getDefault());
+      } catch (IOException e) {
+        expectedException = e;
+      }
+      assertTrue("Create a directory in a non-exist parent dir using"
+          + " mkdir() should throw FileNotFoundException ",
+          expectedException != null
+              && expectedException instanceof FileNotFoundException);
+    } finally {
+      dfs.close();
+      cluster.shutdown();
+    }
+  }
 }

Modified: hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSPermission.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSPermission.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSPermission.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSPermission.java Sat Nov 28 20:05:56 2009
@@ -38,7 +38,7 @@
 /** Unit tests for permission */
 public class TestDFSPermission extends TestCase {
   public static final Log LOG = LogFactory.getLog(TestDFSPermission.class);
-  final private static Configuration conf = new Configuration();
+  final private static Configuration conf = new HdfsConfiguration();
   
   final private static String GROUP1_NAME = "group1";
   final private static String GROUP2_NAME = "group2";
@@ -79,7 +79,7 @@
       LOG.info("NUM_TEST_PERMISSIONS=" + NUM_TEST_PERMISSIONS);
       
       // explicitly turn on permission checking
-      conf.setBoolean("dfs.permissions", true);
+      conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
       
       // Initiate all four users
       SUPERUSER = UnixUserGroupInformation.login(conf);
@@ -154,8 +154,8 @@
   /* create a file/directory with the given umask and permission */
   private void create(OpType op, Path name, short umask, 
       FsPermission permission) throws IOException {
-    // set umask in configuration
-    conf.setInt(FsPermission.UMASK_LABEL, umask);
+    // set umask in configuration, converting to padded octal
+    conf.set(FsPermission.UMASK_LABEL, String.format("%1$03o", umask));
 
     // create the file/directory
     switch (op) {

Modified: hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSRename.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSRename.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSRename.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSRename.java Sat Nov 28 20:05:56 2009
@@ -38,8 +38,14 @@
     }
   }
 
+  static void createFile(FileSystem fs, Path f) throws IOException {
+    DataOutputStream a_out = fs.create(f);
+    a_out.writeBytes("something");
+    a_out.close();
+  }
+  
   public void testRename() throws Exception {
-    Configuration conf = new Configuration();
+    Configuration conf = new HdfsConfiguration();
     MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
     try {
       FileSystem fs = cluster.getFileSystem();
@@ -50,9 +56,7 @@
         Path aa = new Path(dir, "aa");
         Path b = new Path(dir, "b");
   
-        DataOutputStream a_out = fs.create(a);
-        a_out.writeBytes("something");
-        a_out.close();
+        createFile(fs, a);
         
         //should not have any lease
         assertEquals(0, countLease(cluster)); 
@@ -78,18 +82,37 @@
         assertFalse(fs.exists(dstPath));
         assertFalse(fs.rename(dir, dstPath));
       }
-
-      { // test rename /a/b to /a/b/c
+      
+      { // dst cannot be a file or directory under src
+        // test rename /a/b/foo to /a/b/c
         Path src = new Path("/a/b");
         Path dst = new Path("/a/b/c");
 
-        DataOutputStream a_out = fs.create(new Path(src, "foo"));
-        a_out.writeBytes("something");
-        a_out.close();
+        createFile(fs, new Path(src, "foo"));
+        
+        // dst cannot be a file under src
+        assertFalse(fs.rename(src, dst)); 
         
-        assertFalse(fs.rename(src, dst));
+        // dst cannot be a directory under src
+        assertFalse(fs.rename(src.getParent(), dst.getParent())); 
       }
       
+      { // dst can start with src, if it is not a directory or file under src
+        // test rename /test /testfile
+        Path src = new Path("/testPrefix");
+        Path dst = new Path("/testPrefixfile");
+
+        createFile(fs, src);
+        assertTrue(fs.rename(src, dst));
+      }
+      
+      { // dst should not be same as src test rename /a/b/c to /a/b/c
+        Path src = new Path("/a/b/c");
+        createFile(fs, src);
+        assertTrue(fs.rename(src, src));
+        assertFalse(fs.rename(new Path("/a/b"), new Path("/a/b/")));
+        assertTrue(fs.rename(src, new Path("/a/b/c/")));
+      }
       fs.delete(dir, true);
     } finally {
       if (cluster != null) {cluster.shutdown();}

Modified: hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSRollback.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSRollback.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSRollback.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSRollback.java Sat Nov 28 20:05:56 2009
@@ -121,11 +121,11 @@
     UpgradeUtilities.initialize();
     
     for (int numDirs = 1; numDirs <= 2; numDirs++) {
-      conf = new Configuration();
+      conf = new HdfsConfiguration();
       conf.setInt("dfs.datanode.scan.period.hours", -1);      
       conf = UpgradeUtilities.initializeStorageStateConf(numDirs, conf);
-      String[] nameNodeDirs = conf.getStrings("dfs.name.dir");
-      String[] dataNodeDirs = conf.getStrings("dfs.data.dir");
+      String[] nameNodeDirs = conf.getStrings(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY);
+      String[] dataNodeDirs = conf.getStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
       
       log("Normal NameNode rollback", numDirs);
       UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");

Modified: hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java Sat Nov 28 20:05:56 2009
@@ -93,7 +93,7 @@
   }
 
   public void testZeroSizeFile() throws IOException {
-    Configuration conf = new Configuration();
+    Configuration conf = new HdfsConfiguration();
     MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
     FileSystem fs = cluster.getFileSystem();
     assertTrue("Not a HDFS: "+fs.getUri(),
@@ -135,7 +135,7 @@
   }
   
   public void testRecrusiveRm() throws IOException {
-	  Configuration conf = new Configuration();
+	  Configuration conf = new HdfsConfiguration();
 	  MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
 	  FileSystem fs = cluster.getFileSystem();
 	  assertTrue("Not a HDFS: " + fs.getUri(), 
@@ -160,7 +160,7 @@
   }
     
   public void testDu() throws IOException {
-    Configuration conf = new Configuration();
+    Configuration conf = new HdfsConfiguration();
     MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
     FileSystem fs = cluster.getFileSystem();
     assertTrue("Not a HDFS: "+fs.getUri(),
@@ -209,7 +209,7 @@
                                   
   }
   public void testPut() throws IOException {
-    Configuration conf = new Configuration();
+    Configuration conf = new HdfsConfiguration();
     MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
     FileSystem fs = cluster.getFileSystem();
     assertTrue("Not a HDFS: "+fs.getUri(),
@@ -305,7 +305,7 @@
 
   /** check command error outputs and exit statuses. */
   public void testErrOutPut() throws Exception {
-    Configuration conf = new Configuration();
+    Configuration conf = new HdfsConfiguration();
     MiniDFSCluster cluster = null;
     PrintStream bak = null;
     try {
@@ -447,8 +447,8 @@
   }
   
   public void testURIPaths() throws Exception {
-    Configuration srcConf = new Configuration();
-    Configuration dstConf = new Configuration();
+    Configuration srcConf = new HdfsConfiguration();
+    Configuration dstConf = new HdfsConfiguration();
     MiniDFSCluster srcCluster =  null;
     MiniDFSCluster dstCluster = null;
     String bak = System.getProperty("test.build.data");
@@ -539,7 +539,7 @@
   }
 
   public void testText() throws Exception {
-    Configuration conf = new Configuration();
+    Configuration conf = new HdfsConfiguration();
     MiniDFSCluster cluster = null;
     PrintStream bak = null;
     try {
@@ -583,7 +583,7 @@
   }
 
   public void testCopyToLocal() throws IOException {
-    Configuration conf = new Configuration();
+    Configuration conf = new HdfsConfiguration();
     MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
     FileSystem fs = cluster.getFileSystem();
     assertTrue("Not a HDFS: "+fs.getUri(),
@@ -680,7 +680,7 @@
   }
 
   public void testCount() throws Exception {
-    Configuration conf = new Configuration();
+    Configuration conf = new HdfsConfiguration();
     MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
     DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
     FsShell shell = new FsShell();
@@ -836,14 +836,14 @@
   }
   
   public void testFilePermissions() throws IOException {
-    Configuration conf = new Configuration();
+    Configuration conf = new HdfsConfiguration();
     
     //test chmod on local fs
     FileSystem fs = FileSystem.getLocal(conf);
     testChmod(conf, fs, 
               (new File(TEST_ROOT_DIR, "chmodTest")).getAbsolutePath());
     
-    conf.set("dfs.permissions", "true");
+    conf.set(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, "true");
     
     //test chmod on DFS
     MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
@@ -901,7 +901,7 @@
    * Tests various options of DFSShell.
    */
   public void testDFSShell() throws IOException {
-    Configuration conf = new Configuration();
+    Configuration conf = new HdfsConfiguration();
     /* This tests some properties of ChecksumFileSystem as well.
      * Make sure that we create ChecksumDFS */
     MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
@@ -1094,7 +1094,7 @@
   static List<File> getBlockFiles(MiniDFSCluster cluster) throws IOException {
     List<File> files = new ArrayList<File>();
     List<DataNode> datanodes = cluster.getDataNodes();
-    Block[][] blocks = cluster.getAllBlockReports();
+    Iterable<Block>[] blocks = cluster.getAllBlockReports();
     for(int i = 0; i < blocks.length; i++) {
       FSDataset ds = (FSDataset)datanodes.get(i).getFSDataset();
       for(Block b : blocks[i]) {
@@ -1127,7 +1127,7 @@
     MiniDFSCluster dfs = null;
     PrintStream bak = null;
     try {
-      Configuration conf = new Configuration();
+      Configuration conf = new HdfsConfiguration();
       dfs = new MiniDFSCluster(conf, 2, true, null);
       FileSystem fs = dfs.getFileSystem();
       Path p = new Path("/foo");
@@ -1160,7 +1160,7 @@
   
   public void testGet() throws IOException {
     DFSTestUtil.setLogLevel2All(FSInputChecker.LOG);
-    final Configuration conf = new Configuration();
+    final Configuration conf = new HdfsConfiguration();
     MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
     DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
 
@@ -1218,7 +1218,7 @@
   }
 
   public void testLsr() throws Exception {
-    Configuration conf = new Configuration();
+    Configuration conf = new HdfsConfiguration();
     MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
     DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
 

Modified: hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShellGenericOptions.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShellGenericOptions.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShellGenericOptions.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShellGenericOptions.java Sat Nov 28 20:05:56 2009
@@ -37,7 +37,7 @@
     String namenode = null;
     MiniDFSCluster cluster = null;
     try {
-      Configuration conf = new Configuration();
+      Configuration conf = new HdfsConfiguration();
       cluster = new MiniDFSCluster(conf, 1, true, null);
       namenode = FileSystem.getDefaultUri(conf).toString();
       String [] args = new String[4];
@@ -70,7 +70,7 @@
                "<?xml-stylesheet type=\"text/xsl\" href=\"configuration.xsl\"?>\n"+
                "<configuration>\n"+
                " <property>\n"+
-               "   <name>fs.default.name</name>\n"+
+               "   <name>fs.defaultFS</name>\n"+
                "   <value>"+namenode+"</value>\n"+
                " </property>\n"+
                "</configuration>\n");
@@ -91,7 +91,7 @@
   private void testPropertyOption(String[] args, String namenode) {
     // prepare arguments to create a directory /data
     args[0] = "-D";
-    args[1] = "fs.default.name="+namenode;
+    args[1] = "fs.defaultFS="+namenode;
     execute(args, namenode);        
   }
     

Modified: hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSStartupVersions.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSStartupVersions.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSStartupVersions.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSStartupVersions.java Sat Nov 28 20:05:56 2009
@@ -169,10 +169,10 @@
   public void testVersions() throws Exception {
     UpgradeUtilities.initialize();
     Configuration conf = UpgradeUtilities.initializeStorageStateConf(1, 
-                                                      new Configuration());
+                                                      new HdfsConfiguration());
     StorageInfo[] versions = initializeVersions();
     UpgradeUtilities.createStorageDirs(
-                                       NAME_NODE, conf.getStrings("dfs.name.dir"), "current");
+                                       NAME_NODE, conf.getStrings(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY), "current");
     cluster = new MiniDFSCluster(conf, 0, StartupOption.REGULAR);
     StorageInfo nameNodeVersion = new StorageInfo(
                                                   UpgradeUtilities.getCurrentLayoutVersion(),
@@ -181,7 +181,7 @@
     log("NameNode version info", NAME_NODE, null, nameNodeVersion);
     for (int i = 0; i < versions.length; i++) {
       File[] storage = UpgradeUtilities.createStorageDirs(
-                                                          DATA_NODE, conf.getStrings("dfs.data.dir"), "current");
+                                                          DATA_NODE, conf.getStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY), "current");
       log("DataNode version info", DATA_NODE, i, versions[i]);
       UpgradeUtilities.createVersionFile(DATA_NODE, storage, versions[i]);
       try {

Modified: hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSStorageStateRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSStorageStateRecovery.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSStorageStateRecovery.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSStorageStateRecovery.java Sat Nov 28 20:05:56 2009
@@ -111,8 +111,8 @@
    */
   String[] createStorageState(NodeType nodeType, boolean[] state) throws Exception {
     String[] baseDirs = (nodeType == NAME_NODE ?
-                         conf.getStrings("dfs.name.dir") :
-                         conf.getStrings("dfs.data.dir"));
+                         conf.getStrings(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY) :
+                         conf.getStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY));
     UpgradeUtilities.createEmptyDirs(baseDirs);
     if (state[0])  // current
       UpgradeUtilities.createStorageDirs(nodeType, baseDirs, "current");
@@ -179,7 +179,7 @@
     UpgradeUtilities.initialize();
 
     for (int numDirs = 1; numDirs <= 2; numDirs++) {
-      conf = new Configuration();
+      conf = new HdfsConfiguration();
       conf.setInt("dfs.datanode.scan.period.hours", -1);      
       conf = UpgradeUtilities.initializeStorageStateConf(numDirs, conf);
       for (int i = 0; i < testCases.length; i++) {

Modified: hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgrade.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgrade.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgrade.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgrade.java Sat Nov 28 20:05:56 2009
@@ -128,11 +128,11 @@
     UpgradeUtilities.initialize();
     
     for (int numDirs = 1; numDirs <= 2; numDirs++) {
-      conf = new Configuration();
+      conf = new HdfsConfiguration();
       conf.setInt("dfs.datanode.scan.period.hours", -1);      
       conf = UpgradeUtilities.initializeStorageStateConf(numDirs, conf);
-      String[] nameNodeDirs = conf.getStrings("dfs.name.dir");
-      String[] dataNodeDirs = conf.getStrings("dfs.data.dir");
+      String[] nameNodeDirs = conf.getStrings(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY);
+      String[] dataNodeDirs = conf.getStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
       
       log("Normal NameNode upgrade", numDirs);
       UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");

Modified: hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java Sat Nov 28 20:05:56 2009
@@ -177,7 +177,7 @@
   public void testUpgradeFromImage() throws IOException {
     MiniDFSCluster cluster = null;
     try {
-      Configuration conf = new Configuration();
+      Configuration conf = new HdfsConfiguration();
       if (System.getProperty("test.build.data") == null) { // to allow test to be run outside of Ant
         System.setProperty("test.build.data", "build/test/data");
       }

Modified: hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java Sat Nov 28 20:05:56 2009
@@ -41,16 +41,21 @@
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSClient.DFSOutputStream;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.security.BlockAccessToken;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.AccessToken;
 import org.apache.hadoop.util.DataChecksum;
+import org.junit.Test;
 
 /**
  * This tests data transfer protocol handling in the Datanode. It sends
@@ -94,6 +99,7 @@
       
       DataInputStream in = new DataInputStream(sock.getInputStream());
       out.write(sendBuf.toByteArray());
+      out.flush();
       try {
         in.readFully(retBuf);
       } catch (EOFException eof) {
@@ -137,15 +143,184 @@
     in.readFully(arr);
   }
   
-  public void testDataTransferProtocol() throws IOException {
+  private void writeZeroLengthPacket(Block block, String description)
+  throws IOException {
+    sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
+    sendOut.writeInt(512);         // checksum size
+    sendOut.writeInt(8);           // size of packet
+    sendOut.writeLong(block.getNumBytes());          // OffsetInBlock
+    sendOut.writeLong(100);        // sequencenumber
+    sendOut.writeBoolean(true);    // lastPacketInBlock
+
+    sendOut.writeInt(0);           // chunk length
+    sendOut.writeInt(0);           // zero checksum
+        
+    //ok finally write a block with 0 len
+    SUCCESS.write(recvOut);
+    Text.writeString(recvOut, ""); // first bad node
+    recvOut.writeLong(100);        // sequencenumber
+    SUCCESS.write(recvOut);
+    sendRecvData(description, false);
+  }
+  
+  private void testWrite(Block block, BlockConstructionStage stage, long newGS,
+      String description, Boolean eofExcepted) throws IOException {
+    sendBuf.reset();
+    recvBuf.reset();
+    DataTransferProtocol.Sender.opWriteBlock(sendOut, 
+        block.getBlockId(), block.getGenerationStamp(), 0,
+        stage, newGS, block.getNumBytes(), block.getNumBytes(), "cl", null,
+        new DatanodeInfo[1], BlockAccessToken.DUMMY_TOKEN);
+    if (eofExcepted) {
+      ERROR.write(recvOut);
+      sendRecvData(description, true);
+    } else if (stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
+      //ok finally write a block with 0 len
+      SUCCESS.write(recvOut);
+      Text.writeString(recvOut, ""); // first bad node
+      sendRecvData(description, false);
+    } else {
+      writeZeroLengthPacket(block, description);
+    }
+  }
+  
+  @Test public void testOpWrite() throws IOException {
+    int numDataNodes = 1;
+    Configuration conf = new HdfsConfiguration();
+    conf.setBoolean("dfs.support.append", true);
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, numDataNodes, true, null);
+    try {
+      cluster.waitActive();
+      datanode = cluster.getDataNodes().get(0).dnRegistration;
+      dnAddr = NetUtils.createSocketAddr(datanode.getName());
+      FileSystem fileSys = cluster.getFileSystem();
+
+      /* Test writing to finalized replicas */
+      Path file = new Path("dataprotocol.dat");    
+      DFSTestUtil.createFile(fileSys, file, 1L, (short)numDataNodes, 0L);
+      // get the first blockid for the file
+      Block firstBlock = DFSTestUtil.getFirstBlock(fileSys, file);
+      // test PIPELINE_SETUP_CREATE on a finalized block
+      testWrite(firstBlock, BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L,
+          "Cannot create an existing block", true);
+      // test PIPELINE_DATA_STREAMING on a finalized block
+      testWrite(firstBlock, BlockConstructionStage.DATA_STREAMING, 0L,
+          "Unexpected stage", true);
+      // test PIPELINE_SETUP_STREAMING_RECOVERY on an existing block
+      long newGS = firstBlock.getGenerationStamp() + 1;
+      testWrite(firstBlock, 
+          BlockConstructionStage.PIPELINE_SETUP_STREAMING_RECOVERY, 
+          newGS, "Cannot recover data streaming to a finalized replica", true);
+      // test PIPELINE_SETUP_APPEND on an existing block
+      newGS = firstBlock.getGenerationStamp() + 1;
+      testWrite(firstBlock, 
+          BlockConstructionStage.PIPELINE_SETUP_APPEND,
+          newGS, "Append to a finalized replica", false);
+      firstBlock.setGenerationStamp(newGS);
+      // test PIPELINE_SETUP_APPEND_RECOVERY on an existing block
+      file = new Path("dataprotocol1.dat");    
+      DFSTestUtil.createFile(fileSys, file, 1L, (short)numDataNodes, 0L);
+      firstBlock = DFSTestUtil.getFirstBlock(fileSys, file);
+      newGS = firstBlock.getGenerationStamp() + 1;
+      testWrite(firstBlock, 
+          BlockConstructionStage.PIPELINE_SETUP_APPEND_RECOVERY, newGS,
+          "Recover appending to a finalized replica", false);
+      // test PIPELINE_CLOSE_RECOVERY on an existing block
+      file = new Path("dataprotocol2.dat");    
+      DFSTestUtil.createFile(fileSys, file, 1L, (short)numDataNodes, 0L);
+      firstBlock = DFSTestUtil.getFirstBlock(fileSys, file);
+      newGS = firstBlock.getGenerationStamp() + 1;
+      testWrite(firstBlock, 
+          BlockConstructionStage.PIPELINE_CLOSE_RECOVERY, newGS,
+          "Recover failed close to a finalized replica", false);
+      firstBlock.setGenerationStamp(newGS);
+
+      /* Test writing to a new block */
+      long newBlockId = firstBlock.getBlockId() + 1;
+      Block newBlock = new Block(newBlockId, 0, 
+          firstBlock.getGenerationStamp());
+
+      // test PIPELINE_SETUP_CREATE on a new block
+      testWrite(newBlock, BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L,
+          "Create a new block", false);
+      // test PIPELINE_SETUP_STREAMING_RECOVERY on a new block
+      newGS = newBlock.getGenerationStamp() + 1;
+      newBlock.setBlockId(newBlock.getBlockId()+1);
+      testWrite(newBlock, 
+          BlockConstructionStage.PIPELINE_SETUP_STREAMING_RECOVERY, newGS,
+          "Recover a new block", true);
+      
+      // test PIPELINE_SETUP_APPEND on a new block
+      newGS = newBlock.getGenerationStamp() + 1;
+      testWrite(newBlock, 
+          BlockConstructionStage.PIPELINE_SETUP_APPEND, newGS,
+          "Cannot append to a new block", true);
+
+      // test PIPELINE_SETUP_APPEND_RECOVERY on a new block
+      newBlock.setBlockId(newBlock.getBlockId()+1);
+      newGS = newBlock.getGenerationStamp() + 1;
+      testWrite(newBlock, 
+          BlockConstructionStage.PIPELINE_SETUP_APPEND_RECOVERY, newGS,
+          "Cannot append to a new block", true);
+
+      /* Test writing to RBW replicas */
+      Path file1 = new Path("dataprotocol1.dat");    
+      DFSTestUtil.createFile(fileSys, file1, 1L, (short)numDataNodes, 0L);
+      DFSOutputStream out = (DFSOutputStream)(fileSys.append(file1).
+          getWrappedStream()); 
+      out.write(1);
+      out.hflush();
+      FSDataInputStream in = fileSys.open(file1);
+      firstBlock = DFSTestUtil.getAllBlocks(in).get(0).getBlock();
+      firstBlock.setNumBytes(2L);
+      
+      try {
+        // test PIPELINE_SETUP_CREATE on a RBW block
+        testWrite(firstBlock, BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L,
+            "Cannot create a RBW block", true);
+        // test PIPELINE_SETUP_APPEND on an existing block
+        newGS = newBlock.getGenerationStamp() + 1;
+        testWrite(firstBlock, BlockConstructionStage.PIPELINE_SETUP_APPEND,
+            newGS, "Cannot append to a RBW replica", true);
+        // test PIPELINE_SETUP_APPEND on an existing block
+        testWrite(firstBlock, 
+            BlockConstructionStage.PIPELINE_SETUP_APPEND_RECOVERY,
+            newGS, "Recover append to a RBW replica", false);
+        firstBlock.setGenerationStamp(newGS);
+        // test PIPELINE_SETUP_STREAMING_RECOVERY on a RBW block
+        file = new Path("dataprotocol2.dat");    
+        DFSTestUtil.createFile(fileSys, file, 1L, (short)numDataNodes, 0L);
+        out = (DFSOutputStream)(fileSys.append(file).
+            getWrappedStream()); 
+        out.write(1);
+        out.hflush();
+        in = fileSys.open(file);
+        firstBlock = DFSTestUtil.getAllBlocks(in).get(0).getBlock();
+        firstBlock.setNumBytes(2L);
+        newGS = firstBlock.getGenerationStamp() + 1;
+        testWrite(firstBlock, 
+            BlockConstructionStage.PIPELINE_SETUP_STREAMING_RECOVERY,
+            newGS, "Recover a RBW replica", false);
+      } finally {
+        IOUtils.closeStream(in);
+        IOUtils.closeStream(out);
+      }
+
+    } finally {
+      cluster.shutdown();
+    }
+  }
+  
+@Test  public void testDataTransferProtocol() throws IOException {
     Random random = new Random();
     int oneMil = 1024*1024;
     Path file = new Path("dataprotocol.dat");
     int numDataNodes = 1;
     
-    Configuration conf = new Configuration();
+    Configuration conf = new HdfsConfiguration();
     conf.setInt("dfs.replication", numDataNodes); 
     MiniDFSCluster cluster = new MiniDFSCluster(conf, numDataNodes, true, null);
+    try {
     cluster.waitActive();
     DFSClient dfsClient = new DFSClient(
                  new InetSocketAddress("localhost", cluster.getNameNodePort()),
@@ -154,7 +329,7 @@
     dnAddr = NetUtils.createSocketAddr(datanode.getName());
     FileSystem fileSys = cluster.getFileSystem();
     
-    int fileLen = Math.min(conf.getInt("dfs.block.size", 4096), 4096);
+    int fileLen = Math.min(conf.getInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096), 4096);
     
     createFile(fileSys, file, fileLen);
 
@@ -178,16 +353,10 @@
     
     /* Test OP_WRITE_BLOCK */
     sendBuf.reset();
-    sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
-    WRITE_BLOCK.write(sendOut);
-    sendOut.writeLong(newBlockId); // block id
-    sendOut.writeLong(0);          // generation stamp
-    sendOut.writeInt(0);           // targets in pipeline 
-    sendOut.writeBoolean(false);   // recoveryFlag
-    Text.writeString(sendOut, "cl");// clientID
-    sendOut.writeBoolean(false); // no src node info
-    sendOut.writeInt(0);           // number of downstream targets
-    AccessToken.DUMMY_TOKEN.write(sendOut);
+    DataTransferProtocol.Sender.opWriteBlock(sendOut, 
+        newBlockId, 0L, 0,
+        BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
+        new DatanodeInfo[1], BlockAccessToken.DUMMY_TOKEN);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
     
     // bad bytes per checksum
@@ -198,32 +367,10 @@
 
     sendBuf.reset();
     recvBuf.reset();
-    sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
-    WRITE_BLOCK.write(sendOut);
-    sendOut.writeLong(newBlockId);
-    sendOut.writeLong(0);          // generation stamp
-    sendOut.writeInt(0);           // targets in pipeline 
-    sendOut.writeBoolean(false);   // recoveryFlag
-    Text.writeString(sendOut, "cl");// clientID
-    sendOut.writeBoolean(false); // no src node info
-
-    // bad number of targets
-    sendOut.writeInt(-1-random.nextInt(oneMil));
-    ERROR.write(recvOut);
-    sendRecvData("bad targets len while writing block " + newBlockId, true);
-
-    sendBuf.reset();
-    recvBuf.reset();
-    sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
-    WRITE_BLOCK.write(sendOut);
-    sendOut.writeLong(++newBlockId);
-    sendOut.writeLong(0);          // generation stamp
-    sendOut.writeInt(0);           // targets in pipeline 
-    sendOut.writeBoolean(false);   // recoveryFlag
-    Text.writeString(sendOut, "cl");// clientID
-    sendOut.writeBoolean(false); // no src node info
-    sendOut.writeInt(0);
-    AccessToken.DUMMY_TOKEN.write(sendOut);
+    DataTransferProtocol.Sender.opWriteBlock(sendOut,
+        ++newBlockId, 0L, 0,
+        BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
+        new DatanodeInfo[1], BlockAccessToken.DUMMY_TOKEN);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
     sendOut.writeInt(512);
     sendOut.writeInt(4);           // size of packet
@@ -243,16 +390,10 @@
     // test for writing a valid zero size block
     sendBuf.reset();
     recvBuf.reset();
-    sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
-    WRITE_BLOCK.write(sendOut);
-    sendOut.writeLong(++newBlockId);
-    sendOut.writeLong(0);          // generation stamp
-    sendOut.writeInt(0);           // targets in pipeline 
-    sendOut.writeBoolean(false);   // recoveryFlag
-    Text.writeString(sendOut, "cl");// clientID
-    sendOut.writeBoolean(false); // no src node info
-    sendOut.writeInt(0);
-    AccessToken.DUMMY_TOKEN.write(sendOut);
+    DataTransferProtocol.Sender.opWriteBlock(sendOut, 
+        ++newBlockId, 0L, 0,
+        BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
+        new DatanodeInfo[1], BlockAccessToken.DUMMY_TOKEN);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
     sendOut.writeInt(512);         // checksum size
     sendOut.writeInt(8);           // size of packet
@@ -262,6 +403,7 @@
 
     sendOut.writeInt(0);           // chunk length
     sendOut.writeInt(0);           // zero checksum
+    sendOut.flush();
     //ok finally write a block with 0 len
     SUCCESS.write(recvOut);
     Text.writeString(recvOut, ""); // first bad node
@@ -283,7 +425,7 @@
     sendOut.writeLong(fileLen);
     ERROR.write(recvOut);
     Text.writeString(sendOut, "cl");
-    AccessToken.DUMMY_TOKEN.write(sendOut);
+    BlockAccessToken.DUMMY_TOKEN.write(sendOut);
     sendRecvData("Wrong block ID " + newBlockId + " for read", false); 
 
     // negative block start offset
@@ -295,7 +437,7 @@
     sendOut.writeLong(-1L);
     sendOut.writeLong(fileLen);
     Text.writeString(sendOut, "cl");
-    AccessToken.DUMMY_TOKEN.write(sendOut);
+    BlockAccessToken.DUMMY_TOKEN.write(sendOut);
     sendRecvData("Negative start-offset for read for block " + 
                  firstBlock.getBlockId(), false);
 
@@ -308,7 +450,7 @@
     sendOut.writeLong(fileLen);
     sendOut.writeLong(fileLen);
     Text.writeString(sendOut, "cl");
-    AccessToken.DUMMY_TOKEN.write(sendOut);
+    BlockAccessToken.DUMMY_TOKEN.write(sendOut);
     sendRecvData("Wrong start-offset for reading block " +
                  firstBlock.getBlockId(), false);
     
@@ -323,7 +465,7 @@
     sendOut.writeLong(0);
     sendOut.writeLong(-1-random.nextInt(oneMil));
     Text.writeString(sendOut, "cl");
-    AccessToken.DUMMY_TOKEN.write(sendOut);
+    BlockAccessToken.DUMMY_TOKEN.write(sendOut);
     sendRecvData("Negative length for reading block " +
                  firstBlock.getBlockId(), false);
     
@@ -338,7 +480,7 @@
     sendOut.writeLong(0);
     sendOut.writeLong(fileLen + 1);
     Text.writeString(sendOut, "cl");
-    AccessToken.DUMMY_TOKEN.write(sendOut);
+    BlockAccessToken.DUMMY_TOKEN.write(sendOut);
     sendRecvData("Wrong length for reading block " +
                  firstBlock.getBlockId(), false);
     
@@ -351,7 +493,10 @@
     sendOut.writeLong(0);
     sendOut.writeLong(fileLen);
     Text.writeString(sendOut, "cl");
-    AccessToken.DUMMY_TOKEN.write(sendOut);
+    BlockAccessToken.DUMMY_TOKEN.write(sendOut);
     readFile(fileSys, file, fileLen);
+    } finally {
+      cluster.shutdown();
+    }
   }
 }