You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2012/02/27 00:32:14 UTC
svn commit: r1293964 [6/11] - in
/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs:
./ src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/protocol/
src/main/java/org/apache/hadoop/hdfs/protocolPB/ src/...
Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockKey.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockKey.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockKey.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockKey.java Sun Feb 26 23:32:06 2012
@@ -36,4 +36,8 @@ public class BlockKey extends Delegation
public BlockKey(int keyId, long expiryDate, SecretKey key) {
super(keyId, expiryDate, key);
}
+
+ public BlockKey(int keyId, long expiryDate, byte[] encodedKey) {
+ super(keyId, expiryDate, encodedKey);
+ }
}
Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/ExportedBlockKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/ExportedBlockKeys.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/ExportedBlockKeys.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/ExportedBlockKeys.java Sun Feb 26 23:32:06 2012
@@ -43,7 +43,7 @@ public class ExportedBlockKeys implement
this(false, 0, 0, new BlockKey(), new BlockKey[0]);
}
- ExportedBlockKeys(boolean isBlockTokenEnabled, long keyUpdateInterval,
+ public ExportedBlockKeys(boolean isBlockTokenEnabled, long keyUpdateInterval,
long tokenLifetime, BlockKey currentKey, BlockKey[] allKeys) {
this.isBlockTokenEnabled = isBlockTokenEnabled;
this.keyUpdateInterval = keyUpdateInterval;
Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java Sun Feb 26 23:32:06 2012
@@ -36,6 +36,8 @@ import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
+import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
@@ -46,6 +48,7 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
@@ -206,14 +209,15 @@ class NameNodeConnector {
methodNameToPolicyMap.put("getBlocks", methodPolicy);
methodNameToPolicyMap.put("getAccessKeys", methodPolicy);
- return (NamenodeProtocol) RetryProxy.create(NamenodeProtocol.class,
- RPC.getProxy(NamenodeProtocol.class,
- NamenodeProtocol.versionID,
- address,
- UserGroupInformation.getCurrentUser(),
- conf,
- NetUtils.getDefaultSocketFactory(conf)),
- methodNameToPolicyMap);
+ RPC.setProtocolEngine(conf, NamenodeProtocolPB.class,
+ ProtobufRpcEngine.class);
+ NamenodeProtocolPB proxy = RPC.getProxy(NamenodeProtocolPB.class,
+ RPC.getProtocolVersion(NamenodeProtocolPB.class), address,
+ UserGroupInformation.getCurrentUser(), conf,
+ NetUtils.getDefaultSocketFactory(conf));
+ NamenodeProtocolPB retryProxy = (NamenodeProtocolPB) RetryProxy.create(
+ NamenodeProtocolPB.class, proxy, methodNameToPolicyMap);
+ return new NamenodeProtocolTranslatorPB(retryProxy);
}
/**
Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java Sun Feb 26 23:32:06 2012
@@ -112,7 +112,6 @@ public class BlockInfoUnderConstruction
return (this == obj) || super.equals(obj);
}
- /** {@inheritDoc} */
@Override
public String toString() {
final StringBuilder b = new StringBuilder(getClass().getSimpleName());
@@ -268,7 +267,6 @@ public class BlockInfoUnderConstruction
return (this == obj) || super.equals(obj);
}
- /** {@inheritDoc} */
@Override
public String toString() {
final StringBuilder b = new StringBuilder(super.toString());
Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Sun Feb 26 23:32:06 2012
@@ -59,10 +59,12 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
+import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.util.Daemon;
@@ -2082,7 +2084,7 @@ public class BlockManager {
* Modify (block-->datanode) map. Possibly generate replication tasks, if the
* removed block is still valid.
*/
- private void removeStoredBlock(Block block, DatanodeDescriptor node) {
+ public void removeStoredBlock(Block block, DatanodeDescriptor node) {
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("BLOCK* removeStoredBlock: "
+ block + " from " + node.getName());
@@ -2201,27 +2203,48 @@ public class BlockManager {
}
}
- /** The given node is reporting that it received a certain block. */
- public void blockReceived(final DatanodeID nodeID, final String poolId,
- final Block block, final String delHint) throws IOException {
+ /** The given node is reporting that it received/deleted certain blocks. */
+ public void blockReceivedAndDeleted(final DatanodeID nodeID,
+ final String poolId,
+ final ReceivedDeletedBlockInfo receivedAndDeletedBlocks[]
+ ) throws IOException {
namesystem.writeLock();
+ int received = 0;
+ int deleted = 0;
try {
final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
if (node == null || !node.isAlive) {
- final String s = block + " is received from dead or unregistered node "
- + nodeID.getName();
- NameNode.stateChangeLog.warn("BLOCK* blockReceived: " + s);
- throw new IOException(s);
- }
-
- if (NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("BLOCK* blockReceived: " + block
- + " is received from " + nodeID.getName());
+ NameNode.stateChangeLog
+ .warn("BLOCK* blockReceivedDeleted"
+ + " is received from dead or unregistered node "
+ + nodeID.getName());
+ throw new IOException(
+ "Got blockReceivedDeleted message from unregistered or dead node");
}
- addBlock(node, block, delHint);
+ for (int i = 0; i < receivedAndDeletedBlocks.length; i++) {
+ if (receivedAndDeletedBlocks[i].isDeletedBlock()) {
+ removeStoredBlock(
+ receivedAndDeletedBlocks[i].getBlock(), node);
+ deleted++;
+ } else {
+ addBlock(node, receivedAndDeletedBlocks[i].getBlock(),
+ receivedAndDeletedBlocks[i].getDelHints());
+ received++;
+ }
+ if (NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("BLOCK* block"
+ + (receivedAndDeletedBlocks[i].isDeletedBlock() ? "Deleted"
+ : "Received") + ": " + receivedAndDeletedBlocks[i].getBlock()
+ + " is received from " + nodeID.getName());
+ }
+ }
} finally {
namesystem.writeUnlock();
+ NameNode.stateChangeLog
+ .debug("*BLOCK* NameNode.blockReceivedAndDeleted: " + "from "
+ + nodeID.getName() + " received: " + received + ", "
+ + " deleted: " + deleted);
}
}
@@ -2396,6 +2419,7 @@ public class BlockManager {
}
public void removeBlock(Block block) {
+ block.setNumBytes(BlockCommand.NO_ACK);
addToInvalidates(block);
corruptReplicas.removeFromCorruptReplicasMap(block);
blocksMap.removeBlock(block);
Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java Sun Feb 26 23:32:06 2012
@@ -66,7 +66,7 @@ public class BlockPlacementPolicyDefault
BlockPlacementPolicyDefault() {
}
- /** {@inheritDoc} */
+ @Override
public void initialize(Configuration conf, FSClusterStats stats,
NetworkTopology clusterMap) {
this.considerLoad = conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true);
@@ -82,7 +82,7 @@ public class BlockPlacementPolicyDefault
}
};
- /** {@inheritDoc} */
+ @Override
public DatanodeDescriptor[] chooseTarget(String srcPath,
int numOfReplicas,
DatanodeDescriptor writer,
@@ -92,7 +92,6 @@ public class BlockPlacementPolicyDefault
null, blocksize);
}
- /** {@inheritDoc} */
@Override
public DatanodeDescriptor[] chooseTarget(String srcPath,
int numOfReplicas,
@@ -528,7 +527,7 @@ public class BlockPlacementPolicyDefault
return nodes;
}
- /** {@inheritDoc} */
+ @Override
public int verifyBlockPlacement(String srcPath,
LocatedBlock lBlk,
int minRacks) {
@@ -547,7 +546,7 @@ public class BlockPlacementPolicyDefault
return minRacks - racks.size();
}
- /** {@inheritDoc} */
+ @Override
public DatanodeDescriptor chooseReplicaToDelete(FSInodeInfo inode,
Block block,
short replicationFactor,
Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Sun Feb 26 23:32:06 2012
@@ -60,6 +60,7 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
+import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
import org.apache.hadoop.hdfs.util.CyclicIteration;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.CachedDNSToSwitchMapping;
@@ -862,7 +863,7 @@ public class DatanodeManager {
try {
nodeinfo = getDatanode(nodeReg);
} catch(UnregisteredNodeException e) {
- return new DatanodeCommand[]{DatanodeCommand.REGISTER};
+ return new DatanodeCommand[]{RegisterCommand.REGISTER};
}
// Check if this datanode should actually be shutdown instead.
@@ -872,7 +873,7 @@ public class DatanodeManager {
}
if (nodeinfo == null || !nodeinfo.isAlive) {
- return new DatanodeCommand[]{DatanodeCommand.REGISTER};
+ return new DatanodeCommand[]{RegisterCommand.REGISTER};
}
heartbeatManager.updateHeartbeat(nodeinfo, capacity, dfsUsed,
Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java Sun Feb 26 23:32:06 2012
@@ -36,6 +36,7 @@ import org.apache.hadoop.hdfs.protocol.E
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@@ -45,11 +46,17 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
+import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
+import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
+import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
-import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.StringUtils;
@@ -86,15 +93,17 @@ class BPOfferService implements Runnable
DatanodeRegistration bpRegistration;
long lastBlockReport = 0;
+ long lastDeletedReport = 0;
boolean resetBlockReportTime = true;
Thread bpThread;
- DatanodeProtocol bpNamenode;
+ DatanodeProtocolClientSideTranslatorPB bpNamenode;
private long lastHeartbeat = 0;
private volatile boolean initialized = false;
- private final LinkedList<Block> receivedBlockList = new LinkedList<Block>();
- private final LinkedList<String> delHints = new LinkedList<String>();
+ private final LinkedList<ReceivedDeletedBlockInfo> receivedAndDeletedBlockList
+ = new LinkedList<ReceivedDeletedBlockInfo>();
+ private volatile int pendingReceivedRequests = 0;
private volatile boolean shouldServiceRun = true;
UpgradeManagerDatanode upgradeManager = null;
private final DataNode dn;
@@ -160,7 +169,7 @@ class BPOfferService implements Runnable
* Used to inject a spy NN in the unit tests.
*/
@VisibleForTesting
- void setNameNode(DatanodeProtocol dnProtocol) {
+ void setNameNode(DatanodeProtocolClientSideTranslatorPB dnProtocol) {
bpNamenode = dnProtocol;
}
@@ -220,8 +229,8 @@ class BPOfferService implements Runnable
private void connectToNNAndHandshake() throws IOException {
// get NN proxy
- bpNamenode = (DatanodeProtocol)RPC.waitForProxy(DatanodeProtocol.class,
- DatanodeProtocol.versionID, nnAddr, dn.getConf());
+ bpNamenode = new DatanodeProtocolClientSideTranslatorPB(nnAddr,
+ dn.getConf());
// First phase of the handshake with NN - get the namespace
// info.
@@ -270,39 +279,32 @@ class BPOfferService implements Runnable
* Report received blocks and delete hints to the Namenode
* @throws IOException
*/
- private void reportReceivedBlocks() throws IOException {
- //check if there are newly received blocks
- Block [] blockArray=null;
- String [] delHintArray=null;
- synchronized(receivedBlockList) {
- synchronized(delHints){
- int numBlocks = receivedBlockList.size();
- if (numBlocks > 0) {
- if(numBlocks!=delHints.size()) {
- LOG.warn("Panic: receiveBlockList and delHints are not of " +
- "the same length" );
- }
- //
- // Send newly-received blockids to namenode
- //
- blockArray = receivedBlockList.toArray(new Block[numBlocks]);
- delHintArray = delHints.toArray(new String[numBlocks]);
- }
+ private void reportReceivedDeletedBlocks() throws IOException {
+
+ // check if there are newly received blocks
+ ReceivedDeletedBlockInfo[] receivedAndDeletedBlockArray = null;
+ int currentReceivedRequestsCounter;
+ synchronized (receivedAndDeletedBlockList) {
+ currentReceivedRequestsCounter = pendingReceivedRequests;
+ int numBlocks = receivedAndDeletedBlockList.size();
+ if (numBlocks > 0) {
+ //
+ // Send newly-received and deleted blockids to namenode
+ //
+ receivedAndDeletedBlockArray = receivedAndDeletedBlockList
+ .toArray(new ReceivedDeletedBlockInfo[numBlocks]);
}
}
- if (blockArray != null) {
- if(delHintArray == null || delHintArray.length != blockArray.length ) {
- LOG.warn("Panic: block array & delHintArray are not the same" );
- }
- bpNamenode.blockReceived(bpRegistration, getBlockPoolId(), blockArray,
- delHintArray);
- synchronized(receivedBlockList) {
- synchronized(delHints){
- for(int i=0; i<blockArray.length; i++) {
- receivedBlockList.remove(blockArray[i]);
- delHints.remove(delHintArray[i]);
- }
+ if (receivedAndDeletedBlockArray != null) {
+ StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks(
+ bpRegistration.getStorageID(), receivedAndDeletedBlockArray) };
+ bpNamenode.blockReceivedAndDeleted(bpRegistration, getBlockPoolId(),
+ report);
+ synchronized (receivedAndDeletedBlockList) {
+ for (int i = 0; i < receivedAndDeletedBlockArray.length; i++) {
+ receivedAndDeletedBlockList.remove(receivedAndDeletedBlockArray[i]);
}
+ pendingReceivedRequests -= currentReceivedRequestsCounter;
}
}
}
@@ -313,26 +315,42 @@ class BPOfferService implements Runnable
* client? For now we don't.
*/
void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) {
- if(block==null || delHint==null) {
- throw new IllegalArgumentException(
- block==null?"Block is null":"delHint is null");
+ if (block == null || delHint == null) {
+ throw new IllegalArgumentException(block == null ? "Block is null"
+ : "delHint is null");
}
-
+
if (!block.getBlockPoolId().equals(getBlockPoolId())) {
LOG.warn("BlockPool mismatch " + block.getBlockPoolId() + " vs. "
+ getBlockPoolId());
return;
}
-
- synchronized (receivedBlockList) {
- synchronized (delHints) {
- receivedBlockList.add(block.getLocalBlock());
- delHints.add(delHint);
- receivedBlockList.notifyAll();
- }
+
+ synchronized (receivedAndDeletedBlockList) {
+ receivedAndDeletedBlockList.add(new ReceivedDeletedBlockInfo(block
+ .getLocalBlock(), delHint));
+ pendingReceivedRequests++;
+ receivedAndDeletedBlockList.notifyAll();
}
}
+ void notifyNamenodeDeletedBlock(ExtendedBlock block) {
+ if (block == null) {
+ throw new IllegalArgumentException("Block is null");
+ }
+
+ if (!block.getBlockPoolId().equals(getBlockPoolId())) {
+ LOG.warn("BlockPool mismatch " + block.getBlockPoolId() + " vs. "
+ + getBlockPoolId());
+ return;
+ }
+
+ synchronized (receivedAndDeletedBlockList) {
+ receivedAndDeletedBlockList.add(new ReceivedDeletedBlockInfo(block
+ .getLocalBlock(), ReceivedDeletedBlockInfo.TODELETE_HINT));
+ }
+ }
+
/**
* Report the list blocks to the Namenode
@@ -350,8 +368,9 @@ class BPOfferService implements Runnable
// Send block report
long brSendStartTime = now();
- cmd = bpNamenode.blockReport(bpRegistration, getBlockPoolId(), bReport
- .getBlockListAsLongs());
+ StorageBlockReport[] report = { new StorageBlockReport(
+ bpRegistration.getStorageID(), bReport.getBlockListAsLongs()) };
+ cmd = bpNamenode.blockReport(bpRegistration, getBlockPoolId(), report);
// Log the block report processing stats from Datanode perspective
long brSendCost = now() - brSendStartTime;
@@ -383,11 +402,11 @@ class BPOfferService implements Runnable
DatanodeCommand [] sendHeartBeat() throws IOException {
- return bpNamenode.sendHeartbeat(bpRegistration,
- dn.data.getCapacity(),
- dn.data.getDfsUsed(),
- dn.data.getRemaining(),
- dn.data.getBlockPoolUsed(getBlockPoolId()),
+ // reports number of failed volumes
+ StorageReport[] report = { new StorageReport(bpRegistration.getStorageID(),
+ false, dn.data.getCapacity(), dn.data.getDfsUsed(),
+ dn.data.getRemaining(), dn.data.getBlockPoolUsed(getBlockPoolId())) };
+ return bpNamenode.sendHeartbeat(bpRegistration, report,
dn.xmitsInProgress.get(),
dn.getXceiverCount(), dn.data.getNumFailedVolumes());
}
@@ -433,7 +452,7 @@ class BPOfferService implements Runnable
if(upgradeManager != null)
upgradeManager.shutdownUpgrade();
shouldServiceRun = false;
- RPC.stopProxy(bpNamenode);
+ IOUtils.cleanup(LOG, bpNamenode);
dn.shutdownBlockPool(this);
}
@@ -442,7 +461,8 @@ class BPOfferService implements Runnable
* forever calling remote NameNode functions.
*/
private void offerService() throws Exception {
- LOG.info("For namenode " + nnAddr + " using BLOCKREPORT_INTERVAL of "
+ LOG.info("For namenode " + nnAddr + " using DELETEREPORT_INTERVAL of "
+ + dnConf.deleteReportInterval + " msec " + " BLOCKREPORT_INTERVAL of "
+ dnConf.blockReportInterval + "msec" + " Initial delay: "
+ dnConf.initialBlockReportDelay + "msec" + "; heartBeatInterval="
+ dnConf.heartBeatInterval);
@@ -480,8 +500,11 @@ class BPOfferService implements Runnable
}
}
}
-
- reportReceivedBlocks();
+ if (pendingReceivedRequests > 0
+ || (startTime - lastDeletedReport > dnConf.deleteReportInterval)) {
+ reportReceivedDeletedBlocks();
+ lastDeletedReport = startTime;
+ }
DatanodeCommand cmd = blockReport();
processCommand(cmd);
@@ -497,10 +520,10 @@ class BPOfferService implements Runnable
//
long waitTime = dnConf.heartBeatInterval -
(System.currentTimeMillis() - lastHeartbeat);
- synchronized(receivedBlockList) {
- if (waitTime > 0 && receivedBlockList.size() == 0) {
+ synchronized(receivedAndDeletedBlockList) {
+ if (waitTime > 0 && pendingReceivedRequests == 0) {
try {
- receivedBlockList.wait(waitTime);
+ receivedAndDeletedBlockList.wait(waitTime);
} catch (InterruptedException ie) {
LOG.warn("BPOfferService for " + this + " interrupted");
}
@@ -553,7 +576,8 @@ class BPOfferService implements Runnable
while (shouldRun()) {
try {
// Use returned registration from namenode with updated machine name.
- bpRegistration = bpNamenode.registerDatanode(bpRegistration);
+ bpRegistration = bpNamenode.registerDatanode(bpRegistration,
+ new DatanodeStorage[0]);
break;
} catch(SocketTimeoutException e) { // namenode is busy
LOG.info("Problem connecting to server: " + nnAddr);
@@ -699,7 +723,7 @@ class BPOfferService implements Runnable
}
break;
case DatanodeProtocol.DNA_FINALIZE:
- String bp = ((DatanodeCommand.Finalize) cmd).getBlockPoolId();
+ String bp = ((FinalizeCommand) cmd).getBlockPoolId();
assert getBlockPoolId().equals(bp) :
"BP " + getBlockPoolId() + " received DNA_FINALIZE " +
"for other block pool " + bp;
@@ -764,12 +788,12 @@ class BPOfferService implements Runnable
}
@VisibleForTesting
- DatanodeProtocol getBpNamenode() {
+ DatanodeProtocolClientSideTranslatorPB getBpNamenode() {
return bpNamenode;
}
@VisibleForTesting
- void setBpNamenode(DatanodeProtocol bpNamenode) {
+ void setBpNamenode(DatanodeProtocolClientSideTranslatorPB bpNamenode) {
this.bpNamenode = bpNamenode;
}
}
Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java Sun Feb 26 23:32:06 2012
@@ -55,6 +55,7 @@ class DNConf {
final long readaheadLength;
final long heartBeatInterval;
final long blockReportInterval;
+ final long deleteReportInterval;
final long initialBlockReportDelay;
final int writePacketSize;
@@ -105,6 +106,7 @@ class DNConf {
heartBeatInterval = conf.getLong(DFS_HEARTBEAT_INTERVAL_KEY,
DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000L;
+ this.deleteReportInterval = 100 * heartBeatInterval;
// do we need to sync block file contents to disk when blockfile is closed?
this.syncOnClose = conf.getBoolean(DFS_DATANODE_SYNCONCLOSE_KEY,
DFS_DATANODE_SYNCONCLOSE_DEFAULT);
Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Sun Feb 26 23:32:06 2012
@@ -100,8 +100,16 @@ import org.apache.hadoop.hdfs.protocol.R
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ClientDatanodeProtocolService;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DNTransferAckProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InterDatanodeProtocolService;
+import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB;
+import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB;
+import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
@@ -124,11 +132,13 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.hdfs.web.resources.Param;
import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
@@ -154,6 +164,7 @@ import org.apache.hadoop.util.StringUtil
import org.apache.hadoop.util.VersionInfo;
import org.mortbay.util.ajax.JSON;
+import com.google.protobuf.BlockingService;
/**********************************************************
@@ -385,7 +396,7 @@ public class DataNode extends Configured
private List<ServicePlugin> plugins;
// For InterDataNodeProtocol
- public Server ipcServer;
+ public RPC.Server ipcServer;
private SecureResources secureResources = null;
private AbstractList<File> dataDirs;
@@ -507,11 +518,26 @@ public class DataNode extends Configured
private void initIpcServer(Configuration conf) throws IOException {
InetSocketAddress ipcAddr = NetUtils.createSocketAddr(
conf.get("dfs.datanode.ipc.address"));
- ipcServer = RPC.getServer(DataNode.class, this, ipcAddr.getHostName(),
- ipcAddr.getPort(),
- conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY,
- DFS_DATANODE_HANDLER_COUNT_DEFAULT),
- false, conf, blockPoolTokenSecretManager);
+
+ // Add all the RPC protocols that the Datanode implements
+ RPC.setProtocolEngine(conf, ClientDatanodeProtocolPB.class,
+ ProtobufRpcEngine.class);
+ ClientDatanodeProtocolServerSideTranslatorPB clientDatanodeProtocolXlator =
+ new ClientDatanodeProtocolServerSideTranslatorPB(this);
+ BlockingService service = ClientDatanodeProtocolService
+ .newReflectiveBlockingService(clientDatanodeProtocolXlator);
+ ipcServer = RPC.getServer(ClientDatanodeProtocolPB.class, service, ipcAddr
+ .getHostName(), ipcAddr.getPort(), conf.getInt(
+ DFS_DATANODE_HANDLER_COUNT_KEY, DFS_DATANODE_HANDLER_COUNT_DEFAULT),
+ false, conf, blockPoolTokenSecretManager);
+
+ InterDatanodeProtocolServerSideTranslatorPB interDatanodeProtocolXlator =
+ new InterDatanodeProtocolServerSideTranslatorPB(this);
+ service = InterDatanodeProtocolService
+ .newReflectiveBlockingService(interDatanodeProtocolXlator);
+ DFSUtil.addPBProtocol(conf, InterDatanodeProtocolPB.class, service,
+ ipcServer);
+
// set service-level authorization security policy
if (conf.getBoolean(
CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
@@ -634,6 +660,17 @@ public class DataNode extends Configured
}
}
+ // calls specific to BP
+ protected void notifyNamenodeDeletedBlock(ExtendedBlock block) {
+ BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
+ if (bpos != null) {
+ bpos.notifyNamenodeDeletedBlock(block);
+ } else {
+ LOG.warn("Cannot find BPOfferService for reporting block deleted for bpid="
+ + block.getBlockPoolId());
+ }
+ }
+
public void reportBadBlocks(ExtendedBlock block) throws IOException{
BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
if(bpos == null || bpos.bpNamenode == null) {
@@ -952,15 +989,13 @@ public class DataNode extends Configured
if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
InterDatanodeProtocol.LOG.debug("InterDatanodeProtocol addr=" + addr);
}
- UserGroupInformation loginUgi = UserGroupInformation.getLoginUser();
+ final UserGroupInformation loginUgi = UserGroupInformation.getLoginUser();
try {
return loginUgi
.doAs(new PrivilegedExceptionAction<InterDatanodeProtocol>() {
public InterDatanodeProtocol run() throws IOException {
- return (InterDatanodeProtocol) RPC.getProxy(
- InterDatanodeProtocol.class, InterDatanodeProtocol.versionID,
- addr, UserGroupInformation.getCurrentUser(), conf,
- NetUtils.getDefaultSocketFactory(conf), socketTimeout);
+ return new InterDatanodeProtocolTranslatorPB(addr, loginUgi,
+ conf, NetUtils.getDefaultSocketFactory(conf), socketTimeout);
}
});
} catch (InterruptedException ie) {
@@ -1207,7 +1242,7 @@ public class DataNode extends Configured
//inform NameNodes
for(BPOfferService bpos: blockPoolManager.getAllNamenodeThreads()) {
- DatanodeProtocol nn = bpos.bpNamenode;
+ DatanodeProtocolClientSideTranslatorPB nn = bpos.bpNamenode;
try {
nn.errorReport(bpos.bpRegistration, dpError, errMsgr);
} catch(IOException e) {
@@ -1241,7 +1276,8 @@ public class DataNode extends Configured
private void transferBlock( ExtendedBlock block,
DatanodeInfo xferTargets[]
) throws IOException {
- DatanodeProtocol nn = getBPNamenode(block.getBlockPoolId());
+ DatanodeProtocolClientSideTranslatorPB nn = getBPNamenode(block
+ .getBlockPoolId());
DatanodeRegistration bpReg = getDNRegistrationForBP(block.getBlockPoolId());
if (!data.isValidBlock(block)) {
@@ -1819,7 +1855,7 @@ public class DataNode extends Configured
return new ExtendedBlock(oldBlock.getBlockPoolId(), r);
}
- /** {@inheritDoc} */
+ @Override
public long getProtocolVersion(String protocol, long clientVersion
) throws IOException {
if (protocol.equals(InterDatanodeProtocol.class.getName())) {
@@ -1852,7 +1888,7 @@ public class DataNode extends Configured
this.rInfo = rInfo;
}
- /** {@inheritDoc} */
+ @Override
public String toString() {
return "block:" + rInfo + " node:" + id;
}
@@ -1909,7 +1945,8 @@ public class DataNode extends Configured
* @return Namenode corresponding to the bpid
* @throws IOException
*/
- public DatanodeProtocol getBPNamenode(String bpid) throws IOException {
+ public DatanodeProtocolClientSideTranslatorPB getBPNamenode(String bpid)
+ throws IOException {
BPOfferService bpos = blockPoolManager.get(bpid);
if(bpos == null || bpos.bpNamenode == null) {
throw new IOException("cannot find a namnode proxy for bpid=" + bpid);
@@ -1921,7 +1958,8 @@ public class DataNode extends Configured
void syncBlock(RecoveringBlock rBlock,
List<BlockRecord> syncList) throws IOException {
ExtendedBlock block = rBlock.getBlock();
- DatanodeProtocol nn = getBPNamenode(block.getBlockPoolId());
+ DatanodeProtocolClientSideTranslatorPB nn = getBPNamenode(block
+ .getBlockPoolId());
long recoveryId = rBlock.getNewGenerationStamp();
if (LOG.isDebugEnabled()) {
@@ -2036,7 +2074,6 @@ public class DataNode extends Configured
}
// ClientDataNodeProtocol implementation
- /** {@inheritDoc} */
@Override // ClientDataNodeProtocol
public long getReplicaVisibleLength(final ExtendedBlock block) throws IOException {
checkWriteAccess(block);
Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Sun Feb 26 23:32:06 2012
@@ -59,7 +59,6 @@ import org.apache.hadoop.hdfs.server.com
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
-import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.metrics2.util.MBeans;
@@ -1122,7 +1121,7 @@ class FSDataset implements FSDatasetInte
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
roots[idx] = storage.getStorageDir(idx).getCurrentDir();
}
- asyncDiskService = new FSDatasetAsyncDiskService(roots);
+ asyncDiskService = new FSDatasetAsyncDiskService(this, roots);
registerMBean(storage.getStorageID());
}
@@ -1202,8 +1201,8 @@ class FSDataset implements FSDatasetInte
File getBlockFile(String bpid, Block b) throws IOException {
File f = validateBlockFile(bpid, b);
if(f == null) {
- if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
- InterDatanodeProtocol.LOG.debug("b=" + b + ", volumeMap=" + volumeMap);
+ if (DataNode.LOG.isDebugEnabled()) {
+ DataNode.LOG.debug("b=" + b + ", volumeMap=" + volumeMap);
}
throw new IOException("Block " + b + " is not valid.");
}
@@ -1964,8 +1963,8 @@ class FSDataset implements FSDatasetInte
datanode.checkDiskError();
}
- if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
- InterDatanodeProtocol.LOG.debug("b=" + b + ", f=" + f);
+ if (DataNode.LOG.isDebugEnabled()) {
+ DataNode.LOG.debug("b=" + b + ", f=" + f);
}
return null;
}
@@ -2049,15 +2048,19 @@ class FSDataset implements FSDatasetInte
volumeMap.remove(bpid, invalidBlks[i]);
}
File metaFile = DatanodeUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp());
-
+
// Delete the block asynchronously to make sure we can do it fast enough
- asyncDiskService.deleteAsync(v, bpid, f, metaFile,
- invalidBlks[i].toString());
+ asyncDiskService.deleteAsync(v, f, metaFile,
+ new ExtendedBlock(bpid, invalidBlks[i]));
}
if (error) {
throw new IOException("Error in deleting blocks.");
}
}
+
+ public void notifyNamenodeDeletedBlock(ExtendedBlock block){
+ datanode.notifyNamenodeDeletedBlock(block);
+ }
@Override // {@link FSDatasetInterface}
public synchronized boolean contains(final ExtendedBlock block) {
Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java Sun Feb 26 23:32:06 2012
@@ -28,6 +28,8 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
/*
* This class is a container of multiple thread pools, each for a volume,
@@ -47,6 +49,8 @@ import org.apache.commons.logging.LogFac
*/
class FSDatasetAsyncDiskService {
+ final FSDataset dataset;
+
public static final Log LOG = LogFactory.getLog(FSDatasetAsyncDiskService.class);
// ThreadPool core pool size
@@ -70,8 +74,8 @@ class FSDatasetAsyncDiskService {
*
* @param volumes The roots of the data volumes.
*/
- FSDatasetAsyncDiskService(File[] volumes) {
-
+ FSDatasetAsyncDiskService(FSDataset dataset, File[] volumes) {
+ this.dataset = dataset;
// Create one ThreadPool per volume
for (int v = 0 ; v < volumes.length; v++) {
final File vol = volumes[v];
@@ -147,13 +151,12 @@ class FSDatasetAsyncDiskService {
* Delete the block file and meta file from the disk asynchronously, adjust
* dfsUsed statistics accordingly.
*/
- void deleteAsync(FSDataset.FSVolume volume, String bpid, File blockFile,
- File metaFile, String blockName) {
- DataNode.LOG.info("Scheduling block " + blockName + " file " + blockFile
- + " for deletion");
- ReplicaFileDeleteTask deletionTask =
- new ReplicaFileDeleteTask(volume, bpid, blockFile, metaFile,
- blockName);
+ void deleteAsync(FSDataset.FSVolume volume, File blockFile, File metaFile,
+ ExtendedBlock block) {
+ DataNode.LOG.info("Scheduling block " + block.getLocalBlock().toString()
+ + " file " + blockFile + " for deletion");
+ ReplicaFileDeleteTask deletionTask = new ReplicaFileDeleteTask(dataset,
+ volume, blockFile, metaFile, block);
execute(volume.getCurrentDir(), deletionTask);
}
@@ -161,19 +164,19 @@ class FSDatasetAsyncDiskService {
* as decrement the dfs usage of the volume.
*/
static class ReplicaFileDeleteTask implements Runnable {
+ final FSDataset dataset;
final FSDataset.FSVolume volume;
- final String blockPoolId;
final File blockFile;
final File metaFile;
- final String blockName;
+ final ExtendedBlock block;
- ReplicaFileDeleteTask(FSDataset.FSVolume volume, String bpid,
- File blockFile, File metaFile, String blockName) {
+ ReplicaFileDeleteTask(FSDataset dataset, FSDataset.FSVolume volume, File blockFile,
+ File metaFile, ExtendedBlock block) {
+ this.dataset = dataset;
this.volume = volume;
- this.blockPoolId = bpid;
this.blockFile = blockFile;
this.metaFile = metaFile;
- this.blockName = blockName;
+ this.block = block;
}
FSDataset.FSVolume getVolume() {
@@ -183,9 +186,9 @@ class FSDatasetAsyncDiskService {
@Override
public String toString() {
// Called in AsyncDiskService.execute for displaying error messages.
- return "deletion of block " + blockPoolId + " " + blockName
- + " with block file " + blockFile + " and meta file " + metaFile
- + " from volume " + volume;
+ return "deletion of block " + block.getBlockPoolId() + " "
+ + block.getLocalBlock().toString() + " with block file " + blockFile
+ + " and meta file " + metaFile + " from volume " + volume;
}
@Override
@@ -193,12 +196,15 @@ class FSDatasetAsyncDiskService {
long dfsBytes = blockFile.length() + metaFile.length();
if ( !blockFile.delete() || ( !metaFile.delete() && metaFile.exists() ) ) {
DataNode.LOG.warn("Unexpected error trying to delete block "
- + blockPoolId + " " + blockName + " at file " + blockFile
- + ". Ignored.");
+ + block.getBlockPoolId() + " " + block.getLocalBlock().toString()
+ + " at file " + blockFile + ". Ignored.");
} else {
- volume.decDfsUsed(blockPoolId, dfsBytes);
- DataNode.LOG.info("Deleted block " + blockPoolId + " " + blockName
- + " at file " + blockFile);
+ if(block.getLocalBlock().getNumBytes() != BlockCommand.NO_ACK){
+ dataset.notifyNamenodeDeletedBlock(block);
+ }
+ volume.decDfsUsed(block.getBlockPoolId(), dfsBytes);
+ DataNode.LOG.info("Deleted block " + block.getBlockPoolId() + " "
+ + block.getLocalBlock().toString() + " at file " + blockFile);
}
}
};
Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Sun Feb 26 23:32:06 2012
@@ -257,7 +257,7 @@ public interface FSDatasetInterface exte
this.checksumIn = checksumIn;
}
- /** {@inheritDoc} */
+ @Override
public void close() {
IOUtils.closeStream(dataIn);
IOUtils.closeStream(checksumIn);
Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java Sun Feb 26 23:32:06 2012
@@ -18,18 +18,21 @@
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
+import java.util.Collection;
import java.util.Iterator;
+import java.util.List;
+import java.util.zip.Checksum;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.common.Storage.StorageState;
-import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.LogLoadPlan;
-import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.StringUtils;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
/**
* Extension of FSImage for the backup node.
@@ -77,6 +80,8 @@ public class BackupImage extends FSImage
* {@see #freezeNamespaceAtNextRoll()}
*/
private boolean stopApplyingEditsOnNextRoll = false;
+
+ private FSNamesystem namesystem;
/**
* Construct a backup image.
@@ -88,6 +93,10 @@ public class BackupImage extends FSImage
storage.setDisablePreUpgradableLayoutCheck(true);
bnState = BNState.DROP_UNTIL_NEXT_ROLL;
}
+
+ void setNamesystem(FSNamesystem fsn) {
+ this.namesystem = fsn;
+ }
/**
* Analyze backup storage directories for consistency.<br>
@@ -136,7 +145,7 @@ public class BackupImage extends FSImage
* and create empty edits.
*/
void saveCheckpoint() throws IOException {
- saveNamespace();
+ saveNamespace(namesystem);
}
/**
@@ -219,7 +228,7 @@ public class BackupImage extends FSImage
}
lastAppliedTxId += numTxns;
- getFSNamesystem().dir.updateCountForINodeWithQuota(); // inefficient!
+ namesystem.dir.updateCountForINodeWithQuota(); // inefficient!
} finally {
backupInputStream.clear();
}
@@ -257,11 +266,18 @@ public class BackupImage extends FSImage
new FSImageTransactionalStorageInspector();
storage.inspectStorageDirs(inspector);
- LogLoadPlan logLoadPlan = inspector.createLogLoadPlan(lastAppliedTxId,
- target - 1);
-
- logLoadPlan.doRecovery();
- loadEdits(logLoadPlan.getEditsFiles());
+
+ editLog.recoverUnclosedStreams();
+ Iterable<EditLogInputStream> editStreamsAll
+ = editLog.selectInputStreams(lastAppliedTxId, target - 1);
+ // remove inprogress
+ List<EditLogInputStream> editStreams = Lists.newArrayList();
+ for (EditLogInputStream s : editStreamsAll) {
+ if (s.getFirstTxId() != editLog.getCurSegmentTxId()) {
+ editStreams.add(s);
+ }
+ }
+ loadEdits(editStreams, namesystem);
}
// now, need to load the in-progress file
@@ -271,7 +287,24 @@ public class BackupImage extends FSImage
return false; // drop lock and try again to load local logs
}
- EditLogInputStream stream = getEditLog().getInProgressFileInputStream();
+ EditLogInputStream stream = null;
+ Collection<EditLogInputStream> editStreams
+ = getEditLog().selectInputStreams(
+ getEditLog().getCurSegmentTxId(),
+ getEditLog().getCurSegmentTxId());
+
+ for (EditLogInputStream s : editStreams) {
+ if (s.getFirstTxId() == getEditLog().getCurSegmentTxId()) {
+ stream = s;
+ }
+ break;
+ }
+ if (stream == null) {
+ LOG.warn("Unable to find stream starting with " + editLog.getCurSegmentTxId()
+ + ". This indicates that there is an error in synchronization in BackupImage");
+ return false;
+ }
+
try {
long remainingTxns = getEditLog().getLastWrittenTxId() - lastAppliedTxId;
@@ -285,7 +318,7 @@ public class BackupImage extends FSImage
"expected to load " + remainingTxns + " but loaded " +
numLoaded + " from " + stream;
} finally {
- IOUtils.closeStream(stream);
+ FSEditLog.closeAllStreams(editStreams);
}
LOG.info("Successfully synced BackupNode with NameNode at txnid " +
Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java Sun Feb 26 23:32:06 2012
@@ -57,12 +57,31 @@ class BackupJournalManager implements Jo
throws IOException {
}
+ @Override
+ public long getNumberOfTransactions(long fromTxnId)
+ throws IOException, CorruptionException {
+ // This JournalManager is never used for input. Therefore it cannot
+ // return any transactions
+ return 0;
+ }
+
+ @Override
+ public EditLogInputStream getInputStream(long fromTxnId) throws IOException {
+ // This JournalManager is never used for input. Therefore it cannot
+ // return any transactions
+ throw new IOException("Unsupported operation");
+ }
+
+ @Override
+ public void recoverUnfinalizedSegments() throws IOException {
+ }
+
public boolean matchesRegistration(NamenodeRegistration bnReg) {
return bnReg.getAddress().equals(this.bnReg.getAddress());
}
@Override
- public EditLogInputStream getInProgressInputStream(long segmentStartsAtTxId) {
- return null;
+ public String toString() {
+ return "BackupJournalManager";
}
}
Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java Sun Feb 26 23:32:06 2012
@@ -28,16 +28,24 @@ import org.apache.hadoop.hdfs.DFSConfigK
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalProtocolService;
+import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB;
+import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.Storage;
-import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import com.google.protobuf.BlockingService;
/**
* BackupNode.
@@ -61,7 +69,7 @@ public class BackupNode extends NameNode
private static final String BN_SERVICE_RPC_ADDRESS_KEY = DFSConfigKeys.DFS_NAMENODE_BACKUP_SERVICE_RPC_ADDRESS_KEY;
/** Name-node proxy */
- NamenodeProtocol namenode;
+ NamenodeProtocolTranslatorPB namenode;
/** Name-node RPC address */
String nnRpcAddress;
/** Name-node HTTP address */
@@ -123,6 +131,7 @@ public class BackupNode extends NameNode
protected void loadNamesystem(Configuration conf) throws IOException {
BackupImage bnImage = new BackupImage(conf);
this.namesystem = new FSNamesystem(conf, bnImage);
+ bnImage.setNamesystem(namesystem);
bnImage.recoverCreateRead();
}
@@ -175,7 +184,9 @@ public class BackupNode extends NameNode
}
}
// Stop the RPC client
- RPC.stopProxy(namenode);
+ if (namenode != null) {
+ IOUtils.cleanup(LOG, namenode);
+ }
namenode = null;
// Stop the checkpoint manager
if(checkpointManager != null) {
@@ -186,12 +197,19 @@ public class BackupNode extends NameNode
super.stop();
}
- static class BackupNodeRpcServer extends NameNodeRpcServer implements JournalProtocol {
+ static class BackupNodeRpcServer extends NameNodeRpcServer implements
+ JournalProtocol {
private final String nnRpcAddress;
private BackupNodeRpcServer(Configuration conf, BackupNode nn)
throws IOException {
super(conf, nn);
+ JournalProtocolServerSideTranslatorPB journalProtocolTranslator =
+ new JournalProtocolServerSideTranslatorPB(this);
+ BlockingService service = JournalProtocolService
+ .newReflectiveBlockingService(journalProtocolTranslator);
+ DFSUtil.addPBProtocol(conf, JournalProtocolPB.class, service,
+ this.clientRpcServer);
nnRpcAddress = nn.nnRpcAddress;
}
@@ -200,9 +218,8 @@ public class BackupNode extends NameNode
throws IOException {
if (protocol.equals(JournalProtocol.class.getName())) {
return JournalProtocol.versionID;
- } else {
- return super.getProtocolVersion(protocol, clientVersion);
}
+ return super.getProtocolVersion(protocol, clientVersion);
}
/////////////////////////////////////////////////////
@@ -244,7 +261,7 @@ public class BackupNode extends NameNode
verifyRequest(nnReg);
if(!nnRpcAddress.equals(nnReg.getAddress()))
throw new IOException("Journal request from unexpected name-node: "
- + nnReg.getAddress() + " expecting " + rpcAddress);
+ + nnReg.getAddress() + " expecting " + clientRpcAddress);
getBNImage().journal(firstTxId, numTxns, records);
}
@@ -278,9 +295,8 @@ public class BackupNode extends NameNode
private NamespaceInfo handshake(Configuration conf) throws IOException {
// connect to name node
InetSocketAddress nnAddress = NameNode.getServiceAddress(conf, true);
- this.namenode =
- (NamenodeProtocol) RPC.waitForProxy(NamenodeProtocol.class,
- NamenodeProtocol.versionID, nnAddress, conf);
+ this.namenode = new NamenodeProtocolTranslatorPB(nnAddress, conf,
+ UserGroupInformation.getCurrentUser());
this.nnRpcAddress = getHostPortString(nnAddress);
this.nnHttpAddress = getHostPortString(super.getHttpServerAddress(conf));
// get version and id info from the name-node
@@ -293,7 +309,9 @@ public class BackupNode extends NameNode
LOG.info("Problem connecting to server: " + nnAddress);
try {
Thread.sleep(1000);
- } catch (InterruptedException ie) {}
+ } catch (InterruptedException ie) {
+ LOG.warn("Encountered exception ", e);
+ }
}
}
return nsInfo;
@@ -342,7 +360,9 @@ public class BackupNode extends NameNode
LOG.info("Problem connecting to name-node: " + nnRpcAddress);
try {
Thread.sleep(1000);
- } catch (InterruptedException ie) {}
+ } catch (InterruptedException ie) {
+ LOG.warn("Encountered exception ", e);
+ }
}
}
Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java Sun Feb 26 23:32:06 2012
@@ -37,9 +37,7 @@ public class CheckpointSignature extends
implements WritableComparable<CheckpointSignature> {
private static final String FIELD_SEPARATOR = ":";
private static final int NUM_FIELDS = 7;
-
String blockpoolID = "";
-
long mostRecentCheckpointTxId;
long curSegmentTxId;
@@ -67,6 +65,14 @@ public class CheckpointSignature extends
blockpoolID = fields[i++];
}
+ public CheckpointSignature(StorageInfo info, String blockpoolID,
+ long mostRecentCheckpointTxId, long curSegmentTxId) {
+ super(info);
+ this.blockpoolID = blockpoolID;
+ this.mostRecentCheckpointTxId = mostRecentCheckpointTxId;
+ this.curSegmentTxId = curSegmentTxId;
+ }
+
/**
* Get the cluster id from CheckpointSignature
* @return the cluster id
@@ -83,6 +89,14 @@ public class CheckpointSignature extends
return blockpoolID;
}
+ public long getMostRecentCheckpointTxId() {
+ return mostRecentCheckpointTxId;
+ }
+
+ public long getCurSegmentTxId() {
+ return curSegmentTxId;
+ }
+
/**
* Set the block pool id of CheckpointSignature.
*
Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java Sun Feb 26 23:32:06 2012
@@ -224,7 +224,7 @@ class Checkpointer extends Daemon {
LOG.info("Loading image with txid " + sig.mostRecentCheckpointTxId);
File file = bnStorage.findImageFile(sig.mostRecentCheckpointTxId);
- bnImage.reloadFromImageFile(file);
+ bnImage.reloadFromImageFile(file, backupNode.getNamesystem());
}
lastApplied = bnImage.getLastAppliedTxId();
@@ -238,11 +238,11 @@ class Checkpointer extends Daemon {
backupNode.nnHttpAddress, log, bnStorage);
}
- rollForwardByApplyingLogs(manifest, bnImage);
+ rollForwardByApplyingLogs(manifest, bnImage, backupNode.getNamesystem());
}
long txid = bnImage.getLastAppliedTxId();
- bnImage.saveFSImageInAllDirs(txid);
+ bnImage.saveFSImageInAllDirs(backupNode.getNamesystem(), txid);
bnStorage.writeAll();
if(cpCmd.needToReturnImage()) {
@@ -272,19 +272,21 @@ class Checkpointer extends Daemon {
static void rollForwardByApplyingLogs(
RemoteEditLogManifest manifest,
- FSImage dstImage) throws IOException {
+ FSImage dstImage,
+ FSNamesystem dstNamesystem) throws IOException {
NNStorage dstStorage = dstImage.getStorage();
- List<File> editsFiles = Lists.newArrayList();
+ List<EditLogInputStream> editsStreams = Lists.newArrayList();
for (RemoteEditLog log : manifest.getLogs()) {
File f = dstStorage.findFinalizedEditsFile(
log.getStartTxId(), log.getEndTxId());
if (log.getStartTxId() > dstImage.getLastAppliedTxId()) {
- editsFiles.add(f);
- }
+ editsStreams.add(new EditLogFileInputStream(f, log.getStartTxId(),
+ log.getEndTxId()));
+ }
}
LOG.info("Checkpointer about to load edits from " +
- editsFiles.size() + " file(s).");
- dstImage.loadEdits(editsFiles);
+ editsStreams.size() + " stream(s).");
+ dstImage.loadEdits(editsStreams, dstNamesystem);
}
}
Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ContentSummaryServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ContentSummaryServlet.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ContentSummaryServlet.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ContentSummaryServlet.java Sun Feb 26 23:32:06 2012
@@ -40,7 +40,7 @@ public class ContentSummaryServlet exten
/** For java.io.Serializable */
private static final long serialVersionUID = 1L;
- /** {@inheritDoc} */
+ @Override
public void doGet(final HttpServletRequest request,
final HttpServletResponse response) throws ServletException, IOException {
final Configuration conf =
Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java Sun Feb 26 23:32:06 2012
@@ -21,6 +21,7 @@ import java.io.DataInputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import com.google.common.base.Preconditions;
/**
@@ -122,4 +123,14 @@ class EditLogBackupInputStream extends E
reader = null;
this.version = 0;
}
+
+ @Override
+ public long getFirstTxId() throws IOException {
+ return HdfsConstants.INVALID_TXID;
+ }
+
+ @Override
+ public long getLastTxId() throws IOException {
+ return HdfsConstants.INVALID_TXID;
+ }
}
Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java Sun Feb 26 23:32:06 2012
@@ -22,11 +22,11 @@ import java.net.InetSocketAddress;
import java.util.Arrays;
import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.protocolPB.JournalProtocolTranslatorPB;
import org.apache.hadoop.hdfs.server.common.Storage;
-import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
/**
@@ -40,7 +40,7 @@ import org.apache.hadoop.net.NetUtils;
class EditLogBackupOutputStream extends EditLogOutputStream {
static int DEFAULT_BUFFER_SIZE = 256;
- private JournalProtocol backupNode; // RPC proxy to backup node
+ private JournalProtocolTranslatorPB backupNode; // RPC proxy to backup node
private NamenodeRegistration bnRegistration; // backup node registration
private NamenodeRegistration nnRegistration; // active node registration
private EditsDoubleBuffer doubleBuf;
@@ -57,8 +57,7 @@ class EditLogBackupOutputStream extends
Storage.LOG.info("EditLogBackupOutputStream connects to: " + bnAddress);
try {
this.backupNode =
- RPC.getProxy(JournalProtocol.class,
- JournalProtocol.versionID, bnAddress, new HdfsConfiguration());
+ new JournalProtocolTranslatorPB(bnAddress, new HdfsConfiguration());
} catch(IOException e) {
Storage.LOG.error("Error connecting to: " + bnAddress, e);
throw e;
@@ -105,14 +104,14 @@ class EditLogBackupOutputStream extends
throw new IOException("BackupEditStream has " + size +
" records still to be flushed and cannot be closed.");
}
- RPC.stopProxy(backupNode); // stop the RPC threads
+ IOUtils.cleanup(Storage.LOG, backupNode); // stop the RPC threads
doubleBuf.close();
doubleBuf = null;
}
@Override
public void abort() throws IOException {
- RPC.stopProxy(backupNode);
+ IOUtils.cleanup(Storage.LOG, backupNode);
doubleBuf = null;
}
Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java Sun Feb 26 23:32:06 2012
@@ -27,6 +27,7 @@ import java.io.DataInputStream;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import com.google.common.annotations.VisibleForTesting;
@@ -37,12 +38,15 @@ import com.google.common.annotations.Vis
class EditLogFileInputStream extends EditLogInputStream {
private final File file;
private final FileInputStream fStream;
+ final private long firstTxId;
+ final private long lastTxId;
private final int logVersion;
private final FSEditLogOp.Reader reader;
private final FSEditLogLoader.PositionTrackingInputStream tracker;
/**
* Open an EditLogInputStream for the given file.
+ * The file is pretransactional, so has no txids
* @param name filename to open
* @throws LogHeaderCorruptException if the header is either missing or
* appears to be corrupt/truncated
@@ -51,6 +55,21 @@ class EditLogFileInputStream extends Edi
*/
EditLogFileInputStream(File name)
throws LogHeaderCorruptException, IOException {
+ this(name, HdfsConstants.INVALID_TXID, HdfsConstants.INVALID_TXID);
+ }
+
+ /**
+ * Open an EditLogInputStream for the given file.
+ * @param name filename to open
+ * @param firstTxId first transaction found in file
+ * @param lastTxId last transaction id found in file
+ * @throws LogHeaderCorruptException if the header is either missing or
+ * appears to be corrupt/truncated
+ * @throws IOException if an actual IO error occurs while reading the
+ * header
+ */
+ EditLogFileInputStream(File name, long firstTxId, long lastTxId)
+ throws LogHeaderCorruptException, IOException {
file = name;
fStream = new FileInputStream(name);
@@ -65,6 +84,18 @@ class EditLogFileInputStream extends Edi
}
reader = new FSEditLogOp.Reader(in, logVersion);
+ this.firstTxId = firstTxId;
+ this.lastTxId = lastTxId;
+ }
+
+ @Override
+ public long getFirstTxId() throws IOException {
+ return firstTxId;
+ }
+
+ @Override
+ public long getLastTxId() throws IOException {
+ return lastTxId;
}
@Override // JournalStream
@@ -116,7 +147,8 @@ class EditLogFileInputStream extends Edi
// If it's missing its header, this is equivalent to no transactions
FSImage.LOG.warn("Log at " + file + " has no valid header",
corrupt);
- return new FSEditLogLoader.EditLogValidation(0, 0);
+ return new FSEditLogLoader.EditLogValidation(0, HdfsConstants.INVALID_TXID,
+ HdfsConstants.INVALID_TXID);
}
try {
Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java Sun Feb 26 23:32:06 2012
@@ -83,7 +83,6 @@ class EditLogFileOutputStream extends Ed
return JournalType.FILE;
}
- /** {@inheritDoc} */
@Override
void write(FSEditLogOp op) throws IOException {
doubleBuf.writeOp(op);
Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java Sun Feb 26 23:32:06 2012
@@ -28,6 +28,17 @@ import java.io.IOException;
* into the #{@link EditLogOutputStream}.
*/
abstract class EditLogInputStream implements JournalStream, Closeable {
+ /**
+ * @return the first transaction which will be found in this stream
+ */
+ public abstract long getFirstTxId() throws IOException;
+
+ /**
+ * @return the last transaction which will be found in this stream
+ */
+ public abstract long getLastTxId() throws IOException;
+
+
/**
* Close the stream.
* @throws IOException if an error occurred while closing
Modified: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1293964&r1=1293963&r2=1293964&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Sun Feb 26 23:32:06 2012
@@ -57,9 +57,10 @@ import org.apache.hadoop.hdfs.server.blo
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.util.ByteArray;
+import com.google.common.base.Preconditions;
+
/*************************************************
* FSDirectory stores the filesystem directory state.
* It handles writing/loading values to disk, and logging
@@ -73,6 +74,7 @@ public class FSDirectory implements Clos
INodeDirectoryWithQuota rootDir;
FSImage fsImage;
+ private final FSNamesystem namesystem;
private volatile boolean ready = false;
private static final long UNKNOWN_DISK_SPACE = -1;
private final int maxComponentLength;
@@ -114,15 +116,9 @@ public class FSDirectory implements Clos
*/
private final NameCache<ByteArray> nameCache;
- /** Access an existing dfs name directory. */
- FSDirectory(FSNamesystem ns, Configuration conf) throws IOException {
- this(new FSImage(conf), ns, conf);
- }
-
FSDirectory(FSImage fsImage, FSNamesystem ns, Configuration conf) {
this.dirLock = new ReentrantReadWriteLock(true); // fair
this.cond = dirLock.writeLock().newCondition();
- fsImage.setFSNamesystem(ns);
rootDir = new INodeDirectoryWithQuota(INodeDirectory.ROOT_NAME,
ns.createFsOwnerPermissions(new FsPermission((short)0755)),
Integer.MAX_VALUE, UNKNOWN_DISK_SPACE);
@@ -146,10 +142,11 @@ public class FSDirectory implements Clos
NameNode.LOG.info("Caching file names occuring more than " + threshold
+ " times ");
nameCache = new NameCache<ByteArray>(threshold);
+ namesystem = ns;
}
private FSNamesystem getFSNamesystem() {
- return fsImage.getFSNamesystem();
+ return namesystem;
}
private BlockManager getBlockManager() {
@@ -157,33 +154,11 @@ public class FSDirectory implements Clos
}
/**
- * Load the filesystem image into memory.
- *
- * @param startOpt Startup type as specified by the user.
- * @throws IOException If image or editlog cannot be read.
+ * Notify that loading of this FSDirectory is complete, and
+ * it is ready for use
*/
- void loadFSImage(StartupOption startOpt)
- throws IOException {
- // format before starting up if requested
- if (startOpt == StartupOption.FORMAT) {
- fsImage.format(fsImage.getStorage().determineClusterId());// reuse current id
-
- startOpt = StartupOption.REGULAR;
- }
- boolean success = false;
- try {
- if (fsImage.recoverTransitionRead(startOpt)) {
- fsImage.saveNamespace();
- }
- fsImage.openEditLog();
-
- fsImage.setCheckpointDirectories(null, null);
- success = true;
- } finally {
- if (!success) {
- fsImage.close();
- }
- }
+ void imageLoadComplete() {
+ Preconditions.checkState(!ready, "FSDirectory already loaded");
writeLock();
try {
setReady(true);