You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ha...@apache.org on 2009/08/04 01:34:05 UTC

svn commit: r800624 [1/2] - in /hadoop/hdfs/branches/HDFS-265: ./ ivy/ src/contrib/hdfsproxy/ src/java/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/server/balancer/ src/java/org/apache/hado...

Author: hairong
Date: Mon Aug  3 23:34:04 2009
New Revision: 800624

URL: http://svn.apache.org/viewvc?rev=800624&view=rev
Log:
Merge -r 796828:800617 from trunk to branch HDFS-265

Added:
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
      - copied unchanged from r800617, hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfo.java
      - copied unchanged from r800617, hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfo.java
Removed:
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeBlockInfo.java
Modified:
    hadoop/hdfs/branches/HDFS-265/   (props changed)
    hadoop/hdfs/branches/HDFS-265/CHANGES.txt
    hadoop/hdfs/branches/HDFS-265/build.xml   (contents, props changed)
    hadoop/hdfs/branches/HDFS-265/ivy.xml
    hadoop/hdfs/branches/HDFS-265/ivy/libraries.properties
    hadoop/hdfs/branches/HDFS-265/src/contrib/hdfsproxy/   (props changed)
    hadoop/hdfs/branches/HDFS-265/src/java/   (props changed)
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/hdfs/branches/HDFS-265/src/test/findbugsExcludeFile.xml
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/   (props changed)
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs-with-mr/   (props changed)
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestAbandonBlock.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSPermission.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFSInputChecker.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestGetBlocks.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestInjectionForSimulatedStorage.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestReplication.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestSetTimes.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/security/TestPermission.java
    hadoop/hdfs/branches/HDFS-265/src/webapps/datanode/   (props changed)
    hadoop/hdfs/branches/HDFS-265/src/webapps/hdfs/   (props changed)
    hadoop/hdfs/branches/HDFS-265/src/webapps/secondary/   (props changed)

Propchange: hadoop/hdfs/branches/HDFS-265/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Aug  3 23:34:04 2009
@@ -1 +1,2 @@
 /hadoop/core/branches/branch-0.19/hdfs:713112
+/hadoop/hdfs/trunk:796829-800617

Modified: hadoop/hdfs/branches/HDFS-265/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/CHANGES.txt?rev=800624&r1=800623&r2=800624&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-265/CHANGES.txt Mon Aug  3 23:34:04 2009
@@ -45,6 +45,31 @@
     HDFS-458. Create a new ant target, run-commit-test.  (Jakob Homan
     via szetszwo)
 
+    HDFS-493. Change build.xml so that the fault-injected tests are executed
+    only by the run-test-*-faul-inject targets.  (Konstantin Boudnik via
+    szetszwo)
+
+    HADOOP-6160. Fix releaseaudit target to run on specific directories.
+    (gkesavan)
+
+    HDFS-508. Factor out BlockInfo from BlocksMap. (shv)
+
+    HDFS-510. Rename DatanodeBlockInfo to be ReplicaInfo.
+    (Jakob Homan & Hairong Kuang via shv)
+
+    HDFS-500. Deprecate NameNode methods deprecated in NameNodeProtocol.
+    (Jakob Homan via shv)
+
+    HDFS-514. Change DFSClient.namenode from public to private.  (Bill Zeller
+    via szetszwo)
+
+    HDFS-496. Use PureJavaCrc32 in HDFS.  (Todd Lipcon via szetszwo)
+
+    HDFS-511. Remove redundant block searches in BlockManager. (shv)
+
+    HDFS-504. Update the modification time of a file when the file 
+    is closed. (Chun Zhang via dhruba)
+
   BUG FIXES
     HDFS-76. Better error message to users when commands fail because of 
     lack of quota. Allow quota to be set even if the limit is lower than
@@ -89,6 +114,21 @@
     HDFS-463. CreateEditLog utility broken after HDFS-396 (URI for
     FSImage). (Suresh Srinivas via rangadi)
 
+    HDFS-484. Fix bin-package and package target to package jar files.
+    (gkesavan)
+
+    HDFS-501. Use enum to define the constants in DataTransferProtocol.
+    (szetszwo)
+
+    HDFS-490. Eliminate the deprecated warnings introduced by H-5438.
+    (He Yongqiang via szetszwo)
+
+    HDFS-119. Fix a bug in logSync(), which causes NameNode block forever.
+    (Suresh Srinivas via shv)
+
+    HDFS-167. Fix a bug in DFSClient that caused infinite retries on write.
+    (Bill Zeller via szetszwo)
+
 Release 0.20.1 - Unreleased
 
   IMPROVEMENTS

Modified: hadoop/hdfs/branches/HDFS-265/build.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/build.xml?rev=800624&r1=800623&r2=800624&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/build.xml (original)
+++ hadoop/hdfs/branches/HDFS-265/build.xml Mon Aug  3 23:34:04 2009
@@ -362,6 +362,8 @@
           description="Run Fault Injection related hdfs tests">
     <subant buildpath="build.xml" target="run-test-hdfs">
       <property name="build.dir" value="${build-fi.dir}"/>
+      <property name="test.fault.inject" value="yes"/>
+      <property name="test.include" value="TestFi*"/>
     </subant>
   </target>
 
@@ -369,6 +371,8 @@
           description="Run hdfs Fault Injection related unit tests that require mapred">
     <subant buildpath="build.xml" target="run-test-hdfs-with-mr">
       <property name="build.dir" value="${build-fi.dir}"/>
+      <property name="test.fault.inject" value="yes"/>
+      <property name="test.include" value="TestFi*"/>
     </subant>
   </target>
 
@@ -551,6 +555,35 @@
     </jar>
   </target>
 
+  <condition property="tests.notestcase">
+    <and>
+      <isfalse value="${test.fault.inject}"/>
+      <not>
+        <isset property="testcase"/>
+      </not>
+    </and>
+  </condition>
+  <condition property="tests.notestcase.fi">
+    <and>
+      <not>
+        <isset property="testcase" />
+      </not>
+      <istrue value="${test.fault.inject}" />
+    </and>
+  </condition>
+  <condition property="tests.testcase">
+    <and>
+      <isfalse value="${test.fault.inject}" />
+      <isset property="testcase" />
+    </and>
+  </condition>
+  <condition property="tests.testcase.fi">
+    <and>
+      <istrue value="${test.fault.inject}" />
+      <isset property="testcase" />
+    </and>
+  </condition>
+
   <!-- ================================================================== -->
   <!-- Run unit tests                                                     --> 
   <!-- ================================================================== -->
@@ -586,15 +619,22 @@
           <propertyref regex="fi.*"/>
         </syspropertyset>
         <formatter type="${test.junit.output.format}" />
-        <batchtest todir="${test.build.dir}" unless="testcase">
+        <batchtest todir="${test.build.dir}" if="tests.notestcase">
           <fileset dir="${test.src.dir}/hdfs" excludes="**/${test.exclude}.java">
              <patternset>
                <includesfile name="@{test.file}"/>
              </patternset>
          </fileset>
         </batchtest>
-        <batchtest todir="${test.build.dir}" if="testcase">
+        <batchtest todir="${test.build.dir}" if="tests.notestcase.fi">
+          <fileset dir="${test.src.dir}/aop"
+            includes="**/${test.include}.java"
+            excludes="**/${test.exclude}.java" />
+        </batchtest>
+        <batchtest todir="${test.build.dir}" if="tests.testcase">
           <fileset dir="${test.src.dir}/hdfs" includes="**/${testcase}.java"/>
+        </batchtest>
+        <batchtest todir="${test.build.dir}" if="tests.testcase.fi">
           <fileset dir="${test.src.dir}/aop" includes="**/${testcase}.java"/>
         </batchtest>
       </junit>
@@ -640,14 +680,22 @@
         <propertyref regex="fi.*"/>
       </syspropertyset>
       <formatter type="${test.junit.output.format}" />
-      <batchtest todir="${test.build.dir}" unless="testcase">
+      <batchtest todir="${test.build.dir}" if="tests.notestcase">
         <fileset dir="${test.src.dir}/hdfs-with-mr"
            includes="**/${test.include}.java"
-     excludes="**/${test.exclude}.java" />
+           excludes="**/${test.exclude}.java" />
+      </batchtest>
+      <batchtest todir="${test.build.dir}" if="tests.notestcase.fi">
+        <fileset dir="${test.src.dir}/aop"
+          includes="**/${test.include}.java"
+          excludes="**/${test.exclude}.java" />
       </batchtest>
-      <batchtest todir="${test.build.dir}" if="testcase">
+      <batchtest todir="${test.build.dir}" if="tests.testcase">
         <fileset dir="${test.src.dir}/hdfs-with-mr" includes="**/${testcase}.java"/>
       </batchtest>
+      <batchtest todir="${test.build.dir}" if="tests.testcase.fi">
+        <fileset dir="${test.src.dir}/aop" includes="**/${testcase}.java"/>
+      </batchtest>
     </junit>
     <antcall target="checkfailure"/>
   </target>  
@@ -956,7 +1004,7 @@
     </copy>
 
     <copy todir="${dist.dir}"> 
-      <fileset file="${build.dir}/${final.name}-*.jar"/>
+      <fileset file="${build.dir}/${name}-*.jar"/>
     </copy>
 
     <copy todir="${dist.dir}/conf">
@@ -1039,7 +1087,7 @@
     </copy>
 
     <copy todir="${dist.dir}"> 
-      <fileset file="${build.dir}/${final.name}-*.jar"/>
+      <fileset file="${build.dir}/${name}-*.jar"/>
     </copy>
     
     <copy todir="${dist.dir}/conf">
@@ -1081,12 +1129,19 @@
   <!-- ================================================================== -->
   <!-- Perform audit activities for the release                           -->
   <!-- ================================================================== -->
-  <target name="releaseaudit" depends="package,ivy-retrieve-releaseaudit" description="Release Audit activities">
-    <fail unless="rat.present" message="Failed to load class [${rat.reporting.classname}]."/>
-    <java classname="${rat.reporting.classname}" fork="true">
-      <classpath refid="releaseaudit-classpath"/>
-      <arg value="${build.dir}/${final.name}"/>
-    </java>
+  <target name="rats-taskdef" depends="ivy-retrieve-releaseaudit">
+     <typedef format="xml" resource="org/apache/rat/anttasks/antlib.xml" uri="antlib:org.apache.rat.anttasks"
+      classpathref="releaseaudit-classpath"/>
+  </target>
+
+  <target name="releaseaudit" depends="package, rats-taskdef" description="Release Audit activities">
+   <rat:report xmlns:rat="antlib:org.apache.rat.anttasks">
+      <fileset dir="${dist.dir}">
+        <exclude name="CHANGES.txt"/>
+        <exclude name="docs/"/>
+        <exclude name="lib/jdiff/"/>
+      </fileset>
+    </rat:report>
   </target>
 
   <!-- ================================================================== -->
@@ -1360,8 +1415,6 @@
     <ivy:retrieve settingsRef="${ant.project.name}.ivy.settings"
       pattern="${build.ivy.lib.dir}/${ivy.artifact.retrieve.pattern}" />
     <ivy:cachepath pathid="releaseaudit-classpath" conf="releaseaudit"/>
-    <available classname="${rat.reporting.classname}" 
-      classpathref="releaseaudit-classpath" property="rat.present" value="true"/>
   </target>
 
   <target name="ivy-report" depends="ivy-resolve-releaseaudit"

Propchange: hadoop/hdfs/branches/HDFS-265/build.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Aug  3 23:34:04 2009
@@ -1,2 +1,3 @@
 /hadoop/core/branches/branch-0.19/hdfs/build.xml:713112
 /hadoop/core/trunk/build.xml:779102
+/hadoop/hdfs/trunk/build.xml:796829-800617

Modified: hadoop/hdfs/branches/HDFS-265/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/ivy.xml?rev=800624&r1=800623&r2=800624&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/ivy.xml (original)
+++ hadoop/hdfs/branches/HDFS-265/ivy.xml Mon Aug  3 23:34:04 2009
@@ -224,8 +224,8 @@
       name="junit"
       rev="${junit.version}"
       conf="common->default"/>
-    <dependency org="com.google.code.p.arat"
-      name="rat-lib"
+    <dependency org="org.apache.rat"
+      name="apache-rat-tasks"
       rev="${rats-lib.version}"
       conf="releaseaudit->default"/>
     <dependency org="commons-lang"

Modified: hadoop/hdfs/branches/HDFS-265/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/ivy/libraries.properties?rev=800624&r1=800623&r2=800624&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/ivy/libraries.properties (original)
+++ hadoop/hdfs/branches/HDFS-265/ivy/libraries.properties Mon Aug  3 23:34:04 2009
@@ -60,7 +60,7 @@
 
 oro.version=2.0.8
 
-rats-lib.version=0.5.1
+rats-lib.version=0.6
 
 servlet.version=4.0.6
 servlet-api-2.5.version=6.1.14

Propchange: hadoop/hdfs/branches/HDFS-265/src/contrib/hdfsproxy/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Aug  3 23:34:04 2009
@@ -1,2 +1,3 @@
 /hadoop/core/branches/branch-0.19/hdfs/src/contrib/hdfsproxy:713112
 /hadoop/core/trunk/src/contrib/hdfsproxy:776175-784663
+/hadoop/hdfs/trunk/src/contrib/hdfsproxy:796829-800617

Propchange: hadoop/hdfs/branches/HDFS-265/src/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Aug  3 23:34:04 2009
@@ -1,2 +1,3 @@
 /hadoop/core/branches/branch-0.19/hdfs/src/java:713112
 /hadoop/core/trunk/src/hdfs:776175-785643,785929-786278
+/hadoop/hdfs/trunk/src/java:796829-800617

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=800624&r1=800623&r2=800624&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java Mon Aug  3 23:34:04 2009
@@ -17,42 +17,100 @@
  */
 package org.apache.hadoop.hdfs;
 
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.io.retry.RetryPolicies;
-import org.apache.hadoop.io.retry.RetryPolicy;
-import org.apache.hadoop.io.retry.RetryProxy;
-import org.apache.hadoop.fs.*;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Op.BLOCK_CHECKSUM;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.CHECKSUM_OK;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.net.SocketFactory;
+import javax.security.auth.login.LoginException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSInputChecker;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FSOutputSummer;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsStatus;
+import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Syncable;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.ipc.*;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.net.NodeBase;
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
-import org.apache.hadoop.security.InvalidAccessTokenException;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.EnumSetWritable;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.AccessToken;
+import org.apache.hadoop.security.InvalidAccessTokenException;
 import org.apache.hadoop.security.UnixUserGroupInformation;
-import org.apache.hadoop.util.*;
-
-import org.apache.commons.logging.*;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
-import java.util.zip.CRC32;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ConcurrentHashMap;
-import java.nio.BufferOverflowException;
-import java.nio.ByteBuffer;
-
-import javax.net.SocketFactory;
-import javax.security.auth.login.LoginException;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.PureJavaCrc32;
+import org.apache.hadoop.util.StringUtils;
 
 /********************************************************
  * DFSClient can connect to a Hadoop Filesystem and 
@@ -69,8 +127,8 @@
   public static final Log LOG = LogFactory.getLog(DFSClient.class);
   public static final int MAX_BLOCK_ACQUIRE_FAILURES = 3;
   private static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
-  public final ClientProtocol namenode;
-  final private ClientProtocol rpcNamenode;
+  private ClientProtocol namenode;
+  private ClientProtocol rpcNamenode;
   final UnixUserGroupInformation ugi;
   volatile boolean clientRunning = true;
   Random r = new Random();
@@ -161,6 +219,29 @@
   public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf,
                    FileSystem.Statistics stats)
     throws IOException {
+    this(conf, stats);
+    this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi);
+    this.namenode = createNamenode(this.rpcNamenode);
+  }
+
+  /** 
+   * Create a new DFSClient connected to the given namenode
+   * and rpcNamenode objects.
+   * 
+   * This constructor was written to allow easy testing of the DFSClient class.
+   * End users will most likely want to use one of the other constructors.
+   */
+  public DFSClient(ClientProtocol namenode, ClientProtocol rpcNamenode,
+                   Configuration conf, FileSystem.Statistics stats)
+    throws IOException {
+      this(conf, stats);
+      this.namenode = namenode;
+      this.rpcNamenode = rpcNamenode;
+  }
+
+  
+  private DFSClient(Configuration conf, FileSystem.Statistics stats)
+    throws IOException {
     this.conf = conf;
     this.stats = stats;
     this.socketTimeout = conf.getInt("dfs.socket.timeout", 
@@ -182,9 +263,6 @@
       throw (IOException)(new IOException().initCause(e));
     }
 
-    this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi);
-    this.namenode = createNamenode(rpcNamenode);
-
     String taskId = conf.get("mapred.task.id");
     if (taskId != null) {
       this.clientName = "DFSClient_" + taskId; 
@@ -372,6 +450,14 @@
     return create(src, overwrite, replication, blockSize, null);
   }
 
+  /**
+   * Get the namenode associated with this DFSClient object
+   * @return the namenode associated with this DFSClient object
+   */
+  public ClientProtocol getNamenode() {
+    return namenode;
+  }
+  
   
   /**
    * Create a new dfs file with the specified block replication 
@@ -619,15 +705,14 @@
         try {
           if (LOG.isDebugEnabled()) {
             LOG.debug("write to " + datanodes[j].getName() + ": "
-                + DataTransferProtocol.OP_BLOCK_CHECKSUM +
-                ", block=" + block);
+                + BLOCK_CHECKSUM + ", block=" + block);
           }
           DataTransferProtocol.Sender.opBlockChecksum(out, block.getBlockId(),
               block.getGenerationStamp(), lb.getAccessToken());
 
-          final short reply = in.readShort();
-          if (reply != DataTransferProtocol.OP_STATUS_SUCCESS) {
-            if (reply == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN
+          final DataTransferProtocol.Status reply = DataTransferProtocol.Status.read(in);
+          if (reply != SUCCESS) {
+            if (reply == ERROR_ACCESS_TOKEN
                 && i > lastRetriedIndex) {
               if (LOG.isDebugEnabled()) {
                 LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM "
@@ -1353,9 +1438,9 @@
           new BufferedInputStream(NetUtils.getInputStream(sock), 
                                   bufferSize));
       
-      short status = in.readShort();
-      if (status != DataTransferProtocol.OP_STATUS_SUCCESS) {
-        if (status == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN) {
+      DataTransferProtocol.Status status = DataTransferProtocol.Status.read(in);
+      if (status != SUCCESS) {
+        if (status == ERROR_ACCESS_TOKEN) {
           throw new InvalidAccessTokenException(
               "Got access token error in response to OP_READ_BLOCK "
                   + "for file " + file + " for block " + blockId);
@@ -1402,9 +1487,7 @@
     private void checksumOk(Socket sock) {
       try {
         OutputStream out = NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT);
-        byte buf[] = { (DataTransferProtocol.OP_STATUS_CHECKSUM_OK >>> 8) & 0xff,
-                       (DataTransferProtocol.OP_STATUS_CHECKSUM_OK) & 0xff };
-        out.write(buf);
+        CHECKSUM_OK.writeOutputStream(out);
         out.flush();
       } catch (IOException e) {
         // its ok not to be able to send this.
@@ -2476,8 +2559,9 @@
 
               // processes response status from all datanodes.
               for (int i = 0; i < targets.length && clientRunning; i++) {
-                short reply = blockReplyStream.readShort();
-                if (reply != DataTransferProtocol.OP_STATUS_SUCCESS) {
+                final DataTransferProtocol.Status reply
+                    = DataTransferProtocol.Status.read(blockReplyStream);
+                if (reply != SUCCESS) {
                   errorIndex = i; // first bad datanode
                   throw new IOException("Bad response " + reply +
                       " for block " + block +
@@ -2716,7 +2800,7 @@
       //
       private boolean createBlockOutputStream(DatanodeInfo[] nodes, String client,
           boolean recoveryFlag) {
-        short pipelineStatus = (short)DataTransferProtocol.OP_STATUS_SUCCESS;
+        DataTransferProtocol.Status pipelineStatus = SUCCESS;
         String firstBadLink = "";
         if (LOG.isDebugEnabled()) {
           for (int i = 0; i < nodes.length; i++) {
@@ -2755,10 +2839,10 @@
           out.flush();
 
           // receive ack for connect
-          pipelineStatus = blockReplyStream.readShort();
+          pipelineStatus = DataTransferProtocol.Status.read(blockReplyStream);
           firstBadLink = Text.readString(blockReplyStream);
-          if (pipelineStatus != DataTransferProtocol.OP_STATUS_SUCCESS) {
-            if (pipelineStatus == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN) {
+          if (pipelineStatus != SUCCESS) {
+            if (pipelineStatus == ERROR_ACCESS_TOKEN) {
               throw new InvalidAccessTokenException(
                   "Got access token error for connect ack with firstBadLink as "
                       + firstBadLink);
@@ -2792,7 +2876,7 @@
       }
 
       private LocatedBlock locateFollowingBlock(long start) throws IOException {
-        int retries = 5;
+        int retries = conf.getInt("dfs.client.block.write.locateFollowingBlock.retries", 5);
         long sleeptime = 400;
         while (true) {
           long localstart = System.currentTimeMillis();
@@ -2808,26 +2892,32 @@
               if (ue != e) { 
                 throw ue; // no need to retry these exceptions
               }
-
-              if (--retries == 0 && 
-                  !NotReplicatedYetException.class.getName().
+              
+              
+              if (NotReplicatedYetException.class.getName().
                   equals(e.getClassName())) {
-                throw e;
-              } else {
-                LOG.info(StringUtils.stringifyException(e));
-                if (System.currentTimeMillis() - localstart > 5000) {
-                  LOG.info("Waiting for replication for "
-                      + (System.currentTimeMillis() - localstart) / 1000
-                      + " seconds");
-                }
-                try {
-                  LOG.warn("NotReplicatedYetException sleeping " + src
-                      + " retries left " + retries);
-                  Thread.sleep(sleeptime);
-                  sleeptime *= 2;
-                } catch (InterruptedException ie) {
+                if (retries == 0) { 
+                  throw e;
+                } else {
+                  --retries;
+                  LOG.info(StringUtils.stringifyException(e));
+                  if (System.currentTimeMillis() - localstart > 5000) {
+                    LOG.info("Waiting for replication for "
+                        + (System.currentTimeMillis() - localstart) / 1000
+                        + " seconds");
+                  }
+                  try {
+                    LOG.warn("NotReplicatedYetException sleeping " + src
+                        + " retries left " + retries);
+                    Thread.sleep(sleeptime);
+                    sleeptime *= 2;
+                  } catch (InterruptedException ie) {
+                  }
                 }
-              }                
+              } else {
+                throw e;
+              }
+
             }
           }
         } 
@@ -2919,7 +3009,7 @@
 
     private DFSOutputStream(String src, long blockSize, Progressable progress,
         int bytesPerChecksum) throws IOException {
-      super(new CRC32(), bytesPerChecksum, 4);
+      super(new PureJavaCrc32(), bytesPerChecksum, 4);
       this.src = src;
       this.blockSize = blockSize;
       this.progress = progress;

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=800624&r1=800623&r2=800624&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java Mon Aug  3 23:34:04 2009
@@ -17,9 +17,12 @@
  */
 package org.apache.hadoop.hdfs.protocol;
 
+import java.io.DataInput;
 import java.io.DataInputStream;
+import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.AccessToken;
@@ -42,39 +45,136 @@
    */
   public static final int DATA_TRANSFER_VERSION = 16;
 
-  // Processed at datanode stream-handler
-  public static final byte OP_WRITE_BLOCK = (byte) 80;
-  public static final byte OP_READ_BLOCK = (byte) 81;
-  /**
-   * @deprecated As of version 15, OP_READ_METADATA is no longer supported
-   */
-  @Deprecated public static final byte OP_READ_METADATA = (byte) 82;
-  public static final byte OP_REPLACE_BLOCK = (byte) 83;
-  public static final byte OP_COPY_BLOCK = (byte) 84;
-  public static final byte OP_BLOCK_CHECKSUM = (byte) 85;
+  /** Operation */
+  public enum Op {
+    WRITE_BLOCK((byte)80),
+    READ_BLOCK((byte)81),
+    READ_METADATA((byte)82),
+    REPLACE_BLOCK((byte)83),
+    COPY_BLOCK((byte)84),
+    BLOCK_CHECKSUM((byte)85);
+
+    /** The code for this operation. */
+    public final byte code;
+    
+    private Op(byte code) {
+      this.code = code;
+    }
+    
+    private static final int FIRST_CODE = values()[0].code;
+    /** Return the object represented by the code. */
+    private static Op valueOf(byte code) {
+      final int i = (code & 0xff) - FIRST_CODE;
+      return i < 0 || i >= values().length? null: values()[i];
+    }
+
+    /** Read from in */
+    public static Op read(DataInput in) throws IOException {
+      return valueOf(in.readByte());
+    }
+
+    /** Write to out */
+    public void write(DataOutput out) throws IOException {
+      out.write(code);
+    }
+  };
+
+  /** Status */
+  public enum Status {
+    SUCCESS(0),
+    ERROR(1),
+    ERROR_CHECKSUM(2),
+    ERROR_INVALID(3),
+    ERROR_EXISTS(4),
+    ERROR_ACCESS_TOKEN(5),
+    CHECKSUM_OK(6);
+
+    /** The code for this operation. */
+    private final int code;
+    
+    private Status(int code) {
+      this.code = code;
+    }
+
+    private static final int FIRST_CODE = values()[0].code;
+    /** Return the object represented by the code. */
+    private static Status valueOf(int code) {
+      final int i = code - FIRST_CODE;
+      return i < 0 || i >= values().length? null: values()[i];
+    }
+
+    /** Read from in */
+    public static Status read(DataInput in) throws IOException {
+      return valueOf(in.readShort());
+    }
+
+    /** Write to out */
+    public void write(DataOutput out) throws IOException {
+      out.writeShort(code);
+    }
+
+    /** Write to out */
+    public void writeOutputStream(OutputStream out) throws IOException {
+      out.write(new byte[] {(byte)(code >>> 8), (byte)code});
+    }
+  };
   
-  public static final int OP_STATUS_SUCCESS = 0;  
-  public static final int OP_STATUS_ERROR = 1;  
-  public static final int OP_STATUS_ERROR_CHECKSUM = 2;  
-  public static final int OP_STATUS_ERROR_INVALID = 3;  
-  public static final int OP_STATUS_ERROR_EXISTS = 4;  
-  public static final int OP_STATUS_ERROR_ACCESS_TOKEN = 5;
-  public static final int OP_STATUS_CHECKSUM_OK = 6;
+  /** @deprecated Deprecated at 0.21.  Use Op.WRITE_BLOCK instead. */
+  @Deprecated
+  public static final byte OP_WRITE_BLOCK = Op.WRITE_BLOCK.code;
+  /** @deprecated Deprecated at 0.21.  Use Op.READ_BLOCK instead. */
+  @Deprecated
+  public static final byte OP_READ_BLOCK = Op.READ_BLOCK.code;
+  /** @deprecated As of version 15, OP_READ_METADATA is no longer supported. */
+  @Deprecated
+  public static final byte OP_READ_METADATA = Op.READ_METADATA.code;
+  /** @deprecated Deprecated at 0.21.  Use Op.REPLACE_BLOCK instead. */
+  @Deprecated
+  public static final byte OP_REPLACE_BLOCK = Op.REPLACE_BLOCK.code;
+  /** @deprecated Deprecated at 0.21.  Use Op.COPY_BLOCK instead. */
+  @Deprecated
+  public static final byte OP_COPY_BLOCK = Op.COPY_BLOCK.code;
+  /** @deprecated Deprecated at 0.21.  Use Op.BLOCK_CHECKSUM instead. */
+  @Deprecated
+  public static final byte OP_BLOCK_CHECKSUM = Op.BLOCK_CHECKSUM.code;
+
+
+  /** @deprecated Deprecated at 0.21.  Use Status.SUCCESS instead. */
+  @Deprecated
+  public static final int OP_STATUS_SUCCESS = Status.SUCCESS.code;  
+  /** @deprecated Deprecated at 0.21.  Use Status.ERROR instead. */
+  @Deprecated
+  public static final int OP_STATUS_ERROR = Status.ERROR.code;
+  /** @deprecated Deprecated at 0.21.  Use Status.ERROR_CHECKSUM instead. */
+  @Deprecated
+  public static final int OP_STATUS_ERROR_CHECKSUM = Status.ERROR_CHECKSUM.code;
+  /** @deprecated Deprecated at 0.21.  Use Status.ERROR_INVALID instead. */
+  @Deprecated
+  public static final int OP_STATUS_ERROR_INVALID = Status.ERROR_INVALID.code;
+  /** @deprecated Deprecated at 0.21.  Use Status.ERROR_EXISTS instead. */
+  @Deprecated
+  public static final int OP_STATUS_ERROR_EXISTS = Status.ERROR_EXISTS.code;
+  /** @deprecated Deprecated at 0.21.  Use Status.ERROR_ACCESS_TOKEN instead.*/
+  @Deprecated
+  public static final int OP_STATUS_ERROR_ACCESS_TOKEN = Status.ERROR_ACCESS_TOKEN.code;
+  /** @deprecated Deprecated at 0.21.  Use Status.CHECKSUM_OK instead. */
+  @Deprecated
+  public static final int OP_STATUS_CHECKSUM_OK = Status.CHECKSUM_OK.code;
 
 
   /** Sender */
   public static class Sender {
     /** Initialize a operation. */
-    public static void op(DataOutputStream out, int op) throws IOException {
+    public static void op(DataOutputStream out, Op op) throws IOException {
       out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
-      out.write(op);
+      op.write(out);
     }
 
     /** Send OP_READ_BLOCK */
     public static void opReadBlock(DataOutputStream out,
         long blockId, long blockGs, long blockOffset, long blockLen,
         String clientName, AccessToken accessToken) throws IOException {
-      op(out, OP_READ_BLOCK);
+      op(out, Op.READ_BLOCK);
 
       out.writeLong(blockId);
       out.writeLong(blockGs);
@@ -90,7 +190,7 @@
         long blockId, long blockGs, int pipelineSize, boolean isRecovery,
         String client, DatanodeInfo src, DatanodeInfo[] targets,
         AccessToken accesstoken) throws IOException {
-      op(out, OP_WRITE_BLOCK);
+      op(out, Op.WRITE_BLOCK);
 
       out.writeLong(blockId);
       out.writeLong(blockGs);
@@ -114,7 +214,7 @@
     public static void opReplaceBlock(DataOutputStream out,
         long blockId, long blockGs, String storageId, DatanodeInfo src,
         AccessToken accesstoken) throws IOException {
-      op(out, OP_REPLACE_BLOCK);
+      op(out, Op.REPLACE_BLOCK);
 
       out.writeLong(blockId);
       out.writeLong(blockGs);
@@ -127,7 +227,7 @@
     /** Send OP_COPY_BLOCK */
     public static void opCopyBlock(DataOutputStream out,
         long blockId, long blockGs, AccessToken accesstoken) throws IOException {
-      op(out, OP_COPY_BLOCK);
+      op(out, Op.COPY_BLOCK);
 
       out.writeLong(blockId);
       out.writeLong(blockGs);
@@ -138,7 +238,7 @@
     /** Send OP_BLOCK_CHECKSUM */
     public static void opBlockChecksum(DataOutputStream out,
         long blockId, long blockGs, AccessToken accesstoken) throws IOException {
-      op(out, OP_BLOCK_CHECKSUM);
+      op(out, Op.BLOCK_CHECKSUM);
 
       out.writeLong(blockId);
       out.writeLong(blockGs);
@@ -150,12 +250,12 @@
   /** Receiver */
   public static abstract class Receiver {
     /** Initialize a operation. */
-    public final byte op(DataInputStream in) throws IOException {
+    public final Op op(DataInputStream in) throws IOException {
       final short version = in.readShort();
       if (version != DATA_TRANSFER_VERSION) {
         throw new IOException( "Version Mismatch" );
       }
-      return in.readByte();
+      return Op.read(in);
     }
 
     /** Receive OP_READ_BLOCK */

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=800624&r1=800623&r2=800624&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Mon Aug  3 23:34:04 2009
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.balancer;
 
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
+
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
@@ -376,9 +378,9 @@
     
     /* Receive a block copy response from the input stream */ 
     private void receiveResponse(DataInputStream in) throws IOException {
-      short status = in.readShort();
-      if (status != DataTransferProtocol.OP_STATUS_SUCCESS) {
-        if (status == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN)
+      DataTransferProtocol.Status status = DataTransferProtocol.Status.read(in);
+      if (status != DataTransferProtocol.Status.SUCCESS) {
+        if (status == ERROR_ACCESS_TOKEN)
           throw new IOException("block move failed due to access token error");
         throw new IOException("block move is failed");
       }

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=800624&r1=800623&r2=800624&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Mon Aug  3 23:34:04 2009
@@ -17,6 +17,10 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
+import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
+
 import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
@@ -25,7 +29,6 @@
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.LinkedList;
-import java.util.zip.CRC32;
 import java.util.zip.Checksum;
 
 import org.apache.commons.logging.Log;
@@ -36,11 +39,12 @@
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.PureJavaCrc32;
 import org.apache.hadoop.util.StringUtils;
-import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
 
 /** A class that receives a block and writes to its own disk, meanwhile
  * may copies it to another site. If a throttler is provided,
@@ -672,7 +676,7 @@
     }
 
     // compute crc of partial chunk from data read in the block file.
-    partialCrc = new CRC32();
+    partialCrc = new PureJavaCrc32();
     partialCrc.update(buf, 0, sizePartialChunk);
     LOG.info("Read in partial CRC chunk from disk for block " + block);
 
@@ -823,7 +827,7 @@
             }
 
             replyOut.writeLong(expected);
-            replyOut.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS);
+            SUCCESS.write(replyOut);
             replyOut.flush();
         } catch (Exception e) {
           if (running) {
@@ -854,7 +858,7 @@
       while (running && datanode.shouldRun && !lastPacketInBlock) {
 
         try {
-            short op = DataTransferProtocol.OP_STATUS_SUCCESS;
+            DataTransferProtocol.Status op = SUCCESS;
             boolean didRead = false;
             long expected = -2;
             try { 
@@ -919,7 +923,7 @@
             }
             
             if (!didRead) {
-              op = DataTransferProtocol.OP_STATUS_ERROR;
+              op = ERROR;
             }
             
             // If this is the last packet in block, then close block
@@ -948,7 +952,7 @@
 
             // send my status back to upstream datanode
             replyOut.writeLong(expected); // send seqno upstream
-            replyOut.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS);
+            SUCCESS.write(replyOut);
 
             LOG.debug("PacketResponder " + numTargets + 
                       " for block " + block +
@@ -958,18 +962,18 @@
             // forward responses from downstream datanodes.
             for (int i = 0; i < numTargets && datanode.shouldRun; i++) {
               try {
-                if (op == DataTransferProtocol.OP_STATUS_SUCCESS) {
-                  op = mirrorIn.readShort();
-                  if (op != DataTransferProtocol.OP_STATUS_SUCCESS) {
+                if (op == SUCCESS) {
+                  op = Status.read(mirrorIn);
+                  if (op != SUCCESS) {
                     LOG.debug("PacketResponder for block " + block +
                               ": error code received from downstream " +
                               " datanode[" + i + "] " + op);
                   }
                 }
               } catch (Throwable e) {
-                op = DataTransferProtocol.OP_STATUS_ERROR;
+                op = ERROR;
               }
-              replyOut.writeShort(op);
+              op.write(replyOut);
             }
             replyOut.flush();
             LOG.debug("PacketResponder " + block + " " + numTargets + 
@@ -982,7 +986,7 @@
             // If we forwarded an error response from a downstream datanode
             // and we are acting on behalf of a client, then we quit. The 
             // client will drive the recovery mechanism.
-            if (op == DataTransferProtocol.OP_STATUS_ERROR && receiver.clientName.length() > 0) {
+            if (op == ERROR && receiver.clientName.length() > 0) {
               running = false;
             }
         } catch (IOException e) {

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=800624&r1=800623&r2=800624&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Mon Aug  3 23:34:04 2009
@@ -17,6 +17,12 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.CHECKSUM_OK;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
+import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
+
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
@@ -42,7 +48,6 @@
 import org.apache.hadoop.security.AccessTokenHandler;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.StringUtils;
-import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
 
 /**
  * Thread for processing incoming/outgoing data stream.
@@ -79,7 +84,7 @@
       in = new DataInputStream(
           new BufferedInputStream(NetUtils.getInputStream(s), 
                                   SMALL_BUFFER_SIZE));
-      final byte op = op(in);
+      final DataTransferProtocol.Op op = op(in);
       boolean local = s.getInetAddress().equals(s.getLocalAddress());
       // Make sure the xciver count is not exceeded
       int curXceiverCount = datanode.getXceiverCount();
@@ -90,7 +95,7 @@
       }
       long startTime = DataNode.now();
       switch ( op ) {
-      case DataTransferProtocol.OP_READ_BLOCK:
+      case READ_BLOCK:
         opReadBlock(in);
         datanode.myMetrics.readBlockOp.inc(DataNode.now() - startTime);
         if (local)
@@ -98,7 +103,7 @@
         else
           datanode.myMetrics.readsFromRemoteClient.inc();
         break;
-      case DataTransferProtocol.OP_WRITE_BLOCK:
+      case WRITE_BLOCK:
         opWriteBlock(in);
         datanode.myMetrics.writeBlockOp.inc(DataNode.now() - startTime);
         if (local)
@@ -106,16 +111,16 @@
         else
           datanode.myMetrics.writesFromRemoteClient.inc();
         break;
-      case DataTransferProtocol.OP_REPLACE_BLOCK: // for balancing purpose; send to a destination
+      case REPLACE_BLOCK: // for balancing purpose; send to a destination
         opReplaceBlock(in);
         datanode.myMetrics.replaceBlockOp.inc(DataNode.now() - startTime);
         break;
-      case DataTransferProtocol.OP_COPY_BLOCK:
+      case COPY_BLOCK:
             // for balancing purpose; send to a proxy source
         opCopyBlock(in);
         datanode.myMetrics.copyBlockOp.inc(DataNode.now() - startTime);
         break;
-      case DataTransferProtocol.OP_BLOCK_CHECKSUM: //get the checksum of a block
+      case BLOCK_CHECKSUM: //get the checksum of a block
         opBlockChecksum(in);
         datanode.myMetrics.blockChecksumOp.inc(DataNode.now() - startTime);
         break;
@@ -150,7 +155,7 @@
         && !datanode.accessTokenHandler.checkAccess(accessToken, null, blockId,
             AccessTokenHandler.AccessMode.READ)) {
       try {
-        out.writeShort(DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN);
+        ERROR_ACCESS_TOKEN.write(out);
         out.flush();
         throw new IOException("Access token verification failed, on client "
             + "request for reading block " + block);
@@ -172,19 +177,19 @@
         blockSender = new BlockSender(block, startOffset, length,
             true, true, false, datanode, clientTraceFmt);
       } catch(IOException e) {
-        out.writeShort(DataTransferProtocol.OP_STATUS_ERROR);
+        ERROR.write(out);
         throw e;
       }
 
-      out.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS); // send op status
+      SUCCESS.write(out); // send op status
       long read = blockSender.sendBlock(out, baseStream, null); // send data
 
       if (blockSender.isBlockReadFully()) {
         // See if client verification succeeded. 
         // This is an optional response from client.
         try {
-          if (in.readShort() == DataTransferProtocol.OP_STATUS_CHECKSUM_OK  && 
-              datanode.blockScanner != null) {
+          if (DataTransferProtocol.Status.read(in) == CHECKSUM_OK
+              && datanode.blockScanner != null) {
             datanode.blockScanner.verifiedByClient(block);
           }
         } catch (IOException ignored) {}
@@ -238,7 +243,7 @@
             .getBlockId(), AccessTokenHandler.AccessMode.WRITE)) {
       try {
         if (client.length() != 0) {
-          replyOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN);
+          ERROR_ACCESS_TOKEN.write(replyOut);
           Text.writeString(replyOut, datanode.dnRegistration.getName());
           replyOut.flush();
         }
@@ -255,7 +260,7 @@
     BlockReceiver blockReceiver = null; // responsible for data handling
     String mirrorNode = null;           // the name:port of next target
     String firstBadLink = "";           // first datanode that failed in connection setup
-    short mirrorInStatus = (short)DataTransferProtocol.OP_STATUS_SUCCESS;
+    DataTransferProtocol.Status mirrorInStatus = SUCCESS;
     try {
       // open a block receiver and check if the block does not exist
       blockReceiver = new BlockReceiver(block, in, 
@@ -296,9 +301,9 @@
 
           // read connect ack (only for clients, not for replication req)
           if (client.length() != 0) {
-            mirrorInStatus = mirrorIn.readShort();
+            mirrorInStatus = DataTransferProtocol.Status.read(mirrorIn);
             firstBadLink = Text.readString(mirrorIn);
-            if (LOG.isDebugEnabled() || mirrorInStatus != DataTransferProtocol.OP_STATUS_SUCCESS) {
+            if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) {
               LOG.info("Datanode " + targets.length +
                        " got response for connect ack " +
                        " from downstream datanode with firstbadlink as " +
@@ -308,7 +313,7 @@
 
         } catch (IOException e) {
           if (client.length() != 0) {
-            replyOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR);
+            ERROR.write(replyOut);
             Text.writeString(replyOut, mirrorNode);
             replyOut.flush();
           }
@@ -331,12 +336,12 @@
 
       // send connect ack back to source (only for clients)
       if (client.length() != 0) {
-        if (LOG.isDebugEnabled() || mirrorInStatus != DataTransferProtocol.OP_STATUS_SUCCESS) {
+        if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) {
           LOG.info("Datanode " + targets.length +
                    " forwarding connect ack to upstream firstbadlink is " +
                    firstBadLink);
         }
-        replyOut.writeShort(mirrorInStatus);
+        mirrorInStatus.write(replyOut);
         Text.writeString(replyOut, firstBadLink);
         replyOut.flush();
       }
@@ -387,7 +392,7 @@
         && !datanode.accessTokenHandler.checkAccess(accessToken, null, block
             .getBlockId(), AccessTokenHandler.AccessMode.READ)) {
       try {
-        out.writeShort(DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN);
+        ERROR_ACCESS_TOKEN.write(out);
         out.flush();
         throw new IOException(
             "Access token verification failed, on getBlockChecksum() "
@@ -418,7 +423,7 @@
       }
 
       //write reply
-      out.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS);
+      SUCCESS.write(out);
       out.writeInt(bytesPerCRC);
       out.writeLong(crcPerBlock);
       md5.write(out);
@@ -443,17 +448,14 @@
             AccessTokenHandler.AccessMode.COPY)) {
       LOG.warn("Invalid access token in request from "
           + s.getRemoteSocketAddress() + " for copying block " + block);
-      sendResponse(s,
-          (short) DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN,
-          datanode.socketWriteTimeout);
+      sendResponse(s, ERROR_ACCESS_TOKEN, datanode.socketWriteTimeout);
       return;
     }
 
     if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
       LOG.info("Not able to copy block " + blockId + " to " 
           + s.getRemoteSocketAddress() + " because threads quota is exceeded.");
-      sendResponse(s, (short)DataTransferProtocol.OP_STATUS_ERROR, 
-          datanode.socketWriteTimeout);
+      sendResponse(s, ERROR, datanode.socketWriteTimeout);
       return;
     }
 
@@ -473,7 +475,7 @@
           baseStream, SMALL_BUFFER_SIZE));
 
       // send status first
-      reply.writeShort((short)DataTransferProtocol.OP_STATUS_SUCCESS);
+      SUCCESS.write(reply);
       // send block content to the target
       long read = blockSender.sendBlock(reply, baseStream, 
                                         dataXceiverServer.balanceThrottler);
@@ -515,22 +517,20 @@
             AccessTokenHandler.AccessMode.REPLACE)) {
       LOG.warn("Invalid access token in request from "
           + s.getRemoteSocketAddress() + " for replacing block " + block);
-      sendResponse(s, (short)DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN,
-          datanode.socketWriteTimeout);
+      sendResponse(s, ERROR_ACCESS_TOKEN, datanode.socketWriteTimeout);
       return;
     }
 
     if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
       LOG.warn("Not able to receive block " + blockId + " from " 
           + s.getRemoteSocketAddress() + " because threads quota is exceeded.");
-      sendResponse(s, (short)DataTransferProtocol.OP_STATUS_ERROR, 
-          datanode.socketWriteTimeout);
+      sendResponse(s, ERROR, datanode.socketWriteTimeout);
       return;
     }
 
     Socket proxySock = null;
     DataOutputStream proxyOut = null;
-    short opStatus = DataTransferProtocol.OP_STATUS_SUCCESS;
+    DataTransferProtocol.Status opStatus = SUCCESS;
     BlockReceiver blockReceiver = null;
     DataInputStream proxyReply = null;
     
@@ -554,9 +554,10 @@
       // receive the response from the proxy
       proxyReply = new DataInputStream(new BufferedInputStream(
           NetUtils.getInputStream(proxySock), BUFFER_SIZE));
-      short status = proxyReply.readShort();
-      if (status != DataTransferProtocol.OP_STATUS_SUCCESS) {
-        if (status == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN) {
+      final DataTransferProtocol.Status status
+          = DataTransferProtocol.Status.read(proxyReply);
+      if (status != SUCCESS) {
+        if (status == ERROR_ACCESS_TOKEN) {
           throw new IOException("Copy block " + block + " from "
               + proxySock.getRemoteSocketAddress()
               + " failed due to access token error");
@@ -581,11 +582,11 @@
           " from " + s.getRemoteSocketAddress());
       
     } catch (IOException ioe) {
-      opStatus = DataTransferProtocol.OP_STATUS_ERROR;
+      opStatus = ERROR;
       throw ioe;
     } finally {
       // receive the last byte that indicates the proxy released its thread resource
-      if (opStatus == DataTransferProtocol.OP_STATUS_SUCCESS) {
+      if (opStatus == SUCCESS) {
         try {
           proxyReply.readChar();
         } catch (IOException ignored) {
@@ -613,12 +614,12 @@
    * @param opStatus status message to write
    * @param timeout send timeout
    **/
-  private void sendResponse(Socket s, short opStatus, long timeout) 
-                                                       throws IOException {
+  private void sendResponse(Socket s, DataTransferProtocol.Status opStatus,
+      long timeout) throws IOException {
     DataOutputStream reply = 
       new DataOutputStream(NetUtils.getOutputStream(s, timeout));
     try {
-      reply.writeShort(opStatus);
+      opStatus.write(reply);
       reply.flush();
     } finally {
       IOUtils.closeStream(reply);

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java?rev=800624&r1=800623&r2=800624&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java Mon Aug  3 23:34:04 2009
@@ -65,7 +65,7 @@
       JspHelper.printGotoForm(out, namenodeInfoPort, target);
     } else {
       if (!targetStatus.isDir()) { // a file
-        List<LocatedBlock> blocks = dfs.namenode.getBlockLocations(dir, 0, 1)
+        List<LocatedBlock> blocks = dfs.getNamenode().getBlockLocations(dir, 0, 1)
             .getLocatedBlocks();
 
         LocatedBlock firstBlock = null;
@@ -205,7 +205,7 @@
 
     final DFSClient dfs = new DFSClient(datanode.getNameNodeAddr(),
         JspHelper.conf);
-    List<LocatedBlock> blocks = dfs.namenode.getBlockLocations(filename, 0,
+    List<LocatedBlock> blocks = dfs.getNamenode().getBlockLocations(filename, 0,
         Long.MAX_VALUE).getLocatedBlocks();
     // Add the various links for looking at the file contents
     // URL for downloading the full file
@@ -320,7 +320,7 @@
     AccessToken accessToken = AccessToken.DUMMY_TOKEN;
     if (JspHelper.conf.getBoolean(
         AccessTokenHandler.STRING_ENABLE_ACCESS_TOKEN, false)) {
-      List<LocatedBlock> blks = dfs.namenode.getBlockLocations(filename, 0,
+      List<LocatedBlock> blks = dfs.getNamenode().getBlockLocations(filename, 0,
           Long.MAX_VALUE).getLocatedBlocks();
       if (blks == null || blks.size() == 0) {
         out.print("Can't locate file blocks");
@@ -390,7 +390,7 @@
     // determine data for the next link
     if (startOffset + chunkSizeToView >= blockSize) {
       // we have to go to the next block from this point onwards
-      List<LocatedBlock> blocks = dfs.namenode.getBlockLocations(filename, 0,
+      List<LocatedBlock> blocks = dfs.getNamenode().getBlockLocations(filename, 0,
           Long.MAX_VALUE).getLocatedBlocks();
       for (int i = 0; i < blocks.size(); i++) {
         if (blocks.get(i).getBlock().getBlockId() == blockId) {
@@ -440,7 +440,7 @@
     int prevPort = req.getServerPort();
     int prevDatanodePort = datanodePort;
     if (startOffset == 0) {
-      List<LocatedBlock> blocks = dfs.namenode.getBlockLocations(filename, 0,
+      List<LocatedBlock> blocks = dfs.getNamenode().getBlockLocations(filename, 0,
           Long.MAX_VALUE).getLocatedBlocks();
       for (int i = 0; i < blocks.size(); i++) {
         if (blocks.get(i).getBlock().getBlockId() == blockId) {
@@ -546,7 +546,7 @@
     // fetch the block from the datanode that has the last block for this file
     final DFSClient dfs = new DFSClient(datanode.getNameNodeAddr(),
         JspHelper.conf);
-    List<LocatedBlock> blocks = dfs.namenode.getBlockLocations(filename, 0,
+    List<LocatedBlock> blocks = dfs.getNamenode().getBlockLocations(filename, 0,
         Long.MAX_VALUE).getLocatedBlocks();
     if (blocks == null || blocks.size() == 0) {
       out.print("No datanodes contain blocks of file " + filename);

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=800624&r1=800623&r2=800624&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Mon Aug  3 23:34:04 2009
@@ -171,7 +171,7 @@
       return Block.GRANDFATHER_GENERATION_STAMP;
     }
 
-    void getVolumeMap(HashMap<Block, DatanodeBlockInfo> volumeMap, FSVolume volume) {
+    void getVolumeMap(HashMap<Block, ReplicaInfo> volumeMap, FSVolume volume) {
       if (children != null) {
         for (int i = 0; i < children.length; i++) {
           children[i].getVolumeMap(volumeMap, volume);
@@ -183,7 +183,7 @@
         if (Block.isBlockFilename(blockFiles[i])) {
           long genStamp = getGenerationStampFromFile(blockFiles, blockFiles[i]);
           volumeMap.put(new Block(blockFiles[i], blockFiles[i].length(), genStamp), 
-                        new DatanodeBlockInfo(volume, blockFiles[i]));
+                        new ReplicaInfo(volume, blockFiles[i]));
         }
       }
     }
@@ -403,7 +403,7 @@
       DiskChecker.checkDir(tmpDir);
     }
       
-    void getVolumeMap(HashMap<Block, DatanodeBlockInfo> volumeMap) {
+    void getVolumeMap(HashMap<Block, ReplicaInfo> volumeMap) {
       dataDir.getVolumeMap(volumeMap, this);
     }
       
@@ -496,7 +496,7 @@
       return remaining;
     }
       
-    synchronized void getVolumeMap(HashMap<Block, DatanodeBlockInfo> volumeMap) {
+    synchronized void getVolumeMap(HashMap<Block, ReplicaInfo> volumeMap) {
       for (int idx = 0; idx < volumes.length; idx++) {
         volumes[idx].getVolumeMap(volumeMap);
       }
@@ -653,7 +653,7 @@
   FSVolumeSet volumes;
   private HashMap<Block,ActiveFile> ongoingCreates = new HashMap<Block,ActiveFile>();
   private int maxBlocksPerDir = 0;
-  HashMap<Block,DatanodeBlockInfo> volumeMap = null;
+  HashMap<Block,ReplicaInfo> volumeMap = null;
   static  Random random = new Random();
 
   // Used for synchronizing access to usage stats
@@ -669,7 +669,7 @@
       volArray[idx] = new FSVolume(storage.getStorageDir(idx).getCurrentDir(), conf);
     }
     volumes = new FSVolumeSet(volArray);
-    volumeMap = new HashMap<Block, DatanodeBlockInfo>();
+    volumeMap = new HashMap<Block, ReplicaInfo>();
     volumes.getVolumeMap(volumeMap);
     registerMBean(storage.getStorageID());
   }
@@ -742,7 +742,7 @@
   public synchronized BlockInputStreams getTmpInputStreams(Block b, 
                           long blkOffset, long ckoff) throws IOException {
 
-    DatanodeBlockInfo info = volumeMap.get(b);
+    ReplicaInfo info = volumeMap.get(b);
     if (info == null) {
       throw new IOException("Block " + b + " does not exist in volumeMap.");
     }
@@ -777,7 +777,7 @@
    * @return - true if the specified block was detached
    */
   public boolean detachBlock(Block block, int numLinks) throws IOException {
-    DatanodeBlockInfo info = null;
+    ReplicaInfo info = null;
 
     synchronized (this) {
       info = volumeMap.get(block);
@@ -1006,12 +1006,12 @@
         v = volumes.getNextVolume(blockSize);
         // create temporary file to hold block in the designated volume
         f = createTmpFile(v, b);
-        volumeMap.put(b, new DatanodeBlockInfo(v));
+        volumeMap.put(b, new ReplicaInfo(v));
       } else if (f != null) {
         DataNode.LOG.info("Reopen already-open Block for append " + b);
         // create or reuse temporary file to hold block in the designated volume
         v = volumeMap.get(b).getVolume();
-        volumeMap.put(b, new DatanodeBlockInfo(v));
+        volumeMap.put(b, new ReplicaInfo(v));
       } else {
         // reopening block for appending to it.
         DataNode.LOG.info("Reopen Block for append " + b);
@@ -1042,7 +1042,7 @@
                                   " to tmp dir " + f);
           }
         }
-        volumeMap.put(b, new DatanodeBlockInfo(v));
+        volumeMap.put(b, new ReplicaInfo(v));
       }
       if (f == null) {
         DataNode.LOG.warn("Block " + b + " reopen failed " +
@@ -1147,7 +1147,7 @@
         
     File dest = null;
     dest = v.addBlock(b, f);
-    volumeMap.put(b, new DatanodeBlockInfo(v, dest));
+    volumeMap.put(b, new ReplicaInfo(v, dest));
     ongoingCreates.remove(b);
   }
 
@@ -1248,7 +1248,7 @@
 
   /** {@inheritDoc} */
   public void validateBlockMetadata(Block b) throws IOException {
-    DatanodeBlockInfo info = volumeMap.get(b);
+    ReplicaInfo info = volumeMap.get(b);
     if (info == null) {
       throw new IOException("Block " + b + " does not exist in volumeMap.");
     }
@@ -1306,7 +1306,7 @@
       FSVolume v;
       synchronized (this) {
         f = getFile(invalidBlks[i]);
-        DatanodeBlockInfo dinfo = volumeMap.get(invalidBlks[i]);
+        ReplicaInfo dinfo = volumeMap.get(invalidBlks[i]);
         if (dinfo == null) {
           DataNode.LOG.warn("Unexpected error trying to delete block "
                            + invalidBlks[i] + 
@@ -1369,7 +1369,7 @@
    * Turn the block identifier into a filename.
    */
   public synchronized File getFile(Block b) {
-    DatanodeBlockInfo info = volumeMap.get(b);
+    ReplicaInfo info = volumeMap.get(b);
     if (info != null) {
       return info.getFile();
     }
@@ -1448,8 +1448,8 @@
    * generation stamp</li>
    * <li>If the block length in memory does not match the actual block file length
    * then mark the block as corrupt and update the block length in memory</li>
-   * <li>If the file in {@link DatanodeBlockInfo} does not match the file on
-   * the disk, update {@link DatanodeBlockInfo} with the correct file</li>
+   * <li>If the file in {@link ReplicaInfo} does not match the file on
+   * the disk, update {@link ReplicaInfo} with the correct file</li>
    * </ul>
    *
    * @param blockId Block that differs
@@ -1472,7 +1472,7 @@
           Block.getGenerationStamp(diskMetaFile.getName()) :
             Block.GRANDFATHER_GENERATION_STAMP;
 
-      DatanodeBlockInfo memBlockInfo = volumeMap.get(block);
+      ReplicaInfo memBlockInfo = volumeMap.get(block);
       if (diskFile == null || !diskFile.exists()) {
         if (memBlockInfo == null) {
           // Block file does not exist and block does not exist in memory
@@ -1507,7 +1507,7 @@
        */
       if (memBlockInfo == null) {
         // Block is missing in memory - add the block to volumeMap
-        DatanodeBlockInfo diskBlockInfo = new DatanodeBlockInfo(vol, diskFile);
+        ReplicaInfo diskBlockInfo = new ReplicaInfo(vol, diskFile);
         Block diskBlock = new Block(diskFile, diskFile.length(), diskGS);
         volumeMap.put(diskBlock, diskBlockInfo);
         if (datanode.blockScanner != null) {
@@ -1540,7 +1540,7 @@
             + memFile.getAbsolutePath()
             + " does not exist. Updating it to the file found during scan "
             + diskFile.getAbsolutePath());
-        DatanodeBlockInfo info = volumeMap.remove(memBlock);
+        ReplicaInfo info = volumeMap.remove(memBlock);
         info.setFile(diskFile);
         memFile = diskFile;
 
@@ -1571,7 +1571,7 @@
           DataNode.LOG.warn("Updating generation stamp for block " + blockId
               + " from " + memBlock.getGenerationStamp() + " to " + gs);
 
-          DatanodeBlockInfo info = volumeMap.remove(memBlock);
+          ReplicaInfo info = volumeMap.remove(memBlock);
           memBlock.setGenerationStamp(gs);
           volumeMap.put(memBlock, info);
         }
@@ -1583,7 +1583,7 @@
         corruptBlock = new Block(memBlock);
         DataNode.LOG.warn("Updating size of block " + blockId + " from "
             + memBlock.getNumBytes() + " to " + memFile.length());
-        DatanodeBlockInfo info = volumeMap.remove(memBlock);
+        ReplicaInfo info = volumeMap.remove(memBlock);
         memBlock.setNumBytes(memFile.length());
         volumeMap.put(memBlock, info);
       }

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java?rev=800624&r1=800623&r2=800624&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java Mon Aug  3 23:34:04 2009
@@ -37,7 +37,6 @@
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.server.namenode.BlocksMap.BlockInfo;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas;
 import org.apache.hadoop.hdfs.server.namenode.UnderReplicatedBlocks.BlockIterator;
 import org.apache.hadoop.security.AccessTokenHandler;
@@ -234,7 +233,7 @@
   /**
    * Get all valid locations of the block
    */
-  ArrayList<String> addBlock(Block block) {
+  ArrayList<String> getValidLocations(Block block) {
     ArrayList<String> machineSet =
       new ArrayList<String>(blocksMap.numNodes(block));
     for(Iterator<DatanodeDescriptor> it =
@@ -249,7 +248,6 @@
     return machineSet;
   }
 
-
   List<LocatedBlock> getBlockLocations(Block[] blocks, long offset,
       long length, int nrBlocksToReturn) throws IOException {
     int curBlk = 0;
@@ -397,43 +395,50 @@
     }
   }
 
-  void markBlockAsCorrupt(Block blk, DatanodeInfo dn) throws IOException {
+  void findAndMarkBlockAsCorrupt(Block blk,
+                                 DatanodeInfo dn) throws IOException {
+    BlockInfo storedBlock = getStoredBlock(blk);
+    if (storedBlock == null) {
+      // Check if the replica is in the blockMap, if not
+      // ignore the request for now. This could happen when BlockScanner
+      // thread of Datanode reports bad block before Block reports are sent
+      // by the Datanode on startup
+      NameNode.stateChangeLog.info("BLOCK* NameSystem.markBlockAsCorrupt: " +
+                                   "block " + blk + " could not be marked as " +
+                                   "corrupt as it does not exist in blocksMap");
+      return;
+    }
+    markBlockAsCorrupt(storedBlock, dn);
+  }
+
+  private void markBlockAsCorrupt(BlockInfo storedBlock,
+                                  DatanodeInfo dn) throws IOException {
+    assert storedBlock != null : "storedBlock should not be null";
     DatanodeDescriptor node = namesystem.getDatanode(dn);
     if (node == null) {
-      throw new IOException("Cannot mark block" + blk.getBlockName() +
+      throw new IOException("Cannot mark block " + 
+                            storedBlock.getBlockName() +
                             " as corrupt because datanode " + dn.getName() +
                             " does not exist. ");
     }
 
-    final BlockInfo storedBlockInfo = blocksMap.getStoredBlock(blk);
-    if (storedBlockInfo == null) {
-      // Check if the replica is in the blockMap, if not
-      // ignore the request for now. This could happen when BlockScanner
-      // thread of Datanode reports bad block before Block reports are sent
-      // by the Datanode on startup
+    INodeFile inode = storedBlock.getINode();
+    if (inode == null) {
       NameNode.stateChangeLog.info("BLOCK NameSystem.markBlockAsCorrupt: " +
-                                   "block " + blk + " could not be marked " +
-                                   "as corrupt as it does not exists in " +
-                                   "blocksMap");
+                                   "block " + storedBlock +
+                                   " could not be marked as corrupt as it" +
+                                   " does not belong to any file");
+      addToInvalidates(storedBlock, node);
+      return;
+    } 
+    // Add this replica to corruptReplicas Map
+    corruptReplicas.addToCorruptReplicasMap(storedBlock, node);
+    if (countNodes(storedBlock).liveReplicas() > inode.getReplication()) {
+      // the block is over-replicated so invalidate the replicas immediately
+      invalidateBlock(storedBlock, node);
     } else {
-      INodeFile inode = storedBlockInfo.getINode();
-      if (inode == null) {
-        NameNode.stateChangeLog.info("BLOCK NameSystem.markBlockAsCorrupt: " +
-                                     "block " + blk + " could not be marked " +
-                                     "as corrupt as it does not belong to " +
-                                     "any file");
-        addToInvalidates(storedBlockInfo, node);
-        return;
-      } 
-      // Add this replica to corruptReplicas Map
-      corruptReplicas.addToCorruptReplicasMap(storedBlockInfo, node);
-      if (countNodes(storedBlockInfo).liveReplicas() > inode.getReplication()) {
-        // the block is over-replicated so invalidate the replicas immediately
-        invalidateBlock(storedBlockInfo, node);
-      } else {
-        // add the block to neededReplication
-        updateNeededReplications(storedBlockInfo, -1, 0);
-      }
+      // add the block to neededReplication
+      updateNeededReplications(storedBlock, -1, 0);
     }
   }
 
@@ -844,8 +849,9 @@
    * needed replications if this takes care of the problem.
    * @return the block that is stored in blockMap.
    */
-  private Block addStoredBlock(Block block, DatanodeDescriptor node,
-      DatanodeDescriptor delNodeHint) {
+  private Block addStoredBlock(final Block block,
+                               DatanodeDescriptor node,
+                               DatanodeDescriptor delNodeHint) {
     BlockInfo storedBlock = blocksMap.getStoredBlock(block);
     if (storedBlock == null || storedBlock.getINode() == null) {
       // If this block does not belong to anyfile, then we are done.
@@ -858,30 +864,32 @@
       // it will happen in next block report otherwise.
       return block;
     }
+    assert storedBlock != null : "Block must be stored by now";
+    INodeFile fileINode = storedBlock.getINode();
+    assert fileINode != null : "Block must belong to a file";
 
     // add block to the data-node
     boolean added = node.addBlock(storedBlock);
 
-    assert storedBlock != null : "Block must be stored by now";
-
     if (block != storedBlock) {
-      if (block.getNumBytes() >= 0) {
-        long cursize = storedBlock.getNumBytes();
+      long cursize = storedBlock.getNumBytes();
+      long newsize = block.getNumBytes();
+      if (newsize >= 0) {
         if (cursize == 0) {
-          storedBlock.setNumBytes(block.getNumBytes());
-        } else if (cursize != block.getNumBytes()) {
+          storedBlock.setNumBytes(newsize);
+        } else if (cursize != newsize) {
           FSNamesystem.LOG.warn("Inconsistent size for block " + block +
                    " reported from " + node.getName() +
                    " current size is " + cursize +
-                   " reported size is " + block.getNumBytes());
+                   " reported size is " + newsize);
           try {
-            if (cursize > block.getNumBytes()) {
+            if (cursize > newsize) {
               // new replica is smaller in size than existing block.
               // Mark the new replica as corrupt.
               FSNamesystem.LOG.warn("Mark new replica "
                   + block + " from " + node.getName() + " as corrupt "
                   + "because length is shorter than existing ones");
-              markBlockAsCorrupt(block, node);
+              markBlockAsCorrupt(storedBlock, node);
             } else {
               // new replica is larger in size than existing block.
               // Mark pre-existing replicas as corrupt.
@@ -899,19 +907,12 @@
                 FSNamesystem.LOG.warn("Mark existing replica "
                         + block + " from " + node.getName() + " as corrupt "
                         + "because its length is shorter than the new one");
-                markBlockAsCorrupt(block, nodes[j]);
+                markBlockAsCorrupt(storedBlock, nodes[j]);
               }
               //
               // change the size of block in blocksMap
               //
-              storedBlock = blocksMap.getStoredBlock(block); // extra look up!
-              if (storedBlock == null) {
-                FSNamesystem.LOG.warn("Block " + block + " reported from "
-                    + node.getName()
-                    + " does not exist in blockMap. Surprise! Surprise!");
-              } else {
-                storedBlock.setNumBytes(block.getNumBytes());
-              }
+              storedBlock.setNumBytes(newsize);
             }
           } catch (IOException e) {
             FSNamesystem.LOG.warn("Error in deleting bad block " + block + e);
@@ -919,17 +920,15 @@
         }
 
         // Updated space consumed if required.
-        INodeFile file = (storedBlock != null) ? storedBlock.getINode() : null;
-        long diff = (file == null) ? 0 :
-                    (file.getPreferredBlockSize() - storedBlock.getNumBytes());
+        long diff = fileINode.getPreferredBlockSize() - storedBlock.getNumBytes();
         
-        if (diff > 0 && file.isUnderConstruction() &&
+        if (diff > 0 && fileINode.isUnderConstruction() &&
             cursize < storedBlock.getNumBytes()) {
           try {
             String path = /* For finding parents */
-            namesystem.leaseManager.findPath((INodeFileUnderConstruction) file);
+            namesystem.leaseManager.findPath((INodeFileUnderConstruction)fileINode);
             namesystem.dir.updateSpaceConsumed(path, 0, -diff
-                * file.getReplication());
+                * fileINode.getReplication());
           } catch (IOException e) {
             FSNamesystem.LOG
                 .warn("Unexpected exception while updating disk space : "
@@ -937,12 +936,9 @@
           }
         }
       }
-      block = storedBlock;
     }
-    assert storedBlock == block : "Block must be stored by now";
 
     int curReplicaDelta = 0;
-
     if (added) {
       curReplicaDelta = 1;
       //
@@ -952,20 +948,20 @@
       //
       if (!namesystem.isInSafeMode()) {
         NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: "
-            + "blockMap updated: " + node.getName() + " is added to " + block
-            + " size " + block.getNumBytes());
+            + "blockMap updated: " + node.getName() + " is added to " + 
+            storedBlock + " size " + storedBlock.getNumBytes());
       }
     } else {
       NameNode.stateChangeLog.warn("BLOCK* NameSystem.addStoredBlock: "
-          + "Redundant addStoredBlock request received for " + block + " on "
-          + node.getName() + " size " + block.getNumBytes());
+          + "Redundant addStoredBlock request received for " + storedBlock
+          + " on " + node.getName() + " size " + storedBlock.getNumBytes());
     }
 
     // filter out containingNodes that are marked for decommission.
     NumberReplicas num = countNodes(storedBlock);
     int numLiveReplicas = num.liveReplicas();
     int numCurrentReplica = numLiveReplicas
-      + pendingReplications.getNumReplicas(block);
+      + pendingReplications.getNumReplicas(storedBlock);
 
     // check whether safe replication is reached for the block
     namesystem.incrementSafeBlockCount(numCurrentReplica);
@@ -974,39 +970,37 @@
     // if file is being actively written to, then do not check
     // replication-factor here. It will be checked when the file is closed.
     //
-    INodeFile fileINode = null;
-    fileINode = storedBlock.getINode();
     if (fileINode.isUnderConstruction()) {
-      return block;
+      return storedBlock;
     }
 
     // do not handle mis-replicated blocks during startup
     if (namesystem.isInSafeMode())
-      return block;
+      return storedBlock;
 
     // handle underReplication/overReplication
     short fileReplication = fileINode.getReplication();
     if (numCurrentReplica >= fileReplication) {
-      neededReplications.remove(block, numCurrentReplica,
+      neededReplications.remove(storedBlock, numCurrentReplica,
           num.decommissionedReplicas, fileReplication);
     } else {
-      updateNeededReplications(block, curReplicaDelta, 0);
+      updateNeededReplications(storedBlock, curReplicaDelta, 0);
     }
     if (numCurrentReplica > fileReplication) {
-      processOverReplicatedBlock(block, fileReplication, node, delNodeHint);
+      processOverReplicatedBlock(storedBlock, fileReplication, node, delNodeHint);
     }
     // If the file replication has reached desired value
     // we can remove any corrupt replicas the block may have
-    int corruptReplicasCount = corruptReplicas.numCorruptReplicas(block);
+    int corruptReplicasCount = corruptReplicas.numCorruptReplicas(storedBlock);
     int numCorruptNodes = num.corruptReplicas();
     if (numCorruptNodes != corruptReplicasCount) {
       FSNamesystem.LOG.warn("Inconsistent number of corrupt replicas for " +
-          block + "blockMap has " + numCorruptNodes + 
+          storedBlock + "blockMap has " + numCorruptNodes + 
           " but corrupt replicas map has " + corruptReplicasCount);
     }
     if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication))
-      invalidateCorruptReplicas(block);
-    return block;
+      invalidateCorruptReplicas(storedBlock);
+    return storedBlock;
   }
 
   /**
@@ -1051,7 +1045,7 @@
     long nrInvalid = 0, nrOverReplicated = 0, nrUnderReplicated = 0;
     synchronized (namesystem) {
       neededReplications.clear();
-      for (BlocksMap.BlockInfo block : blocksMap.getBlocks()) {
+      for (BlockInfo block : blocksMap.getBlocks()) {
         INodeFile fileINode = block.getINode();
         if (fileINode == null) {
           // block does not belong to any file
@@ -1415,13 +1409,13 @@
     blocksMap.removeBlock(block);
   }
   
-  public int getCapacity() {
+  int getCapacity() {
     synchronized(namesystem) {
       return blocksMap.getCapacity();
     }
   }
   
-  public float getLoadFactor() {
+  float getLoadFactor() {
     return blocksMap.getLoadFactor();
   }
 }