You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by bo...@apache.org on 2011/02/23 21:22:36 UTC
svn commit: r1073927 - in /hadoop/hdfs/branches/HDFS-1052: ./
src/java/org/apache/hadoop/hdfs/
src/java/org/apache/hadoop/hdfs/server/common/
src/java/org/apache/hadoop/hdfs/server/datanode/
src/java/org/apache/hadoop/hdfs/server/namenode/ src/java/org...
Author: boryas
Date: Wed Feb 23 20:22:36 2011
New Revision: 1073927
URL: http://svn.apache.org/viewvc?rev=1073927&view=rev
Log:
HDFS-1634. Federation: Convert single threaded DataNode into per BlockPool thread model.
Added:
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java
Modified:
hadoop/hdfs/branches/HDFS-1052/CHANGES.txt
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolStorage.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java
hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java
hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
Modified: hadoop/hdfs/branches/HDFS-1052/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/CHANGES.txt?rev=1073927&r1=1073926&r2=1073927&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-1052/CHANGES.txt Wed Feb 23 20:22:36 2011
@@ -22,6 +22,9 @@ Trunk (unreleased changes)
HDFS-1632. Federation: data node storage structure changes and
introduce block pool storage. (tanping via suresh)
+ HDFS-1634. Federation: Convert single threaded DataNode into
+ per BlockPool thread model.(boryas)
+
IMPROVEMENTS
HDFS-1510. Added test-patch.properties required by test-patch.sh (nigel)
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1073927&r1=1073926&r2=1073927&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Wed Feb 23 20:22:36 2011
@@ -223,4 +223,6 @@ public class DFSConfigKeys extends Commo
public static final String DFS_SECONDARY_NAMENODE_KRB_HTTPS_USER_NAME_KEY = "dfs.secondary.namenode.kerberos.https.principal";
public static final String DFS_NAMENODE_NAME_CACHE_THRESHOLD_KEY = "dfs.namenode.name.cache.threshold";
public static final int DFS_NAMENODE_NAME_CACHE_THRESHOLD_DEFAULT = 10;
+
+ public static final String DFS_FEDERATION_NAMENODES = "dfs.federation.namenodes.uri";
}
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java?rev=1073927&r1=1073926&r2=1073927&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java Wed Feb 23 20:22:36 2011
@@ -130,4 +130,11 @@ public class StorageInfo implements Writ
}
clusterID = cid;
}
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("lv=").append(layoutVersion).append(";cid=").append(clusterID)
+ .append(";nsid=").append(namespaceID).append(";c=").append(cTime);
+ return sb.toString();
+ }
}
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolStorage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolStorage.java?rev=1073927&r1=1073926&r2=1073927&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolStorage.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolStorage.java Wed Feb 23 20:22:36 2011
@@ -66,11 +66,12 @@ public class BlockPoolStorage extends St
super(NodeType.DATA_NODE);
}
- BlockPoolStorage(int namespaceID, String bpID, long cTime) {
+ BlockPoolStorage(int namespaceID, String bpID, long cTime, String clusterId) {
super(NodeType.DATA_NODE);
this.namespaceID = namespaceID;
this.blockpoolID = bpID;
this.cTime = cTime;
+ this.clusterID = clusterId;
}
/**
@@ -508,4 +509,9 @@ public class BlockPoolStorage extends St
public boolean isConversionNeeded(StorageDirectory sd) throws IOException {
return false;
}
+
+ @Override
+ public String toString() {
+ return super.toString() + ";bpid=" + blockpoolID;
+ }
}
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1073927&r1=1073926&r2=1073927&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Wed Feb 23 20:22:36 2011
@@ -25,6 +25,7 @@ import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
+import java.lang.management.ManagementFactory;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
@@ -49,6 +50,9 @@ import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -126,11 +130,6 @@ import org.apache.hadoop.util.DiskChecke
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.mortbay.util.ajax.JSON;
-import java.lang.management.ManagementFactory;
-
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
/**********************************************************
* DataNode is a class (and program) that stores a set of
* blocks for a DFS deployment. A single deployment can
@@ -165,7 +164,7 @@ import javax.management.ObjectName;
@InterfaceAudience.Private
public class DataNode extends Configured
implements InterDatanodeProtocol, ClientDatanodeProtocol, FSConstants,
- Runnable, DataNodeMXBean {
+ DataNodeMXBean {
public static final Log LOG = LogFactory.getLog(DataNode.class);
static{
@@ -196,23 +195,22 @@ public class DataNode extends Configured
return NetUtils.createSocketAddr(target);
}
+ BPOfferService[] nameNodeThreads;
+ private Map<String, BPOfferService> bpMapping =
+ new HashMap<String, BPOfferService>();
public DatanodeProtocol namenode = null;
public FSDatasetInterface data = null;
public DatanodeRegistration dnRegistration = null;
+ private String clusterId = null;
volatile boolean shouldRun = true;
- private LinkedList<Block> receivedBlockList = new LinkedList<Block>();
- private LinkedList<String> delHints = new LinkedList<String>();
public final static String EMPTY_DEL_HINT = "";
AtomicInteger xmitsInProgress = new AtomicInteger();
Daemon dataXceiverServer = null;
ThreadGroup threadGroup = null;
long blockReportInterval;
- //disallow the sending of BR before instructed to do so
- long lastBlockReport = 0;
boolean resetBlockReportTime = true;
long initialBlockReportDelay = BLOCKREPORT_INITIAL_DELAY * 1000L;
- long lastHeartbeat = 0;
long heartBeatInterval;
private DataStorage storage = null;
private HttpServer infoServer = null;
@@ -220,8 +218,7 @@ public class DataNode extends Configured
private InetSocketAddress nameNodeAddr;
private InetSocketAddress nameNodeAddrForClient;
private InetSocketAddress selfAddr;
- private static DataNode datanodeObject = null;
- private Thread dataNodeThread = null;
+ static DataNode datanodeObject = null;
String machineName;
private static String dnThreadName;
int socketTimeout;
@@ -243,8 +240,10 @@ public class DataNode extends Configured
// For InterDataNodeProtocol
public Server ipcServer;
- private SecureResources secureResources = null;
-
+ private SecureResources secureResources = null;
+ private AbstractList<File> dataDirs;
+ private Configuration conf;
+
/**
* Create the DataNode given a configuration and an array of dataDirs.
* 'dataDirs' is where the blocks are stored.
@@ -282,8 +281,14 @@ public class DataNode extends Configured
startDataNode(conf, dataDirs, namenode, resources);
} catch (IOException ie) {
shutdown();
- throw ie;
- }
+ throw ie;
+ }
+ }
+
+ private synchronized void setClusterId(String cid) {
+ if(clusterId==null) {
+ clusterId = cid;
+ }
}
private void initConfig(Configuration conf) throws UnknownHostException {
@@ -296,9 +301,10 @@ public class DataNode extends Configured
conf.get("dfs.datanode.dns.interface","default"),
conf.get("dfs.datanode.dns.nameserver","default"));
}
- this.nameNodeAddr = NameNode.getServiceAddress(conf, true);
- this.nameNodeAddrForClient = NameNode.getAddress(conf);
-
+
+ // TODO:FEDERATION this.nameNodeAddr = NameNode.getServiceAddress(conf, true);
+ // FEDDERATION this.nameNodeAddrForClient = NameNode.getAddress(conf);
+
this.socketTimeout = conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
HdfsConstants.READ_TIMEOUT);
this.socketWriteTimeout = conf.getInt("dfs.datanode.socket.write.timeout",
@@ -361,44 +367,6 @@ public class DataNode extends Configured
this.dnRegistration.setInfoPort(this.infoServer.getPort());
}
- private void initFsDataSet(Configuration conf, AbstractList<File> dataDirs)
- throws IOException {
- // get version and id info from the name-node
- NamespaceInfo nsInfo = handshake();
-
- StartupOption startOpt = getStartupOption(conf);
- assert startOpt != null : "Startup option must be set.";
-
-
- boolean simulatedFSDataset =
- conf.getBoolean("dfs.datanode.simulateddatastorage", false);
- if (simulatedFSDataset) {
- setNewStorageID(dnRegistration);
- dnRegistration.storageInfo.layoutVersion = FSConstants.LAYOUT_VERSION;
- dnRegistration.storageInfo.namespaceID = nsInfo.namespaceID;
- dnRegistration.storageInfo.clusterID = nsInfo.clusterID;
- // it would have been better to pass storage as a parameter to
- // constructor below - need to augment ReflectionUtils used below.
- conf.set(DFSConfigKeys.DFS_DATANODE_STORAGEID_KEY, dnRegistration.getStorageID());
- try {
- //Equivalent of following (can't do because Simulated is in test dir)
- // this.data = new SimulatedFSDataset(conf);
- this.data = (FSDatasetInterface) ReflectionUtils.newInstance(
- Class.forName("org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset"), conf);
- } catch (ClassNotFoundException e) {
- throw new IOException(StringUtils.stringifyException(e));
- }
- } else { // real storage
- // read storage info, lock data dirs and transition fs state if necessary
- storage.recoverTransitionRead(nsInfo, dataDirs, startOpt);
- // adjust
- this.dnRegistration.setStorageInfo(storage);
- // initialize data node internal structure
- this.data = new FSDataset(storage, conf);
- }
- }
-
-
private void startPlugins(Configuration conf) {
plugins = conf.getInstances("dfs.datanode.plugins", ServicePlugin.class);
for (ServicePlugin p: plugins) {
@@ -474,6 +442,602 @@ public class DataNode extends Configured
this.threadGroup.setDaemon(true); // auto destroy when empty
}
+ // calls specific to BP
+ protected void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) {
+ BPOfferService bpos = bpMapping.get(block.getPoolId());
+ if(bpos != null)
+ bpos.notifyNamenodeReceivedBlock(block, delHint);
+ else
+ LOG.warn("Cannot find BPOfferService for reporting block received for bpid="
+ + block.getPoolId());
+ }
+
+ /**
+ * A thread per namenode to perform:
+ * <ul>
+ * <li> Pre-registration handshake with namenode</li>
+ * <li> Registration with namenode</li>
+ * <li> Send periodic heartbeats to the namenode</li>
+ * <li> Handle commands received from the datanode</li>
+ * </ul>
+ */
+ class BPOfferService implements Runnable {
+ final InetSocketAddress nn_addr;
+ DatanodeRegistration bpRegistration;
+ NamespaceInfo bpNSInfo;
+ long lastBlockReport = 0;
+ private Thread bpThread;
+ private DatanodeProtocol bpNamenode;
+ private String blockPoolId;
+ private long lastHeartbeat = 0;
+ private boolean initialized = false;
+ private final LinkedList<Block> receivedBlockList = new LinkedList<Block>();
+ private final LinkedList<String> delHints = new LinkedList<String>();
+
+ BPOfferService(InetSocketAddress isa, DatanodeRegistration bpRegistration) {
+ this.bpRegistration = bpRegistration;
+ this.nn_addr = isa;
+ }
+
+ /**
+ * returns true if BP thread has completed initialization
+ * @return true if initialized
+ */
+ public boolean initialized() {
+ return initialized;
+ }
+
+ public String getBlockPoolId() {
+ return blockPoolId;
+ }
+
+ void setNamespaceInfo(NamespaceInfo nsinfo) {
+ bpNSInfo = nsinfo;
+ this.blockPoolId = nsinfo.getBlockPoolID();
+ bpMapping.put(blockPoolId, this);
+ }
+
+ void setNameNode(DatanodeProtocol dnProtocol) {
+ this.bpNamenode = dnProtocol;
+ }
+
+ private NamespaceInfo handshake() throws IOException {
+ NamespaceInfo nsInfo = new NamespaceInfo();
+ while (shouldRun) {
+ try {
+ nsInfo = bpNamenode.versionRequest();
+ break;
+ } catch(SocketTimeoutException e) { // namenode is busy
+ LOG.info("Problem connecting to server: " + nn_addr);
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ie) {}
+ }
+ }
+ LOG.info("handshake: nsifno= " + nsInfo);
+ // TODO:FEDERATION on version mismatch datanode should continue
+ // to retry
+ // verify build version
+ if(! nsInfo.getBuildVersion().equals(Storage.getBuildVersion())) {
+ String errorMsg = "Incompatible build versions: namenode BV = "
+ + nsInfo.getBuildVersion() + "; datanode BV = "
+ + Storage.getBuildVersion();
+ LOG.fatal(errorMsg);
+ try {
+ bpNamenode.errorReport( bpRegistration,
+ DatanodeProtocol.NOTIFY, errorMsg );
+ } catch( SocketTimeoutException e ) { // namenode is busy
+ LOG.info("Problem connecting to server: " + nn_addr);
+ }
+ throw new IOException( errorMsg );
+ }
+ assert FSConstants.LAYOUT_VERSION == nsInfo.getLayoutVersion() :
+ "Data-node and name-node layout versions must be the same."
+ + "Expected: "+ FSConstants.LAYOUT_VERSION
+ + " actual "+ nsInfo.getLayoutVersion();
+ return nsInfo;
+ }
+
+
+ void setupBP(Configuration conf, AbstractList<File> dataDirs)
+ throws IOException {
+ // get NN proxy
+ DatanodeProtocol dnp =
+ (DatanodeProtocol)RPC.waitForProxy(DatanodeProtocol.class,
+ DatanodeProtocol.versionID, nn_addr, conf);
+ LOG.info("NN proxy created in BP="+blockPoolId + " for " + nn_addr);
+ setNameNode(dnp);
+
+ // handshake with NN
+ NamespaceInfo nsInfo = handshake();
+ LOG.info("received namespace info nsInfo=" + nsInfo);
+ setNamespaceInfo(nsInfo);
+ setClusterId(nsInfo.clusterID);
+
+ // setup storage..
+ StartupOption startOpt = getStartupOption(conf);
+ assert startOpt != null : "Startup option must be set.";
+
+ boolean simulatedFSDataset =
+ conf.getBoolean("dfs.datanode.simulateddatastorage", false);
+ if (simulatedFSDataset) {
+ bpRegistration.setStorageID(dnRegistration.getStorageID()); // same as mother DN
+ bpRegistration.storageInfo.layoutVersion = FSConstants.LAYOUT_VERSION;
+ bpRegistration.storageInfo.namespaceID = bpNSInfo.namespaceID;
+ bpRegistration.storageInfo.clusterID = bpNSInfo.clusterID;
+ //???? bpRegistration.storageInfo.blockpoolID = bpNSInfo.blockpoolID; // TODO:FEDERATION
+ } else {
+ // read storage info, lock data dirs and transition fs state if necessary
+ storage.recoverTransitionRead(blockPoolId, bpNSInfo, dataDirs, startOpt);
+ LOG.info("in setUp setting up storage: nsid=" + storage.namespaceID +
+ ";bpid=" + blockPoolId +
+ ";lv=" + storage.layoutVersion +
+ ";nsInfo=" + bpNSInfo);
+
+ // use BlockPoolStorage as storageInfo in registration.
+ bpRegistration.setStorageID(storage.getStorageID());
+ bpRegistration.setStorageInfo(storage.getBPStorage(blockPoolId));
+ //data.addStorage(blockPoolId, storage);
+ }
+ }
+
+ /**
+ * This methods arranges for the data node to send the block report at
+ * the next heartbeat.
+ */
+ void scheduleBlockReport(long delay) {
+ if (delay > 0) { // send BR after random delay
+ lastBlockReport = System.currentTimeMillis()
+ - ( blockReportInterval - R.nextInt((int)(delay)));
+ } else { // send at next heartbeat
+ lastBlockReport = lastHeartbeat - blockReportInterval;
+ }
+ resetBlockReportTime = true; // reset future BRs for randomness
+ }
+
+ /**
+ * Report received blocks and delete hints to the Namenode
+ * @throws IOException
+ */
+ private void reportReceivedBlocks() throws IOException {
+ //check if there are newly received blocks
+ Block [] blockArray=null;
+ String [] delHintArray=null;
+ synchronized(receivedBlockList) {
+ synchronized(delHints){
+ int numBlocks = receivedBlockList.size();
+ if (numBlocks > 0) {
+ if(numBlocks!=delHints.size()) {
+ LOG.warn("Panic: receiveBlockList and delHints are not of " +
+ "the same length" );
+ }
+ //
+ // Send newly-received blockids to namenode
+ //
+ blockArray = receivedBlockList.toArray(new Block[numBlocks]);
+ delHintArray = delHints.toArray(new String[numBlocks]);
+ }
+ }
+ }
+ if (blockArray != null) {
+ if(delHintArray == null || delHintArray.length != blockArray.length ) {
+ LOG.warn("Panic: block array & delHintArray are not the same" );
+ }
+ bpNamenode.blockReceived(dnRegistration, blockPoolId, blockArray,
+ delHintArray);
+ synchronized(receivedBlockList) {
+ synchronized(delHints){
+ for(int i=0; i<blockArray.length; i++) {
+ receivedBlockList.remove(blockArray[i]);
+ delHints.remove(delHintArray[i]);
+ }
+ }
+ }
+ }
+ }
+
+ /*
+ * Informing the name node could take a long long time! Should we wait
+ * till namenode is informed before responding with success to the
+ * client? For now we don't.
+ */
+ void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) {
+ if(block==null || delHint==null) {
+ throw new IllegalArgumentException(
+ block==null?"Block is null":"delHint is null");
+ }
+
+ if (block.getPoolId().equals(blockPoolId)) {
+ LOG.warn("BlockPool is mismaptch " + block.getBlockId() +
+ " vs. " + blockPoolId);
+ return;
+ }
+
+ synchronized (receivedBlockList) {
+ synchronized (delHints) {
+ receivedBlockList.add(block.getLocalBlock());
+ delHints.add(delHint);
+ receivedBlockList.notifyAll();
+ }
+ }
+ }
+
+
+ /**
+ * Report the list blocks to the Namenode
+ * @throws IOException
+ */
+ DatanodeCommand blockReport() throws IOException {
+ // send block report
+ DatanodeCommand cmd = null;
+ long startTime = now();
+ if (startTime - lastBlockReport > blockReportInterval) {
+ //
+ // Send latest block report if timer has expired.
+ // Get back a list of local block(s) that are obsolete
+ // and can be safely GC'ed.
+ //
+ long brStartTime = now();
+ BlockListAsLongs bReport = data.getBlockReport(/* TODO:FEDERATION pass blockPoolId*/);
+
+ // TODO:FEDERATION add support for pool ID
+ cmd = bpNamenode.blockReport(bpRegistration, blockPoolId, bReport
+ .getBlockListAsLongs());
+ long brTime = now() - brStartTime;
+ myMetrics.blockReports.inc(brTime);
+ LOG.info("BlockReport of " + bReport.getNumberOfBlocks() +
+ " blocks got processed in " + brTime + " msecs");
+ //
+ // If we have sent the first block report, then wait a random
+ // time before we start the periodic block reports.
+ //
+ if (resetBlockReportTime) {
+ lastBlockReport = startTime - R.nextInt((int)(blockReportInterval));
+ resetBlockReportTime = false;
+ } else {
+ /* say the last block report was at 8:20:14. The current report
+ * should have started around 9:20:14 (default 1 hour interval).
+ * If current time is :
+ * 1) normal like 9:20:18, next report should be at 10:20:14
+ * 2) unexpected like 11:35:43, next report should be at 12:20:14
+ */
+ lastBlockReport += (now() - lastBlockReport) /
+ blockReportInterval * blockReportInterval;
+ }
+ LOG.info("sent block report, processed command:" + cmd);
+ }
+ return cmd;
+ }
+
+
+ DatanodeCommand [] sendHeartBeat() throws IOException {
+ return bpNamenode.sendHeartbeat(bpRegistration,
+ data.getCapacity(),
+ data.getDfsUsed(),
+ data.getRemaining(),
+ xmitsInProgress.get(),
+ getXceiverCount());
+ }
+
+
+ /**
+ * Main loop for each BP thread. Run until shutdown,
+ * forever calling remote NameNode functions.
+ */
+ private void offerService() throws Exception {
+ LOG.info("For namenode " + nn_addr + " using BLOCKREPORT_INTERVAL of "
+ + blockReportInterval + "msec" + " Initial delay: "
+ + initialBlockReportDelay + "msec" + "; heartBeatInterval="
+ + heartBeatInterval);
+
+ //
+ // Now loop for a long time....
+ //
+ while (shouldRun) {
+ try {
+ long startTime = now();
+
+ //
+ // Every so often, send heartbeat or block-report
+ //
+ if (startTime - lastHeartbeat > heartBeatInterval) {
+ //
+ // All heartbeat messages include following info:
+ // -- Datanode name
+ // -- data transfer port
+ // -- Total capacity
+ // -- Bytes remaining
+ //
+ lastHeartbeat = startTime;
+ // TODO:FEDERATION include some global DN stats..
+ DatanodeCommand[] cmds = sendHeartBeat();
+ myMetrics.heartbeats.inc(now() - startTime);
+ if (!processCommand(cmds))
+ continue;
+ }
+
+ reportReceivedBlocks();
+
+ DatanodeCommand cmd = blockReport();
+ processCommand(cmd);
+
+ // start block scanner
+ if (blockScanner != null) {
+ synchronized(blockScanner) { // SHOULD BE MOVED OUT OF THE THREAD.. FEDERATION
+ if(blockScannerThread == null && upgradeManager.isUpgradeCompleted()) {
+ LOG.info("Starting Periodic block scanner.");
+ blockScannerThread = new Daemon(blockScanner);
+ blockScannerThread.start();
+ }
+ }
+ }
+
+ //
+ // There is no work to do; sleep until hearbeat timer elapses,
+ // or work arrives, and then iterate again.
+ //
+ long waitTime = heartBeatInterval -
+ (System.currentTimeMillis() - lastHeartbeat);
+ synchronized(receivedBlockList) {
+ if (waitTime > 0 && receivedBlockList.size() == 0) {
+ try {
+ receivedBlockList.wait(waitTime);
+ } catch (InterruptedException ie) {
+ }
+ }
+ } // synchronized
+ } catch(RemoteException re) {
+ String reClass = re.getClassName();
+ if (UnregisteredNodeException.class.getName().equals(reClass) ||
+ DisallowedDatanodeException.class.getName().equals(reClass) ||
+ IncorrectVersionException.class.getName().equals(reClass)) {
+ LOG.warn("DataNode is shutting down: " +
+ StringUtils.stringifyException(re));
+ shutdown(); // TODO:FEDERATION - ??? what to do here
+ return;
+ }
+ LOG.warn(StringUtils.stringifyException(re));
+ try {
+ long sleepTime = Math.min(1000, heartBeatInterval);
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ } catch (IOException e) {
+ LOG.warn(StringUtils.stringifyException(e));
+ }
+ } // while (shouldRun)
+ } // offerService
+
+
+
+ /**
+ * Register one bp with the corresponding NameNode
+ * <p>
+ * The bpDatanode needs to register with the namenode on startup in order
+ * 1) to report which storage it is serving now and
+ * 2) to receive a registrationID
+ *
+ * issued by the namenode to recognize registered datanodes.
+ *
+ * @see FSNamesystem#registerDatanode(DatanodeRegistration)
+ * @throws IOException
+ */
+ void register() throws IOException {
+ LOG.info("in register: sid=" + bpRegistration.getStorageID() + ";SI="
+ + bpRegistration.storageInfo);
+
+ while(shouldRun) {
+ try {
+ // reset name to machineName. Mainly for web interface. Same for all DB
+ bpRegistration.name = machineName + ":" + bpRegistration.getPort();
+ LOG.info("bpReg before =" + bpRegistration.storageInfo +
+ ";sid=" + bpRegistration.storageID);
+ bpRegistration = bpNamenode.registerDatanode(bpRegistration);
+ LOG.info("bpReg after =" + bpRegistration.storageInfo +
+ ";sid=" + bpRegistration.storageID);
+ break;
+ } catch(SocketTimeoutException e) { // namenode is busy
+ LOG.info("Problem connecting to server: " + nn_addr);
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ie) {}
+ }
+ }
+
+
+ // TODO:FEDERATION - reavaluate the following three checks!!!!!
+ assert ("".equals(storage.getStorageID())
+ && !"".equals(bpRegistration.getStorageID()))
+ || storage.getStorageID().equals(bpRegistration.getStorageID()) :
+ "New storageID can be assigned only if data-node is not formatted";
+
+ if (storage.getStorageID().equals("")) {
+ storage.setStorageID(bpRegistration.getStorageID());
+ storage.writeAll();
+ LOG.info("New storage id " + bpRegistration.getStorageID()
+ + " is assigned to data-node " + bpRegistration.getName());
+ }
+ if(! storage.getStorageID().equals(bpRegistration.getStorageID())) {
+ throw new IOException("Inconsistent storage IDs. Name-node returned "
+ + bpRegistration.getStorageID()
+ + ". Expecting " + storage.getStorageID());
+ }
+
+ if (!isBlockTokenInitialized) {
+ /* first time registering with NN */
+ ExportedBlockKeys keys = bpRegistration.exportedKeys;
+ isBlockTokenEnabled = keys.isBlockTokenEnabled();
+ if (isBlockTokenEnabled) {
+ long blockKeyUpdateInterval = keys.getKeyUpdateInterval();
+ long blockTokenLifetime = keys.getTokenLifetime();
+ LOG.info("Block token params received from NN: keyUpdateInterval="
+ + blockKeyUpdateInterval / (60 * 1000)
+ + " min(s), tokenLifetime=" + blockTokenLifetime / (60 * 1000)
+ + " min(s)");
+ blockTokenSecretManager.setTokenLifetime(blockTokenLifetime);
+ }
+ isBlockTokenInitialized = true;
+ }
+
+ if (isBlockTokenEnabled) {
+ blockTokenSecretManager.setKeys(bpRegistration.exportedKeys);
+ bpRegistration.exportedKeys = ExportedBlockKeys.DUMMY_KEYS;
+ }
+
+ LOG.info("in register:" + ";bpDNR="+bpRegistration.storageInfo);
+
+ // random short delay - helps scatter the BR from all DNs
+ scheduleBlockReport(initialBlockReportDelay);
+ }
+
+
+ /**
+ * No matter what kind of exception we get, keep retrying to offerService().
+ * That's the loop that connects to the NameNode and provides basic DataNode
+ * functionality.
+ *
+ * Only stop when "shouldRun" is turned off (which can only happen at shutdown).
+ */
+ public void run() {
+ LOG.info(bpRegistration + "In BPOfferService.run, data = " + data +
+ ";bp="+blockPoolId);
+
+ //init stuff
+ try {
+ // setup storage
+ setupBP(conf, dataDirs);
+ register();
+ } catch (IOException ioe) {
+ LOG.error(bpRegistration + ": Setup failed", ioe);
+ try {
+ // TODO:FEDERATION needs to unlock only this specific storage...
+ // and remove it....
+ storage.unlockAll();
+ } catch (Exception e) {
+ LOG.warn("failed to unlock storage for dn: " + bpRegistration, e);
+ }
+ // TODO:FEDERATION should be local only
+ //shutdown();
+ return;
+ }
+
+ initialized = true; // bp is initialized;
+
+ while (shouldRun) {
+ try {
+ // TODO:FEDERATION needs to be moved too
+ startDistributedUpgradeIfNeeded();
+ offerService();
+ } catch (Exception ex) {
+ LOG.error("Exception: " + StringUtils.stringifyException(ex));
+ if (shouldRun) {
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException ie) {
+ }
+ }
+ }
+ }
+
+ LOG.info(dnRegistration + ":Finishing DataNode in: "+data);
+ shutdown();
+ }
+
+ /**
+ * Process an array of datanode commands
+ *
+ * @param cmds an array of datanode commands
+ * @return true if further processing may be required or false otherwise.
+ */
+ private boolean processCommand(DatanodeCommand[] cmds) {
+ if (cmds != null) {
+ for (DatanodeCommand cmd : cmds) {
+ try {
+ if (processCommand(cmd) == false) {
+ return false;
+ }
+ } catch (IOException ioe) {
+ LOG.warn("Error processing datanode Command", ioe);
+ }
+ }
+ }
+ return true;
+ }
+
+ /**
+ *
+ * @param cmd
+ * @return true if further processing may be required or false otherwise.
+ * @throws IOException
+ */
+ private boolean processCommand(DatanodeCommand cmd) throws IOException {
+ if (cmd == null)
+ return true;
+ final BlockCommand bcmd =
+ cmd instanceof BlockCommand? (BlockCommand)cmd: null;
+
+ switch(cmd.getAction()) {
+ case DatanodeProtocol.DNA_TRANSFER:
+ // Send a copy of a block to another datanode
+ transferBlocks(bcmd.getPoolId(), bcmd.getBlocks(), bcmd.getTargets());
+ myMetrics.blocksReplicated.inc(bcmd.getBlocks().length);
+ break;
+ case DatanodeProtocol.DNA_INVALIDATE:
+ //
+ // Some local block(s) are obsolete and can be
+ // safely garbage-collected.
+ //
+ Block toDelete[] = bcmd.getBlocks();
+ try {
+ if (blockScanner != null) {
+ blockScanner.deleteBlocks(bcmd.getPoolId(), toDelete);
+ }
+ // using global fsdataset
+ data.invalidate(bcmd.getPoolId(), toDelete);
+ } catch(IOException e) {
+ checkDiskError();
+ throw e;
+ }
+ myMetrics.blocksRemoved.inc(toDelete.length);
+ break;
+ case DatanodeProtocol.DNA_SHUTDOWN:
+ // shut down the data node
+ shutdown(); //TODO:FEDERATION - we should not shutdown the whole datanode.
+ return false;
+ case DatanodeProtocol.DNA_REGISTER:
+ // namenode requested a registration - at start or if NN lost contact
+ LOG.info("DatanodeCommand action: DNA_REGISTER");
+ if (shouldRun) {
+ register();
+ }
+ break;
+ case DatanodeProtocol.DNA_FINALIZE:
+ // TODO:FEDERATION - global storage????? or per BP storage - add real BPID
+ storage.finalizeUpgrade("FAKE ID NEEDS TO BE REPLACED");
+ break;
+ case UpgradeCommand.UC_ACTION_START_UPGRADE:
+ // start distributed upgrade here
+ processDistributedUpgradeCommand((UpgradeCommand)cmd);
+ break;
+ case DatanodeProtocol.DNA_RECOVERBLOCK:
+ // TODO:FEDERATION - global storage????? or per BP storage
+ recoverBlocks(((BlockRecoveryCommand)cmd).getRecoveringBlocks());
+ break;
+ case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
+ LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");
+ if (isBlockTokenEnabled) {
+ blockTokenSecretManager.setKeys(((KeyUpdateCommand) cmd).getExportedKeys());
+ }
+ break;
+ default:
+ LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
+ }
+ return true;
+ }
+
+ }
+
/**
* This method starts the data node with the specified conf.
*
@@ -492,29 +1056,102 @@ public class DataNode extends Configured
throw new RuntimeException("Cannot start secure cluster without " +
"privileged resources.");
+ // settings global for all BPs in the Data Node
this.secureResources = resources;
- this.namenode = namenode;
+ this.dataDirs = dataDirs;
+ this.conf = conf;
+
storage = new DataStorage();
+ // global DN settings
initConfig(conf);
registerMXBean();
+ initFsDataSet(conf, dataDirs); // TODO:FEDERATION should this be moved to after at least one storage is created..
initDataXceiver(conf);
- initFsDataSet(conf, dataDirs);
- initBlockScanner(conf);
startInfoServer(conf);
-
+ initIpcServer(conf); // TODO:FEDERATION redirect the call appropriately
+
myMetrics = new DataNodeMetrics(conf, dnRegistration.getName());
- // TODO check what code removed here
- initIpcServer(conf);
+ // get all the NNs configured
+ nameNodeThreads = getAllNamenodes(conf);
+ }
+
+ private void initFsDataSet(Configuration conf, AbstractList<File> dataDirs)
+ throws IOException {
+ // get version and id info from the name-node
+ boolean simulatedFSDataset =
+ conf.getBoolean("dfs.datanode.simulateddatastorage", false);
+
+ if (simulatedFSDataset) {
+
+ if(data == null) { // create FSDataset
+ setNewStorageID(dnRegistration);
+ conf.set(DFSConfigKeys.DFS_DATANODE_STORAGEID_KEY,
+ dnRegistration.getStorageID());
+
+ // it would have been better to pass storage as a parameter to
+ // constructor below - need to augment ReflectionUtils used below.
+
+ try {
+ //TODO:FEDERATION Equivalent of following (can't do because Simulated is in test dir)
+ if(data==null) {
+ data = (FSDatasetInterface) ReflectionUtils.newInstance(
+ Class.forName(
+ "org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset"),
+ conf);
+ }
+ } catch (ClassNotFoundException e) {
+ throw new IOException(StringUtils.stringifyException(e));
+ }
+
+ }
+ // TODO:FEDERATION do we need set it to the general dnRegistration?????
+ // TODO:FEDERATION do we need LV,NSid, cid,bpid for datanode version file?
+
+ } else {
+ if(data == null)
+ data = new FSDataset(storage, conf);
+ }
+ }
+
+
+ void postStartInit(Configuration conf, AbstractList<File> dataDirs)
+ throws IOException {
+
+ initBlockScanner(conf);
+
startPlugins(conf);
// BlockTokenSecretManager is created here, but it shouldn't be
// used until it is initialized in register().
- this.blockTokenSecretManager = new BlockTokenSecretManager(false, 0, 0);
+ this.blockTokenSecretManager = new BlockTokenSecretManager(false, 0, 0);
}
/**
+ * for each namenode create an offerservice object
+ * Threads will be started later (out of DataNode constructor)
+ * @param conf
+ * @throws IOException
+ */
+ private BPOfferService[] getAllNamenodes(Configuration conf)
+ throws IOException {
+ if(nameNodeThreads != null)
+ return nameNodeThreads; // already initialized
+
+ // get NNs addresses from the configuration
+ InetSocketAddress[] isas = NameNode.getNNAddresses(conf);
+
+ AbstractList<BPOfferService> al = new ArrayList<BPOfferService> (isas.length);
+ for(InetSocketAddress isa : isas) {
+ BPOfferService bpos = new BPOfferService(isa, dnRegistration);
+ al.add(bpos);
+ }
+ nameNodeThreads = new BPOfferService[isas.length];
+ return al.toArray(nameNodeThreads);
+ }
+
+ /**
* Determine the http server's effective addr
*/
public static InetSocketAddress getInfoAddr(Configuration conf) {
@@ -540,48 +1177,12 @@ public class DataNode extends Configured
return (socketWriteTimeout > 0) ?
SocketChannel.open().socket() : new Socket();
}
-
- private NamespaceInfo handshake() throws IOException {
- NamespaceInfo nsInfo = new NamespaceInfo();
- while (shouldRun) {
- try {
- nsInfo = namenode.versionRequest();
- break;
- } catch(SocketTimeoutException e) { // namenode is busy
- LOG.info("Problem connecting to server: " + getNameNodeAddr());
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ie) {}
- }
- }
- String errorMsg = null;
- // verify build version
- if( ! nsInfo.getBuildVersion().equals( Storage.getBuildVersion() )) {
- errorMsg = "Incompatible build versions: namenode BV = "
- + nsInfo.getBuildVersion() + "; datanode BV = "
- + Storage.getBuildVersion();
- LOG.fatal( errorMsg );
- try {
- namenode.errorReport( dnRegistration,
- DatanodeProtocol.NOTIFY, errorMsg );
- } catch( SocketTimeoutException e ) { // namenode is busy
- LOG.info("Problem connecting to server: " + getNameNodeAddr());
- }
- throw new IOException( errorMsg );
- }
- assert FSConstants.LAYOUT_VERSION == nsInfo.getLayoutVersion() :
- "Data-node and name-node layout versions must be the same."
- + "Expected: "+ FSConstants.LAYOUT_VERSION + " actual "+ nsInfo.getLayoutVersion();
- return nsInfo;
- }
private static void setDataNode(DataNode node) {
datanodeObject = node;
}
- /** Return the DataNode object
- *
- */
+ /** Return the DataNode object */
public static DataNode getDataNode() {
return datanodeObject;
}
@@ -661,73 +1262,6 @@ public class DataNode extends Configured
dnReg.storageID = "DS-" + rand + "-"+ ip + "-" + dnReg.getPort() + "-" +
System.currentTimeMillis();
}
- /**
- * Register datanode
- * <p>
- * The datanode needs to register with the namenode on startup in order
- * 1) to report which storage it is serving now and
- * 2) to receive a registrationID
- * issued by the namenode to recognize registered datanodes.
- *
- * @see FSNamesystem#registerDatanode(DatanodeRegistration)
- * @throws IOException
- */
- private void register() throws IOException {
- if (dnRegistration.getStorageID().equals("")) {
- setNewStorageID(dnRegistration);
- }
- while(shouldRun) {
- try {
- // reset name to machineName. Mainly for web interface.
- dnRegistration.name = machineName + ":" + dnRegistration.getPort();
- dnRegistration = namenode.registerDatanode(dnRegistration);
- break;
- } catch(SocketTimeoutException e) { // namenode is busy
- LOG.info("Problem connecting to server: " + getNameNodeAddr());
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ie) {}
- }
- }
- assert ("".equals(storage.getStorageID())
- && !"".equals(dnRegistration.getStorageID()))
- || storage.getStorageID().equals(dnRegistration.getStorageID()) :
- "New storageID can be assigned only if data-node is not formatted";
- if (storage.getStorageID().equals("")) {
- storage.setStorageID(dnRegistration.getStorageID());
- storage.writeAll();
- LOG.info("New storage id " + dnRegistration.getStorageID()
- + " is assigned to data-node " + dnRegistration.getName());
- }
- if(! storage.getStorageID().equals(dnRegistration.getStorageID())) {
- throw new IOException("Inconsistent storage IDs. Name-node returned "
- + dnRegistration.getStorageID()
- + ". Expecting " + storage.getStorageID());
- }
-
- if (!isBlockTokenInitialized) {
- /* first time registering with NN */
- ExportedBlockKeys keys = dnRegistration.exportedKeys;
- this.isBlockTokenEnabled = keys.isBlockTokenEnabled();
- if (isBlockTokenEnabled) {
- long blockKeyUpdateInterval = keys.getKeyUpdateInterval();
- long blockTokenLifetime = keys.getTokenLifetime();
- LOG.info("Block token params received from NN: keyUpdateInterval="
- + blockKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime="
- + blockTokenLifetime / (60 * 1000) + " min(s)");
- blockTokenSecretManager.setTokenLifetime(blockTokenLifetime);
- }
- isBlockTokenInitialized = true;
- }
-
- if (isBlockTokenEnabled) {
- blockTokenSecretManager.setKeys(dnRegistration.exportedKeys);
- dnRegistration.exportedKeys = ExportedBlockKeys.DUMMY_KEYS;
- }
-
- // random short delay - helps scatter the BR from all DNs
- scheduleBlockReport(initialBlockReportDelay);
- }
/**
* Shut down this instance of the datanode.
@@ -788,6 +1322,16 @@ public class DataNode extends Configured
}
}
+ // shutdown BPOS thread TODO:FEDERATION - review if this is enough
+ for(BPOfferService bpos : nameNodeThreads) {
+ if(bpos != null && bpos.bpThread!=null) {
+ try {
+ bpos.bpThread.interrupt();
+ bpos.bpThread.join();
+ } catch (InterruptedException ie) {}
+ }
+ }
+
RPC.stopProxy(namenode); // stop the RPC threads
if(upgradeManager != null)
@@ -806,13 +1350,6 @@ public class DataNode extends Configured
LOG.warn("Exception when unlocking storage: " + ie, ie);
}
}
- if (dataNodeThread != null) {
- dataNodeThread.interrupt();
- try {
- dataNodeThread.join();
- } catch (InterruptedException ie) {
- }
- }
if (data != null) {
data.shutdown();
}
@@ -869,7 +1406,7 @@ public class DataNode extends Configured
if(hasEnoughResource) {
- scheduleBlockReport(0);
+ scheduleAllBlockReport(0);
return; // do not shutdown
}
@@ -882,184 +1419,6 @@ public class DataNode extends Configured
return threadGroup == null ? 0 : threadGroup.activeCount();
}
- /**
- * Main loop for the DataNode. Runs until shutdown,
- * forever calling remote NameNode functions.
- */
- public void offerService() throws Exception {
-
- LOG.info("using BLOCKREPORT_INTERVAL of " + blockReportInterval + "msec" +
- " Initial delay: " + initialBlockReportDelay + "msec");
-
- //
- // Now loop for a long time....
- //
- while (shouldRun) {
- try {
- long startTime = now();
-
- //
- // Every so often, send heartbeat or block-report
- //
-
- if (startTime - lastHeartbeat > heartBeatInterval) {
- //
- // All heartbeat messages include following info:
- // -- Datanode name
- // -- data transfer port
- // -- Total capacity
- // -- Bytes remaining
- //
- lastHeartbeat = startTime;
- DatanodeCommand[] cmds = namenode.sendHeartbeat(dnRegistration,
- data.getCapacity(),
- data.getDfsUsed(),
- data.getRemaining(),
- xmitsInProgress.get(),
- getXceiverCount());
- myMetrics.heartbeats.inc(now() - startTime);
- //LOG.info("Just sent heartbeat, with name " + localName);
- if (!processCommand(cmds))
- continue;
- }
-
- reportReceivedBlocks();
-
- DatanodeCommand cmd = blockReport();
- processCommand(cmd);
-
- // start block scanner
- if (blockScanner != null && blockScannerThread == null &&
- upgradeManager.isUpgradeCompleted()) {
- LOG.info("Starting Periodic block scanner.");
- blockScannerThread = new Daemon(blockScanner);
- blockScannerThread.start();
- }
-
- //
- // There is no work to do; sleep until hearbeat timer elapses,
- // or work arrives, and then iterate again.
- //
- long waitTime = heartBeatInterval - (System.currentTimeMillis() - lastHeartbeat);
- synchronized(receivedBlockList) {
- if (waitTime > 0 && receivedBlockList.size() == 0) {
- try {
- receivedBlockList.wait(waitTime);
- } catch (InterruptedException ie) {
- }
- }
- } // synchronized
- } catch(RemoteException re) {
- String reClass = re.getClassName();
- if (UnregisteredNodeException.class.getName().equals(reClass) ||
- DisallowedDatanodeException.class.getName().equals(reClass) ||
- IncorrectVersionException.class.getName().equals(reClass)) {
- LOG.warn("DataNode is shutting down: " +
- StringUtils.stringifyException(re));
- shutdown();
- return;
- }
- LOG.warn(StringUtils.stringifyException(re));
- try {
- long sleepTime = Math.min(1000, heartBeatInterval);
- Thread.sleep(sleepTime);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- }
- } catch (IOException e) {
- LOG.warn(StringUtils.stringifyException(e));
- }
- } // while (shouldRun)
- } // offerService
-
- /**
- * Process an array of datanode commands
- *
- * @param cmds an array of datanode commands
- * @return true if further processing may be required or false otherwise.
- */
- private boolean processCommand(DatanodeCommand[] cmds) {
- if (cmds != null) {
- for (DatanodeCommand cmd : cmds) {
- try {
- if (processCommand(cmd) == false) {
- return false;
- }
- } catch (IOException ioe) {
- LOG.warn("Error processing datanode Command", ioe);
- }
- }
- }
- return true;
- }
-
- /**
- *
- * @param cmd
- * @return true if further processing may be required or false otherwise.
- * @throws IOException
- */
- private boolean processCommand(DatanodeCommand cmd) throws IOException {
- if (cmd == null)
- return true;
- final BlockCommand bcmd = cmd instanceof BlockCommand? (BlockCommand)cmd: null;
-
- switch(cmd.getAction()) {
- case DatanodeProtocol.DNA_TRANSFER:
- // Send a copy of a block to another datanode
- transferBlocks(bcmd.getPoolId(), bcmd.getBlocks(), bcmd.getTargets());
- myMetrics.blocksReplicated.inc(bcmd.getBlocks().length);
- break;
- case DatanodeProtocol.DNA_INVALIDATE:
- //
- // Some local block(s) are obsolete and can be
- // safely garbage-collected.
- //
- Block toDelete[] = bcmd.getBlocks();
- try {
- if (blockScanner != null) {
- blockScanner.deleteBlocks(bcmd.getPoolId(), toDelete);
- }
- data.invalidate(bcmd.getPoolId(), toDelete);
- } catch(IOException e) {
- checkDiskError();
- throw e;
- }
- myMetrics.blocksRemoved.inc(toDelete.length);
- break;
- case DatanodeProtocol.DNA_SHUTDOWN:
- // shut down the data node
- this.shutdown();
- return false;
- case DatanodeProtocol.DNA_REGISTER:
- // namenode requested a registration - at start or if NN lost contact
- LOG.info("DatanodeCommand action: DNA_REGISTER");
- if (shouldRun) {
- register();
- }
- break;
- case DatanodeProtocol.DNA_FINALIZE:
- storage.finalizeUpgrade(bcmd.getPoolId());
- break;
- case UpgradeCommand.UC_ACTION_START_UPGRADE:
- // start distributed upgrade here
- processDistributedUpgradeCommand((UpgradeCommand)cmd);
- break;
- case DatanodeProtocol.DNA_RECOVERBLOCK:
- recoverBlocks(((BlockRecoveryCommand)cmd).getRecoveringBlocks());
- break;
- case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
- LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");
- if (isBlockTokenEnabled) {
- blockTokenSecretManager.setKeys(((KeyUpdateCommand) cmd).getExportedKeys());
- }
- break;
- default:
- LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
- }
- return true;
- }
-
// Distributed upgrade manager
UpgradeManagerDatanode upgradeManager = new UpgradeManagerDatanode(this);
@@ -1068,92 +1427,6 @@ public class DataNode extends Configured
assert upgradeManager != null : "DataNode.upgradeManager is null.";
upgradeManager.processUpgradeCommand(comm);
}
-
- /**
- * Report received blocks and delete hints to the Namenode
- * @throws IOException
- */
- private void reportReceivedBlocks() throws IOException {
- //check if there are newly received blocks
- Block [] blockArray=null;
- String [] delHintArray=null;
- synchronized(receivedBlockList) {
- synchronized(delHints){
- int numBlocks = receivedBlockList.size();
- if (numBlocks > 0) {
- if(numBlocks!=delHints.size()) {
- LOG.warn("Panic: receiveBlockList and delHints are not of the same length" );
- }
- //
- // Send newly-received blockids to namenode
- //
- blockArray = receivedBlockList.toArray(new Block[numBlocks]);
- delHintArray = delHints.toArray(new String[numBlocks]);
- }
- }
- }
- if (blockArray != null) {
- if(delHintArray == null || delHintArray.length != blockArray.length ) {
- LOG.warn("Panic: block array & delHintArray are not the same" );
- }
- // TODO:FEDERATION add support for pool ID
- namenode.blockReceived(dnRegistration, "TODO", blockArray, delHintArray);
- synchronized(receivedBlockList) {
- synchronized(delHints){
- for(int i=0; i<blockArray.length; i++) {
- receivedBlockList.remove(blockArray[i]);
- delHints.remove(delHintArray[i]);
- }
- }
- }
- }
- }
-
- /**
- * Report the list blocks to the Namenode
- * @throws IOException
- */
- private DatanodeCommand blockReport() throws IOException {
- // send block report
- DatanodeCommand cmd = null;
- long startTime = now();
- if (startTime - lastBlockReport > blockReportInterval) {
- //
- // Send latest block report if timer has expired.
- // Get back a list of local block(s) that are obsolete
- // and can be safely GC'ed.
- //
- long brStartTime = now();
- BlockListAsLongs bReport = data.getBlockReport();
-
- // TODO:FEDERATION add support for pool ID
- cmd = namenode.blockReport(dnRegistration, "TODO", bReport
- .getBlockListAsLongs());
- long brTime = now() - brStartTime;
- myMetrics.blockReports.inc(brTime);
- LOG.info("BlockReport of " + bReport.getNumberOfBlocks() +
- " blocks got processed in " + brTime + " msecs");
- //
- // If we have sent the first block report, then wait a random
- // time before we start the periodic block reports.
- //
- if (resetBlockReportTime) {
- lastBlockReport = startTime - R.nextInt((int)(blockReportInterval));
- resetBlockReportTime = false;
- } else {
- /* say the last block report was at 8:20:14. The current report
- * should have started around 9:20:14 (default 1 hour interval).
- * If current time is :
- * 1) normal like 9:20:18, next report should be at 10:20:14
- * 2) unexpected like 11:35:43, next report should be at 12:20:14
- */
- lastBlockReport += (now() - lastBlockReport) /
- blockReportInterval * blockReportInterval;
- }
- }
- return cmd;
- }
-
/**
* Start distributed upgrade if it should be initiated by the data-node.
*/
@@ -1220,29 +1493,6 @@ public class DataNode extends Configured
}
}
- /*
- * Informing the name node could take a long long time! Should we wait
- * till namenode is informed before responding with success to the
- * client? For now we don't.
- */
- protected void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) {
- if(block==null || delHint==null) {
- throw new IllegalArgumentException(block==null?"Block is null":"delHint is null");
- }
- synchronized (receivedBlockList) {
- synchronized (delHints) {
- // TODO:FEDERATION receivedBlockList should be per block pool
- // TODO:FEDERATION use ExtendedBlock
- receivedBlockList.add(block.getLocalBlock());
- delHints.add(delHint);
- receivedBlockList.notifyAll();
- }
- }
- }
-
-
-
-
/* ********************************************************************
Protocol when a client reads data from Datanode (Cur Ver: 9):
@@ -1412,64 +1662,41 @@ public class DataNode extends Configured
*/
void closeBlock(ExtendedBlock block, String delHint) {
myMetrics.blocksWritten.inc();
- notifyNamenodeReceivedBlock(block, delHint);
+ BPOfferService bpos = nameNodeThreads[0];
+ // TODO:FEDERATION - find the corresponding bp - for now , for compiliation, pick the first one
+ bpos.notifyNamenodeReceivedBlock(new ExtendedBlock(block), delHint);
if (blockScanner != null) {
blockScanner.addBlock(block);
}
}
- /**
- * No matter what kind of exception we get, keep retrying to offerService().
- * That's the loop that connects to the NameNode and provides basic DataNode
- * functionality.
- *
- * Only stop when "shouldRun" is turned off (which can only happen at shutdown).
+ /** Start a single datanode daemon and wait for it to finish.
+ * If this thread is specifically interrupted, it will stop waiting.
*/
- public void run() {
- LOG.info(dnRegistration + "In DataNode.run, data = " + data);
+ public void runDatanodeDaemon() throws IOException {
+ if (nameNodeThreads != null) {
+ // Start namenode threads
+ for(BPOfferService bp : nameNodeThreads) {
+ bp.bpThread = new Thread(bp, dnThreadName);
+ bp.bpThread.setDaemon(true); // needed for JUnit testing
+ bp.bpThread.start();
+ }
+ }
// start dataXceiveServer
dataXceiverServer.start();
ipcServer.start();
-
- while (shouldRun) {
- try {
- startDistributedUpgradeIfNeeded();
- offerService();
- } catch (Exception ex) {
- LOG.error("Exception: " + StringUtils.stringifyException(ex));
- if (shouldRun) {
- try {
- Thread.sleep(5000);
- } catch (InterruptedException ie) {
- }
- }
- }
- }
-
- LOG.info(dnRegistration + ":Finishing DataNode in: "+data);
- shutdown();
- }
-
- /** Start a single datanode daemon and wait for it to finish.
- * If this thread is specifically interrupted, it will stop waiting.
- */
- public static void runDatanodeDaemon(DataNode dn) throws IOException {
- if (dn != null) {
- //register datanode
- dn.register();
- dn.dataNodeThread = new Thread(dn, dnThreadName);
- dn.dataNodeThread.setDaemon(true); // needed for JUnit testing
- dn.dataNodeThread.start();
- }
+
+ postStartInit(conf, dataDirs);
}
-
+
static boolean isDatanodeUp(DataNode dn) {
- return dn.dataNodeThread != null && dn.dataNodeThread.isAlive();
+ //return dn.dataNodeThread != null && dn.dataNodeThread.isAlive();
+ return true; // TODO:FEDERATION - put the right condition
}
/** Instantiate a single datanode object. This must be run by invoking
- * {@link DataNode#runDatanodeDaemon(DataNode)} subsequently.
+ * {@link DataNode#runDatanodeDaemon()} subsequently.
*/
public static DataNode instantiateDataNode(String args[],
Configuration conf) throws IOException {
@@ -1477,7 +1704,7 @@ public class DataNode extends Configured
}
/** Instantiate a single datanode object, along with its secure resources.
- * This must be run by invoking{@link DataNode#runDatanodeDaemon(DataNode)}
+ * This must be run by invoking{@link DataNode#runDatanodeDaemon()}
* subsequently.
*/
public static DataNode instantiateDataNode(String args [], Configuration conf,
@@ -1530,15 +1757,17 @@ public class DataNode extends Configured
public static DataNode createDataNode(String args[], Configuration conf,
SecureResources resources) throws IOException {
DataNode dn = instantiateDataNode(args, conf, resources);
- runDatanodeDaemon(dn);
+ dn.runDatanodeDaemon();
return dn;
}
void join() {
- if (dataNodeThread != null) {
- try {
- dataNodeThread.join();
- } catch (InterruptedException e) {}
+ // TODO:FEDERATION do not ignore InterruptedException
+ for(BPOfferService bpos : nameNodeThreads) {
+ if(bpos.bpThread != null)
+ try {
+ bpos.bpThread.join();
+ } catch (InterruptedException e) {}
}
}
@@ -1646,16 +1875,13 @@ public class DataNode extends Configured
}
/**
- * This methods arranges for the data node to send the block report at the next heartbeat.
+ * This methods arranges for the data node to send
+ * the block report at the next heartbeat.
*/
- public void scheduleBlockReport(long delay) {
- if (delay > 0) { // send BR after random delay
- lastBlockReport = System.currentTimeMillis()
- - ( blockReportInterval - R.nextInt((int)(delay)));
- } else { // send at next heartbeat
- lastBlockReport = lastHeartbeat - blockReportInterval;
+ public void scheduleAllBlockReport(long delay) {
+ for(BPOfferService bpos : nameNodeThreads) {
+ bpos.scheduleBlockReport(delay);
}
- resetBlockReportTime = true; // reset future BRs for randomness
}
@@ -2001,7 +2227,7 @@ public class DataNode extends Configured
@Override // DataNodeMXBean
public String getClusterId() {
- return this.storage.clusterID;
-}
+ return clusterId;
+ }
}
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java?rev=1073927&r1=1073926&r2=1073927&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java Wed Feb 23 20:22:36 2011
@@ -87,6 +87,10 @@ public class DataStorage extends Storage
storageID = "";
}
+ public StorageInfo getBPStorage(String bpid) {
+ return bpStorageMap.get(bpid);
+ }
+
public DataStorage(StorageInfo storageInfo, String strgID) {
super(NodeType.DATA_NODE, storageInfo);
this.storageID = strgID;
@@ -118,7 +122,7 @@ public class DataStorage extends Storage
Collection<File> dataDirs,
StartupOption startOpt
) throws IOException {
- if (this.initilized) {
+ if (initilized) {
// DN storage has been initialized, no need to do anything
return;
}
@@ -169,7 +173,7 @@ public class DataStorage extends Storage
// 2. Do transitions
// Each storage directory is treated individually.
- // During sturtup some of them can upgrade or rollback
+ // During startup some of them can upgrade or rollback
// while others could be uptodate for the regular startup.
for(int idx = 0; idx < getNumStorageDirs(); idx++) {
doTransition(getStorageDir(idx), nsInfo, startOpt);
@@ -179,6 +183,12 @@ public class DataStorage extends Storage
"Data-node and name-node CTimes must be the same.";
}
+ // make sure we have storage id set - if not - generate new one
+ if(storageID.isEmpty()) {
+ DataNode.setNewStorageID(DataNode.datanodeObject.dnRegistration);
+ storageID = DataNode.datanodeObject.dnRegistration.storageID;
+ }
+
// 3. Update all storages. Some of them might have just been formatted.
this.writeAll();
@@ -210,7 +220,8 @@ public class DataStorage extends Storage
// mkdir for the list of BlockPoolStorage
makeBlockPoolDataDir(bpDataDirs, null);
BlockPoolStorage bpStorage = new BlockPoolStorage(nsInfo.getNamespaceID(),
- bpID, nsInfo.getCTime());
+ bpID, nsInfo.getCTime(), nsInfo.getClusterID());
+
bpStorage.recoverTransitionRead(nsInfo, bpDataDirs, startOpt);
addBlockPoolStorage(bpID, bpStorage);
}
@@ -447,7 +458,7 @@ public class DataStorage extends Storage
// 3. Format BP and hard link blocks from previous directory
File curBpDir = getBpRoot(nsInfo.getBlockPoolID(), curDir);
BlockPoolStorage bpStorage = new BlockPoolStorage(nsInfo.getNamespaceID(),
- nsInfo.getBlockPoolID(), nsInfo.getCTime());
+ nsInfo.getBlockPoolID(), nsInfo.getCTime(), nsInfo.getClusterID());
bpStorage.format(new StorageDirectory(curBpDir), nsInfo);
linkAllBlocks(tmpDir, curBpDir);
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1073927&r1=1073926&r2=1073927&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Wed Feb 23 20:22:36 2011
@@ -881,6 +881,8 @@ public class FSDataset implements FSCons
}
volumes = new FSVolumeSet(volArray);
volumes.getVolumeMap(volumeMap);
+
+ // TODO:FEDERATION this needs to be moved to addStorage()
File[] roots = new File[storage.getNumStorageDirs()];
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
roots[idx] = storage.getStorageDir(idx).getCurrentDir();
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1073927&r1=1073926&r2=1073927&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Wed Feb 23 20:22:36 2011
@@ -722,6 +722,7 @@ public class FSImage extends Storage {
blockpoolID = bpid;
}
+ @Override
protected void getFields(Properties props,
StorageDirectory sd
) throws IOException {
@@ -774,6 +775,7 @@ public class FSImage extends Storage {
* @param sd storage directory
* @throws IOException
*/
+ @Override
protected void setFields(Properties props,
StorageDirectory sd
) throws IOException {
@@ -2338,7 +2340,7 @@ public class FSImage extends Storage {
U_STR.write(out);
}
- String getBlockPoolID() {
+ public String getBlockPoolID() {
return blockpoolID;
}
}
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1073927&r1=1073926&r2=1073927&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Wed Feb 23 20:22:36 2011
@@ -525,11 +525,12 @@ public class FSNamesystem implements FSC
}
NamespaceInfo getNamespaceInfo() {
- return new NamespaceInfo(dir.fsImage.getNamespaceID(),
- dir.fsImage.getClusterID(),
- dir.fsImage.getBlockPoolID(),
+ NamespaceInfo nsinfo = new NamespaceInfo(dir.fsImage.getNamespaceID(),
+ getClusterId(),
+ getBlockpoolId(),
dir.fsImage.getCTime(),
getDistributedUpgradeVersion());
+ return nsinfo;
}
/**
@@ -2502,7 +2503,7 @@ public class FSNamesystem implements FSC
* namespaceID and will continue serving the datanodes that has previously
* registered with the namenode without restarting the whole cluster.
*
- * @see org.apache.hadoop.hdfs.server.datanode.DataNode#register()
+ * @see org.apache.hadoop.hdfs.server.datanode.DataNode
*/
public void registerDatanode(DatanodeRegistration nodeReg
) throws IOException {
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1073927&r1=1073926&r2=1073927&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Wed Feb 23 20:22:36 2011
@@ -22,6 +22,8 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
+import java.util.AbstractList;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
@@ -31,9 +33,9 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.Options;
@@ -46,12 +48,12 @@ import org.apache.hadoop.hdfs.DFSConfigK
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -217,6 +219,51 @@ public class NameNode implements Namenod
public static InetSocketAddress getAddress(String address) {
return NetUtils.createSocketAddr(address, DEFAULT_PORT);
}
+
+ /**
+ * TODO:FEDERATION
+ * at this moment only support fs.default style enteries.
+ * @param conf
+ * @return array of namenodes' addresses
+ */
+ public static InetSocketAddress [] getNNAddresses(Configuration conf)
+ throws IOException {
+ URI[] nns=getNameNodesURIs(conf);
+ if(nns == null) {
+ throw new IOException("Federation namnodes are not configured correctly");
+ }
+
+ InetSocketAddress [] isas = new InetSocketAddress[nns.length];
+ int i=0;
+ for(URI u : nns) {
+ isas[i++] = getAddress(u);
+ }
+ return isas;
+ }
+
+ /**
+ * TODO:FEDERATION
+ * get the list of namenodes from the configuration
+ * create URI for each one of them
+ * @param conf
+ * @return list of URIs of all configured NameNodes
+ */
+ public static URI [] getNameNodesURIs(Configuration conf) {
+ String [] nnURIs = conf.getStrings(DFSConfigKeys.DFS_FEDERATION_NAMENODES);
+ if(nnURIs == null) {
+ nnURIs = new String[] { conf.get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY)};
+ }
+
+ AbstractList<URI> nns = new ArrayList<URI>(nnURIs.length);
+ for(String uri : nnURIs) {
+ // name should be prepened with FileSystem.fixName(uri)
+ // TBD
+ nns.add(URI.create(uri));
+ }
+
+ URI[] r = new URI[nns.size()];
+ return nns.toArray(r);
+ }
/**
* Set the configuration property for the service rpc address
@@ -246,6 +293,15 @@ public class NameNode implements Namenod
public static InetSocketAddress getAddress(Configuration conf) {
URI filesystemURI = FileSystem.getDefaultUri(conf);
+ return getAddress(filesystemURI);
+ }
+
+
+ /**
+ * TODO:FEDERATION
+ * @param filesystemURI
+ */
+ public static InetSocketAddress getAddress(URI filesystemURI) {
String authority = filesystemURI.getAuthority();
if (authority == null) {
throw new IllegalArgumentException(String.format(
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java?rev=1073927&r1=1073926&r2=1073927&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java Wed Feb 23 20:22:36 2011
@@ -28,7 +28,7 @@ import org.apache.hadoop.hdfs.protocol.D
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
-import org.apache.hadoop.hdfs.server.datanode.DataStorage;
+import org.apache.hadoop.hdfs.server.datanode.BlockPoolStorage;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableFactory;
@@ -77,9 +77,8 @@ implements Writable, NodeRegistration {
this.ipcPort = ipcPort;
}
- public void setStorageInfo(DataStorage storage) {
+ public void setStorageInfo(StorageInfo storage) {
this.storageInfo = new StorageInfo(storage);
- this.storageID = storage.getStorageID();
}
public void setName(String name) {
@@ -108,6 +107,7 @@ implements Writable, NodeRegistration {
+ ", storageID=" + storageID
+ ", infoPort=" + infoPort
+ ", ipcPort=" + ipcPort
+ + ", storageInfo=" + storageInfo
+ ")";
}
Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java?rev=1073927&r1=1073926&r2=1073927&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java Wed Feb 23 20:22:36 2011
@@ -94,4 +94,8 @@ public class NamespaceInfo extends Stora
distributedUpgradeVersion = in.readInt();
blockPoolID = WritableUtils.readString(in);
}
+
+ public String toString(){
+ return super.toString() + ";bpid=" + blockPoolID;
+ }
}
Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1073927&r1=1073926&r2=1073927&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java Wed Feb 23 20:22:36 2011
@@ -629,7 +629,7 @@ public class MiniDFSCluster {
StaticMapping.addNodeToRack(ipAddr + ":" + port,
racks[i-curDatanodesNum]);
}
- DataNode.runDatanodeDaemon(dn);
+ dn.runDatanodeDaemon();
dataNodes.add(new DataNodeProperties(dn, newconf, dnArgs));
}
curDatanodesNum += numDataNodes;
@@ -1137,7 +1137,7 @@ public class MiniDFSCluster {
String bpid = getNamesystem().getPoolId();
SimulatedFSDataset sdataset = (SimulatedFSDataset) dataSet;
sdataset.injectBlocks(bpid, blocksToInject);
- dataNodes.get(dataNodeIndex).datanode.scheduleBlockReport(0);
+ dataNodes.get(dataNodeIndex).datanode.scheduleAllBlockReport(0);
}
/**
Added: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java?rev=1073927&view=auto
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java (added)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java Wed Feb 23 20:22:36 2011
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.datanode.DataNode.BPOfferService;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
+import org.apache.hadoop.net.DNS;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestDataNodeMultipleRegistrations {
+ private static final Log LOG =
+ LogFactory.getLog(TestDataNodeMultipleRegistrations.class);
+ File common_base_dir;
+ String localHost;
+ Configuration conf;
+
+ @Before
+ public void setUp() throws Exception {
+ common_base_dir = new File(MiniDFSCluster.getBaseDirectory());
+ if (common_base_dir != null) {
+ if (common_base_dir.exists() && !FileUtil.fullyDelete(common_base_dir)) {
+ throw new IOException("cannot get directory ready:"
+ + common_base_dir.getAbsolutePath());
+ }
+ }
+
+ conf = new HdfsConfiguration();
+ localHost = DNS.getDefaultHost(conf.get("dfs.datanode.dns.interface",
+ "default"), conf.get("dfs.datanode.dns.nameserver", "default"));
+
+ localHost = "127.0.0.1";
+ conf.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, localHost);
+ }
+
+ NameNode startNameNode(Configuration conf, int nnPort) throws IOException {
+ // per nn base_dir
+ File base_dir = new File(common_base_dir, Integer.toString(nnPort));
+
+ boolean manageNameDfsDirs = true; // for now
+ boolean format = true; // for now
+ // disable service authorization
+ conf.setBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION,
+ false);
+
+ // Setup the NameNode configuration
+ conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, localHost + ":0");
+ if (manageNameDfsDirs) {
+ String name = fileAsURI(new File(base_dir, "name1")) + ","
+ + fileAsURI(new File(base_dir, "name2"));
+ conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, name);
+ String sname = fileAsURI(new File(base_dir, "namesecondary1")) + ","
+ + fileAsURI(new File(base_dir, "namesecondary2"));
+ conf.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY, sname);
+ }
+
+ // Format and clean out DataNode directories
+ if (format) {
+ GenericTestUtils.formatNamenode(conf);
+ }
+
+ // Start the NameNode
+ String[] args = new String[] {};
+ return NameNode.createNameNode(args, conf);
+ }
+
+ public DataNode startDataNode(Configuration conf) throws IOException {
+ Configuration dnConf = new HdfsConfiguration(conf);
+ boolean manageDfsDirs = true; // for now
+ File data_dir = new File(common_base_dir, "data");
+ if (data_dir.exists() && !FileUtil.fullyDelete(data_dir)) {
+ throw new IOException("Cannot remove data directory: " + data_dir);
+ }
+
+ if (manageDfsDirs) {
+ File dir1 = new File(data_dir, "data1");
+ File dir2 = new File(data_dir, "data2");
+ dir1.mkdirs();
+ dir2.mkdirs();
+ if (!dir1.isDirectory() || !dir2.isDirectory()) {
+ throw new IOException(
+ "Mkdirs failed to create directory for DataNode: " + dir1 + " or "
+ + dir2);
+ }
+ String dirs = fileAsURI(dir1) + "," + fileAsURI(dir2);
+ dnConf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dirs);
+ conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dirs);
+ }
+ LOG.debug("Starting DataNode " + " with "
+ + DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY + ": "
+ + dnConf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY));
+
+ String[] dnArgs = null; // for now
+ DataNode dn = DataNode.instantiateDataNode(dnArgs, dnConf);
+ if (dn == null)
+ throw new IOException("Cannot start DataNode in "
+ + dnConf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY));
+
+ dn.runDatanodeDaemon();
+ return dn;
+ }
+
+ /**
+ * start multiple NNs and single DN and verifies per BP registrations and
+ * handshakes.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void test2NNRegistration() throws IOException {
+ NameNode nn1, nn2;
+ // figure out host name for DataNode
+ int nnPort = 9928;
+ String nnURL1 = "hdfs://" + localHost + ":" + Integer.toString(nnPort);
+ FileSystem.setDefaultUri(conf, nnURL1);
+ nn1 = startNameNode(conf, nnPort);
+
+ nnPort = 9929;
+ String nnURL2 = "hdfs://" + localHost + ":" + Integer.toString(nnPort);
+ FileSystem.setDefaultUri(conf, nnURL2);
+ nn2 = startNameNode(conf, nnPort);
+
+ Assert.assertNotNull("cannot create nn1", nn1);
+ Assert.assertNotNull("cannot create nn2", nn2);
+
+ String bpid1 = nn1.getFSImage().getBlockPoolID();
+ String bpid2 = nn2.getFSImage().getBlockPoolID();
+ String cid1 = nn1.getFSImage().getClusterID();
+ String cid2 = nn2.getFSImage().getClusterID();
+ int lv1 = nn1.getFSImage().getLayoutVersion();
+ int lv2 = nn2.getFSImage().getLayoutVersion();
+ int ns1 = nn1.getFSImage().namespaceID;
+ int ns2 = nn2.getFSImage().namespaceID;
+ Assert.assertNotSame("namespace ids should be different", ns1, ns2);
+ LOG.info("nn1: lv=" + lv1 + ";cid=" + cid1 + ";bpid=" + bpid1
+ + ";uri=" + nn1.getNameNodeAddress());
+ LOG.info("nn2: lv=" + lv2 + ";cid=" + cid2 + ";bpid=" + bpid2
+ + ";uri=" + nn2.getNameNodeAddress());
+
+ // now start the datanode...
+ String nns = nnURL1 + "," + nnURL2;
+ conf.set(DFSConfigKeys.DFS_FEDERATION_NAMENODES, nns);
+ DataNode dn = startDataNode(conf);
+ Assert.assertNotNull("failed to create DataNode", dn);
+ waitDataNodeUp(dn);
+
+ for (BPOfferService bpos : dn.nameNodeThreads) {
+ LOG.info("reg: bpid=" + "; name=" + bpos.bpRegistration.name
+ + "; sid=" + bpos.bpRegistration.storageID + "; nna=" + bpos.nn_addr);
+ }
+
+ BPOfferService bpos1 = dn.nameNodeThreads[0];
+ BPOfferService bpos2 = dn.nameNodeThreads[1];
+
+ Assert.assertEquals("wrong nn address", bpos1.nn_addr, nn1
+ .getNameNodeAddress());
+ Assert.assertEquals("wrong nn address", bpos2.nn_addr, nn2
+ .getNameNodeAddress());
+ Assert.assertEquals("wrong bpid", bpos1.getBlockPoolId(), bpid1);
+ Assert.assertEquals("wrong bpid", bpos2.getBlockPoolId(), bpid2);
+ Assert.assertEquals("wrong cid", dn.getClusterId(), cid1);
+ Assert.assertEquals("cid should be same", cid2, cid1);
+ Assert.assertEquals("namespace should be same", bpos1.bpNSInfo.namespaceID,
+ ns1);
+ Assert.assertEquals("namespace should be same", bpos2.bpNSInfo.namespaceID,
+ ns2);
+
+ dn.shutdown();
+ shutdownNN(nn1);
+ nn1 = null;
+ shutdownNN(nn2);
+ nn2 = null;
+ }
+
+ /**
+ * starts single nn and single dn and verifies registration and handshake
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testFedSingleNN() throws IOException {
+ NameNode nn1;
+ int nnPort = 9927;
+ // figure out host name for DataNode
+ String nnURL = "hdfs://" + localHost + ":" + Integer.toString(nnPort);
+
+ FileSystem.setDefaultUri(conf, nnURL);
+ nn1 = startNameNode(conf, nnPort);
+ Assert.assertNotNull("cannot create nn1", nn1);
+
+ String bpid1 = nn1.getFSImage().getBlockPoolID();
+ String cid1 = nn1.getFSImage().getClusterID();
+ int lv1 = nn1.getFSImage().getLayoutVersion();
+ LOG.info("nn1: lv=" + lv1 + ";cid=" + cid1 + ";bpid=" + bpid1
+ + ";uri=" + nn1.getNameNodeAddress());
+
+ // now start the datanode...
+ String nns = nnURL;
+ conf.set(DFSConfigKeys.DFS_FEDERATION_NAMENODES, nns);
+
+ DataNode dn = startDataNode(conf);
+ Assert.assertNotNull("failed to create DataNode", dn);
+
+ waitDataNodeUp(dn);
+ // try block report
+
+ for (BPOfferService bpos : dn.nameNodeThreads) {
+ LOG.info("reg: bpid=" + "; name=" + bpos.bpRegistration.name
+ + "; sid=" + bpos.bpRegistration.storageID + "; nna=" + bpos.nn_addr);
+ }
+ BPOfferService bpos1 = dn.nameNodeThreads[0];
+ bpos1.lastBlockReport = 0;
+ DatanodeCommand cmd = bpos1.blockReport();
+
+ Assert.assertNotNull("cmd is null", cmd);
+
+ Assert.assertEquals("wrong nn address", bpos1.nn_addr, nn1
+ .getNameNodeAddress());
+ Assert.assertEquals("wrong bpid", bpos1.getBlockPoolId(), bpid1);
+ Assert.assertEquals("wrong cid", dn.getClusterId(), cid1);
+
+ dn.shutdown();
+ dn = null;
+ shutdownNN(nn1);
+ nn1 = null;
+ }
+
+ private void shutdownNN(NameNode nn) {
+ if (nn == null) {
+ return;
+ }
+ nn.stop();
+ nn.join();
+ }
+
+ public boolean isDnUp(DataNode dn) {
+ boolean up = dn.nameNodeThreads.length > 0;
+ for (BPOfferService bpos : dn.nameNodeThreads) {
+ up = up && bpos.initialized();
+ }
+ return up;
+ }
+
+ public void waitDataNodeUp(DataNode dn) {
+ // should be something smart
+ while (!isDnUp(dn)) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ie) {
+ }
+ }
+ }
+}