You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/11/21 20:27:13 UTC
svn commit: r1204660 - in
/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/server/datanode/
src/test/java/org/apache/hadoop/hdfs/server/common/
src/test/java/org/apache/hadoop/hdfs/server/data...
Author: todd
Date: Mon Nov 21 19:27:12 2011
New Revision: 1204660
URL: http://svn.apache.org/viewvc?rev=1204660&view=rev
Log:
HDFS-2566. Move BPOfferService to be a non-inner class. Contributed by Todd Lipcon.
Added:
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
Modified:
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestDistributedUpgrade.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeExit.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestRefreshNamenodes.java
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1204660&r1=1204659&r2=1204660&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Mon Nov 21 19:27:12 2011
@@ -28,6 +28,8 @@ Release 0.23.1 - UNRELEASED
HDFS-2570. Add descriptions for dfs.*.https.address in hdfs-default.xml.
(eli)
+ HDFS-2566. Move BPOfferService to be a non-inner class. (todd)
+
OPTIMIZATIONS
HDFS-2130. Switch default checksum to CRC32C. (todd)
Added: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1204660&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java (added)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java Mon Nov 21 19:27:12 2011
@@ -0,0 +1,766 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.apache.hadoop.hdfs.server.common.Util.now;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.LinkedList;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
+import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
+import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.util.StringUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+/**
+ * A thread per namenode to perform:
+ * <ul>
+ * <li> Pre-registration handshake with namenode</li>
+ * <li> Registration with namenode</li>
+ * <li> Send periodic heartbeats to the namenode</li>
+ * <li> Handle commands received from the namenode</li>
+ * </ul>
+ */
+@InterfaceAudience.Private
+class BPOfferService implements Runnable {
+ static final Log LOG = DataNode.LOG;
+
+ final InetSocketAddress nnAddr;
+
+ /**
+ * Information about the namespace that this service
+ * is registering with. This is assigned after
+ * the first phase of the handshake.
+ */
+ NamespaceInfo bpNSInfo;
+
+ /**
+ * The registration information for this block pool.
+ * This is assigned after the second phase of the
+ * handshake.
+ */
+ DatanodeRegistration bpRegistration;
+
+ long lastBlockReport = 0;
+
+ boolean resetBlockReportTime = true;
+
+ Thread bpThread;
+ DatanodeProtocol bpNamenode;
+ private long lastHeartbeat = 0;
+ private volatile boolean initialized = false;
+ private final LinkedList<Block> receivedBlockList = new LinkedList<Block>();
+ private final LinkedList<String> delHints = new LinkedList<String>();
+ private volatile boolean shouldServiceRun = true;
+ UpgradeManagerDatanode upgradeManager = null;
+ private final DataNode dn;
+ private final DNConf dnConf;
+
+ BPOfferService(InetSocketAddress nnAddr, DataNode dn) {
+ this.dn = dn;
+ this.nnAddr = nnAddr;
+ this.dnConf = dn.getDnConf();
+ }
+
+ /**
+ * returns true if BP thread has completed initialization of storage
+ * and has registered with the corresponding namenode
+ * @return true if initialized
+ */
+ public boolean isInitialized() {
+ return initialized;
+ }
+
+ public boolean isAlive() {
+ return shouldServiceRun && bpThread.isAlive();
+ }
+
+ public String getBlockPoolId() {
+ if (bpNSInfo != null) {
+ return bpNSInfo.getBlockPoolID();
+ } else {
+ LOG.warn("Block pool ID needed, but service not yet registered with NN",
+ new Exception("trace"));
+ return null;
+ }
+ }
+
+ public NamespaceInfo getNamespaceInfo() {
+ return bpNSInfo;
+ }
+
+ public String toString() {
+ if (bpNSInfo == null) {
+ // If we haven't yet connected to our NN, we don't yet know our
+ // own block pool ID.
+ // If _none_ of the block pools have connected yet, we don't even
+ // know the storage ID of this DN.
+ String storageId = dn.getStorageId();
+ if (storageId == null || "".equals(storageId)) {
+ storageId = "unknown";
+ }
+ return "Block pool <registering> (storage id " + storageId +
+ ") connecting to " + nnAddr;
+ } else {
+ return "Block pool " + getBlockPoolId() +
+ " (storage id " + dn.getStorageId() +
+ ") registered with " + nnAddr;
+ }
+ }
+
+ InetSocketAddress getNNSocketAddress() {
+ return nnAddr;
+ }
+
+ /**
+ * Used to inject a spy NN in the unit tests.
+ */
+ @VisibleForTesting
+ void setNameNode(DatanodeProtocol dnProtocol) {
+ bpNamenode = dnProtocol;
+ }
+
+ /**
+ * Perform the first part of the handshake with the NameNode.
+ * This calls <code>versionRequest</code> to determine the NN's
+ * namespace and version info. It automatically retries until
+ * the NN responds or the DN is shutting down.
+ *
+ * @return the NamespaceInfo
+ * @throws IncorrectVersionException if the remote NN does not match
+ * this DN's version
+ */
+ NamespaceInfo retrieveNamespaceInfo() throws IncorrectVersionException {
+ NamespaceInfo nsInfo = null;
+ while (shouldRun()) {
+ try {
+ nsInfo = bpNamenode.versionRequest();
+ LOG.debug(this + " received versionRequest response: " + nsInfo);
+ break;
+ } catch(SocketTimeoutException e) { // namenode is busy
+ LOG.warn("Problem connecting to server: " + nnAddr);
+ } catch(IOException e ) { // namenode is not available
+ LOG.warn("Problem connecting to server: " + nnAddr);
+ }
+
+ // try again in a second
+ sleepAndLogInterrupts(5000, "requesting version info from NN");
+ }
+
+ if (nsInfo != null) {
+ checkNNVersion(nsInfo);
+ }
+ return nsInfo;
+ }
+
+ private void checkNNVersion(NamespaceInfo nsInfo)
+ throws IncorrectVersionException {
+ // build and layout versions should match
+ String nsBuildVer = nsInfo.getBuildVersion();
+ String stBuildVer = Storage.getBuildVersion();
+ if (!nsBuildVer.equals(stBuildVer)) {
+ LOG.warn("Data-node and name-node Build versions must be the same. " +
+ "Namenode build version: " + nsBuildVer + "Datanode " +
+ "build version: " + stBuildVer);
+ throw new IncorrectVersionException(nsBuildVer, "namenode", stBuildVer);
+ }
+
+ if (HdfsConstants.LAYOUT_VERSION != nsInfo.getLayoutVersion()) {
+ LOG.warn("Data-node and name-node layout versions must be the same." +
+ " Expected: "+ HdfsConstants.LAYOUT_VERSION +
+ " actual "+ bpNSInfo.getLayoutVersion());
+ throw new IncorrectVersionException(
+ bpNSInfo.getLayoutVersion(), "namenode");
+ }
+ }
+
+ private void connectToNNAndHandshake() throws IOException {
+ // get NN proxy
+ bpNamenode = (DatanodeProtocol)RPC.waitForProxy(DatanodeProtocol.class,
+ DatanodeProtocol.versionID, nnAddr, dn.getConf());
+
+ // First phase of the handshake with NN - get the namespace
+ // info.
+ bpNSInfo = retrieveNamespaceInfo();
+
+ // Now that we know the namespace ID, etc, we can pass this to the DN.
+ // The DN can now initialize its local storage if we are the
+ // first BP to handshake, etc.
+ dn.initBlockPool(this);
+
+ // Second phase of the handshake with the NN.
+ register();
+ }
+
+ /**
+ * This methods arranges for the data node to send the block report at
+ * the next heartbeat.
+ */
+ void scheduleBlockReport(long delay) {
+ if (delay > 0) { // send BR after random delay
+ lastBlockReport = System.currentTimeMillis()
+ - ( dnConf.blockReportInterval - DFSUtil.getRandom().nextInt((int)(delay)));
+ } else { // send at next heartbeat
+ lastBlockReport = lastHeartbeat - dnConf.blockReportInterval;
+ }
+ resetBlockReportTime = true; // reset future BRs for randomness
+ }
+
+ void reportBadBlocks(ExtendedBlock block) {
+ DatanodeInfo[] dnArr = { new DatanodeInfo(bpRegistration) };
+ LocatedBlock[] blocks = { new LocatedBlock(block, dnArr) };
+
+ try {
+ bpNamenode.reportBadBlocks(blocks);
+ } catch (IOException e){
+ /* One common reason is that NameNode could be in safe mode.
+ * Should we keep on retrying in that case?
+ */
+ LOG.warn("Failed to report bad block " + block + " to namenode : "
+ + " Exception", e);
+ }
+
+ }
+
+ /**
+ * Report received blocks and delete hints to the Namenode
+ * @throws IOException
+ */
+ private void reportReceivedBlocks() throws IOException {
+ //check if there are newly received blocks
+ Block [] blockArray=null;
+ String [] delHintArray=null;
+ synchronized(receivedBlockList) {
+ synchronized(delHints){
+ int numBlocks = receivedBlockList.size();
+ if (numBlocks > 0) {
+ if(numBlocks!=delHints.size()) {
+ LOG.warn("Panic: receiveBlockList and delHints are not of " +
+ "the same length" );
+ }
+ //
+ // Send newly-received blockids to namenode
+ //
+ blockArray = receivedBlockList.toArray(new Block[numBlocks]);
+ delHintArray = delHints.toArray(new String[numBlocks]);
+ }
+ }
+ }
+ if (blockArray != null) {
+ if(delHintArray == null || delHintArray.length != blockArray.length ) {
+ LOG.warn("Panic: block array & delHintArray are not the same" );
+ }
+ bpNamenode.blockReceived(bpRegistration, getBlockPoolId(), blockArray,
+ delHintArray);
+ synchronized(receivedBlockList) {
+ synchronized(delHints){
+ for(int i=0; i<blockArray.length; i++) {
+ receivedBlockList.remove(blockArray[i]);
+ delHints.remove(delHintArray[i]);
+ }
+ }
+ }
+ }
+ }
+
+ /*
+ * Informing the name node could take a long long time! Should we wait
+ * till namenode is informed before responding with success to the
+ * client? For now we don't.
+ */
+ void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) {
+ if(block==null || delHint==null) {
+ throw new IllegalArgumentException(
+ block==null?"Block is null":"delHint is null");
+ }
+
+ if (!block.getBlockPoolId().equals(getBlockPoolId())) {
+ LOG.warn("BlockPool mismatch " + block.getBlockPoolId() + " vs. "
+ + getBlockPoolId());
+ return;
+ }
+
+ synchronized (receivedBlockList) {
+ synchronized (delHints) {
+ receivedBlockList.add(block.getLocalBlock());
+ delHints.add(delHint);
+ receivedBlockList.notifyAll();
+ }
+ }
+ }
+
+
+ /**
+ * Report the list blocks to the Namenode
+ * @throws IOException
+ */
+ DatanodeCommand blockReport() throws IOException {
+ // send block report if timer has expired.
+ DatanodeCommand cmd = null;
+ long startTime = now();
+ if (startTime - lastBlockReport > dnConf.blockReportInterval) {
+
+ // Create block report
+ long brCreateStartTime = now();
+ BlockListAsLongs bReport = dn.data.getBlockReport(getBlockPoolId());
+
+ // Send block report
+ long brSendStartTime = now();
+ cmd = bpNamenode.blockReport(bpRegistration, getBlockPoolId(), bReport
+ .getBlockListAsLongs());
+
+ // Log the block report processing stats from Datanode perspective
+ long brSendCost = now() - brSendStartTime;
+ long brCreateCost = brSendStartTime - brCreateStartTime;
+ dn.metrics.addBlockReport(brSendCost);
+ LOG.info("BlockReport of " + bReport.getNumberOfBlocks()
+ + " blocks took " + brCreateCost + " msec to generate and "
+ + brSendCost + " msecs for RPC and NN processing");
+
+ // If we have sent the first block report, then wait a random
+ // time before we start the periodic block reports.
+ if (resetBlockReportTime) {
+ lastBlockReport = startTime - DFSUtil.getRandom().nextInt((int)(dnConf.blockReportInterval));
+ resetBlockReportTime = false;
+ } else {
+ /* say the last block report was at 8:20:14. The current report
+ * should have started around 9:20:14 (default 1 hour interval).
+ * If current time is :
+ * 1) normal like 9:20:18, next report should be at 10:20:14
+ * 2) unexpected like 11:35:43, next report should be at 12:20:14
+ */
+ lastBlockReport += (now() - lastBlockReport) /
+ dnConf.blockReportInterval * dnConf.blockReportInterval;
+ }
+ LOG.info("sent block report, processed command:" + cmd);
+ }
+ return cmd;
+ }
+
+
+ DatanodeCommand [] sendHeartBeat() throws IOException {
+ return bpNamenode.sendHeartbeat(bpRegistration,
+ dn.data.getCapacity(),
+ dn.data.getDfsUsed(),
+ dn.data.getRemaining(),
+ dn.data.getBlockPoolUsed(getBlockPoolId()),
+ dn.xmitsInProgress.get(),
+ dn.getXceiverCount(), dn.data.getNumFailedVolumes());
+ }
+
+ //This must be called only by blockPoolManager
+ void start() {
+ if ((bpThread != null) && (bpThread.isAlive())) {
+ //Thread is started already
+ return;
+ }
+ bpThread = new Thread(this, formatThreadName());
+ bpThread.setDaemon(true); // needed for JUnit testing
+ bpThread.start();
+ }
+
+ private String formatThreadName() {
+ Collection<URI> dataDirs = DataNode.getStorageDirs(dn.getConf());
+ return "DataNode: [" +
+ StringUtils.uriToString(dataDirs.toArray(new URI[0])) + "] " +
+ " heartbeating to " + nnAddr;
+ }
+
+ //This must be called only by blockPoolManager.
+ void stop() {
+ shouldServiceRun = false;
+ if (bpThread != null) {
+ bpThread.interrupt();
+ }
+ }
+
+ //This must be called only by blockPoolManager
+ void join() {
+ try {
+ if (bpThread != null) {
+ bpThread.join();
+ }
+ } catch (InterruptedException ie) { }
+ }
+
+ //Cleanup method to be called by current thread before exiting.
+ private synchronized void cleanUp() {
+
+ if(upgradeManager != null)
+ upgradeManager.shutdownUpgrade();
+ shouldServiceRun = false;
+ RPC.stopProxy(bpNamenode);
+ dn.shutdownBlockPool(this);
+ }
+
+ /**
+ * Main loop for each BP thread. Run until shutdown,
+ * forever calling remote NameNode functions.
+ */
+ private void offerService() throws Exception {
+ LOG.info("For namenode " + nnAddr + " using BLOCKREPORT_INTERVAL of "
+ + dnConf.blockReportInterval + "msec" + " Initial delay: "
+ + dnConf.initialBlockReportDelay + "msec" + "; heartBeatInterval="
+ + dnConf.heartBeatInterval);
+
+ //
+ // Now loop for a long time....
+ //
+ while (shouldRun()) {
+ try {
+ long startTime = now();
+
+ //
+ // Every so often, send heartbeat or block-report
+ //
+ if (startTime - lastHeartbeat > dnConf.heartBeatInterval) {
+ //
+ // All heartbeat messages include following info:
+ // -- Datanode name
+ // -- data transfer port
+ // -- Total capacity
+ // -- Bytes remaining
+ //
+ lastHeartbeat = startTime;
+ if (!dn.areHeartbeatsDisabledForTests()) {
+ DatanodeCommand[] cmds = sendHeartBeat();
+ dn.metrics.addHeartbeat(now() - startTime);
+
+ long startProcessCommands = now();
+ if (!processCommand(cmds))
+ continue;
+ long endProcessCommands = now();
+ if (endProcessCommands - startProcessCommands > 2000) {
+ LOG.info("Took " + (endProcessCommands - startProcessCommands) +
+ "ms to process " + cmds.length + " commands from NN");
+ }
+ }
+ }
+
+ reportReceivedBlocks();
+
+ DatanodeCommand cmd = blockReport();
+ processCommand(cmd);
+
+ // Now safe to start scanning the block pool
+ if (dn.blockScanner != null) {
+ dn.blockScanner.addBlockPool(this.getBlockPoolId());
+ }
+
+ //
+ // There is no work to do; sleep until hearbeat timer elapses,
+ // or work arrives, and then iterate again.
+ //
+ long waitTime = dnConf.heartBeatInterval -
+ (System.currentTimeMillis() - lastHeartbeat);
+ synchronized(receivedBlockList) {
+ if (waitTime > 0 && receivedBlockList.size() == 0) {
+ try {
+ receivedBlockList.wait(waitTime);
+ } catch (InterruptedException ie) {
+ LOG.warn("BPOfferService for " + this + " interrupted");
+ }
+ }
+ } // synchronized
+ } catch(RemoteException re) {
+ String reClass = re.getClassName();
+ if (UnregisteredNodeException.class.getName().equals(reClass) ||
+ DisallowedDatanodeException.class.getName().equals(reClass) ||
+ IncorrectVersionException.class.getName().equals(reClass)) {
+ LOG.warn(this + " is shutting down", re);
+ shouldServiceRun = false;
+ return;
+ }
+ LOG.warn("RemoteException in offerService", re);
+ try {
+ long sleepTime = Math.min(1000, dnConf.heartBeatInterval);
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ } catch (IOException e) {
+ LOG.warn("IOException in offerService", e);
+ }
+ } // while (shouldRun())
+ } // offerService
+
+ /**
+ * Register one bp with the corresponding NameNode
+ * <p>
+ * The bpDatanode needs to register with the namenode on startup in order
+ * 1) to report which storage it is serving now and
+ * 2) to receive a registrationID
+ *
+ * issued by the namenode to recognize registered datanodes.
+ *
+ * @see FSNamesystem#registerDatanode(DatanodeRegistration)
+ * @throws IOException
+ */
+ void register() throws IOException {
+ Preconditions.checkState(bpNSInfo != null,
+ "register() should be called after handshake()");
+
+ // The handshake() phase loaded the block pool storage
+ // off disk - so update the bpRegistration object from that info
+ bpRegistration = dn.createBPRegistration(bpNSInfo);
+
+ LOG.info(this + " beginning handshake with NN");
+
+ while (shouldRun()) {
+ try {
+ // Use returned registration from namenode with updated machine name.
+ bpRegistration = bpNamenode.registerDatanode(bpRegistration);
+ break;
+ } catch(SocketTimeoutException e) { // namenode is busy
+ LOG.info("Problem connecting to server: " + nnAddr);
+ sleepAndLogInterrupts(1000, "connecting to server");
+ }
+ }
+
+ LOG.info("Block pool " + this + " successfully registered with NN");
+ dn.bpRegistrationSucceeded(bpRegistration, getBlockPoolId());
+
+ // random short delay - helps scatter the BR from all DNs
+ scheduleBlockReport(dnConf.initialBlockReportDelay);
+ }
+
+
+ private void sleepAndLogInterrupts(int millis,
+ String stateString) {
+ try {
+ Thread.sleep(millis);
+ } catch (InterruptedException ie) {
+ LOG.info("BPOfferService " + this +
+ " interrupted while " + stateString);
+ }
+ }
+
+ /**
+ * No matter what kind of exception we get, keep retrying to offerService().
+ * That's the loop that connects to the NameNode and provides basic DataNode
+ * functionality.
+ *
+ * Only stop when "shouldRun" or "shouldServiceRun" is turned off, which can
+ * happen either at shutdown or due to refreshNamenodes.
+ */
+ @Override
+ public void run() {
+ LOG.info(this + " starting to offer service");
+
+ try {
+ // init stuff
+ try {
+ // setup storage
+ connectToNNAndHandshake();
+ } catch (IOException ioe) {
+ // Initial handshake, storage recovery or registration failed
+ // End BPOfferService thread
+ LOG.fatal("Initialization failed for block pool " + this, ioe);
+ return;
+ }
+
+ initialized = true; // bp is initialized;
+
+ while (shouldRun()) {
+ try {
+ startDistributedUpgradeIfNeeded();
+ offerService();
+ } catch (Exception ex) {
+ LOG.error("Exception in BPOfferService for " + this, ex);
+ sleepAndLogInterrupts(5000, "offering service");
+ }
+ }
+ } catch (Throwable ex) {
+ LOG.warn("Unexpected exception in block pool " + this, ex);
+ } finally {
+ LOG.warn("Ending block pool service for: " + this);
+ cleanUp();
+ }
+ }
+
+ private boolean shouldRun() {
+ return shouldServiceRun && dn.shouldRun();
+ }
+
+ /**
+ * Process an array of datanode commands
+ *
+ * @param cmds an array of datanode commands
+ * @return true if further processing may be required or false otherwise.
+ */
+ private boolean processCommand(DatanodeCommand[] cmds) {
+ if (cmds != null) {
+ for (DatanodeCommand cmd : cmds) {
+ try {
+ if (processCommand(cmd) == false) {
+ return false;
+ }
+ } catch (IOException ioe) {
+ LOG.warn("Error processing datanode Command", ioe);
+ }
+ }
+ }
+ return true;
+ }
+
+ /**
+ *
+ * @param cmd
+ * @return true if further processing may be required or false otherwise.
+ * @throws IOException
+ */
+ private boolean processCommand(DatanodeCommand cmd) throws IOException {
+ if (cmd == null)
+ return true;
+ final BlockCommand bcmd =
+ cmd instanceof BlockCommand? (BlockCommand)cmd: null;
+
+ switch(cmd.getAction()) {
+ case DatanodeProtocol.DNA_TRANSFER:
+ // Send a copy of a block to another datanode
+ dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(), bcmd.getTargets());
+ dn.metrics.incrBlocksReplicated(bcmd.getBlocks().length);
+ break;
+ case DatanodeProtocol.DNA_INVALIDATE:
+ //
+ // Some local block(s) are obsolete and can be
+ // safely garbage-collected.
+ //
+ Block toDelete[] = bcmd.getBlocks();
+ try {
+ if (dn.blockScanner != null) {
+ dn.blockScanner.deleteBlocks(bcmd.getBlockPoolId(), toDelete);
+ }
+ // using global fsdataset
+ dn.data.invalidate(bcmd.getBlockPoolId(), toDelete);
+ } catch(IOException e) {
+ dn.checkDiskError();
+ throw e;
+ }
+ dn.metrics.incrBlocksRemoved(toDelete.length);
+ break;
+ case DatanodeProtocol.DNA_SHUTDOWN:
+ // shut down the data node
+ shouldServiceRun = false;
+ return false;
+ case DatanodeProtocol.DNA_REGISTER:
+ // namenode requested a registration - at start or if NN lost contact
+ LOG.info("DatanodeCommand action: DNA_REGISTER");
+ if (shouldRun()) {
+ // re-retrieve namespace info to make sure that, if the NN
+ // was restarted, we still match its version (HDFS-2120)
+ retrieveNamespaceInfo();
+ // and re-register
+ register();
+ }
+ break;
+ case DatanodeProtocol.DNA_FINALIZE:
+ String bp = ((DatanodeCommand.Finalize) cmd).getBlockPoolId();
+ assert getBlockPoolId().equals(bp) :
+ "BP " + getBlockPoolId() + " received DNA_FINALIZE " +
+ "for other block pool " + bp;
+
+ dn.finalizeUpgradeForPool(bp);
+ break;
+ case UpgradeCommand.UC_ACTION_START_UPGRADE:
+ // start distributed upgrade here
+ processDistributedUpgradeCommand((UpgradeCommand)cmd);
+ break;
+ case DatanodeProtocol.DNA_RECOVERBLOCK:
+ dn.recoverBlocks(((BlockRecoveryCommand)cmd).getRecoveringBlocks());
+ break;
+ case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
+ LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");
+ if (dn.isBlockTokenEnabled) {
+ dn.blockPoolTokenSecretManager.setKeys(getBlockPoolId(),
+ ((KeyUpdateCommand) cmd).getExportedKeys());
+ }
+ break;
+ case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE:
+ LOG.info("DatanodeCommand action: DNA_BALANCERBANDWIDTHUPDATE");
+ long bandwidth =
+ ((BalancerBandwidthCommand) cmd).getBalancerBandwidthValue();
+ if (bandwidth > 0) {
+ DataXceiverServer dxcs =
+ (DataXceiverServer) dn.dataXceiverServer.getRunnable();
+ dxcs.balanceThrottler.setBandwidth(bandwidth);
+ }
+ break;
+ default:
+ LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
+ }
+ return true;
+ }
+
+ private void processDistributedUpgradeCommand(UpgradeCommand comm)
+ throws IOException {
+ UpgradeManagerDatanode upgradeManager = getUpgradeManager();
+ upgradeManager.processUpgradeCommand(comm);
+ }
+
+ synchronized UpgradeManagerDatanode getUpgradeManager() {
+ if(upgradeManager == null)
+ upgradeManager =
+ new UpgradeManagerDatanode(dn, getBlockPoolId());
+
+ return upgradeManager;
+ }
+
+ /**
+ * Start distributed upgrade if it should be initiated by the data-node.
+ */
+ private void startDistributedUpgradeIfNeeded() throws IOException {
+ UpgradeManagerDatanode um = getUpgradeManager();
+
+ if(!um.getUpgradeState())
+ return;
+ um.setUpgradeState(false, um.getUpgradeVersion());
+ um.startUpgrade();
+ return;
+ }
+
+}
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java?rev=1204660&r1=1204659&r2=1204660&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java Mon Nov 21 19:27:12 2011
@@ -31,7 +31,6 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.server.datanode.DataNode.BPOfferService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1204660&r1=1204659&r2=1204660&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Mon Nov 21 19:27:12 2011
@@ -50,7 +50,6 @@ import static org.apache.hadoop.hdfs.DFS
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_FEDERATION_NAMESERVICES;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEBHDFS_ENABLED_DEFAULT;
-import static org.apache.hadoop.hdfs.server.common.Util.now;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
@@ -61,7 +60,6 @@ import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
-import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.channels.ServerSocketChannel;
@@ -74,7 +72,6 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -94,7 +91,6 @@ import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -103,7 +99,6 @@ import org.apache.hadoop.hdfs.protocol.H
import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
-import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@@ -117,31 +112,21 @@ import org.apache.hadoop.hdfs.security.t
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
-import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.common.JspHelper;
-import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.datanode.FSDataset.VolumeInfo;
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.FileChecksumServlets;
import org.apache.hadoop.hdfs.server.namenode.StreamFile;
-import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
-import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
-import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
-import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
-import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
-import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.hdfs.web.resources.Param;
import org.apache.hadoop.http.HttpServer;
@@ -170,7 +155,6 @@ import org.apache.hadoop.util.StringUtil
import org.apache.hadoop.util.VersionInfo;
import org.mortbay.util.ajax.JSON;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@@ -393,8 +377,6 @@ public class DataNode extends Configured
private volatile String hostName; // Host name of this datanode
- private static String dnThreadName;
-
boolean isBlockTokenEnabled;
BlockPoolTokenSecretManager blockPoolTokenSecretManager;
@@ -666,701 +648,8 @@ public class DataNode extends Configured
this.heartbeatsDisabledForTests = heartbeatsDisabledForTests;
}
- /**
- * A thread per namenode to perform:
- * <ul>
- * <li> Pre-registration handshake with namenode</li>
- * <li> Registration with namenode</li>
- * <li> Send periodic heartbeats to the namenode</li>
- * <li> Handle commands received from the datanode</li>
- * </ul>
- */
- @InterfaceAudience.Private
- static class BPOfferService implements Runnable {
- final InetSocketAddress nnAddr;
-
- /**
- * Information about the namespace that this service
- * is registering with. This is assigned after
- * the first phase of the handshake.
- */
- NamespaceInfo bpNSInfo;
-
- /**
- * The registration information for this block pool.
- * This is assigned after the second phase of the
- * handshake.
- */
- DatanodeRegistration bpRegistration;
-
- long lastBlockReport = 0;
-
- boolean resetBlockReportTime = true;
-
- private Thread bpThread;
- private DatanodeProtocol bpNamenode;
- private long lastHeartbeat = 0;
- private volatile boolean initialized = false;
- private final LinkedList<Block> receivedBlockList = new LinkedList<Block>();
- private final LinkedList<String> delHints = new LinkedList<String>();
- private volatile boolean shouldServiceRun = true;
- UpgradeManagerDatanode upgradeManager = null;
- private final DataNode dn;
- private final DNConf dnConf;
-
- BPOfferService(InetSocketAddress nnAddr, DataNode dn) {
- this.dn = dn;
- this.nnAddr = nnAddr;
- this.dnConf = dn.getDnConf();
- }
-
- /**
- * returns true if BP thread has completed initialization of storage
- * and has registered with the corresponding namenode
- * @return true if initialized
- */
- public boolean isInitialized() {
- return initialized;
- }
-
- public boolean isAlive() {
- return shouldServiceRun && bpThread.isAlive();
- }
-
- public String getBlockPoolId() {
- if (bpNSInfo != null) {
- return bpNSInfo.getBlockPoolID();
- } else {
- LOG.warn("Block pool ID needed, but service not yet registered with NN",
- new Exception("trace"));
- return null;
- }
- }
-
- public NamespaceInfo getNamespaceInfo() {
- return bpNSInfo;
- }
-
- public String toString() {
- if (bpNSInfo == null) {
- // If we haven't yet connected to our NN, we don't yet know our
- // own block pool ID.
- // If _none_ of the block pools have connected yet, we don't even
- // know the storage ID of this DN.
- String storageId = dn.getStorageId();
- if (storageId == null || "".equals(storageId)) {
- storageId = "unknown";
- }
- return "Block pool <registering> (storage id " + storageId +
- ") connecting to " + nnAddr;
- } else {
- return "Block pool " + getBlockPoolId() +
- " (storage id " + dn.getStorageId() +
- ") registered with " + nnAddr;
- }
- }
-
- private InetSocketAddress getNNSocketAddress() {
- return nnAddr;
- }
-
- /**
- * Used to inject a spy NN in the unit tests.
- */
- @VisibleForTesting
- void setNameNode(DatanodeProtocol dnProtocol) {
- bpNamenode = dnProtocol;
- }
-
- /**
- * Perform the first part of the handshake with the NameNode.
- * This calls <code>versionRequest</code> to determine the NN's
- * namespace and version info. It automatically retries until
- * the NN responds or the DN is shutting down.
- *
- * @return the NamespaceInfo
- * @throws IncorrectVersionException if the remote NN does not match
- * this DN's version
- */
- NamespaceInfo retrieveNamespaceInfo() throws IncorrectVersionException {
- NamespaceInfo nsInfo = null;
- while (shouldRun()) {
- try {
- nsInfo = bpNamenode.versionRequest();
- LOG.debug(this + " received versionRequest response: " + nsInfo);
- break;
- } catch(SocketTimeoutException e) { // namenode is busy
- LOG.warn("Problem connecting to server: " + nnAddr);
- } catch(IOException e ) { // namenode is not available
- LOG.warn("Problem connecting to server: " + nnAddr);
- }
-
- // try again in a second
- sleepAndLogInterrupts(5000, "requesting version info from NN");
- }
-
- if (nsInfo != null) {
- checkNNVersion(nsInfo);
- }
- return nsInfo;
- }
-
- private void checkNNVersion(NamespaceInfo nsInfo)
- throws IncorrectVersionException {
- // build and layout versions should match
- String nsBuildVer = nsInfo.getBuildVersion();
- String stBuildVer = Storage.getBuildVersion();
- if (!nsBuildVer.equals(stBuildVer)) {
- LOG.warn("Data-node and name-node Build versions must be the same. " +
- "Namenode build version: " + nsBuildVer + "Datanode " +
- "build version: " + stBuildVer);
- throw new IncorrectVersionException(nsBuildVer, "namenode", stBuildVer);
- }
-
- if (HdfsConstants.LAYOUT_VERSION != nsInfo.getLayoutVersion()) {
- LOG.warn("Data-node and name-node layout versions must be the same." +
- " Expected: "+ HdfsConstants.LAYOUT_VERSION +
- " actual "+ bpNSInfo.getLayoutVersion());
- throw new IncorrectVersionException(
- bpNSInfo.getLayoutVersion(), "namenode");
- }
- }
-
- private void connectToNNAndHandshake() throws IOException {
- // get NN proxy
- bpNamenode =
- (DatanodeProtocol)RPC.waitForProxy(DatanodeProtocol.class,
- DatanodeProtocol.versionID, nnAddr, dn.getConf());
-
- // First phase of the handshake with NN - get the namespace
- // info.
- bpNSInfo = retrieveNamespaceInfo();
-
- // Now that we know the namespace ID, etc, we can pass this to the DN.
- // The DN can now initialize its local storage if we are the
- // first BP to handshake, etc.
- dn.initBlockPool(this);
-
- // Second phase of the handshake with the NN.
- register();
- }
-
- /**
- * This methods arranges for the data node to send the block report at
- * the next heartbeat.
- */
- void scheduleBlockReport(long delay) {
- if (delay > 0) { // send BR after random delay
- lastBlockReport = System.currentTimeMillis()
- - ( dnConf.blockReportInterval - DFSUtil.getRandom().nextInt((int)(delay)));
- } else { // send at next heartbeat
- lastBlockReport = lastHeartbeat - dnConf.blockReportInterval;
- }
- resetBlockReportTime = true; // reset future BRs for randomness
- }
-
- private void reportBadBlocks(ExtendedBlock block) {
- DatanodeInfo[] dnArr = { new DatanodeInfo(bpRegistration) };
- LocatedBlock[] blocks = { new LocatedBlock(block, dnArr) };
-
- try {
- bpNamenode.reportBadBlocks(blocks);
- } catch (IOException e){
- /* One common reason is that NameNode could be in safe mode.
- * Should we keep on retrying in that case?
- */
- LOG.warn("Failed to report bad block " + block + " to namenode : "
- + " Exception", e);
- }
-
- }
-
- /**
- * Report received blocks and delete hints to the Namenode
- * @throws IOException
- */
- private void reportReceivedBlocks() throws IOException {
- //check if there are newly received blocks
- Block [] blockArray=null;
- String [] delHintArray=null;
- synchronized(receivedBlockList) {
- synchronized(delHints){
- int numBlocks = receivedBlockList.size();
- if (numBlocks > 0) {
- if(numBlocks!=delHints.size()) {
- LOG.warn("Panic: receiveBlockList and delHints are not of " +
- "the same length" );
- }
- //
- // Send newly-received blockids to namenode
- //
- blockArray = receivedBlockList.toArray(new Block[numBlocks]);
- delHintArray = delHints.toArray(new String[numBlocks]);
- }
- }
- }
- if (blockArray != null) {
- if(delHintArray == null || delHintArray.length != blockArray.length ) {
- LOG.warn("Panic: block array & delHintArray are not the same" );
- }
- bpNamenode.blockReceived(bpRegistration, getBlockPoolId(), blockArray,
- delHintArray);
- synchronized(receivedBlockList) {
- synchronized(delHints){
- for(int i=0; i<blockArray.length; i++) {
- receivedBlockList.remove(blockArray[i]);
- delHints.remove(delHintArray[i]);
- }
- }
- }
- }
- }
-
- /*
- * Informing the name node could take a long long time! Should we wait
- * till namenode is informed before responding with success to the
- * client? For now we don't.
- */
- void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) {
- if(block==null || delHint==null) {
- throw new IllegalArgumentException(
- block==null?"Block is null":"delHint is null");
- }
-
- if (!block.getBlockPoolId().equals(getBlockPoolId())) {
- LOG.warn("BlockPool mismatch " + block.getBlockPoolId() +
- " vs. " + getBlockPoolId());
- return;
- }
-
- synchronized (receivedBlockList) {
- synchronized (delHints) {
- receivedBlockList.add(block.getLocalBlock());
- delHints.add(delHint);
- receivedBlockList.notifyAll();
- }
- }
- }
-
-
- /**
- * Report the list blocks to the Namenode
- * @throws IOException
- */
- DatanodeCommand blockReport() throws IOException {
- // send block report if timer has expired.
- DatanodeCommand cmd = null;
- long startTime = now();
- if (startTime - lastBlockReport > dnConf.blockReportInterval) {
-
- // Create block report
- long brCreateStartTime = now();
- BlockListAsLongs bReport = dn.data.getBlockReport(getBlockPoolId());
-
- // Send block report
- long brSendStartTime = now();
- cmd = bpNamenode.blockReport(bpRegistration, getBlockPoolId(), bReport
- .getBlockListAsLongs());
-
- // Log the block report processing stats from Datanode perspective
- long brSendCost = now() - brSendStartTime;
- long brCreateCost = brSendStartTime - brCreateStartTime;
- dn.metrics.addBlockReport(brSendCost);
- LOG.info("BlockReport of " + bReport.getNumberOfBlocks()
- + " blocks took " + brCreateCost + " msec to generate and "
- + brSendCost + " msecs for RPC and NN processing");
-
- // If we have sent the first block report, then wait a random
- // time before we start the periodic block reports.
- if (resetBlockReportTime) {
- lastBlockReport = startTime - DFSUtil.getRandom().nextInt((int)(dnConf.blockReportInterval));
- resetBlockReportTime = false;
- } else {
- /* say the last block report was at 8:20:14. The current report
- * should have started around 9:20:14 (default 1 hour interval).
- * If current time is :
- * 1) normal like 9:20:18, next report should be at 10:20:14
- * 2) unexpected like 11:35:43, next report should be at 12:20:14
- */
- lastBlockReport += (now() - lastBlockReport) /
- dnConf.blockReportInterval * dnConf.blockReportInterval;
- }
- LOG.info("sent block report, processed command:" + cmd);
- }
- return cmd;
- }
-
-
- DatanodeCommand [] sendHeartBeat() throws IOException {
- return bpNamenode.sendHeartbeat(bpRegistration,
- dn.data.getCapacity(),
- dn.data.getDfsUsed(),
- dn.data.getRemaining(),
- dn.data.getBlockPoolUsed(getBlockPoolId()),
- dn.xmitsInProgress.get(),
- dn.getXceiverCount(), dn.data.getNumFailedVolumes());
- }
-
- //This must be called only by blockPoolManager
- void start() {
- if ((bpThread != null) && (bpThread.isAlive())) {
- //Thread is started already
- return;
- }
- bpThread = new Thread(this, dnThreadName);
- bpThread.setDaemon(true); // needed for JUnit testing
- bpThread.start();
- }
-
- //This must be called only by blockPoolManager.
- void stop() {
- shouldServiceRun = false;
- if (bpThread != null) {
- bpThread.interrupt();
- }
- }
-
- //This must be called only by blockPoolManager
- void join() {
- try {
- if (bpThread != null) {
- bpThread.join();
- }
- } catch (InterruptedException ie) { }
- }
-
- //Cleanup method to be called by current thread before exiting.
- private synchronized void cleanUp() {
-
- if(upgradeManager != null)
- upgradeManager.shutdownUpgrade();
- shouldServiceRun = false;
- RPC.stopProxy(bpNamenode);
- dn.shutdownBlockPool(this);
- }
-
- /**
- * Main loop for each BP thread. Run until shutdown,
- * forever calling remote NameNode functions.
- */
- private void offerService() throws Exception {
- LOG.info("For namenode " + nnAddr + " using BLOCKREPORT_INTERVAL of "
- + dnConf.blockReportInterval + "msec" + " Initial delay: "
- + dnConf.initialBlockReportDelay + "msec" + "; heartBeatInterval="
- + dnConf.heartBeatInterval);
-
- //
- // Now loop for a long time....
- //
- while (shouldRun()) {
- try {
- long startTime = now();
-
- //
- // Every so often, send heartbeat or block-report
- //
- if (startTime - lastHeartbeat > dnConf.heartBeatInterval) {
- //
- // All heartbeat messages include following info:
- // -- Datanode name
- // -- data transfer port
- // -- Total capacity
- // -- Bytes remaining
- //
- lastHeartbeat = startTime;
- if (!dn.heartbeatsDisabledForTests) {
- DatanodeCommand[] cmds = sendHeartBeat();
- dn.metrics.addHeartbeat(now() - startTime);
-
- long startProcessCommands = now();
- if (!processCommand(cmds))
- continue;
- long endProcessCommands = now();
- if (endProcessCommands - startProcessCommands > 2000) {
- LOG.info("Took " + (endProcessCommands - startProcessCommands) +
- "ms to process " + cmds.length + " commands from NN");
- }
- }
- }
-
- reportReceivedBlocks();
-
- DatanodeCommand cmd = blockReport();
- processCommand(cmd);
-
- // Now safe to start scanning the block pool
- if (dn.blockScanner != null) {
- dn.blockScanner.addBlockPool(this.getBlockPoolId());
- }
-
- //
- // There is no work to do; sleep until hearbeat timer elapses,
- // or work arrives, and then iterate again.
- //
- long waitTime = dnConf.heartBeatInterval -
- (System.currentTimeMillis() - lastHeartbeat);
- synchronized(receivedBlockList) {
- if (waitTime > 0 && receivedBlockList.size() == 0) {
- try {
- receivedBlockList.wait(waitTime);
- } catch (InterruptedException ie) {
- LOG.warn("BPOfferService for " + this + " interrupted");
- }
- }
- } // synchronized
- } catch(RemoteException re) {
- String reClass = re.getClassName();
- if (UnregisteredNodeException.class.getName().equals(reClass) ||
- DisallowedDatanodeException.class.getName().equals(reClass) ||
- IncorrectVersionException.class.getName().equals(reClass)) {
- LOG.warn(this + " is shutting down", re);
- shouldServiceRun = false;
- return;
- }
- LOG.warn("RemoteException in offerService", re);
- try {
- long sleepTime = Math.min(1000, dnConf.heartBeatInterval);
- Thread.sleep(sleepTime);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- }
- } catch (IOException e) {
- LOG.warn("IOException in offerService", e);
- }
- } // while (shouldRun())
- } // offerService
-
- /**
- * Register one bp with the corresponding NameNode
- * <p>
- * The bpDatanode needs to register with the namenode on startup in order
- * 1) to report which storage it is serving now and
- * 2) to receive a registrationID
- *
- * issued by the namenode to recognize registered datanodes.
- *
- * @see FSNamesystem#registerDatanode(DatanodeRegistration)
- * @throws IOException
- */
- void register() throws IOException {
- Preconditions.checkState(bpNSInfo != null,
- "register() should be called after handshake()");
-
- // The handshake() phase loaded the block pool storage
- // off disk - so update the bpRegistration object from that info
- bpRegistration = dn.createBPRegistration(bpNSInfo);
-
- LOG.info(this + " beginning handshake with NN");
-
- while (shouldRun()) {
- try {
- // Use returned registration from namenode with updated machine name.
- bpRegistration = bpNamenode.registerDatanode(bpRegistration);
- break;
- } catch(SocketTimeoutException e) { // namenode is busy
- LOG.info("Problem connecting to server: " + nnAddr);
- sleepAndLogInterrupts(1000, "connecting to server");
- }
- }
-
- LOG.info("Block pool " + this + " successfully registered with NN");
- dn.bpRegistrationSucceeded(bpRegistration, getBlockPoolId());
-
- // random short delay - helps scatter the BR from all DNs
- scheduleBlockReport(dnConf.initialBlockReportDelay);
- }
-
-
- private void sleepAndLogInterrupts(int millis,
- String stateString) {
- try {
- Thread.sleep(millis);
- } catch (InterruptedException ie) {
- LOG.info("BPOfferService " + this +
- " interrupted while " + stateString);
- }
- }
-
- /**
- * No matter what kind of exception we get, keep retrying to offerService().
- * That's the loop that connects to the NameNode and provides basic DataNode
- * functionality.
- *
- * Only stop when "shouldRun" or "shouldServiceRun" is turned off, which can
- * happen either at shutdown or due to refreshNamenodes.
- */
- @Override
- public void run() {
- LOG.info(this + " starting to offer service");
-
- try {
- // init stuff
- try {
- // setup storage
- connectToNNAndHandshake();
- } catch (IOException ioe) {
- // Initial handshake, storage recovery or registration failed
- // End BPOfferService thread
- LOG.fatal("Initialization failed for block pool " + this, ioe);
- return;
- }
-
- initialized = true; // bp is initialized;
-
- while (shouldRun()) {
- try {
- startDistributedUpgradeIfNeeded();
- offerService();
- } catch (Exception ex) {
- LOG.error("Exception in BPOfferService for " + this, ex);
- sleepAndLogInterrupts(5000, "offering service");
- }
- }
- } catch (Throwable ex) {
- LOG.warn("Unexpected exception in block pool " + this, ex);
- } finally {
- LOG.warn("Ending block pool service for: " + this);
- cleanUp();
- }
- }
-
- private boolean shouldRun() {
- return shouldServiceRun && dn.shouldRun();
- }
-
- /**
- * Process an array of datanode commands
- *
- * @param cmds an array of datanode commands
- * @return true if further processing may be required or false otherwise.
- */
- private boolean processCommand(DatanodeCommand[] cmds) {
- if (cmds != null) {
- for (DatanodeCommand cmd : cmds) {
- try {
- if (processCommand(cmd) == false) {
- return false;
- }
- } catch (IOException ioe) {
- LOG.warn("Error processing datanode Command", ioe);
- }
- }
- }
- return true;
- }
-
- /**
- *
- * @param cmd
- * @return true if further processing may be required or false otherwise.
- * @throws IOException
- */
- private boolean processCommand(DatanodeCommand cmd) throws IOException {
- if (cmd == null)
- return true;
- final BlockCommand bcmd =
- cmd instanceof BlockCommand? (BlockCommand)cmd: null;
-
- switch(cmd.getAction()) {
- case DatanodeProtocol.DNA_TRANSFER:
- // Send a copy of a block to another datanode
- dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(), bcmd.getTargets());
- dn.metrics.incrBlocksReplicated(bcmd.getBlocks().length);
- break;
- case DatanodeProtocol.DNA_INVALIDATE:
- //
- // Some local block(s) are obsolete and can be
- // safely garbage-collected.
- //
- Block toDelete[] = bcmd.getBlocks();
- try {
- if (dn.blockScanner != null) {
- dn.blockScanner.deleteBlocks(bcmd.getBlockPoolId(), toDelete);
- }
- // using global fsdataset
- dn.data.invalidate(bcmd.getBlockPoolId(), toDelete);
- } catch(IOException e) {
- dn.checkDiskError();
- throw e;
- }
- dn.metrics.incrBlocksRemoved(toDelete.length);
- break;
- case DatanodeProtocol.DNA_SHUTDOWN:
- // shut down the data node
- shouldServiceRun = false;
- return false;
- case DatanodeProtocol.DNA_REGISTER:
- // namenode requested a registration - at start or if NN lost contact
- LOG.info("DatanodeCommand action: DNA_REGISTER");
- if (shouldRun()) {
- // re-retrieve namespace info to make sure that, if the NN
- // was restarted, we still match its version (HDFS-2120)
- retrieveNamespaceInfo();
- // and re-register
- register();
- }
- break;
- case DatanodeProtocol.DNA_FINALIZE:
- dn.storage.finalizeUpgrade(((DatanodeCommand.Finalize) cmd)
- .getBlockPoolId());
- break;
- case UpgradeCommand.UC_ACTION_START_UPGRADE:
- // start distributed upgrade here
- processDistributedUpgradeCommand((UpgradeCommand)cmd);
- break;
- case DatanodeProtocol.DNA_RECOVERBLOCK:
- dn.recoverBlocks(((BlockRecoveryCommand)cmd).getRecoveringBlocks());
- break;
- case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
- LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");
- if (dn.isBlockTokenEnabled) {
- dn.blockPoolTokenSecretManager.setKeys(getBlockPoolId(),
- ((KeyUpdateCommand) cmd).getExportedKeys());
- }
- break;
- case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE:
- LOG.info("DatanodeCommand action: DNA_BALANCERBANDWIDTHUPDATE");
- long bandwidth =
- ((BalancerBandwidthCommand) cmd).getBalancerBandwidthValue();
- if (bandwidth > 0) {
- DataXceiverServer dxcs =
- (DataXceiverServer) dn.dataXceiverServer.getRunnable();
- dxcs.balanceThrottler.setBandwidth(bandwidth);
- }
- break;
- default:
- LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
- }
- return true;
- }
-
- private void processDistributedUpgradeCommand(UpgradeCommand comm)
- throws IOException {
- UpgradeManagerDatanode upgradeManager = getUpgradeManager();
- upgradeManager.processUpgradeCommand(comm);
- }
-
- synchronized UpgradeManagerDatanode getUpgradeManager() {
- if(upgradeManager == null)
- upgradeManager =
- new UpgradeManagerDatanode(dn, getBlockPoolId());
-
- return upgradeManager;
- }
-
- /**
- * Start distributed upgrade if it should be initiated by the data-node.
- */
- private void startDistributedUpgradeIfNeeded() throws IOException {
- UpgradeManagerDatanode um = getUpgradeManager();
-
- if(!um.getUpgradeState())
- return;
- um.setUpgradeState(false, um.getUpgradeVersion());
- um.startUpgrade();
- return;
- }
-
+ boolean areHeartbeatsDisabledForTests() {
+ return this.heartbeatsDisabledForTests;
}
/**
@@ -1430,7 +719,7 @@ public class DataNode extends Configured
* sets the storage ID based on this registration.
* Also updates the block pool's state in the secret manager.
*/
- private synchronized void bpRegistrationSucceeded(DatanodeRegistration bpRegistration,
+ synchronized void bpRegistrationSucceeded(DatanodeRegistration bpRegistration,
String blockPoolId)
throws IOException {
hostName = bpRegistration.getHost();
@@ -1487,7 +776,7 @@ public class DataNode extends Configured
/**
* Remove the given block pool from the block scanner, dataset, and storage.
*/
- private void shutdownBlockPool(BPOfferService bpos) {
+ void shutdownBlockPool(BPOfferService bpos) {
blockPoolManager.remove(bpos);
String bpId = bpos.getBlockPoolId();
@@ -1962,7 +1251,7 @@ public class DataNode extends Configured
}
}
- private void transferBlocks(String poolId, Block blocks[],
+ void transferBlocks(String poolId, Block blocks[],
DatanodeInfo xferTargets[][]) {
for (int i = 0; i < blocks.length; i++) {
try {
@@ -2254,9 +1543,7 @@ public class DataNode extends Configured
System.exit(-1);
}
Collection<URI> dataDirs = getStorageDirs(conf);
- dnThreadName = "DataNode: [" +
- StringUtils.uriToString(dataDirs.toArray(new URI[0])) + "]";
- UserGroupInformation.setConfiguration(conf);
+ UserGroupInformation.setConfiguration(conf);
SecurityUtil.login(conf, DFS_DATANODE_KEYTAB_FILE_KEY,
DFS_DATANODE_USER_NAME_KEY);
return makeInstance(dataDirs, conf, resources);
@@ -2788,6 +2075,14 @@ public class DataNode extends Configured
}
}
+ /**
+ * Finalize a pending upgrade in response to DNA_FINALIZE.
+ * @param blockPoolId the block pool to finalize
+ */
+ void finalizeUpgradeForPool(String blockPoolId) throws IOException {
+ storage.finalizeUpgrade(blockPoolId);
+ }
+
// Determine a Datanode's streaming address
public static InetSocketAddress getStreamingAddr(Configuration conf) {
return NetUtils.createSocketAddr(
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestDistributedUpgrade.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestDistributedUpgrade.java?rev=1204660&r1=1204659&r2=1204660&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestDistributedUpgrade.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestDistributedUpgrade.java Mon Nov 21 19:27:12 2011
@@ -21,8 +21,6 @@ import static org.apache.hadoop.hdfs.pro
import java.io.IOException;
-import junit.framework.TestCase;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -37,9 +35,12 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
/**
*/
-public class TestDistributedUpgrade extends TestCase {
+public class TestDistributedUpgrade {
private static final Log LOG = LogFactory.getLog(TestDistributedUpgrade.class);
private Configuration conf;
private int testCounter = 0;
@@ -93,6 +94,7 @@ public class TestDistributedUpgrade exte
/**
*/
+ @Test(timeout=120000)
public void testDistributedUpgrade() throws Exception {
int numDirs = 1;
TestDFSUpgradeFromImage testImg = new TestDFSUpgradeFromImage();
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeExit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeExit.java?rev=1204660&r1=1204659&r2=1204660&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeExit.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeExit.java Mon Nov 21 19:27:12 2011
@@ -28,7 +28,6 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.server.datanode.DataNode.BPOfferService;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java?rev=1204660&r1=1204659&r2=1204660&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java Mon Nov 21 19:27:12 2011
@@ -31,7 +31,6 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
-import org.apache.hadoop.hdfs.server.datanode.DataNode.BPOfferService;
import org.apache.hadoop.hdfs.server.datanode.FSDataset.VolumeInfo;
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java?rev=1204660&r1=1204659&r2=1204660&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java Mon Nov 21 19:27:12 2011
@@ -29,7 +29,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
-import org.apache.hadoop.hdfs.server.datanode.DataNode.BPOfferService;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
@@ -49,7 +48,7 @@ public class TestDatanodeRegister {
DataNode mockDN = mock(DataNode.class);
Mockito.doReturn(true).when(mockDN).shouldRun();
- BPOfferService bpos = new DataNode.BPOfferService(INVALID_ADDR, mockDN);
+ BPOfferService bpos = new BPOfferService(INVALID_ADDR, mockDN);
NamespaceInfo fakeNSInfo = mock(NamespaceInfo.class);
when(fakeNSInfo.getBuildVersion()).thenReturn("NSBuildVersion");
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestRefreshNamenodes.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestRefreshNamenodes.java?rev=1204660&r1=1204659&r2=1204660&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestRefreshNamenodes.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestRefreshNamenodes.java Mon Nov 21 19:27:12 2011
@@ -26,7 +26,6 @@ import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.server.datanode.DataNode.BPOfferService;
import org.junit.Test;
/**