You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ha...@apache.org on 2009/08/04 01:34:05 UTC
svn commit: r800624 [1/2] - in /hadoop/hdfs/branches/HDFS-265: ./ ivy/
src/contrib/hdfsproxy/ src/java/ src/java/org/apache/hadoop/hdfs/
src/java/org/apache/hadoop/hdfs/protocol/
src/java/org/apache/hadoop/hdfs/server/balancer/ src/java/org/apache/hado...
Author: hairong
Date: Mon Aug 3 23:34:04 2009
New Revision: 800624
URL: http://svn.apache.org/viewvc?rev=800624&view=rev
Log:
Merge -r 796828:800617 from trunk to branch HDFS-265
Added:
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
- copied unchanged from r800617, hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfo.java
- copied unchanged from r800617, hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfo.java
Removed:
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeBlockInfo.java
Modified:
hadoop/hdfs/branches/HDFS-265/ (props changed)
hadoop/hdfs/branches/HDFS-265/CHANGES.txt
hadoop/hdfs/branches/HDFS-265/build.xml (contents, props changed)
hadoop/hdfs/branches/HDFS-265/ivy.xml
hadoop/hdfs/branches/HDFS-265/ivy/libraries.properties
hadoop/hdfs/branches/HDFS-265/src/contrib/hdfsproxy/ (props changed)
hadoop/hdfs/branches/HDFS-265/src/java/ (props changed)
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
hadoop/hdfs/branches/HDFS-265/src/test/findbugsExcludeFile.xml
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/ (props changed)
hadoop/hdfs/branches/HDFS-265/src/test/hdfs-with-mr/ (props changed)
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestAbandonBlock.java
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSPermission.java
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFSInputChecker.java
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend.java
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestGetBlocks.java
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestInjectionForSimulatedStorage.java
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery.java
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestReplication.java
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestSetTimes.java
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/security/TestPermission.java
hadoop/hdfs/branches/HDFS-265/src/webapps/datanode/ (props changed)
hadoop/hdfs/branches/HDFS-265/src/webapps/hdfs/ (props changed)
hadoop/hdfs/branches/HDFS-265/src/webapps/secondary/ (props changed)
Propchange: hadoop/hdfs/branches/HDFS-265/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Aug 3 23:34:04 2009
@@ -1 +1,2 @@
/hadoop/core/branches/branch-0.19/hdfs:713112
+/hadoop/hdfs/trunk:796829-800617
Modified: hadoop/hdfs/branches/HDFS-265/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/CHANGES.txt?rev=800624&r1=800623&r2=800624&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-265/CHANGES.txt Mon Aug 3 23:34:04 2009
@@ -45,6 +45,31 @@
HDFS-458. Create a new ant target, run-commit-test. (Jakob Homan
via szetszwo)
+ HDFS-493. Change build.xml so that the fault-injected tests are executed
+ only by the run-test-*-faul-inject targets. (Konstantin Boudnik via
+ szetszwo)
+
+ HADOOP-6160. Fix releaseaudit target to run on specific directories.
+ (gkesavan)
+
+ HDFS-508. Factor out BlockInfo from BlocksMap. (shv)
+
+ HDFS-510. Rename DatanodeBlockInfo to be ReplicaInfo.
+ (Jakob Homan & Hairong Kuang via shv)
+
+ HDFS-500. Deprecate NameNode methods deprecated in NameNodeProtocol.
+ (Jakob Homan via shv)
+
+ HDFS-514. Change DFSClient.namenode from public to private. (Bill Zeller
+ via szetszwo)
+
+ HDFS-496. Use PureJavaCrc32 in HDFS. (Todd Lipcon via szetszwo)
+
+ HDFS-511. Remove redundant block searches in BlockManager. (shv)
+
+ HDFS-504. Update the modification time of a file when the file
+ is closed. (Chun Zhang via dhruba)
+
BUG FIXES
HDFS-76. Better error message to users when commands fail because of
lack of quota. Allow quota to be set even if the limit is lower than
@@ -89,6 +114,21 @@
HDFS-463. CreateEditLog utility broken after HDFS-396 (URI for
FSImage). (Suresh Srinivas via rangadi)
+ HDFS-484. Fix bin-package and package target to package jar files.
+ (gkesavan)
+
+ HDFS-501. Use enum to define the constants in DataTransferProtocol.
+ (szetszwo)
+
+ HDFS-490. Eliminate the deprecated warnings introduced by H-5438.
+ (He Yongqiang via szetszwo)
+
+ HDFS-119. Fix a bug in logSync(), which causes NameNode block forever.
+ (Suresh Srinivas via shv)
+
+ HDFS-167. Fix a bug in DFSClient that caused infinite retries on write.
+ (Bill Zeller via szetszwo)
+
Release 0.20.1 - Unreleased
IMPROVEMENTS
Modified: hadoop/hdfs/branches/HDFS-265/build.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/build.xml?rev=800624&r1=800623&r2=800624&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/build.xml (original)
+++ hadoop/hdfs/branches/HDFS-265/build.xml Mon Aug 3 23:34:04 2009
@@ -362,6 +362,8 @@
description="Run Fault Injection related hdfs tests">
<subant buildpath="build.xml" target="run-test-hdfs">
<property name="build.dir" value="${build-fi.dir}"/>
+ <property name="test.fault.inject" value="yes"/>
+ <property name="test.include" value="TestFi*"/>
</subant>
</target>
@@ -369,6 +371,8 @@
description="Run hdfs Fault Injection related unit tests that require mapred">
<subant buildpath="build.xml" target="run-test-hdfs-with-mr">
<property name="build.dir" value="${build-fi.dir}"/>
+ <property name="test.fault.inject" value="yes"/>
+ <property name="test.include" value="TestFi*"/>
</subant>
</target>
@@ -551,6 +555,35 @@
</jar>
</target>
+ <condition property="tests.notestcase">
+ <and>
+ <isfalse value="${test.fault.inject}"/>
+ <not>
+ <isset property="testcase"/>
+ </not>
+ </and>
+ </condition>
+ <condition property="tests.notestcase.fi">
+ <and>
+ <not>
+ <isset property="testcase" />
+ </not>
+ <istrue value="${test.fault.inject}" />
+ </and>
+ </condition>
+ <condition property="tests.testcase">
+ <and>
+ <isfalse value="${test.fault.inject}" />
+ <isset property="testcase" />
+ </and>
+ </condition>
+ <condition property="tests.testcase.fi">
+ <and>
+ <istrue value="${test.fault.inject}" />
+ <isset property="testcase" />
+ </and>
+ </condition>
+
<!-- ================================================================== -->
<!-- Run unit tests -->
<!-- ================================================================== -->
@@ -586,15 +619,22 @@
<propertyref regex="fi.*"/>
</syspropertyset>
<formatter type="${test.junit.output.format}" />
- <batchtest todir="${test.build.dir}" unless="testcase">
+ <batchtest todir="${test.build.dir}" if="tests.notestcase">
<fileset dir="${test.src.dir}/hdfs" excludes="**/${test.exclude}.java">
<patternset>
<includesfile name="@{test.file}"/>
</patternset>
</fileset>
</batchtest>
- <batchtest todir="${test.build.dir}" if="testcase">
+ <batchtest todir="${test.build.dir}" if="tests.notestcase.fi">
+ <fileset dir="${test.src.dir}/aop"
+ includes="**/${test.include}.java"
+ excludes="**/${test.exclude}.java" />
+ </batchtest>
+ <batchtest todir="${test.build.dir}" if="tests.testcase">
<fileset dir="${test.src.dir}/hdfs" includes="**/${testcase}.java"/>
+ </batchtest>
+ <batchtest todir="${test.build.dir}" if="tests.testcase.fi">
<fileset dir="${test.src.dir}/aop" includes="**/${testcase}.java"/>
</batchtest>
</junit>
@@ -640,14 +680,22 @@
<propertyref regex="fi.*"/>
</syspropertyset>
<formatter type="${test.junit.output.format}" />
- <batchtest todir="${test.build.dir}" unless="testcase">
+ <batchtest todir="${test.build.dir}" if="tests.notestcase">
<fileset dir="${test.src.dir}/hdfs-with-mr"
includes="**/${test.include}.java"
- excludes="**/${test.exclude}.java" />
+ excludes="**/${test.exclude}.java" />
+ </batchtest>
+ <batchtest todir="${test.build.dir}" if="tests.notestcase.fi">
+ <fileset dir="${test.src.dir}/aop"
+ includes="**/${test.include}.java"
+ excludes="**/${test.exclude}.java" />
</batchtest>
- <batchtest todir="${test.build.dir}" if="testcase">
+ <batchtest todir="${test.build.dir}" if="tests.testcase">
<fileset dir="${test.src.dir}/hdfs-with-mr" includes="**/${testcase}.java"/>
</batchtest>
+ <batchtest todir="${test.build.dir}" if="tests.testcase.fi">
+ <fileset dir="${test.src.dir}/aop" includes="**/${testcase}.java"/>
+ </batchtest>
</junit>
<antcall target="checkfailure"/>
</target>
@@ -956,7 +1004,7 @@
</copy>
<copy todir="${dist.dir}">
- <fileset file="${build.dir}/${final.name}-*.jar"/>
+ <fileset file="${build.dir}/${name}-*.jar"/>
</copy>
<copy todir="${dist.dir}/conf">
@@ -1039,7 +1087,7 @@
</copy>
<copy todir="${dist.dir}">
- <fileset file="${build.dir}/${final.name}-*.jar"/>
+ <fileset file="${build.dir}/${name}-*.jar"/>
</copy>
<copy todir="${dist.dir}/conf">
@@ -1081,12 +1129,19 @@
<!-- ================================================================== -->
<!-- Perform audit activities for the release -->
<!-- ================================================================== -->
- <target name="releaseaudit" depends="package,ivy-retrieve-releaseaudit" description="Release Audit activities">
- <fail unless="rat.present" message="Failed to load class [${rat.reporting.classname}]."/>
- <java classname="${rat.reporting.classname}" fork="true">
- <classpath refid="releaseaudit-classpath"/>
- <arg value="${build.dir}/${final.name}"/>
- </java>
+ <target name="rats-taskdef" depends="ivy-retrieve-releaseaudit">
+ <typedef format="xml" resource="org/apache/rat/anttasks/antlib.xml" uri="antlib:org.apache.rat.anttasks"
+ classpathref="releaseaudit-classpath"/>
+ </target>
+
+ <target name="releaseaudit" depends="package, rats-taskdef" description="Release Audit activities">
+ <rat:report xmlns:rat="antlib:org.apache.rat.anttasks">
+ <fileset dir="${dist.dir}">
+ <exclude name="CHANGES.txt"/>
+ <exclude name="docs/"/>
+ <exclude name="lib/jdiff/"/>
+ </fileset>
+ </rat:report>
</target>
<!-- ================================================================== -->
@@ -1360,8 +1415,6 @@
<ivy:retrieve settingsRef="${ant.project.name}.ivy.settings"
pattern="${build.ivy.lib.dir}/${ivy.artifact.retrieve.pattern}" />
<ivy:cachepath pathid="releaseaudit-classpath" conf="releaseaudit"/>
- <available classname="${rat.reporting.classname}"
- classpathref="releaseaudit-classpath" property="rat.present" value="true"/>
</target>
<target name="ivy-report" depends="ivy-resolve-releaseaudit"
Propchange: hadoop/hdfs/branches/HDFS-265/build.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Aug 3 23:34:04 2009
@@ -1,2 +1,3 @@
/hadoop/core/branches/branch-0.19/hdfs/build.xml:713112
/hadoop/core/trunk/build.xml:779102
+/hadoop/hdfs/trunk/build.xml:796829-800617
Modified: hadoop/hdfs/branches/HDFS-265/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/ivy.xml?rev=800624&r1=800623&r2=800624&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/ivy.xml (original)
+++ hadoop/hdfs/branches/HDFS-265/ivy.xml Mon Aug 3 23:34:04 2009
@@ -224,8 +224,8 @@
name="junit"
rev="${junit.version}"
conf="common->default"/>
- <dependency org="com.google.code.p.arat"
- name="rat-lib"
+ <dependency org="org.apache.rat"
+ name="apache-rat-tasks"
rev="${rats-lib.version}"
conf="releaseaudit->default"/>
<dependency org="commons-lang"
Modified: hadoop/hdfs/branches/HDFS-265/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/ivy/libraries.properties?rev=800624&r1=800623&r2=800624&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/ivy/libraries.properties (original)
+++ hadoop/hdfs/branches/HDFS-265/ivy/libraries.properties Mon Aug 3 23:34:04 2009
@@ -60,7 +60,7 @@
oro.version=2.0.8
-rats-lib.version=0.5.1
+rats-lib.version=0.6
servlet.version=4.0.6
servlet-api-2.5.version=6.1.14
Propchange: hadoop/hdfs/branches/HDFS-265/src/contrib/hdfsproxy/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Aug 3 23:34:04 2009
@@ -1,2 +1,3 @@
/hadoop/core/branches/branch-0.19/hdfs/src/contrib/hdfsproxy:713112
/hadoop/core/trunk/src/contrib/hdfsproxy:776175-784663
+/hadoop/hdfs/trunk/src/contrib/hdfsproxy:796829-800617
Propchange: hadoop/hdfs/branches/HDFS-265/src/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Aug 3 23:34:04 2009
@@ -1,2 +1,3 @@
/hadoop/core/branches/branch-0.19/hdfs/src/java:713112
/hadoop/core/trunk/src/hdfs:776175-785643,785929-786278
+/hadoop/hdfs/trunk/src/java:796829-800617
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=800624&r1=800623&r2=800624&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java Mon Aug 3 23:34:04 2009
@@ -17,42 +17,100 @@
*/
package org.apache.hadoop.hdfs;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.io.retry.RetryPolicies;
-import org.apache.hadoop.io.retry.RetryPolicy;
-import org.apache.hadoop.io.retry.RetryProxy;
-import org.apache.hadoop.fs.*;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Op.BLOCK_CHECKSUM;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.CHECKSUM_OK;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.net.SocketFactory;
+import javax.security.auth.login.LoginException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSInputChecker;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FSOutputSummer;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsStatus;
+import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.ipc.*;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.net.NodeBase;
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
-import org.apache.hadoop.security.InvalidAccessTokenException;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.EnumSetWritable;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.AccessToken;
+import org.apache.hadoop.security.InvalidAccessTokenException;
import org.apache.hadoop.security.UnixUserGroupInformation;
-import org.apache.hadoop.util.*;
-
-import org.apache.commons.logging.*;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
-import java.util.zip.CRC32;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ConcurrentHashMap;
-import java.nio.BufferOverflowException;
-import java.nio.ByteBuffer;
-
-import javax.net.SocketFactory;
-import javax.security.auth.login.LoginException;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.PureJavaCrc32;
+import org.apache.hadoop.util.StringUtils;
/********************************************************
* DFSClient can connect to a Hadoop Filesystem and
@@ -69,8 +127,8 @@
public static final Log LOG = LogFactory.getLog(DFSClient.class);
public static final int MAX_BLOCK_ACQUIRE_FAILURES = 3;
private static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
- public final ClientProtocol namenode;
- final private ClientProtocol rpcNamenode;
+ private ClientProtocol namenode;
+ private ClientProtocol rpcNamenode;
final UnixUserGroupInformation ugi;
volatile boolean clientRunning = true;
Random r = new Random();
@@ -161,6 +219,29 @@
public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf,
FileSystem.Statistics stats)
throws IOException {
+ this(conf, stats);
+ this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi);
+ this.namenode = createNamenode(this.rpcNamenode);
+ }
+
+ /**
+ * Create a new DFSClient connected to the given namenode
+ * and rpcNamenode objects.
+ *
+ * This constructor was written to allow easy testing of the DFSClient class.
+ * End users will most likely want to use one of the other constructors.
+ */
+ public DFSClient(ClientProtocol namenode, ClientProtocol rpcNamenode,
+ Configuration conf, FileSystem.Statistics stats)
+ throws IOException {
+ this(conf, stats);
+ this.namenode = namenode;
+ this.rpcNamenode = rpcNamenode;
+ }
+
+
+ private DFSClient(Configuration conf, FileSystem.Statistics stats)
+ throws IOException {
this.conf = conf;
this.stats = stats;
this.socketTimeout = conf.getInt("dfs.socket.timeout",
@@ -182,9 +263,6 @@
throw (IOException)(new IOException().initCause(e));
}
- this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi);
- this.namenode = createNamenode(rpcNamenode);
-
String taskId = conf.get("mapred.task.id");
if (taskId != null) {
this.clientName = "DFSClient_" + taskId;
@@ -372,6 +450,14 @@
return create(src, overwrite, replication, blockSize, null);
}
+ /**
+ * Get the namenode associated with this DFSClient object
+ * @return the namenode associated with this DFSClient object
+ */
+ public ClientProtocol getNamenode() {
+ return namenode;
+ }
+
/**
* Create a new dfs file with the specified block replication
@@ -619,15 +705,14 @@
try {
if (LOG.isDebugEnabled()) {
LOG.debug("write to " + datanodes[j].getName() + ": "
- + DataTransferProtocol.OP_BLOCK_CHECKSUM +
- ", block=" + block);
+ + BLOCK_CHECKSUM + ", block=" + block);
}
DataTransferProtocol.Sender.opBlockChecksum(out, block.getBlockId(),
block.getGenerationStamp(), lb.getAccessToken());
- final short reply = in.readShort();
- if (reply != DataTransferProtocol.OP_STATUS_SUCCESS) {
- if (reply == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN
+ final DataTransferProtocol.Status reply = DataTransferProtocol.Status.read(in);
+ if (reply != SUCCESS) {
+ if (reply == ERROR_ACCESS_TOKEN
&& i > lastRetriedIndex) {
if (LOG.isDebugEnabled()) {
LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM "
@@ -1353,9 +1438,9 @@
new BufferedInputStream(NetUtils.getInputStream(sock),
bufferSize));
- short status = in.readShort();
- if (status != DataTransferProtocol.OP_STATUS_SUCCESS) {
- if (status == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN) {
+ DataTransferProtocol.Status status = DataTransferProtocol.Status.read(in);
+ if (status != SUCCESS) {
+ if (status == ERROR_ACCESS_TOKEN) {
throw new InvalidAccessTokenException(
"Got access token error in response to OP_READ_BLOCK "
+ "for file " + file + " for block " + blockId);
@@ -1402,9 +1487,7 @@
private void checksumOk(Socket sock) {
try {
OutputStream out = NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT);
- byte buf[] = { (DataTransferProtocol.OP_STATUS_CHECKSUM_OK >>> 8) & 0xff,
- (DataTransferProtocol.OP_STATUS_CHECKSUM_OK) & 0xff };
- out.write(buf);
+ CHECKSUM_OK.writeOutputStream(out);
out.flush();
} catch (IOException e) {
// its ok not to be able to send this.
@@ -2476,8 +2559,9 @@
// processes response status from all datanodes.
for (int i = 0; i < targets.length && clientRunning; i++) {
- short reply = blockReplyStream.readShort();
- if (reply != DataTransferProtocol.OP_STATUS_SUCCESS) {
+ final DataTransferProtocol.Status reply
+ = DataTransferProtocol.Status.read(blockReplyStream);
+ if (reply != SUCCESS) {
errorIndex = i; // first bad datanode
throw new IOException("Bad response " + reply +
" for block " + block +
@@ -2716,7 +2800,7 @@
//
private boolean createBlockOutputStream(DatanodeInfo[] nodes, String client,
boolean recoveryFlag) {
- short pipelineStatus = (short)DataTransferProtocol.OP_STATUS_SUCCESS;
+ DataTransferProtocol.Status pipelineStatus = SUCCESS;
String firstBadLink = "";
if (LOG.isDebugEnabled()) {
for (int i = 0; i < nodes.length; i++) {
@@ -2755,10 +2839,10 @@
out.flush();
// receive ack for connect
- pipelineStatus = blockReplyStream.readShort();
+ pipelineStatus = DataTransferProtocol.Status.read(blockReplyStream);
firstBadLink = Text.readString(blockReplyStream);
- if (pipelineStatus != DataTransferProtocol.OP_STATUS_SUCCESS) {
- if (pipelineStatus == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN) {
+ if (pipelineStatus != SUCCESS) {
+ if (pipelineStatus == ERROR_ACCESS_TOKEN) {
throw new InvalidAccessTokenException(
"Got access token error for connect ack with firstBadLink as "
+ firstBadLink);
@@ -2792,7 +2876,7 @@
}
private LocatedBlock locateFollowingBlock(long start) throws IOException {
- int retries = 5;
+ int retries = conf.getInt("dfs.client.block.write.locateFollowingBlock.retries", 5);
long sleeptime = 400;
while (true) {
long localstart = System.currentTimeMillis();
@@ -2808,26 +2892,32 @@
if (ue != e) {
throw ue; // no need to retry these exceptions
}
-
- if (--retries == 0 &&
- !NotReplicatedYetException.class.getName().
+
+
+ if (NotReplicatedYetException.class.getName().
equals(e.getClassName())) {
- throw e;
- } else {
- LOG.info(StringUtils.stringifyException(e));
- if (System.currentTimeMillis() - localstart > 5000) {
- LOG.info("Waiting for replication for "
- + (System.currentTimeMillis() - localstart) / 1000
- + " seconds");
- }
- try {
- LOG.warn("NotReplicatedYetException sleeping " + src
- + " retries left " + retries);
- Thread.sleep(sleeptime);
- sleeptime *= 2;
- } catch (InterruptedException ie) {
+ if (retries == 0) {
+ throw e;
+ } else {
+ --retries;
+ LOG.info(StringUtils.stringifyException(e));
+ if (System.currentTimeMillis() - localstart > 5000) {
+ LOG.info("Waiting for replication for "
+ + (System.currentTimeMillis() - localstart) / 1000
+ + " seconds");
+ }
+ try {
+ LOG.warn("NotReplicatedYetException sleeping " + src
+ + " retries left " + retries);
+ Thread.sleep(sleeptime);
+ sleeptime *= 2;
+ } catch (InterruptedException ie) {
+ }
}
- }
+ } else {
+ throw e;
+ }
+
}
}
}
@@ -2919,7 +3009,7 @@
private DFSOutputStream(String src, long blockSize, Progressable progress,
int bytesPerChecksum) throws IOException {
- super(new CRC32(), bytesPerChecksum, 4);
+ super(new PureJavaCrc32(), bytesPerChecksum, 4);
this.src = src;
this.blockSize = blockSize;
this.progress = progress;
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=800624&r1=800623&r2=800624&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java Mon Aug 3 23:34:04 2009
@@ -17,9 +17,12 @@
*/
package org.apache.hadoop.hdfs.protocol;
+import java.io.DataInput;
import java.io.DataInputStream;
+import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.AccessToken;
@@ -42,39 +45,136 @@
*/
public static final int DATA_TRANSFER_VERSION = 16;
- // Processed at datanode stream-handler
- public static final byte OP_WRITE_BLOCK = (byte) 80;
- public static final byte OP_READ_BLOCK = (byte) 81;
- /**
- * @deprecated As of version 15, OP_READ_METADATA is no longer supported
- */
- @Deprecated public static final byte OP_READ_METADATA = (byte) 82;
- public static final byte OP_REPLACE_BLOCK = (byte) 83;
- public static final byte OP_COPY_BLOCK = (byte) 84;
- public static final byte OP_BLOCK_CHECKSUM = (byte) 85;
+ /** Operation */
+ public enum Op {
+ WRITE_BLOCK((byte)80),
+ READ_BLOCK((byte)81),
+ READ_METADATA((byte)82),
+ REPLACE_BLOCK((byte)83),
+ COPY_BLOCK((byte)84),
+ BLOCK_CHECKSUM((byte)85);
+
+ /** The code for this operation. */
+ public final byte code;
+
+ private Op(byte code) {
+ this.code = code;
+ }
+
+ private static final int FIRST_CODE = values()[0].code;
+ /** Return the object represented by the code. */
+ private static Op valueOf(byte code) {
+ final int i = (code & 0xff) - FIRST_CODE;
+ return i < 0 || i >= values().length? null: values()[i];
+ }
+
+ /** Read from in */
+ public static Op read(DataInput in) throws IOException {
+ return valueOf(in.readByte());
+ }
+
+ /** Write to out */
+ public void write(DataOutput out) throws IOException {
+ out.write(code);
+ }
+ };
+
+ /** Status */
+ public enum Status {
+ SUCCESS(0),
+ ERROR(1),
+ ERROR_CHECKSUM(2),
+ ERROR_INVALID(3),
+ ERROR_EXISTS(4),
+ ERROR_ACCESS_TOKEN(5),
+ CHECKSUM_OK(6);
+
+ /** The code for this operation. */
+ private final int code;
+
+ private Status(int code) {
+ this.code = code;
+ }
+
+ private static final int FIRST_CODE = values()[0].code;
+ /** Return the object represented by the code. */
+ private static Status valueOf(int code) {
+ final int i = code - FIRST_CODE;
+ return i < 0 || i >= values().length? null: values()[i];
+ }
+
+ /** Read from in */
+ public static Status read(DataInput in) throws IOException {
+ return valueOf(in.readShort());
+ }
+
+ /** Write to out */
+ public void write(DataOutput out) throws IOException {
+ out.writeShort(code);
+ }
+
+ /** Write to out */
+ public void writeOutputStream(OutputStream out) throws IOException {
+ out.write(new byte[] {(byte)(code >>> 8), (byte)code});
+ }
+ };
- public static final int OP_STATUS_SUCCESS = 0;
- public static final int OP_STATUS_ERROR = 1;
- public static final int OP_STATUS_ERROR_CHECKSUM = 2;
- public static final int OP_STATUS_ERROR_INVALID = 3;
- public static final int OP_STATUS_ERROR_EXISTS = 4;
- public static final int OP_STATUS_ERROR_ACCESS_TOKEN = 5;
- public static final int OP_STATUS_CHECKSUM_OK = 6;
+ /** @deprecated Deprecated at 0.21. Use Op.WRITE_BLOCK instead. */
+ @Deprecated
+ public static final byte OP_WRITE_BLOCK = Op.WRITE_BLOCK.code;
+ /** @deprecated Deprecated at 0.21. Use Op.READ_BLOCK instead. */
+ @Deprecated
+ public static final byte OP_READ_BLOCK = Op.READ_BLOCK.code;
+ /** @deprecated As of version 15, OP_READ_METADATA is no longer supported. */
+ @Deprecated
+ public static final byte OP_READ_METADATA = Op.READ_METADATA.code;
+ /** @deprecated Deprecated at 0.21. Use Op.REPLACE_BLOCK instead. */
+ @Deprecated
+ public static final byte OP_REPLACE_BLOCK = Op.REPLACE_BLOCK.code;
+ /** @deprecated Deprecated at 0.21. Use Op.COPY_BLOCK instead. */
+ @Deprecated
+ public static final byte OP_COPY_BLOCK = Op.COPY_BLOCK.code;
+ /** @deprecated Deprecated at 0.21. Use Op.BLOCK_CHECKSUM instead. */
+ @Deprecated
+ public static final byte OP_BLOCK_CHECKSUM = Op.BLOCK_CHECKSUM.code;
+
+
+ /** @deprecated Deprecated at 0.21. Use Status.SUCCESS instead. */
+ @Deprecated
+ public static final int OP_STATUS_SUCCESS = Status.SUCCESS.code;
+ /** @deprecated Deprecated at 0.21. Use Status.ERROR instead. */
+ @Deprecated
+ public static final int OP_STATUS_ERROR = Status.ERROR.code;
+ /** @deprecated Deprecated at 0.21. Use Status.ERROR_CHECKSUM instead. */
+ @Deprecated
+ public static final int OP_STATUS_ERROR_CHECKSUM = Status.ERROR_CHECKSUM.code;
+ /** @deprecated Deprecated at 0.21. Use Status.ERROR_INVALID instead. */
+ @Deprecated
+ public static final int OP_STATUS_ERROR_INVALID = Status.ERROR_INVALID.code;
+ /** @deprecated Deprecated at 0.21. Use Status.ERROR_EXISTS instead. */
+ @Deprecated
+ public static final int OP_STATUS_ERROR_EXISTS = Status.ERROR_EXISTS.code;
+ /** @deprecated Deprecated at 0.21. Use Status.ERROR_ACCESS_TOKEN instead.*/
+ @Deprecated
+ public static final int OP_STATUS_ERROR_ACCESS_TOKEN = Status.ERROR_ACCESS_TOKEN.code;
+ /** @deprecated Deprecated at 0.21. Use Status.CHECKSUM_OK instead. */
+ @Deprecated
+ public static final int OP_STATUS_CHECKSUM_OK = Status.CHECKSUM_OK.code;
/** Sender */
public static class Sender {
/** Initialize a operation. */
- public static void op(DataOutputStream out, int op) throws IOException {
+ public static void op(DataOutputStream out, Op op) throws IOException {
out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
- out.write(op);
+ op.write(out);
}
/** Send OP_READ_BLOCK */
public static void opReadBlock(DataOutputStream out,
long blockId, long blockGs, long blockOffset, long blockLen,
String clientName, AccessToken accessToken) throws IOException {
- op(out, OP_READ_BLOCK);
+ op(out, Op.READ_BLOCK);
out.writeLong(blockId);
out.writeLong(blockGs);
@@ -90,7 +190,7 @@
long blockId, long blockGs, int pipelineSize, boolean isRecovery,
String client, DatanodeInfo src, DatanodeInfo[] targets,
AccessToken accesstoken) throws IOException {
- op(out, OP_WRITE_BLOCK);
+ op(out, Op.WRITE_BLOCK);
out.writeLong(blockId);
out.writeLong(blockGs);
@@ -114,7 +214,7 @@
public static void opReplaceBlock(DataOutputStream out,
long blockId, long blockGs, String storageId, DatanodeInfo src,
AccessToken accesstoken) throws IOException {
- op(out, OP_REPLACE_BLOCK);
+ op(out, Op.REPLACE_BLOCK);
out.writeLong(blockId);
out.writeLong(blockGs);
@@ -127,7 +227,7 @@
/** Send OP_COPY_BLOCK */
public static void opCopyBlock(DataOutputStream out,
long blockId, long blockGs, AccessToken accesstoken) throws IOException {
- op(out, OP_COPY_BLOCK);
+ op(out, Op.COPY_BLOCK);
out.writeLong(blockId);
out.writeLong(blockGs);
@@ -138,7 +238,7 @@
/** Send OP_BLOCK_CHECKSUM */
public static void opBlockChecksum(DataOutputStream out,
long blockId, long blockGs, AccessToken accesstoken) throws IOException {
- op(out, OP_BLOCK_CHECKSUM);
+ op(out, Op.BLOCK_CHECKSUM);
out.writeLong(blockId);
out.writeLong(blockGs);
@@ -150,12 +250,12 @@
/** Receiver */
public static abstract class Receiver {
/** Initialize a operation. */
- public final byte op(DataInputStream in) throws IOException {
+ public final Op op(DataInputStream in) throws IOException {
final short version = in.readShort();
if (version != DATA_TRANSFER_VERSION) {
throw new IOException( "Version Mismatch" );
}
- return in.readByte();
+ return Op.read(in);
}
/** Receive OP_READ_BLOCK */
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=800624&r1=800623&r2=800624&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Mon Aug 3 23:34:04 2009
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.balancer;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
+
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
@@ -376,9 +378,9 @@
/* Receive a block copy response from the input stream */
private void receiveResponse(DataInputStream in) throws IOException {
- short status = in.readShort();
- if (status != DataTransferProtocol.OP_STATUS_SUCCESS) {
- if (status == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN)
+ DataTransferProtocol.Status status = DataTransferProtocol.Status.read(in);
+ if (status != DataTransferProtocol.Status.SUCCESS) {
+ if (status == ERROR_ACCESS_TOKEN)
throw new IOException("block move failed due to access token error");
throw new IOException("block move is failed");
}
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=800624&r1=800623&r2=800624&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Mon Aug 3 23:34:04 2009
@@ -17,6 +17,10 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
+import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
+
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
@@ -25,7 +29,6 @@
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.LinkedList;
-import java.util.zip.CRC32;
import java.util.zip.Checksum;
import org.apache.commons.logging.Log;
@@ -36,11 +39,12 @@
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.PureJavaCrc32;
import org.apache.hadoop.util.StringUtils;
-import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
/** A class that receives a block and writes to its own disk, meanwhile
* may copies it to another site. If a throttler is provided,
@@ -672,7 +676,7 @@
}
// compute crc of partial chunk from data read in the block file.
- partialCrc = new CRC32();
+ partialCrc = new PureJavaCrc32();
partialCrc.update(buf, 0, sizePartialChunk);
LOG.info("Read in partial CRC chunk from disk for block " + block);
@@ -823,7 +827,7 @@
}
replyOut.writeLong(expected);
- replyOut.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS);
+ SUCCESS.write(replyOut);
replyOut.flush();
} catch (Exception e) {
if (running) {
@@ -854,7 +858,7 @@
while (running && datanode.shouldRun && !lastPacketInBlock) {
try {
- short op = DataTransferProtocol.OP_STATUS_SUCCESS;
+ DataTransferProtocol.Status op = SUCCESS;
boolean didRead = false;
long expected = -2;
try {
@@ -919,7 +923,7 @@
}
if (!didRead) {
- op = DataTransferProtocol.OP_STATUS_ERROR;
+ op = ERROR;
}
// If this is the last packet in block, then close block
@@ -948,7 +952,7 @@
// send my status back to upstream datanode
replyOut.writeLong(expected); // send seqno upstream
- replyOut.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS);
+ SUCCESS.write(replyOut);
LOG.debug("PacketResponder " + numTargets +
" for block " + block +
@@ -958,18 +962,18 @@
// forward responses from downstream datanodes.
for (int i = 0; i < numTargets && datanode.shouldRun; i++) {
try {
- if (op == DataTransferProtocol.OP_STATUS_SUCCESS) {
- op = mirrorIn.readShort();
- if (op != DataTransferProtocol.OP_STATUS_SUCCESS) {
+ if (op == SUCCESS) {
+ op = Status.read(mirrorIn);
+ if (op != SUCCESS) {
LOG.debug("PacketResponder for block " + block +
": error code received from downstream " +
" datanode[" + i + "] " + op);
}
}
} catch (Throwable e) {
- op = DataTransferProtocol.OP_STATUS_ERROR;
+ op = ERROR;
}
- replyOut.writeShort(op);
+ op.write(replyOut);
}
replyOut.flush();
LOG.debug("PacketResponder " + block + " " + numTargets +
@@ -982,7 +986,7 @@
// If we forwarded an error response from a downstream datanode
// and we are acting on behalf of a client, then we quit. The
// client will drive the recovery mechanism.
- if (op == DataTransferProtocol.OP_STATUS_ERROR && receiver.clientName.length() > 0) {
+ if (op == ERROR && receiver.clientName.length() > 0) {
running = false;
}
} catch (IOException e) {
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=800624&r1=800623&r2=800624&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Mon Aug 3 23:34:04 2009
@@ -17,6 +17,12 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.CHECKSUM_OK;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
+import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
+
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
@@ -42,7 +48,6 @@
import org.apache.hadoop.security.AccessTokenHandler;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.StringUtils;
-import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
/**
* Thread for processing incoming/outgoing data stream.
@@ -79,7 +84,7 @@
in = new DataInputStream(
new BufferedInputStream(NetUtils.getInputStream(s),
SMALL_BUFFER_SIZE));
- final byte op = op(in);
+ final DataTransferProtocol.Op op = op(in);
boolean local = s.getInetAddress().equals(s.getLocalAddress());
// Make sure the xciver count is not exceeded
int curXceiverCount = datanode.getXceiverCount();
@@ -90,7 +95,7 @@
}
long startTime = DataNode.now();
switch ( op ) {
- case DataTransferProtocol.OP_READ_BLOCK:
+ case READ_BLOCK:
opReadBlock(in);
datanode.myMetrics.readBlockOp.inc(DataNode.now() - startTime);
if (local)
@@ -98,7 +103,7 @@
else
datanode.myMetrics.readsFromRemoteClient.inc();
break;
- case DataTransferProtocol.OP_WRITE_BLOCK:
+ case WRITE_BLOCK:
opWriteBlock(in);
datanode.myMetrics.writeBlockOp.inc(DataNode.now() - startTime);
if (local)
@@ -106,16 +111,16 @@
else
datanode.myMetrics.writesFromRemoteClient.inc();
break;
- case DataTransferProtocol.OP_REPLACE_BLOCK: // for balancing purpose; send to a destination
+ case REPLACE_BLOCK: // for balancing purpose; send to a destination
opReplaceBlock(in);
datanode.myMetrics.replaceBlockOp.inc(DataNode.now() - startTime);
break;
- case DataTransferProtocol.OP_COPY_BLOCK:
+ case COPY_BLOCK:
// for balancing purpose; send to a proxy source
opCopyBlock(in);
datanode.myMetrics.copyBlockOp.inc(DataNode.now() - startTime);
break;
- case DataTransferProtocol.OP_BLOCK_CHECKSUM: //get the checksum of a block
+ case BLOCK_CHECKSUM: //get the checksum of a block
opBlockChecksum(in);
datanode.myMetrics.blockChecksumOp.inc(DataNode.now() - startTime);
break;
@@ -150,7 +155,7 @@
&& !datanode.accessTokenHandler.checkAccess(accessToken, null, blockId,
AccessTokenHandler.AccessMode.READ)) {
try {
- out.writeShort(DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN);
+ ERROR_ACCESS_TOKEN.write(out);
out.flush();
throw new IOException("Access token verification failed, on client "
+ "request for reading block " + block);
@@ -172,19 +177,19 @@
blockSender = new BlockSender(block, startOffset, length,
true, true, false, datanode, clientTraceFmt);
} catch(IOException e) {
- out.writeShort(DataTransferProtocol.OP_STATUS_ERROR);
+ ERROR.write(out);
throw e;
}
- out.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS); // send op status
+ SUCCESS.write(out); // send op status
long read = blockSender.sendBlock(out, baseStream, null); // send data
if (blockSender.isBlockReadFully()) {
// See if client verification succeeded.
// This is an optional response from client.
try {
- if (in.readShort() == DataTransferProtocol.OP_STATUS_CHECKSUM_OK &&
- datanode.blockScanner != null) {
+ if (DataTransferProtocol.Status.read(in) == CHECKSUM_OK
+ && datanode.blockScanner != null) {
datanode.blockScanner.verifiedByClient(block);
}
} catch (IOException ignored) {}
@@ -238,7 +243,7 @@
.getBlockId(), AccessTokenHandler.AccessMode.WRITE)) {
try {
if (client.length() != 0) {
- replyOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN);
+ ERROR_ACCESS_TOKEN.write(replyOut);
Text.writeString(replyOut, datanode.dnRegistration.getName());
replyOut.flush();
}
@@ -255,7 +260,7 @@
BlockReceiver blockReceiver = null; // responsible for data handling
String mirrorNode = null; // the name:port of next target
String firstBadLink = ""; // first datanode that failed in connection setup
- short mirrorInStatus = (short)DataTransferProtocol.OP_STATUS_SUCCESS;
+ DataTransferProtocol.Status mirrorInStatus = SUCCESS;
try {
// open a block receiver and check if the block does not exist
blockReceiver = new BlockReceiver(block, in,
@@ -296,9 +301,9 @@
// read connect ack (only for clients, not for replication req)
if (client.length() != 0) {
- mirrorInStatus = mirrorIn.readShort();
+ mirrorInStatus = DataTransferProtocol.Status.read(mirrorIn);
firstBadLink = Text.readString(mirrorIn);
- if (LOG.isDebugEnabled() || mirrorInStatus != DataTransferProtocol.OP_STATUS_SUCCESS) {
+ if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) {
LOG.info("Datanode " + targets.length +
" got response for connect ack " +
" from downstream datanode with firstbadlink as " +
@@ -308,7 +313,7 @@
} catch (IOException e) {
if (client.length() != 0) {
- replyOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR);
+ ERROR.write(replyOut);
Text.writeString(replyOut, mirrorNode);
replyOut.flush();
}
@@ -331,12 +336,12 @@
// send connect ack back to source (only for clients)
if (client.length() != 0) {
- if (LOG.isDebugEnabled() || mirrorInStatus != DataTransferProtocol.OP_STATUS_SUCCESS) {
+ if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) {
LOG.info("Datanode " + targets.length +
" forwarding connect ack to upstream firstbadlink is " +
firstBadLink);
}
- replyOut.writeShort(mirrorInStatus);
+ mirrorInStatus.write(replyOut);
Text.writeString(replyOut, firstBadLink);
replyOut.flush();
}
@@ -387,7 +392,7 @@
&& !datanode.accessTokenHandler.checkAccess(accessToken, null, block
.getBlockId(), AccessTokenHandler.AccessMode.READ)) {
try {
- out.writeShort(DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN);
+ ERROR_ACCESS_TOKEN.write(out);
out.flush();
throw new IOException(
"Access token verification failed, on getBlockChecksum() "
@@ -418,7 +423,7 @@
}
//write reply
- out.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS);
+ SUCCESS.write(out);
out.writeInt(bytesPerCRC);
out.writeLong(crcPerBlock);
md5.write(out);
@@ -443,17 +448,14 @@
AccessTokenHandler.AccessMode.COPY)) {
LOG.warn("Invalid access token in request from "
+ s.getRemoteSocketAddress() + " for copying block " + block);
- sendResponse(s,
- (short) DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN,
- datanode.socketWriteTimeout);
+ sendResponse(s, ERROR_ACCESS_TOKEN, datanode.socketWriteTimeout);
return;
}
if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
LOG.info("Not able to copy block " + blockId + " to "
+ s.getRemoteSocketAddress() + " because threads quota is exceeded.");
- sendResponse(s, (short)DataTransferProtocol.OP_STATUS_ERROR,
- datanode.socketWriteTimeout);
+ sendResponse(s, ERROR, datanode.socketWriteTimeout);
return;
}
@@ -473,7 +475,7 @@
baseStream, SMALL_BUFFER_SIZE));
// send status first
- reply.writeShort((short)DataTransferProtocol.OP_STATUS_SUCCESS);
+ SUCCESS.write(reply);
// send block content to the target
long read = blockSender.sendBlock(reply, baseStream,
dataXceiverServer.balanceThrottler);
@@ -515,22 +517,20 @@
AccessTokenHandler.AccessMode.REPLACE)) {
LOG.warn("Invalid access token in request from "
+ s.getRemoteSocketAddress() + " for replacing block " + block);
- sendResponse(s, (short)DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN,
- datanode.socketWriteTimeout);
+ sendResponse(s, ERROR_ACCESS_TOKEN, datanode.socketWriteTimeout);
return;
}
if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
LOG.warn("Not able to receive block " + blockId + " from "
+ s.getRemoteSocketAddress() + " because threads quota is exceeded.");
- sendResponse(s, (short)DataTransferProtocol.OP_STATUS_ERROR,
- datanode.socketWriteTimeout);
+ sendResponse(s, ERROR, datanode.socketWriteTimeout);
return;
}
Socket proxySock = null;
DataOutputStream proxyOut = null;
- short opStatus = DataTransferProtocol.OP_STATUS_SUCCESS;
+ DataTransferProtocol.Status opStatus = SUCCESS;
BlockReceiver blockReceiver = null;
DataInputStream proxyReply = null;
@@ -554,9 +554,10 @@
// receive the response from the proxy
proxyReply = new DataInputStream(new BufferedInputStream(
NetUtils.getInputStream(proxySock), BUFFER_SIZE));
- short status = proxyReply.readShort();
- if (status != DataTransferProtocol.OP_STATUS_SUCCESS) {
- if (status == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN) {
+ final DataTransferProtocol.Status status
+ = DataTransferProtocol.Status.read(proxyReply);
+ if (status != SUCCESS) {
+ if (status == ERROR_ACCESS_TOKEN) {
throw new IOException("Copy block " + block + " from "
+ proxySock.getRemoteSocketAddress()
+ " failed due to access token error");
@@ -581,11 +582,11 @@
" from " + s.getRemoteSocketAddress());
} catch (IOException ioe) {
- opStatus = DataTransferProtocol.OP_STATUS_ERROR;
+ opStatus = ERROR;
throw ioe;
} finally {
// receive the last byte that indicates the proxy released its thread resource
- if (opStatus == DataTransferProtocol.OP_STATUS_SUCCESS) {
+ if (opStatus == SUCCESS) {
try {
proxyReply.readChar();
} catch (IOException ignored) {
@@ -613,12 +614,12 @@
* @param opStatus status message to write
* @param timeout send timeout
**/
- private void sendResponse(Socket s, short opStatus, long timeout)
- throws IOException {
+ private void sendResponse(Socket s, DataTransferProtocol.Status opStatus,
+ long timeout) throws IOException {
DataOutputStream reply =
new DataOutputStream(NetUtils.getOutputStream(s, timeout));
try {
- reply.writeShort(opStatus);
+ opStatus.write(reply);
reply.flush();
} finally {
IOUtils.closeStream(reply);
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java?rev=800624&r1=800623&r2=800624&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java Mon Aug 3 23:34:04 2009
@@ -65,7 +65,7 @@
JspHelper.printGotoForm(out, namenodeInfoPort, target);
} else {
if (!targetStatus.isDir()) { // a file
- List<LocatedBlock> blocks = dfs.namenode.getBlockLocations(dir, 0, 1)
+ List<LocatedBlock> blocks = dfs.getNamenode().getBlockLocations(dir, 0, 1)
.getLocatedBlocks();
LocatedBlock firstBlock = null;
@@ -205,7 +205,7 @@
final DFSClient dfs = new DFSClient(datanode.getNameNodeAddr(),
JspHelper.conf);
- List<LocatedBlock> blocks = dfs.namenode.getBlockLocations(filename, 0,
+ List<LocatedBlock> blocks = dfs.getNamenode().getBlockLocations(filename, 0,
Long.MAX_VALUE).getLocatedBlocks();
// Add the various links for looking at the file contents
// URL for downloading the full file
@@ -320,7 +320,7 @@
AccessToken accessToken = AccessToken.DUMMY_TOKEN;
if (JspHelper.conf.getBoolean(
AccessTokenHandler.STRING_ENABLE_ACCESS_TOKEN, false)) {
- List<LocatedBlock> blks = dfs.namenode.getBlockLocations(filename, 0,
+ List<LocatedBlock> blks = dfs.getNamenode().getBlockLocations(filename, 0,
Long.MAX_VALUE).getLocatedBlocks();
if (blks == null || blks.size() == 0) {
out.print("Can't locate file blocks");
@@ -390,7 +390,7 @@
// determine data for the next link
if (startOffset + chunkSizeToView >= blockSize) {
// we have to go to the next block from this point onwards
- List<LocatedBlock> blocks = dfs.namenode.getBlockLocations(filename, 0,
+ List<LocatedBlock> blocks = dfs.getNamenode().getBlockLocations(filename, 0,
Long.MAX_VALUE).getLocatedBlocks();
for (int i = 0; i < blocks.size(); i++) {
if (blocks.get(i).getBlock().getBlockId() == blockId) {
@@ -440,7 +440,7 @@
int prevPort = req.getServerPort();
int prevDatanodePort = datanodePort;
if (startOffset == 0) {
- List<LocatedBlock> blocks = dfs.namenode.getBlockLocations(filename, 0,
+ List<LocatedBlock> blocks = dfs.getNamenode().getBlockLocations(filename, 0,
Long.MAX_VALUE).getLocatedBlocks();
for (int i = 0; i < blocks.size(); i++) {
if (blocks.get(i).getBlock().getBlockId() == blockId) {
@@ -546,7 +546,7 @@
// fetch the block from the datanode that has the last block for this file
final DFSClient dfs = new DFSClient(datanode.getNameNodeAddr(),
JspHelper.conf);
- List<LocatedBlock> blocks = dfs.namenode.getBlockLocations(filename, 0,
+ List<LocatedBlock> blocks = dfs.getNamenode().getBlockLocations(filename, 0,
Long.MAX_VALUE).getLocatedBlocks();
if (blocks == null || blocks.size() == 0) {
out.print("No datanodes contain blocks of file " + filename);
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=800624&r1=800623&r2=800624&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Mon Aug 3 23:34:04 2009
@@ -171,7 +171,7 @@
return Block.GRANDFATHER_GENERATION_STAMP;
}
- void getVolumeMap(HashMap<Block, DatanodeBlockInfo> volumeMap, FSVolume volume) {
+ void getVolumeMap(HashMap<Block, ReplicaInfo> volumeMap, FSVolume volume) {
if (children != null) {
for (int i = 0; i < children.length; i++) {
children[i].getVolumeMap(volumeMap, volume);
@@ -183,7 +183,7 @@
if (Block.isBlockFilename(blockFiles[i])) {
long genStamp = getGenerationStampFromFile(blockFiles, blockFiles[i]);
volumeMap.put(new Block(blockFiles[i], blockFiles[i].length(), genStamp),
- new DatanodeBlockInfo(volume, blockFiles[i]));
+ new ReplicaInfo(volume, blockFiles[i]));
}
}
}
@@ -403,7 +403,7 @@
DiskChecker.checkDir(tmpDir);
}
- void getVolumeMap(HashMap<Block, DatanodeBlockInfo> volumeMap) {
+ void getVolumeMap(HashMap<Block, ReplicaInfo> volumeMap) {
dataDir.getVolumeMap(volumeMap, this);
}
@@ -496,7 +496,7 @@
return remaining;
}
- synchronized void getVolumeMap(HashMap<Block, DatanodeBlockInfo> volumeMap) {
+ synchronized void getVolumeMap(HashMap<Block, ReplicaInfo> volumeMap) {
for (int idx = 0; idx < volumes.length; idx++) {
volumes[idx].getVolumeMap(volumeMap);
}
@@ -653,7 +653,7 @@
FSVolumeSet volumes;
private HashMap<Block,ActiveFile> ongoingCreates = new HashMap<Block,ActiveFile>();
private int maxBlocksPerDir = 0;
- HashMap<Block,DatanodeBlockInfo> volumeMap = null;
+ HashMap<Block,ReplicaInfo> volumeMap = null;
static Random random = new Random();
// Used for synchronizing access to usage stats
@@ -669,7 +669,7 @@
volArray[idx] = new FSVolume(storage.getStorageDir(idx).getCurrentDir(), conf);
}
volumes = new FSVolumeSet(volArray);
- volumeMap = new HashMap<Block, DatanodeBlockInfo>();
+ volumeMap = new HashMap<Block, ReplicaInfo>();
volumes.getVolumeMap(volumeMap);
registerMBean(storage.getStorageID());
}
@@ -742,7 +742,7 @@
public synchronized BlockInputStreams getTmpInputStreams(Block b,
long blkOffset, long ckoff) throws IOException {
- DatanodeBlockInfo info = volumeMap.get(b);
+ ReplicaInfo info = volumeMap.get(b);
if (info == null) {
throw new IOException("Block " + b + " does not exist in volumeMap.");
}
@@ -777,7 +777,7 @@
* @return - true if the specified block was detached
*/
public boolean detachBlock(Block block, int numLinks) throws IOException {
- DatanodeBlockInfo info = null;
+ ReplicaInfo info = null;
synchronized (this) {
info = volumeMap.get(block);
@@ -1006,12 +1006,12 @@
v = volumes.getNextVolume(blockSize);
// create temporary file to hold block in the designated volume
f = createTmpFile(v, b);
- volumeMap.put(b, new DatanodeBlockInfo(v));
+ volumeMap.put(b, new ReplicaInfo(v));
} else if (f != null) {
DataNode.LOG.info("Reopen already-open Block for append " + b);
// create or reuse temporary file to hold block in the designated volume
v = volumeMap.get(b).getVolume();
- volumeMap.put(b, new DatanodeBlockInfo(v));
+ volumeMap.put(b, new ReplicaInfo(v));
} else {
// reopening block for appending to it.
DataNode.LOG.info("Reopen Block for append " + b);
@@ -1042,7 +1042,7 @@
" to tmp dir " + f);
}
}
- volumeMap.put(b, new DatanodeBlockInfo(v));
+ volumeMap.put(b, new ReplicaInfo(v));
}
if (f == null) {
DataNode.LOG.warn("Block " + b + " reopen failed " +
@@ -1147,7 +1147,7 @@
File dest = null;
dest = v.addBlock(b, f);
- volumeMap.put(b, new DatanodeBlockInfo(v, dest));
+ volumeMap.put(b, new ReplicaInfo(v, dest));
ongoingCreates.remove(b);
}
@@ -1248,7 +1248,7 @@
/** {@inheritDoc} */
public void validateBlockMetadata(Block b) throws IOException {
- DatanodeBlockInfo info = volumeMap.get(b);
+ ReplicaInfo info = volumeMap.get(b);
if (info == null) {
throw new IOException("Block " + b + " does not exist in volumeMap.");
}
@@ -1306,7 +1306,7 @@
FSVolume v;
synchronized (this) {
f = getFile(invalidBlks[i]);
- DatanodeBlockInfo dinfo = volumeMap.get(invalidBlks[i]);
+ ReplicaInfo dinfo = volumeMap.get(invalidBlks[i]);
if (dinfo == null) {
DataNode.LOG.warn("Unexpected error trying to delete block "
+ invalidBlks[i] +
@@ -1369,7 +1369,7 @@
* Turn the block identifier into a filename.
*/
public synchronized File getFile(Block b) {
- DatanodeBlockInfo info = volumeMap.get(b);
+ ReplicaInfo info = volumeMap.get(b);
if (info != null) {
return info.getFile();
}
@@ -1448,8 +1448,8 @@
* generation stamp</li>
* <li>If the block length in memory does not match the actual block file length
* then mark the block as corrupt and update the block length in memory</li>
- * <li>If the file in {@link DatanodeBlockInfo} does not match the file on
- * the disk, update {@link DatanodeBlockInfo} with the correct file</li>
+ * <li>If the file in {@link ReplicaInfo} does not match the file on
+ * the disk, update {@link ReplicaInfo} with the correct file</li>
* </ul>
*
* @param blockId Block that differs
@@ -1472,7 +1472,7 @@
Block.getGenerationStamp(diskMetaFile.getName()) :
Block.GRANDFATHER_GENERATION_STAMP;
- DatanodeBlockInfo memBlockInfo = volumeMap.get(block);
+ ReplicaInfo memBlockInfo = volumeMap.get(block);
if (diskFile == null || !diskFile.exists()) {
if (memBlockInfo == null) {
// Block file does not exist and block does not exist in memory
@@ -1507,7 +1507,7 @@
*/
if (memBlockInfo == null) {
// Block is missing in memory - add the block to volumeMap
- DatanodeBlockInfo diskBlockInfo = new DatanodeBlockInfo(vol, diskFile);
+ ReplicaInfo diskBlockInfo = new ReplicaInfo(vol, diskFile);
Block diskBlock = new Block(diskFile, diskFile.length(), diskGS);
volumeMap.put(diskBlock, diskBlockInfo);
if (datanode.blockScanner != null) {
@@ -1540,7 +1540,7 @@
+ memFile.getAbsolutePath()
+ " does not exist. Updating it to the file found during scan "
+ diskFile.getAbsolutePath());
- DatanodeBlockInfo info = volumeMap.remove(memBlock);
+ ReplicaInfo info = volumeMap.remove(memBlock);
info.setFile(diskFile);
memFile = diskFile;
@@ -1571,7 +1571,7 @@
DataNode.LOG.warn("Updating generation stamp for block " + blockId
+ " from " + memBlock.getGenerationStamp() + " to " + gs);
- DatanodeBlockInfo info = volumeMap.remove(memBlock);
+ ReplicaInfo info = volumeMap.remove(memBlock);
memBlock.setGenerationStamp(gs);
volumeMap.put(memBlock, info);
}
@@ -1583,7 +1583,7 @@
corruptBlock = new Block(memBlock);
DataNode.LOG.warn("Updating size of block " + blockId + " from "
+ memBlock.getNumBytes() + " to " + memFile.length());
- DatanodeBlockInfo info = volumeMap.remove(memBlock);
+ ReplicaInfo info = volumeMap.remove(memBlock);
memBlock.setNumBytes(memFile.length());
volumeMap.put(memBlock, info);
}
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java?rev=800624&r1=800623&r2=800624&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java Mon Aug 3 23:34:04 2009
@@ -37,7 +37,6 @@
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.server.namenode.BlocksMap.BlockInfo;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas;
import org.apache.hadoop.hdfs.server.namenode.UnderReplicatedBlocks.BlockIterator;
import org.apache.hadoop.security.AccessTokenHandler;
@@ -234,7 +233,7 @@
/**
* Get all valid locations of the block
*/
- ArrayList<String> addBlock(Block block) {
+ ArrayList<String> getValidLocations(Block block) {
ArrayList<String> machineSet =
new ArrayList<String>(blocksMap.numNodes(block));
for(Iterator<DatanodeDescriptor> it =
@@ -249,7 +248,6 @@
return machineSet;
}
-
List<LocatedBlock> getBlockLocations(Block[] blocks, long offset,
long length, int nrBlocksToReturn) throws IOException {
int curBlk = 0;
@@ -397,43 +395,50 @@
}
}
- void markBlockAsCorrupt(Block blk, DatanodeInfo dn) throws IOException {
+ void findAndMarkBlockAsCorrupt(Block blk,
+ DatanodeInfo dn) throws IOException {
+ BlockInfo storedBlock = getStoredBlock(blk);
+ if (storedBlock == null) {
+ // Check if the replica is in the blockMap, if not
+ // ignore the request for now. This could happen when BlockScanner
+ // thread of Datanode reports bad block before Block reports are sent
+ // by the Datanode on startup
+ NameNode.stateChangeLog.info("BLOCK* NameSystem.markBlockAsCorrupt: " +
+ "block " + blk + " could not be marked as " +
+ "corrupt as it does not exist in blocksMap");
+ return;
+ }
+ markBlockAsCorrupt(storedBlock, dn);
+ }
+
+ private void markBlockAsCorrupt(BlockInfo storedBlock,
+ DatanodeInfo dn) throws IOException {
+ assert storedBlock != null : "storedBlock should not be null";
DatanodeDescriptor node = namesystem.getDatanode(dn);
if (node == null) {
- throw new IOException("Cannot mark block" + blk.getBlockName() +
+ throw new IOException("Cannot mark block " +
+ storedBlock.getBlockName() +
" as corrupt because datanode " + dn.getName() +
" does not exist. ");
}
- final BlockInfo storedBlockInfo = blocksMap.getStoredBlock(blk);
- if (storedBlockInfo == null) {
- // Check if the replica is in the blockMap, if not
- // ignore the request for now. This could happen when BlockScanner
- // thread of Datanode reports bad block before Block reports are sent
- // by the Datanode on startup
+ INodeFile inode = storedBlock.getINode();
+ if (inode == null) {
NameNode.stateChangeLog.info("BLOCK NameSystem.markBlockAsCorrupt: " +
- "block " + blk + " could not be marked " +
- "as corrupt as it does not exists in " +
- "blocksMap");
+ "block " + storedBlock +
+ " could not be marked as corrupt as it" +
+ " does not belong to any file");
+ addToInvalidates(storedBlock, node);
+ return;
+ }
+ // Add this replica to corruptReplicas Map
+ corruptReplicas.addToCorruptReplicasMap(storedBlock, node);
+ if (countNodes(storedBlock).liveReplicas() > inode.getReplication()) {
+ // the block is over-replicated so invalidate the replicas immediately
+ invalidateBlock(storedBlock, node);
} else {
- INodeFile inode = storedBlockInfo.getINode();
- if (inode == null) {
- NameNode.stateChangeLog.info("BLOCK NameSystem.markBlockAsCorrupt: " +
- "block " + blk + " could not be marked " +
- "as corrupt as it does not belong to " +
- "any file");
- addToInvalidates(storedBlockInfo, node);
- return;
- }
- // Add this replica to corruptReplicas Map
- corruptReplicas.addToCorruptReplicasMap(storedBlockInfo, node);
- if (countNodes(storedBlockInfo).liveReplicas() > inode.getReplication()) {
- // the block is over-replicated so invalidate the replicas immediately
- invalidateBlock(storedBlockInfo, node);
- } else {
- // add the block to neededReplication
- updateNeededReplications(storedBlockInfo, -1, 0);
- }
+ // add the block to neededReplication
+ updateNeededReplications(storedBlock, -1, 0);
}
}
@@ -844,8 +849,9 @@
* needed replications if this takes care of the problem.
* @return the block that is stored in blockMap.
*/
- private Block addStoredBlock(Block block, DatanodeDescriptor node,
- DatanodeDescriptor delNodeHint) {
+ private Block addStoredBlock(final Block block,
+ DatanodeDescriptor node,
+ DatanodeDescriptor delNodeHint) {
BlockInfo storedBlock = blocksMap.getStoredBlock(block);
if (storedBlock == null || storedBlock.getINode() == null) {
// If this block does not belong to anyfile, then we are done.
@@ -858,30 +864,32 @@
// it will happen in next block report otherwise.
return block;
}
+ assert storedBlock != null : "Block must be stored by now";
+ INodeFile fileINode = storedBlock.getINode();
+ assert fileINode != null : "Block must belong to a file";
// add block to the data-node
boolean added = node.addBlock(storedBlock);
- assert storedBlock != null : "Block must be stored by now";
-
if (block != storedBlock) {
- if (block.getNumBytes() >= 0) {
- long cursize = storedBlock.getNumBytes();
+ long cursize = storedBlock.getNumBytes();
+ long newsize = block.getNumBytes();
+ if (newsize >= 0) {
if (cursize == 0) {
- storedBlock.setNumBytes(block.getNumBytes());
- } else if (cursize != block.getNumBytes()) {
+ storedBlock.setNumBytes(newsize);
+ } else if (cursize != newsize) {
FSNamesystem.LOG.warn("Inconsistent size for block " + block +
" reported from " + node.getName() +
" current size is " + cursize +
- " reported size is " + block.getNumBytes());
+ " reported size is " + newsize);
try {
- if (cursize > block.getNumBytes()) {
+ if (cursize > newsize) {
// new replica is smaller in size than existing block.
// Mark the new replica as corrupt.
FSNamesystem.LOG.warn("Mark new replica "
+ block + " from " + node.getName() + " as corrupt "
+ "because length is shorter than existing ones");
- markBlockAsCorrupt(block, node);
+ markBlockAsCorrupt(storedBlock, node);
} else {
// new replica is larger in size than existing block.
// Mark pre-existing replicas as corrupt.
@@ -899,19 +907,12 @@
FSNamesystem.LOG.warn("Mark existing replica "
+ block + " from " + node.getName() + " as corrupt "
+ "because its length is shorter than the new one");
- markBlockAsCorrupt(block, nodes[j]);
+ markBlockAsCorrupt(storedBlock, nodes[j]);
}
//
// change the size of block in blocksMap
//
- storedBlock = blocksMap.getStoredBlock(block); // extra look up!
- if (storedBlock == null) {
- FSNamesystem.LOG.warn("Block " + block + " reported from "
- + node.getName()
- + " does not exist in blockMap. Surprise! Surprise!");
- } else {
- storedBlock.setNumBytes(block.getNumBytes());
- }
+ storedBlock.setNumBytes(newsize);
}
} catch (IOException e) {
FSNamesystem.LOG.warn("Error in deleting bad block " + block + e);
@@ -919,17 +920,15 @@
}
// Updated space consumed if required.
- INodeFile file = (storedBlock != null) ? storedBlock.getINode() : null;
- long diff = (file == null) ? 0 :
- (file.getPreferredBlockSize() - storedBlock.getNumBytes());
+ long diff = fileINode.getPreferredBlockSize() - storedBlock.getNumBytes();
- if (diff > 0 && file.isUnderConstruction() &&
+ if (diff > 0 && fileINode.isUnderConstruction() &&
cursize < storedBlock.getNumBytes()) {
try {
String path = /* For finding parents */
- namesystem.leaseManager.findPath((INodeFileUnderConstruction) file);
+ namesystem.leaseManager.findPath((INodeFileUnderConstruction)fileINode);
namesystem.dir.updateSpaceConsumed(path, 0, -diff
- * file.getReplication());
+ * fileINode.getReplication());
} catch (IOException e) {
FSNamesystem.LOG
.warn("Unexpected exception while updating disk space : "
@@ -937,12 +936,9 @@
}
}
}
- block = storedBlock;
}
- assert storedBlock == block : "Block must be stored by now";
int curReplicaDelta = 0;
-
if (added) {
curReplicaDelta = 1;
//
@@ -952,20 +948,20 @@
//
if (!namesystem.isInSafeMode()) {
NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: "
- + "blockMap updated: " + node.getName() + " is added to " + block
- + " size " + block.getNumBytes());
+ + "blockMap updated: " + node.getName() + " is added to " +
+ storedBlock + " size " + storedBlock.getNumBytes());
}
} else {
NameNode.stateChangeLog.warn("BLOCK* NameSystem.addStoredBlock: "
- + "Redundant addStoredBlock request received for " + block + " on "
- + node.getName() + " size " + block.getNumBytes());
+ + "Redundant addStoredBlock request received for " + storedBlock
+ + " on " + node.getName() + " size " + storedBlock.getNumBytes());
}
// filter out containingNodes that are marked for decommission.
NumberReplicas num = countNodes(storedBlock);
int numLiveReplicas = num.liveReplicas();
int numCurrentReplica = numLiveReplicas
- + pendingReplications.getNumReplicas(block);
+ + pendingReplications.getNumReplicas(storedBlock);
// check whether safe replication is reached for the block
namesystem.incrementSafeBlockCount(numCurrentReplica);
@@ -974,39 +970,37 @@
// if file is being actively written to, then do not check
// replication-factor here. It will be checked when the file is closed.
//
- INodeFile fileINode = null;
- fileINode = storedBlock.getINode();
if (fileINode.isUnderConstruction()) {
- return block;
+ return storedBlock;
}
// do not handle mis-replicated blocks during startup
if (namesystem.isInSafeMode())
- return block;
+ return storedBlock;
// handle underReplication/overReplication
short fileReplication = fileINode.getReplication();
if (numCurrentReplica >= fileReplication) {
- neededReplications.remove(block, numCurrentReplica,
+ neededReplications.remove(storedBlock, numCurrentReplica,
num.decommissionedReplicas, fileReplication);
} else {
- updateNeededReplications(block, curReplicaDelta, 0);
+ updateNeededReplications(storedBlock, curReplicaDelta, 0);
}
if (numCurrentReplica > fileReplication) {
- processOverReplicatedBlock(block, fileReplication, node, delNodeHint);
+ processOverReplicatedBlock(storedBlock, fileReplication, node, delNodeHint);
}
// If the file replication has reached desired value
// we can remove any corrupt replicas the block may have
- int corruptReplicasCount = corruptReplicas.numCorruptReplicas(block);
+ int corruptReplicasCount = corruptReplicas.numCorruptReplicas(storedBlock);
int numCorruptNodes = num.corruptReplicas();
if (numCorruptNodes != corruptReplicasCount) {
FSNamesystem.LOG.warn("Inconsistent number of corrupt replicas for " +
- block + "blockMap has " + numCorruptNodes +
+ storedBlock + "blockMap has " + numCorruptNodes +
" but corrupt replicas map has " + corruptReplicasCount);
}
if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication))
- invalidateCorruptReplicas(block);
- return block;
+ invalidateCorruptReplicas(storedBlock);
+ return storedBlock;
}
/**
@@ -1051,7 +1045,7 @@
long nrInvalid = 0, nrOverReplicated = 0, nrUnderReplicated = 0;
synchronized (namesystem) {
neededReplications.clear();
- for (BlocksMap.BlockInfo block : blocksMap.getBlocks()) {
+ for (BlockInfo block : blocksMap.getBlocks()) {
INodeFile fileINode = block.getINode();
if (fileINode == null) {
// block does not belong to any file
@@ -1415,13 +1409,13 @@
blocksMap.removeBlock(block);
}
- public int getCapacity() {
+ int getCapacity() {
synchronized(namesystem) {
return blocksMap.getCapacity();
}
}
- public float getLoadFactor() {
+ float getLoadFactor() {
return blocksMap.getLoadFactor();
}
}