You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/04/20 04:28:21 UTC
svn commit: r1095253 [2/4] - in /hadoop/hdfs/branches/HDFS-1073: ./ bin/
src/c++/libhdfs/ src/contrib/hdfsproxy/ src/java/
src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/hdfs/
src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/had...
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Wed Apr 20 02:28:19 2011
@@ -18,13 +18,17 @@
package org.apache.hadoop.hdfs.server.datanode;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
import static org.apache.hadoop.hdfs.server.common.Util.now;
import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
+import java.lang.management.ManagementFactory;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
@@ -49,6 +53,9 @@ import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -66,23 +73,24 @@ import org.apache.hadoop.hdfs.protocol.B
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.Util;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.apache.hadoop.hdfs.server.datanode.FSDataset.VolumeInfo;
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
@@ -92,6 +100,7 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.namenode.StreamFile;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -101,7 +110,6 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
-import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.ProtocolSignature;
@@ -117,20 +125,15 @@ import org.apache.hadoop.security.token.
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ServicePlugin;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.VersionInfo;
-import org.apache.hadoop.util.DiskChecker.DiskErrorException;
-import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.mortbay.util.ajax.JSON;
-import java.lang.management.ManagementFactory;
-
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
/**********************************************************
* DataNode is a class (and program) that stores a set of
* blocks for a DFS deployment. A single deployment can
@@ -1218,7 +1221,7 @@ public class DataNode extends Configured
}
new Daemon(new DataTransfer(xferTargets, block,
- BlockConstructionStage.PIPELINE_SETUP_CREATE)).start();
+ BlockConstructionStage.PIPELINE_SETUP_CREATE, "")).start();
}
}
@@ -1347,16 +1350,25 @@ public class DataNode extends Configured
final DatanodeInfo[] targets;
final Block b;
final BlockConstructionStage stage;
+ final String clientname;
/**
* Connect to the first item in the target list. Pass along the
* entire target list, the block, and the data.
*/
- DataTransfer(DatanodeInfo targets[], Block b, BlockConstructionStage stage
- ) throws IOException {
+ DataTransfer(DatanodeInfo targets[], Block b, BlockConstructionStage stage,
+ final String clientname) throws IOException {
+ if (DataTransferProtocol.LOG.isDebugEnabled()) {
+ DataTransferProtocol.LOG.debug(getClass().getSimpleName() + ": "
+ + b + " (numBytes=" + b.getNumBytes() + ")"
+ + ", stage=" + stage
+ + ", clientname=" + clientname
+ + ", targests=" + Arrays.asList(targets));
+ }
this.targets = targets;
this.b = b;
this.stage = stage;
+ this.clientname = clientname;
}
/**
@@ -1366,7 +1378,9 @@ public class DataNode extends Configured
xmitsInProgress.getAndIncrement();
Socket sock = null;
DataOutputStream out = null;
+ DataInputStream in = null;
BlockSender blockSender = null;
+ final boolean isClient = clientname.length() > 0;
try {
InetSocketAddress curTarget =
@@ -1380,7 +1394,6 @@ public class DataNode extends Configured
OutputStream baseStream = NetUtils.getOutputStream(sock, writeTimeout);
out = new DataOutputStream(new BufferedOutputStream(baseStream,
SMALL_BUFFER_SIZE));
-
blockSender = new BlockSender(b, 0, b.getNumBytes(),
false, false, false, DataNode.this);
DatanodeInfo srcNode = new DatanodeInfo(dnRegistration);
@@ -1395,14 +1408,33 @@ public class DataNode extends Configured
}
DataTransferProtocol.Sender.opWriteBlock(out,
- b, 0, stage, 0, 0, 0, "", srcNode, targets, accessToken);
+ b, 0, stage, 0, 0, 0, clientname, srcNode, targets, accessToken);
// send data & checksum
blockSender.sendBlock(out, baseStream, null);
// no response necessary
- LOG.info(dnRegistration + ":Transmitted block " + b + " to " + curTarget);
+ LOG.info(getClass().getSimpleName() + ": Transmitted " + b
+ + " (numBytes=" + b.getNumBytes() + ") to " + curTarget);
+ // read ack
+ if (isClient) {
+ in = new DataInputStream(NetUtils.getInputStream(sock));
+ final DataTransferProtocol.Status s = DataTransferProtocol.Status.read(in);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getClass().getSimpleName() + ": close-ack=" + s);
+ }
+ if (s != SUCCESS) {
+ if (s == ERROR_ACCESS_TOKEN) {
+ throw new InvalidBlockTokenException(
+ "Got access token error for connect ack, targets="
+ + Arrays.asList(targets));
+ } else {
+ throw new IOException("Bad connect ack, targets="
+ + Arrays.asList(targets));
+ }
+ }
+ }
} catch (IOException ie) {
LOG.warn(dnRegistration + ":Failed to transfer " + b + " to " + targets[0].getName()
+ " got " + StringUtils.stringifyException(ie));
@@ -1413,6 +1445,7 @@ public class DataNode extends Configured
xmitsInProgress.getAndDecrement();
IOUtils.closeStream(blockSender);
IOUtils.closeStream(out);
+ IOUtils.closeStream(in);
IOUtils.closeSocket(sock);
}
}
@@ -1977,48 +2010,46 @@ public class DataNode extends Configured
}
/**
- * Transfer a block to the datanode targets.
- * @return rbw's visible length
- */
- long transferBlockForPipelineRecovery(final Block b,
- final DatanodeInfo[] targets) throws IOException {
- checkWriteAccess(b);
- final Block stored;
- final boolean isRbw;
+ * Transfer a replica to the datanode targets.
+ * @param b the block to transfer.
+ * The corresponding replica must be an RBW or a Finalized.
+ * Its GS and numBytes will be set to
+ * the stored GS and the visible length.
+ * @param targets
+ * @param client
+ */
+ void transferReplicaForPipelineRecovery(final Block b,
+ final DatanodeInfo[] targets, final String client) throws IOException {
+ final long storedGS;
final long visible;
+ final BlockConstructionStage stage;
//get replica information
synchronized(data) {
- stored = data.getStoredBlock(b.getBlockId());
- if (stored.getGenerationStamp() < b.getGenerationStamp()) {
+ if (data.isValidRbw(b)) {
+ stage = BlockConstructionStage.TRANSFER_RBW;
+ } else if (data.isValidBlock(b)) {
+ stage = BlockConstructionStage.TRANSFER_FINALIZED;
+ } else {
+ final String r = data.getReplicaString(b.getBlockId());
+ throw new IOException(b + " is neither a RBW nor a Finalized, r=" + r);
+ }
+
+ storedGS = data.getStoredBlock(b.getBlockId()).getGenerationStamp();
+ if (storedGS < b.getGenerationStamp()) {
throw new IOException(
- "stored.getGenerationStamp() < b.getGenerationStamp(), stored="
- + stored + ", b=" + b);
+ storedGS + " = storedGS < b.getGenerationStamp(), b=" + b);
}
- isRbw = data.isValidRbw(b);
visible = data.getReplicaVisibleLength(b);
}
+ //set storedGS and visible length
+ b.setGenerationStamp(storedGS);
+ b.setNumBytes(visible);
+
if (targets.length > 0) {
- if (isRbw) {
- //transfer rbw
- new DataTransfer(targets, b, BlockConstructionStage.TRANSFER_RBW).run();
- } else {
- //transfer finalized replica
- transferBlock(stored, targets);
- }
+ new DataTransfer(targets, b, stage, client).run();
}
- //TODO: should return: visible + storedGS + isRbw
- return visible;
- }
-
- /**
- * Covert an existing temporary replica to a rbw.
- * @param temporary specifies id, gs and visible bytes.
- * @throws IOException
- */
- void convertTemporaryToRbw(final Block temporary) throws IOException {
- data.convertTemporaryToRbw(temporary);
}
// Determine a Datanode's streaming address
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Wed Apr 20 02:28:19 2011
@@ -33,6 +33,7 @@ import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
+import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.hadoop.hdfs.protocol.Block;
@@ -153,24 +154,9 @@ class DataXceiver extends DataTransferPr
datanode.socketWriteTimeout);
DataOutputStream out = new DataOutputStream(
new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
-
- if (datanode.isBlockTokenEnabled) {
- try {
- datanode.blockTokenSecretManager.checkAccess(blockToken, null, block,
- BlockTokenSecretManager.AccessMode.READ);
- } catch (InvalidToken e) {
- try {
- ERROR_ACCESS_TOKEN.write(out);
- out.flush();
- LOG.warn("Block token verification failed, for client "
- + remoteAddress + " for OP_READ_BLOCK for block " + block + " : "
- + e.getLocalizedMessage());
- throw e;
- } finally {
- IOUtils.closeStream(out);
- }
- }
- }
+ checkAccess(out, true, block, blockToken,
+ DataTransferProtocol.Op.READ_BLOCK,
+ BlockTokenSecretManager.AccessMode.READ);
// send the block
BlockSender blockSender = null;
@@ -245,8 +231,25 @@ class DataXceiver extends DataTransferPr
updateCurrentThreadName("Receiving block " + block + " client=" + clientname);
final boolean isDatanode = clientname.length() == 0;
final boolean isClient = !isDatanode;
+ final boolean isTransfer = stage == BlockConstructionStage.TRANSFER_RBW
+ || stage == BlockConstructionStage.TRANSFER_FINALIZED;
+
+ // check single target for transfer-RBW/Finalized
+ if (isTransfer && targets.length > 0) {
+ throw new IOException(stage + " does not support multiple targets "
+ + Arrays.asList(targets));
+ }
if (LOG.isDebugEnabled()) {
+ LOG.debug("opWriteBlock: stage=" + stage + ", clientname=" + clientname
+ + "\n block =" + block + ", newGs=" + newGs
+ + ", bytesRcvd=[" + minBytesRcvd + ", " + maxBytesRcvd + "]"
+ + "\n targets=" + Arrays.asList(targets)
+ + "; pipelineSize=" + pipelineSize + ", srcDataNode=" + srcDataNode
+ );
+ LOG.debug("isDatanode=" + isDatanode
+ + ", isClient=" + isClient
+ + ", isTransfer=" + isTransfer);
LOG.debug("writeBlock receive buf size " + s.getReceiveBufferSize() +
" tcp no delay " + s.getTcpNoDelay());
}
@@ -261,30 +264,14 @@ class DataXceiver extends DataTransferPr
" src: " + remoteAddress +
" dest: " + localAddress);
- DataOutputStream replyOut = null; // stream to prev target
- replyOut = new DataOutputStream(new BufferedOutputStream(
- NetUtils.getOutputStream(s, datanode.socketWriteTimeout),
- SMALL_BUFFER_SIZE));
- if (datanode.isBlockTokenEnabled) {
- try {
- datanode.blockTokenSecretManager.checkAccess(blockToken, null, block,
- BlockTokenSecretManager.AccessMode.WRITE);
- } catch (InvalidToken e) {
- try {
- if (isClient) {
- ERROR_ACCESS_TOKEN.write(replyOut);
- Text.writeString(replyOut, datanode.dnRegistration.getName());
- replyOut.flush();
- }
- LOG.warn("Block token verification failed, for client "
- + remoteAddress + " for OP_WRITE_BLOCK for block " + block
- + " : " + e.getLocalizedMessage());
- throw e;
- } finally {
- IOUtils.closeStream(replyOut);
- }
- }
- }
+ // reply to upstream datanode or client
+ final DataOutputStream replyOut = new DataOutputStream(
+ new BufferedOutputStream(
+ NetUtils.getOutputStream(s, datanode.socketWriteTimeout),
+ SMALL_BUFFER_SIZE));
+ checkAccess(replyOut, isClient, block, blockToken,
+ DataTransferProtocol.Op.WRITE_BLOCK,
+ BlockTokenSecretManager.AccessMode.WRITE);
DataOutputStream mirrorOut = null; // stream to next target
DataInputStream mirrorIn = null; // reply from next target
@@ -307,8 +294,7 @@ class DataXceiver extends DataTransferPr
}
//
- // Open network conn to backup machine, if
- // appropriate
+ // Connect to downstream machine, if appropriate
//
if (targets.length > 0) {
InetSocketAddress mirrorTarget = null;
@@ -330,7 +316,6 @@ class DataXceiver extends DataTransferPr
SMALL_BUFFER_SIZE));
mirrorIn = new DataInputStream(NetUtils.getInputStream(mirrorSock));
- // Write header: Copied from DFSClient.java!
DataTransferProtocol.Sender.opWriteBlock(mirrorOut, originalBlock,
pipelineSize, stage, newGs, minBytesRcvd, maxBytesRcvd, clientname,
srcDataNode, targets, blockToken);
@@ -375,8 +360,8 @@ class DataXceiver extends DataTransferPr
}
}
- // send connect ack back to source (only for clients)
- if (isClient) {
+ // send connect-ack to source for clients and not transfer-RBW/Finalized
+ if (isClient && !isTransfer) {
if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) {
LOG.info("Datanode " + targets.length +
" forwarding connect ack to upstream firstbadlink is " +
@@ -391,7 +376,15 @@ class DataXceiver extends DataTransferPr
if (blockReceiver != null) {
String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
- mirrorAddr, null, targets.length);
+ mirrorAddr, null, targets);
+
+ // send close-ack for transfer-RBW/Finalized
+ if (isTransfer) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("TRANSFER: send close-ack");
+ }
+ SUCCESS.write(replyOut);
+ }
}
// update its generation stamp
@@ -404,8 +397,7 @@ class DataXceiver extends DataTransferPr
// if this write is for a replication request or recovering
// a failed close for client, then confirm block. For other client-writes,
// the block is finalized in the PacketResponder.
- if ((isDatanode && stage != BlockConstructionStage.TRANSFER_RBW)
- ||
+ if (isDatanode ||
stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
LOG.info("Received block " + block +
@@ -433,32 +425,38 @@ class DataXceiver extends DataTransferPr
datanode.myMetrics.writesFromRemoteClient);
}
+ @Override
+ protected void opTransferBlock(final DataInputStream in,
+ final Block blk, final String client,
+ final DatanodeInfo[] targets,
+ final Token<BlockTokenIdentifier> blockToken) throws IOException {
+ checkAccess(null, true, blk, blockToken,
+ DataTransferProtocol.Op.TRANSFER_BLOCK,
+ BlockTokenSecretManager.AccessMode.COPY);
+
+ updateCurrentThreadName(DataTransferProtocol.Op.TRANSFER_BLOCK + " " + blk);
+
+ final DataOutputStream out = new DataOutputStream(
+ NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
+ try {
+ datanode.transferReplicaForPipelineRecovery(blk, targets, client);
+ SUCCESS.write(out);
+ } finally {
+ IOUtils.closeStream(out);
+ }
+ }
+
/**
* Get block checksum (MD5 of CRC32).
*/
@Override
protected void opBlockChecksum(DataInputStream in, Block block,
Token<BlockTokenIdentifier> blockToken) throws IOException {
- DataOutputStream out = new DataOutputStream(NetUtils.getOutputStream(s,
- datanode.socketWriteTimeout));
- if (datanode.isBlockTokenEnabled) {
- try {
- datanode.blockTokenSecretManager.checkAccess(blockToken, null, block,
- BlockTokenSecretManager.AccessMode.READ);
- } catch (InvalidToken e) {
- try {
- ERROR_ACCESS_TOKEN.write(out);
- out.flush();
- LOG.warn("Block token verification failed, for client "
- + remoteAddress + " for OP_BLOCK_CHECKSUM for block " + block
- + " : " + e.getLocalizedMessage());
- throw e;
- } finally {
- IOUtils.closeStream(out);
- }
-
- }
- }
+ final DataOutputStream out = new DataOutputStream(
+ NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
+ checkAccess(out, true, block, blockToken,
+ DataTransferProtocol.Op.BLOCK_CHECKSUM,
+ BlockTokenSecretManager.AccessMode.READ);
updateCurrentThreadName("Reading metadata for block " + block);
final MetaDataInputStream metadataIn =
@@ -649,7 +647,7 @@ class DataXceiver extends DataTransferPr
// receive a block
blockReceiver.receiveBlock(null, null, null, null,
- dataXceiverServer.balanceThrottler, -1);
+ dataXceiverServer.balanceThrottler, null);
// notify name node
datanode.notifyNamenodeReceivedBlock(block, sourceID);
@@ -713,4 +711,36 @@ class DataXceiver extends DataTransferPr
IOUtils.closeStream(reply);
}
}
+
+ private void checkAccess(DataOutputStream out, final boolean reply,
+ final Block blk,
+ final Token<BlockTokenIdentifier> t,
+ final DataTransferProtocol.Op op,
+ final BlockTokenSecretManager.AccessMode mode) throws IOException {
+ if (datanode.isBlockTokenEnabled) {
+ try {
+ datanode.blockTokenSecretManager.checkAccess(t, null, blk, mode);
+ } catch(InvalidToken e) {
+ try {
+ if (reply) {
+ if (out == null) {
+ out = new DataOutputStream(
+ NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
+ }
+ ERROR_ACCESS_TOKEN.write(out);
+ if (mode == BlockTokenSecretManager.AccessMode.WRITE) {
+ Text.writeString(out, datanode.dnRegistration.getName());
+ }
+ out.flush();
+ }
+ LOG.warn("Block token verification failed: op=" + op
+ + ", remoteAddress=" + remoteAddress
+ + ", message=" + e.getLocalizedMessage());
+ throw e;
+ } finally {
+ IOUtils.closeStream(out);
+ }
+ }
+ }
+ }
}
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Wed Apr 20 02:28:19 2011
@@ -530,7 +530,6 @@ public class FSDataset implements FSCons
static class FSVolumeSet {
FSVolume[] volumes = null;
- int curVolume = 0;
BlockVolumeChoosingPolicy blockChooser;
FSVolumeSet(FSVolume[] volumes, BlockVolumeChoosingPolicy blockChooser) {
@@ -1351,39 +1350,49 @@ public class FSDataset implements FSCons
final long blockId = b.getBlockId();
final long expectedGs = b.getGenerationStamp();
final long visible = b.getNumBytes();
- DataNode.LOG.info("Covert the temporary replica " + b
- + " to RBW, visible length is " + visible);
+ DataNode.LOG.info("Convert replica " + b
+ + " from Temporary to RBW, visible length=" + visible);
- // get replica
- final ReplicaInfo r = volumeMap.get(blockId);
- if (r == null) {
- throw new ReplicaNotFoundException(
- ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
- }
- // check the replica's state
- if (r.getState() != ReplicaState.TEMPORARY) {
- throw new ReplicaNotFoundException(
- "r.getState() != ReplicaState.TEMPORARY, r=" + r);
+ final ReplicaInPipeline temp;
+ {
+ // get replica
+ final ReplicaInfo r = volumeMap.get(blockId);
+ if (r == null) {
+ throw new ReplicaNotFoundException(
+ ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
+ }
+ // check the replica's state
+ if (r.getState() != ReplicaState.TEMPORARY) {
+ throw new ReplicaAlreadyExistsException(
+ "r.getState() != ReplicaState.TEMPORARY, r=" + r);
+ }
+ temp = (ReplicaInPipeline)r;
}
// check generation stamp
- if (r.getGenerationStamp() != expectedGs) {
- throw new ReplicaNotFoundException(
- "r.getGenerationStamp() != expectedGs = " + expectedGs + ", r=" + r);
+ if (temp.getGenerationStamp() != expectedGs) {
+ throw new ReplicaAlreadyExistsException(
+ "temp.getGenerationStamp() != expectedGs = " + expectedGs
+ + ", temp=" + temp);
}
+
+ // TODO: check writer?
+ // set writer to the current thread
+ // temp.setWriter(Thread.currentThread());
+
// check length
- final long numBytes = r.getNumBytes();
+ final long numBytes = temp.getNumBytes();
if (numBytes < visible) {
- throw new ReplicaNotFoundException(numBytes + " = numBytes < visible = "
- + visible + ", r=" + r);
+ throw new IOException(numBytes + " = numBytes < visible = "
+ + visible + ", temp=" + temp);
}
// check volume
- final FSVolume v = r.getVolume();
+ final FSVolume v = temp.getVolume();
if (v == null) {
- throw new IOException("r.getVolume() = null, temp=" + r);
+ throw new IOException("r.getVolume() = null, temp=" + temp);
}
// move block files to the rbw directory
- final File dest = moveBlockFiles(b, r.getBlockFile(), v.rbwDir);
+ final File dest = moveBlockFiles(b, temp.getBlockFile(), v.rbwDir);
// create RBW
final ReplicaBeingWritten rbw = new ReplicaBeingWritten(
blockId, numBytes, expectedGs,
@@ -2024,6 +2033,12 @@ public class FSDataset implements FSCons
return volumeMap.get(blockId);
}
+ @Override
+ public synchronized String getReplicaString(long blockId) {
+ final Replica r = volumeMap.get(blockId);
+ return r == null? "null": r.toString();
+ }
+
@Override // FSDatasetInterface
public synchronized ReplicaRecoveryInfo initReplicaRecovery(
RecoveringBlock rBlock) throws IOException {
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Wed Apr 20 02:28:19 2011
@@ -105,6 +105,11 @@ public interface FSDatasetInterface exte
public Replica getReplica(long blockId);
/**
+ * @return replica meta information
+ */
+ public String getReplicaString(long blockId);
+
+ /**
* @return the generation stamp stored with the block.
*/
public Block getStoredBlock(long blkid) throws IOException;
Propchange: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Apr 20 02:28:19 2011
@@ -4,3 +4,4 @@
/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:776175-785643,785929-786278
/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:796829-820463
/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:820487
+/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:1086482-1095244
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java Wed Apr 20 02:28:19 2011
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
+import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
@@ -24,6 +25,8 @@ import java.net.URI;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.Checksum;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
@@ -232,9 +235,16 @@ public class BackupImage extends FSImage
// update NameSpace in memory
backupInputStream.setBytes(data);
FSEditLogLoader logLoader = new FSEditLogLoader(namesystem);
- logLoader.loadEditRecords(storage.getLayoutVersion(),
- backupInputStream.getDataInputStream(), true,
- lastAppliedTxId + 1);
+ int logVersion = storage.getLayoutVersion();
+ BufferedInputStream bin = new BufferedInputStream(backupInputStream);
+ DataInputStream in = new DataInputStream(bin);
+ Checksum checksum = null;
+ if (logVersion <= -28) { // support fsedits checksum
+ checksum = FSEditLog.getChecksum();
+ in = new DataInputStream(new CheckedInputStream(bin, checksum));
+ }
+ logLoader.loadEditRecords(logVersion, in, checksum, true,
+ lastAppliedTxId + 1);
getFSNamesystem().dir.updateCountForINodeWithQuota(); // inefficient!
break;
case INPROGRESS:
@@ -353,17 +363,25 @@ public class BackupImage extends FSImage
if(jSpoolFile.exists()) {
// load edits.new
EditLogFileInputStream edits = new EditLogFileInputStream(jSpoolFile);
- DataInputStream in = edits.getDataInputStream();
+ BufferedInputStream bin = new BufferedInputStream(edits);
+ DataInputStream in = new DataInputStream(bin);
FSEditLogLoader logLoader = new FSEditLogLoader(namesystem);
- int loaded = logLoader.loadFSEdits(in, false, lastAppliedTxId + 1);
+ int logVersion = logLoader.readLogVersion(in);
+ Checksum checksum = null;
+ if (logVersion <= -28) { // support fsedits checksum
+ checksum = FSEditLog.getChecksum();
+ in = new DataInputStream(new CheckedInputStream(bin, checksum));
+ }
+ int loaded = logLoader.loadEditRecords(logVersion, in, checksum, false,
+ lastAppliedTxId + 1);
lastAppliedTxId += loaded;
numEdits += loaded;
// first time reached the end of spool
jsState = JSpoolState.WAIT;
- loaded = logLoader.loadEditRecords(storage.getLayoutVersion(),
- in, true, lastAppliedTxId + 1);
+ loaded = logLoader.loadEditRecords(logVersion, in, checksum,
+ true, lastAppliedTxId + 1);
numEdits += loaded;
lastAppliedTxId += loaded;
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java Wed Apr 20 02:28:19 2011
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import java.util.zip.Checksum;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.io.DataOutputBuffer;
@@ -91,11 +92,19 @@ class EditLogFileOutputStream extends Ed
* */
@Override
void write(byte op, long txid, Writable... writables) throws IOException {
+ int start = bufCurrent.getLength();
write(op);
bufCurrent.writeLong(txid);
for (Writable w : writables) {
w.write(bufCurrent);
}
+ // write transaction checksum
+ int end = bufCurrent.getLength();
+ Checksum checksum = FSEditLog.getChecksum();
+ checksum.reset();
+ checksum.update(bufCurrent.getData(), start, end-start);
+ int sum = (int)checksum.getValue();
+ bufCurrent.writeInt(sum);
}
/**
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Wed Apr 20 02:28:19 2011
@@ -1285,9 +1285,10 @@ class FSDirectory implements Closeable {
* Check whether the path specifies a directory
*/
boolean isDir(String src) throws UnresolvedLinkException {
+ src = normalizePath(src);
readLock();
try {
- INode node = rootDir.getNode(normalizePath(src), false);
+ INode node = rootDir.getNode(src, false);
return node != null && node.isDirectory();
} finally {
readUnlock();
@@ -1385,6 +1386,12 @@ class FSDirectory implements Closeable {
/** Return the name of the path represented by inodes at [0, pos] */
private static String getFullPathName(INode[] inodes, int pos) {
StringBuilder fullPathName = new StringBuilder();
+ if (inodes[0].isRoot()) {
+ if (pos == 0) return Path.SEPARATOR;
+ } else {
+ fullPathName.append(inodes[0].getLocalName());
+ }
+
for (int i=1; i<=pos; i++) {
fullPathName.append(Path.SEPARATOR_CHAR).append(inodes[i].getLocalName());
}
@@ -2018,7 +2025,7 @@ class FSDirectory implements Closeable {
return null;
}
}
- final String userName = UserGroupInformation.getCurrentUser().getUserName();
+ final String userName = dirPerms.getUserName();
INodeSymlink newNode = unprotectedSymlink(path, target, modTime, modTime,
new PermissionStatus(userName, null, FsPermission.getDefault()));
if (newNode == null) {
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Wed Apr 20 02:28:19 2011
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.zip.Checksum;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -48,6 +49,7 @@ import org.apache.hadoop.io.BytesWritabl
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.util.PureJavaCrc32;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.*;
@@ -91,6 +93,18 @@ public class FSEditLog implements NNStor
private NNStorage storage;
+ private static ThreadLocal<Checksum> localChecksum =
+ new ThreadLocal<Checksum>() {
+ protected Checksum initialValue() {
+ return new PureJavaCrc32();
+ }
+ };
+
+ /** Get a thread local checksum */
+ public static Checksum getChecksum() {
+ return localChecksum.get();
+ }
+
private static class TransactionId {
public long txid;
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Wed Apr 20 02:28:19 2011
@@ -17,12 +17,16 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
+import java.io.BufferedInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.EOFException;
import java.io.IOException;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.Checksum;
+import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
@@ -55,45 +59,65 @@ public class FSEditLogLoader {
*/
int loadFSEdits(EditLogInputStream edits, long expectedStartingTxId)
throws IOException {
- DataInputStream in = edits.getDataInputStream();
long startTime = now();
- int numEdits = loadFSEdits(in, true, expectedStartingTxId);
+ int numEdits = loadFSEdits(edits, true, expectedStartingTxId);
FSImage.LOG.info("Edits file " + edits.getName()
+ " of size " + edits.length() + " edits # " + numEdits
+ " loaded in " + (now()-startTime)/1000 + " seconds.");
return numEdits;
}
- int loadFSEdits(DataInputStream in, boolean closeOnExit,
+ /**
+ * Read the header of fsedit log
+ * @param in fsedit stream
+ * @return the edit log version number
+ * @throws IOException if error occurs
+ */
+ int readLogVersion(DataInputStream in) throws IOException {
+ int logVersion = 0;
+ // Read log file version. Could be missing.
+ in.mark(4);
+ // If edits log is greater than 2G, available method will return negative
+ // numbers, so we avoid having to call available
+ boolean available = true;
+ try {
+ logVersion = in.readByte();
+ } catch (EOFException e) {
+ available = false;
+ }
+ if (available) {
+ in.reset();
+ logVersion = in.readInt();
+ if (logVersion < FSConstants.LAYOUT_VERSION) // future version
+ throw new IOException(
+ "Unexpected version of the file system log file: "
+ + logVersion + ". Current version = "
+ + FSConstants.LAYOUT_VERSION + ".");
+ }
+ assert logVersion <= Storage.LAST_UPGRADABLE_LAYOUT_VERSION :
+ "Unsupported version " + logVersion;
+ return logVersion;
+ }
+
+ int loadFSEdits(EditLogInputStream edits, boolean closeOnExit,
long expectedStartingTxId)
throws IOException {
+ BufferedInputStream bin = new BufferedInputStream(edits);
+ DataInputStream in = new DataInputStream(bin);
+
int numEdits = 0;
int logVersion = 0;
try {
- // Read log file version. Could be missing.
- in.mark(4);
- // If edits log is greater than 2G, available method will return negative
- // numbers, so we avoid having to call available
- boolean available = true;
- try {
- logVersion = in.readByte();
- } catch (EOFException e) {
- available = false;
- }
- if (available) {
- in.reset();
- logVersion = in.readInt();
- if (logVersion < FSConstants.LAYOUT_VERSION) // future version
- throw new IOException(
- "Unexpected version of the file system log file: "
- + logVersion + ". Current version = "
- + FSConstants.LAYOUT_VERSION + ".");
+ logVersion = readLogVersion(in);
+ Checksum checksum = null;
+ if (logVersion <= -28) { // support fsedits checksum
+ checksum = FSEditLog.getChecksum();
+ in = new DataInputStream(new CheckedInputStream(bin, checksum));
}
- assert logVersion <= Storage.LAST_UPGRADABLE_LAYOUT_VERSION :
- "Unsupported version " + logVersion;
-
- numEdits = loadEditRecords(logVersion, in, false, expectedStartingTxId);
+
+ numEdits = loadEditRecords(logVersion, in, checksum, false,
+ expectedStartingTxId);
} finally {
if(closeOnExit)
in.close();
@@ -104,7 +128,9 @@ public class FSEditLogLoader {
@SuppressWarnings("deprecation")
int loadEditRecords(int logVersion, DataInputStream in,
- boolean closeOnExit, long expectedStartingTxId) throws IOException {
+ Checksum checksum, boolean closeOnExit,
+ long expectedStartingTxId)
+ throws IOException {
FSDirectory fsDir = fsNamesys.dir;
int numEdits = 0;
String clientName = null;
@@ -128,6 +154,9 @@ public class FSEditLogLoader {
long blockSize = 0;
FSEditLogOpCodes opCode;
try {
+ if (checksum != null) {
+ checksum.reset();
+ }
in.mark(1);
byte opCodeByte = in.readByte();
opCode = FSEditLogOpCodes.fromByte(opCodeByte);
@@ -139,7 +168,7 @@ public class FSEditLogLoader {
break; // no more transactions
}
- if (logVersion <= -28) {
+ if (logVersion <= -31) {
// Read the txid
long thisTxId = in.readLong();
if (thisTxId != txId + 1) {
@@ -496,6 +525,7 @@ public class FSEditLogLoader {
throw new IOException("Never seen opCode " + opCode);
}
}
+ validateChecksum(in, checksum, numEdits);
}
} finally {
if(closeOnExit)
@@ -521,6 +551,22 @@ public class FSEditLogLoader {
return numEdits;
}
+ /**
+ * Validate a transaction's checksum
+ */
+ private static void validateChecksum(
+ DataInputStream in, Checksum checksum, int tid)
+ throws IOException {
+ if (checksum != null) {
+ int calculatedChecksum = (int)checksum.getValue();
+ int readChecksum = in.readInt(); // read in checksum
+ if (readChecksum != calculatedChecksum) {
+ throw new ChecksumException(
+ "Transaction " + tid + " is corrupt. Calculated checksum is " +
+ calculatedChecksum + " but read checksum " + readChecksum, tid);
+ }
+ }
+ }
/**
* A class to read in blocks stored in the old format. The only two
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Wed Apr 20 02:28:19 2011
@@ -71,7 +71,7 @@ public class FSImage implements NNStorag
private static final SimpleDateFormat DATE_FORM =
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- private static final int FIRST_TXNID_BASED_LAYOUT_VERSION=-29;
+ private static final int FIRST_TXNID_BASED_LAYOUT_VERSION=-32;
// checkpoint states
enum CheckpointStates{START, ROLLED_EDITS, UPLOAD_START, UPLOAD_DONE; }
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java Wed Apr 20 02:28:19 2011
@@ -30,6 +30,7 @@ import java.security.DigestInputStream;
import java.security.DigestOutputStream;
import java.security.MessageDigest;
import java.util.Arrays;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -158,7 +159,7 @@ class FSImageFormat {
// read the transaction ID of the last edit represented by
// this image
- if (imgVersion <= -28) {
+ if (imgVersion <= -31) {
imgTxId = in.readLong();
} else {
imgTxId = 0;
@@ -178,7 +179,11 @@ class FSImageFormat {
// load all inodes
LOG.info("Number of files = " + numFiles);
- loadFullNameINodes(numFiles, in);
+ if (imgVersion <= -30) {
+ loadLocalNameINodes(numFiles, in);
+ } else {
+ loadFullNameINodes(numFiles, in);
+ }
// load datanode info
this.loadDatanodes(in);
@@ -214,6 +219,63 @@ class FSImageFormat {
fsDir.rootDir.setPermissionStatus(root.getPermissionStatus());
}
+ /**
+ * load fsimage files assuming only local names are stored
+ *
+ * @param numFiles number of files expected to be read
+ * @param in image input stream
+ * @throws IOException
+ */
+ private void loadLocalNameINodes(long numFiles, DataInputStream in)
+ throws IOException {
+ assert numFiles > 0;
+
+ // load root
+ if( in.readShort() != 0) {
+ throw new IOException("First node is not root");
+ }
+ INode root = loadINode(in);
+ // update the root's attributes
+ updateRootAttr(root);
+ numFiles--;
+
+ // load rest of the nodes directory by directory
+ while (numFiles > 0) {
+ numFiles -= loadDirectory(in);
+ }
+ if (numFiles != 0) {
+ throw new IOException("Read unexpect number of files: " + -numFiles);
+ }
+ }
+
+ /**
+ * Load all children of a directory
+ *
+ * @param in
+ * @return number of child inodes read
+ * @throws IOException
+ */
+ private int loadDirectory(DataInputStream in) throws IOException {
+ String parentPath = FSImageSerialization.readString(in);
+ FSDirectory fsDir = namesystem.dir;
+ INode parent = fsDir.rootDir.getNode(parentPath, true);
+ if (parent == null || !parent.isDirectory()) {
+ throw new IOException("Path " + parentPath + "is not a directory.");
+ }
+
+ int numChildren = in.readInt();
+ for(int i=0; i<numChildren; i++) {
+ // load single inode
+ byte[] localName = new byte[in.readShort()];
+ in.readFully(localName); // read local name
+ INode newNode = loadINode(in); // read rest of inode
+
+ // add to parent
+ namesystem.dir.addToParent(localName, (INodeDirectory)parent, newNode, false);
+ }
+ return numChildren;
+ }
+
/**
* load fsimage files assuming full path names are stored
*
@@ -501,9 +563,10 @@ class FSImageFormat {
byte[] byteStore = new byte[4*FSConstants.MAX_PATH_LENGTH];
ByteBuffer strbuf = ByteBuffer.wrap(byteStore);
// save the root
- FSImageSerialization.saveINode2Image(strbuf, fsDir.rootDir, out);
+ FSImageSerialization.saveINode2Image(fsDir.rootDir, out);
// save the rest of the nodes
- saveImage(strbuf, 0, fsDir.rootDir, out);
+ saveImage(strbuf, fsDir.rootDir, out);
+ // save files under construction
sourceNamesystem.saveFilesUnderConstruction(out);
sourceNamesystem.saveSecretManagerState(out);
strbuf = null;
@@ -527,28 +590,33 @@ class FSImageFormat {
* This is a recursive procedure, which first saves all children of
* a current directory and then moves inside the sub-directories.
*/
- private static void saveImage(ByteBuffer parentPrefix,
- int prefixLength,
+ private static void saveImage(ByteBuffer currentDirName,
INodeDirectory current,
DataOutputStream out) throws IOException {
- int newPrefixLength = prefixLength;
- if (current.getChildrenRaw() == null)
+ List<INode> children = current.getChildrenRaw();
+ if (children == null || children.isEmpty())
return;
- for(INode child : current.getChildren()) {
+ // print prefix (parent directory name)
+ int prefixLen = currentDirName.position();
+ if (prefixLen == 0) { // root
+ out.writeShort(PATH_SEPARATOR.length);
+ out.write(PATH_SEPARATOR);
+ } else { // non-root directories
+ out.writeShort(prefixLen);
+ out.write(currentDirName.array(), 0, prefixLen);
+ }
+ out.writeInt(children.size());
+ for(INode child : children) {
// print all children first
- parentPrefix.position(prefixLength);
- parentPrefix.put(PATH_SEPARATOR).put(child.getLocalNameBytes());
- FSImageSerialization.saveINode2Image(parentPrefix, child, out);
+ FSImageSerialization.saveINode2Image(child, out);
}
- for(INode child : current.getChildren()) {
+ for(INode child : children) {
if(!child.isDirectory())
continue;
- parentPrefix.position(prefixLength);
- parentPrefix.put(PATH_SEPARATOR).put(child.getLocalNameBytes());
- newPrefixLength = parentPrefix.position();
- saveImage(parentPrefix, newPrefixLength, (INodeDirectory)child, out);
+ currentDirName.put(PATH_SEPARATOR).put(child.getLocalNameBytes());
+ saveImage(currentDirName, (INodeDirectory)child, out);
+ currentDirName.position(prefixLen);
}
- parentPrefix.position(prefixLength);
}
}
}
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java Wed Apr 20 02:28:19 2011
@@ -145,12 +145,11 @@ public class FSImageSerialization {
/*
* Save one inode's attributes to the image.
*/
- static void saveINode2Image(ByteBuffer name,
- INode node,
+ static void saveINode2Image(INode node,
DataOutputStream out) throws IOException {
- int nameLen = name.position();
- out.writeShort(nameLen);
- out.write(name.array(), name.arrayOffset(), nameLen);
+ byte[] name = node.getLocalNameBytes();
+ out.writeShort(name.length);
+ out.write(name);
FsPermission filePerm = TL_DATA.get().FILE_PERM;
if (node.isDirectory()) {
out.writeShort(0); // replication
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Wed Apr 20 02:28:19 2011
@@ -252,6 +252,8 @@ public class FSNamesystem implements FSC
private FsServerDefaults serverDefaults;
// allow appending to hdfs files
private boolean supportAppends = true;
+ private DataTransferProtocol.ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure =
+ DataTransferProtocol.ReplaceDatanodeOnFailure.DEFAULT;
private volatile SafeModeInfo safeMode; // safe mode information
private Host2NodesMap host2DataNodeMap = new Host2NodesMap();
@@ -522,6 +524,8 @@ public class FSNamesystem implements FSC
+ " blockKeyUpdateInterval=" + blockKeyUpdateInterval / (60 * 1000)
+ " min(s), blockTokenLifetime=" + blockTokenLifetime / (60 * 1000)
+ " min(s)");
+
+ this.dtpReplaceDatanodeOnFailure = DataTransferProtocol.ReplaceDatanodeOnFailure.get(conf);
}
/**
@@ -1312,22 +1316,16 @@ public class FSNamesystem implements FSC
long blockSize) throws SafeModeException, FileAlreadyExistsException,
AccessControlException, UnresolvedLinkException, FileNotFoundException,
ParentNotDirectoryException, IOException {
- writeLock();
- try {
- boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
- boolean append = flag.contains(CreateFlag.APPEND);
- boolean create = flag.contains(CreateFlag.CREATE);
-
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: src=" + src
+ ", holder=" + holder
+ ", clientMachine=" + clientMachine
+ ", createParent=" + createParent
+ ", replication=" + replication
- + ", overwrite=" + overwrite
- + ", append=" + append);
+ + ", createFlag=" + flag.toString());
}
-
+ writeLock();
+ try {
if (isInSafeMode())
throw new SafeModeException("Cannot create file" + src, safeMode);
if (!DFSUtil.isValidName(src)) {
@@ -1337,14 +1335,16 @@ public class FSNamesystem implements FSC
// Verify that the destination does not exist as a directory already.
boolean pathExists = dir.exists(src);
if (pathExists && dir.isDir(src)) {
- throw new FileAlreadyExistsException("Cannot create file "+ src + "; already exists as a directory.");
+ throw new FileAlreadyExistsException("Cannot create file " + src
+ + "; already exists as a directory.");
}
+ boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
+ boolean append = flag.contains(CreateFlag.APPEND);
if (isPermissionEnabled) {
if (append || (overwrite && pathExists)) {
checkPathAccess(src, FsAction.WRITE);
- }
- else {
+ } else {
checkAncestorAccess(src, FsAction.WRITE);
}
}
@@ -1395,7 +1395,8 @@ public class FSNamesystem implements FSC
". Lease recovery is in progress. Try again later.");
} else {
- if(pendingFile.getLastBlock().getBlockUCState() ==
+ BlockInfoUnderConstruction lastBlock=pendingFile.getLastBlock();
+ if(lastBlock != null && lastBlock.getBlockUCState() ==
BlockUCState.UNDER_RECOVERY) {
throw new RecoveryInProgressException(
"Recovery in progress, file [" + src + "], " +
@@ -1416,34 +1417,27 @@ public class FSNamesystem implements FSC
} catch(IOException e) {
throw new IOException("failed to create "+e.getMessage());
}
- if (append) {
- if (myFile == null) {
- if(!create)
- throw new FileNotFoundException("failed to append to non-existent file "
- + src + " on client " + clientMachine);
- else {
- //append & create a nonexist file equals to overwrite
- return startFileInternal(src, permissions, holder, clientMachine,
- EnumSet.of(CreateFlag.OVERWRITE), createParent, replication, blockSize);
- }
- } else if (myFile.isDirectory()) {
- throw new IOException("failed to append to directory " + src
- +" on client " + clientMachine);
+ boolean create = flag.contains(CreateFlag.CREATE);
+ if (myFile == null) {
+ if (!create) {
+ throw new FileNotFoundException("failed to overwrite or append to non-existent file "
+ + src + " on client " + clientMachine);
}
- } else if (!dir.isValidToCreate(src)) {
+ } else {
+ // File exists - must be one of append or overwrite
if (overwrite) {
delete(src, true);
- } else {
- throw new IOException("failed to create file " + src
- +" on client " + clientMachine
- +" either because the filename is invalid or the file exists");
+ } else if (!append) {
+ throw new FileAlreadyExistsException("failed to create file " + src
+ + " on client " + clientMachine
+ + " because the file exists");
}
}
DatanodeDescriptor clientNode =
host2DataNodeMap.getDatanodeByHost(clientMachine);
- if (append) {
+ if (append && myFile != null) {
//
// Replace current node with a INodeUnderConstruction.
// Recreate in-memory lease record.
@@ -1635,6 +1629,53 @@ public class FSNamesystem implements FSC
return b;
}
+ /** @see NameNode#getAdditionalDatanode(String, Block, DatanodeInfo[], DatanodeInfo[], int, String) */
+ LocatedBlock getAdditionalDatanode(final String src, final Block blk,
+ final DatanodeInfo[] existings, final HashMap<Node, Node> excludes,
+ final int numAdditionalNodes, final String clientName
+ ) throws IOException {
+ //check if the feature is enabled
+ dtpReplaceDatanodeOnFailure.checkEnabled();
+
+ final DatanodeDescriptor clientnode;
+ final long preferredblocksize;
+ readLock();
+ try {
+ //check safe mode
+ if (isInSafeMode()) {
+ throw new SafeModeException("Cannot add datanode; src=" + src
+ + ", blk=" + blk, safeMode);
+ }
+
+ //check lease
+ final INodeFileUnderConstruction file = checkLease(src, clientName);
+ clientnode = file.getClientNode();
+ preferredblocksize = file.getPreferredBlockSize();
+ } finally {
+ readUnlock();
+ }
+
+ //find datanode descriptors
+ final List<DatanodeDescriptor> chosen = new ArrayList<DatanodeDescriptor>();
+ for(DatanodeInfo d : existings) {
+ final DatanodeDescriptor descriptor = getDatanode(d);
+ if (descriptor != null) {
+ chosen.add(descriptor);
+ }
+ }
+
+ // choose new datanodes.
+ final DatanodeInfo[] targets = blockManager.replicator.chooseTarget(
+ src, numAdditionalNodes, clientnode, chosen, true,
+ excludes, preferredblocksize);
+ final LocatedBlock lb = new LocatedBlock(blk, targets);
+ if (isBlockTokenEnabled) {
+ lb.setBlockToken(blockTokenSecretManager.generateToken(lb.getBlock(),
+ EnumSet.of(BlockTokenSecretManager.AccessMode.COPY)));
+ }
+ return lb;
+ }
+
/**
* The client would like to let go of the given block
*/
@@ -2661,7 +2702,6 @@ public class FSNamesystem implements FSC
* Get registrationID for datanodes based on the namespaceID.
*
* @see #registerDatanode(DatanodeRegistration)
- * @see FSImage#newNamespaceID()
* @return registration ID
*/
public String getRegistrationID() {
@@ -3104,6 +3144,14 @@ public class FSNamesystem implements FSC
throw new IOException("ProcessReport from dead or unregisterted node: "
+ nodeID.getName());
}
+ // To minimize startup time, we discard any second (or later) block reports
+ // that we receive while still in startup phase.
+ if (isInStartupSafeMode() && node.numBlocks() > 0) {
+ NameNode.stateChangeLog.info("BLOCK* NameSystem.processReport: "
+ + "discarded non-initial block report from " + nodeID.getName()
+ + " because namenode still in startup phase");
+ return;
+ }
blockManager.processReport(node, newReport);
NameNode.getNameNodeMetrics().blockReport.inc((int) (now() - startTime));
@@ -4171,6 +4219,15 @@ public class FSNamesystem implements FSC
return false;
return safeMode.isOn();
}
+
+ /**
+ * Check whether the name node is in startup mode.
+ */
+ synchronized boolean isInStartupSafeMode() {
+ if (safeMode == null)
+ return false;
+ return safeMode.isOn() && !safeMode.isManual();
+ }
/**
* Check whether replication queues are populated.
@@ -4542,7 +4599,8 @@ public class FSNamesystem implements FSC
bean = new StandardMBean(this,FSNamesystemMBean.class);
mbeanName = MBeanUtil.registerMBean("NameNode", "FSNamesystemState", bean);
} catch (NotCompliantMBeanException e) {
- e.printStackTrace();
+ LOG.warn("Exception in initializing StandardMBean as FSNamesystemMBean",
+ e);
}
LOG.info("Registered FSNamesystemStatusMBean");
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java Wed Apr 20 02:28:19 2011
@@ -233,12 +233,8 @@ abstract class INode implements Comparab
String getLocalParentDir() {
- INode p_node=getParent();
-
- if(p_node == null)
- return "/";
- else
- return p_node.getFullPathName();
+ INode inode = isRoot() ? this : getParent();
+ return (inode != null) ? inode.getFullPathName() : "";
}
/**
@@ -271,12 +267,7 @@ abstract class INode implements Comparab
/** {@inheritDoc} */
public String toString() {
- String i_path=getFullPathName();
-
- if(i_path.length() == 0)
- i_path="/";
-
- return "\"" + i_path + "\":"
+ return "\"" + getFullPathName() + "\":"
+ getUserName() + ":" + getGroupName() + ":"
+ (isDirectory()? "d": "-") + getFsPermission();
}
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Wed Apr 20 02:28:19 2011
@@ -17,13 +17,12 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
@@ -33,9 +32,9 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.Options;
@@ -80,9 +79,6 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
-import org.apache.hadoop.hdfs.server.namenode.GetDelegationTokenServlet;
-import org.apache.hadoop.hdfs.server.namenode.CancelDelegationTokenServlet;
-import org.apache.hadoop.hdfs.server.namenode.RenewDelegationTokenServlet;
import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
@@ -842,6 +838,33 @@ public class NameNode implements Namenod
return locatedBlock;
}
+ @Override
+ public LocatedBlock getAdditionalDatanode(final String src, final Block blk,
+ final DatanodeInfo[] existings, final DatanodeInfo[] excludes,
+ final int numAdditionalNodes, final String clientName
+ ) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("getAdditionalDatanode: src=" + src
+ + ", blk=" + blk
+ + ", existings=" + Arrays.asList(existings)
+ + ", excludes=" + Arrays.asList(excludes)
+ + ", numAdditionalNodes=" + numAdditionalNodes
+ + ", clientName=" + clientName);
+ }
+
+ myMetrics.numGetAdditionalDatanodeOps.inc();
+
+ HashMap<Node, Node> excludeSet = null;
+ if (excludes != null) {
+ excludeSet = new HashMap<Node, Node>(excludes.length);
+ for (Node node : excludes) {
+ excludeSet.put(node, node);
+ }
+ }
+ return namesystem.getAdditionalDatanode(src, blk,
+ existings, excludeSet, numAdditionalNodes, clientName);
+ }
+
/**
* The client needs to give up on the block.
*/
@@ -1204,7 +1227,7 @@ public class NameNode implements Namenod
}
final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
namesystem.createSymlink(target, link,
- new PermissionStatus(ugi.getUserName(), null, dirPerms), createParent);
+ new PermissionStatus(ugi.getShortUserName(), null, dirPerms), createParent);
}
/** @inheritDoc */
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java Wed Apr 20 02:28:19 2011
@@ -72,6 +72,8 @@ public class NameNodeMetrics implements
new MetricsTimeVaryingInt("FileInfoOps", registry);
public MetricsTimeVaryingInt numAddBlockOps =
new MetricsTimeVaryingInt("AddBlockOps", registry);
+ public final MetricsTimeVaryingInt numGetAdditionalDatanodeOps
+ = new MetricsTimeVaryingInt("GetAdditionalDatanodeOps", registry);
public MetricsTimeVaryingInt numcreateSymlinkOps =
new MetricsTimeVaryingInt("CreateSymlinkOps", registry);
public MetricsTimeVaryingInt numgetLinkTargetOps =
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java Wed Apr 20 02:28:19 2011
@@ -266,13 +266,21 @@ public class DFSAdmin extends FsShell {
super(conf);
}
+ protected DistributedFileSystem getDFS() throws IOException {
+ FileSystem fs = getFS();
+ if (!(fs instanceof DistributedFileSystem)) {
+ throw new IllegalArgumentException("FileSystem " + fs.getUri() +
+ " is not a distributed file system");
+ }
+ return (DistributedFileSystem)fs;
+ }
+
/**
* Gives a report on how the FileSystem is doing.
* @exception IOException if the filesystem does not exist.
*/
public void report() throws IOException {
- if (fs instanceof DistributedFileSystem) {
- DistributedFileSystem dfs = (DistributedFileSystem) fs;
+ DistributedFileSystem dfs = getDFS();
FsStatus ds = dfs.getStatus();
long capacity = ds.getCapacity();
long used = ds.getUsed();
@@ -339,7 +347,6 @@ public class DFSAdmin extends FsShell {
System.out.println();
}
}
- }
}
/**
@@ -350,10 +357,6 @@ public class DFSAdmin extends FsShell {
* @exception IOException if the filesystem does not exist.
*/
public void setSafeMode(String[] argv, int idx) throws IOException {
- if (!(fs instanceof DistributedFileSystem)) {
- System.err.println("FileSystem is " + fs.getUri());
- return;
- }
if (idx != argv.length - 1) {
printUsage("-safemode");
return;
@@ -374,7 +377,7 @@ public class DFSAdmin extends FsShell {
printUsage("-safemode");
return;
}
- DistributedFileSystem dfs = (DistributedFileSystem) fs;
+ DistributedFileSystem dfs = getDFS();
boolean inSafeMode = dfs.setSafeMode(action);
//
@@ -404,12 +407,7 @@ public class DFSAdmin extends FsShell {
public int saveNamespace() throws IOException {
int exitCode = -1;
- if (!(fs instanceof DistributedFileSystem)) {
- System.err.println("FileSystem is " + fs.getUri());
- return exitCode;
- }
-
- DistributedFileSystem dfs = (DistributedFileSystem) fs;
+ DistributedFileSystem dfs = getDFS();
dfs.saveNamespace();
exitCode = 0;
@@ -425,17 +423,12 @@ public class DFSAdmin extends FsShell {
public int restoreFaileStorage(String arg) throws IOException {
int exitCode = -1;
- if (!(fs instanceof DistributedFileSystem)) {
- System.err.println("FileSystem is " + fs.getUri());
- return exitCode;
- }
-
if(!arg.equals("check") && !arg.equals("true") && !arg.equals("false")) {
System.err.println("restoreFailedStorage valid args are true|false|check");
return exitCode;
}
- DistributedFileSystem dfs = (DistributedFileSystem) fs;
+ DistributedFileSystem dfs = getDFS();
Boolean res = dfs.restoreFailedStorage(arg);
System.out.println("restoreFailedStorage is set to " + res);
exitCode = 0;
@@ -452,12 +445,7 @@ public class DFSAdmin extends FsShell {
public int refreshNodes() throws IOException {
int exitCode = -1;
- if (!(fs instanceof DistributedFileSystem)) {
- System.err.println("FileSystem is " + fs.getUri());
- return exitCode;
- }
-
- DistributedFileSystem dfs = (DistributedFileSystem) fs;
+ DistributedFileSystem dfs = getDFS();
dfs.refreshNodes();
exitCode = 0;
@@ -611,18 +599,10 @@ public class DFSAdmin extends FsShell {
* @exception IOException
*/
public int finalizeUpgrade() throws IOException {
- int exitCode = -1;
-
- if (!(fs instanceof DistributedFileSystem)) {
- System.out.println("FileSystem is " + fs.getUri());
- return exitCode;
- }
-
- DistributedFileSystem dfs = (DistributedFileSystem) fs;
+ DistributedFileSystem dfs = getDFS();
dfs.finalizeUpgrade();
- exitCode = 0;
-
- return exitCode;
+
+ return 0;
}
/**
@@ -633,10 +613,7 @@ public class DFSAdmin extends FsShell {
* @exception IOException
*/
public int upgradeProgress(String[] argv, int idx) throws IOException {
- if (!(fs instanceof DistributedFileSystem)) {
- System.out.println("FileSystem is " + fs.getUri());
- return -1;
- }
+
if (idx != argv.length - 1) {
printUsage("-upgradeProgress");
return -1;
@@ -654,7 +631,7 @@ public class DFSAdmin extends FsShell {
return -1;
}
- DistributedFileSystem dfs = (DistributedFileSystem) fs;
+ DistributedFileSystem dfs = getDFS();
UpgradeStatusReport status = dfs.distributedUpgradeProgress(action);
String statusText = (status == null ?
"There are no upgrades in progress." :
@@ -673,7 +650,7 @@ public class DFSAdmin extends FsShell {
*/
public int metaSave(String[] argv, int idx) throws IOException {
String pathname = argv[idx];
- DistributedFileSystem dfs = (DistributedFileSystem) fs;
+ DistributedFileSystem dfs = getDFS();
dfs.metaSave(pathname);
System.out.println("Created file " + pathname + " on server " +
dfs.getUri());
@@ -688,8 +665,7 @@ public class DFSAdmin extends FsShell {
* @throws IOException If an error while getting datanode report
*/
public int printTopology() throws IOException {
- if (fs instanceof DistributedFileSystem) {
- DistributedFileSystem dfs = (DistributedFileSystem)fs;
+ DistributedFileSystem dfs = getDFS();
DFSClient client = dfs.getClient();
DatanodeInfo[] report = client.datanodeReport(DatanodeReportType.ALL);
@@ -724,7 +700,6 @@ public class DFSAdmin extends FsShell {
System.out.println();
}
- }
return 0;
}
@@ -1009,13 +984,13 @@ public class DFSAdmin extends FsShell {
} else if ("-metasave".equals(cmd)) {
exitCode = metaSave(argv, i);
} else if (ClearQuotaCommand.matches(cmd)) {
- exitCode = new ClearQuotaCommand(argv, i, fs).runAll();
+ exitCode = new ClearQuotaCommand(argv, i, getDFS()).runAll();
} else if (SetQuotaCommand.matches(cmd)) {
- exitCode = new SetQuotaCommand(argv, i, fs).runAll();
+ exitCode = new SetQuotaCommand(argv, i, getDFS()).runAll();
} else if (ClearSpaceQuotaCommand.matches(cmd)) {
- exitCode = new ClearSpaceQuotaCommand(argv, i, fs).runAll();
+ exitCode = new ClearSpaceQuotaCommand(argv, i, getDFS()).runAll();
} else if (SetSpaceQuotaCommand.matches(cmd)) {
- exitCode = new SetSpaceQuotaCommand(argv, i, fs).runAll();
+ exitCode = new SetSpaceQuotaCommand(argv, i, getDFS()).runAll();
} else if ("-refreshServiceAcl".equals(cmd)) {
exitCode = refreshServiceAcl();
} else if ("-refreshUserToGroupsMappings".equals(cmd)) {
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsElement.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsElement.java?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsElement.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsElement.java Wed Apr 20 02:28:19 2011
@@ -81,5 +81,6 @@ public enum EditsElement {
KEY_ID,
KEY_EXPIRY_DATE,
KEY_LENGTH,
- KEY_BLOB
+ KEY_BLOB,
+ CHECKSUM
}
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsLoaderCurrent.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsLoaderCurrent.java?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsLoaderCurrent.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsLoaderCurrent.java Wed Apr 20 02:28:19 2011
@@ -17,26 +17,15 @@
*/
package org.apache.hadoop.hdfs.tools.offlineEditsViewer;
-import java.io.DataInputStream;
import java.io.IOException;
-import java.io.EOFException;
-
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.ByteToken;
-import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.ShortToken;
import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.IntToken;
import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.VIntToken;
-import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.LongToken;
-import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.VLongToken;
-import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.StringUTF8Token;
-import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.StringTextToken;
-import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.BlobToken;
-import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.BytesWritableToken;
-import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.EmptyToken;
/**
* EditsLoaderCurrent processes Hadoop EditLogs files and walks over
@@ -49,7 +38,7 @@ import static org.apache.hadoop.hdfs.too
class EditsLoaderCurrent implements EditsLoader {
private static int [] supportedVersions = {
- -18, -19, -20, -21, -22, -23, -24, -25, -26, -27, -28 };
+ -18, -19, -20, -21, -22, -23, -24, -25, -26, -27, -28, -30, -31 };
private EditsVisitor v;
private int editsVersion = 0;
@@ -77,7 +66,7 @@ class EditsLoaderCurrent implements Edit
* Visit OP_INVALID
*/
private void visit_OP_INVALID() throws IOException {
- if(editsVersion <= -28) {
+ if(editsVersion <= -31) {
v.visitLong(EditsElement.TRANSACTION_ID);
}
}
@@ -103,7 +92,7 @@ class EditsLoaderCurrent implements Edit
*/
private void visit_OP_ADD_or_OP_CLOSE(FSEditLogOpCodes editsOpCode)
throws IOException {
- if(editsVersion <= -28) {
+ if(editsVersion <= -31) {
v.visitLong(EditsElement.TRANSACTION_ID);
}
@@ -149,7 +138,7 @@ class EditsLoaderCurrent implements Edit
* Visit OP_RENAME_OLD
*/
private void visit_OP_RENAME_OLD() throws IOException {
- if(editsVersion <= -28) {
+ if(editsVersion <= -31) {
v.visitLong(EditsElement.TRANSACTION_ID);
}
@@ -163,7 +152,7 @@ class EditsLoaderCurrent implements Edit
* Visit OP_DELETE
*/
private void visit_OP_DELETE() throws IOException {
- if(editsVersion <= -28) {
+ if(editsVersion <= -31) {
v.visitLong(EditsElement.TRANSACTION_ID);
}
@@ -176,7 +165,7 @@ class EditsLoaderCurrent implements Edit
* Visit OP_MKDIR
*/
private void visit_OP_MKDIR() throws IOException {
- if(editsVersion <= -28) {
+ if(editsVersion <= -31) {
v.visitLong(EditsElement.TRANSACTION_ID);
}
@@ -198,7 +187,7 @@ class EditsLoaderCurrent implements Edit
* Visit OP_SET_REPLICATION
*/
private void visit_OP_SET_REPLICATION() throws IOException {
- if(editsVersion <= -28) {
+ if(editsVersion <= -31) {
v.visitLong(EditsElement.TRANSACTION_ID);
}
@@ -210,7 +199,7 @@ class EditsLoaderCurrent implements Edit
* Visit OP_SET_PERMISSIONS
*/
private void visit_OP_SET_PERMISSIONS() throws IOException {
- if(editsVersion <= -28) {
+ if(editsVersion <= -31) {
v.visitLong(EditsElement.TRANSACTION_ID);
}
@@ -222,7 +211,7 @@ class EditsLoaderCurrent implements Edit
* Visit OP_SET_OWNER
*/
private void visit_OP_SET_OWNER() throws IOException {
- if(editsVersion <= -28) {
+ if(editsVersion <= -31) {
v.visitLong(EditsElement.TRANSACTION_ID);
}
@@ -235,7 +224,7 @@ class EditsLoaderCurrent implements Edit
* Visit OP_SET_GENSTAMP
*/
private void visit_OP_SET_GENSTAMP() throws IOException {
- if(editsVersion <= -28) {
+ if(editsVersion <= -31) {
v.visitLong(EditsElement.TRANSACTION_ID);
}
@@ -246,7 +235,7 @@ class EditsLoaderCurrent implements Edit
* Visit OP_TIMES
*/
private void visit_OP_TIMES() throws IOException {
- if(editsVersion <= -28) {
+ if(editsVersion <= -31) {
v.visitLong(EditsElement.TRANSACTION_ID);
}
@@ -260,7 +249,7 @@ class EditsLoaderCurrent implements Edit
* Visit OP_SET_QUOTA
*/
private void visit_OP_SET_QUOTA() throws IOException {
- if(editsVersion <= -28) {
+ if(editsVersion <= -31) {
v.visitLong(EditsElement.TRANSACTION_ID);
}
@@ -273,7 +262,7 @@ class EditsLoaderCurrent implements Edit
* Visit OP_RENAME
*/
private void visit_OP_RENAME() throws IOException {
- if(editsVersion <= -28) {
+ if(editsVersion <= -31) {
v.visitLong(EditsElement.TRANSACTION_ID);
}
@@ -293,7 +282,7 @@ class EditsLoaderCurrent implements Edit
* Visit OP_CONCAT_DELETE
*/
private void visit_OP_CONCAT_DELETE() throws IOException {
- if(editsVersion <= -28) {
+ if(editsVersion <= -31) {
v.visitLong(EditsElement.TRANSACTION_ID);
}
@@ -317,7 +306,7 @@ class EditsLoaderCurrent implements Edit
* Visit OP_SYMLINK
*/
private void visit_OP_SYMLINK() throws IOException {
- if(editsVersion <= -28) {
+ if(editsVersion <= -31) {
v.visitLong(EditsElement.TRANSACTION_ID);
}
@@ -340,7 +329,7 @@ class EditsLoaderCurrent implements Edit
* Visit OP_GET_DELEGATION_TOKEN
*/
private void visit_OP_GET_DELEGATION_TOKEN() throws IOException {
- if(editsVersion <= -28) {
+ if(editsVersion <= -31) {
v.visitLong(EditsElement.TRANSACTION_ID);
}
@@ -366,7 +355,7 @@ class EditsLoaderCurrent implements Edit
*/
private void visit_OP_RENEW_DELEGATION_TOKEN()
throws IOException {
- if(editsVersion <= -28) {
+ if(editsVersion <= -31) {
v.visitLong(EditsElement.TRANSACTION_ID);
}
@@ -392,7 +381,7 @@ class EditsLoaderCurrent implements Edit
*/
private void visit_OP_CANCEL_DELEGATION_TOKEN()
throws IOException {
- if(editsVersion <= -28) {
+ if(editsVersion <= -31) {
v.visitLong(EditsElement.TRANSACTION_ID);
}
@@ -417,7 +406,7 @@ class EditsLoaderCurrent implements Edit
*/
private void visit_OP_UPDATE_MASTER_KEY()
throws IOException {
- if(editsVersion <= -28) {
+ if(editsVersion <= -31) {
v.visitLong(EditsElement.TRANSACTION_ID);
}
@@ -530,6 +519,10 @@ class EditsLoaderCurrent implements Edit
visitOpCode(editsOpCode);
v.leaveEnclosingElement(); // DATA
+
+ if (editsOpCode != FSEditLogOpCodes.OP_INVALID && editsVersion <= -28) {
+ v.visitInt(EditsElement.CHECKSUM);
+ }
v.leaveEnclosingElement(); // RECORD
} while(editsOpCode != FSEditLogOpCodes.OP_INVALID);