You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/03/27 22:17:26 UTC
svn commit: r523062 - in /lucene/hadoop/trunk: ./
src/java/org/apache/hadoop/dfs/ src/test/org/apache/hadoop/dfs/
Author: cutting
Date: Tue Mar 27 13:17:25 2007
New Revision: 523062
URL: http://svn.apache.org/viewvc?view=rev&rev=523062
Log:
HADOOP-1153. Fix HDFS daemons to correctly stop their threads. Contributed by Konstantin.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=523062&r1=523061&r2=523062
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Mar 27 13:17:25 2007
@@ -45,6 +45,9 @@
by probing for free ports, improving test reliability.
(Arun C Murthy via cutting)
+14. HADOOP-1153. Fix HDFS daemons to correctly stop their threads.
+ (Konstantin Shvachko via cutting)
+
Release 0.12.2 - 2007-23-17
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?view=diff&rev=523062&r1=523061&r2=523062
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Tue Mar 27 13:17:25 2007
@@ -110,7 +110,7 @@
FSDataset data;
DatanodeRegistration dnRegistration;
private String networkLoc;
- boolean shouldRun = true;
+ volatile boolean shouldRun = true;
Vector receivedBlockList = new Vector();
int xmitsInProgress = 0;
Daemon dataXceiveServer = null;
@@ -318,7 +318,7 @@
* @throws IOException
*/
private void register() throws IOException {
- while( true ) {
+ while( shouldRun ) {
try {
// reset name to machineName. Mainly for web interface.
dnRegistration.name = machineName + ":" + dnRegistration.getPort();
@@ -342,15 +342,22 @@
* Returns only after shutdown is complete.
*/
public void shutdown() {
- try {
- infoServer.stop();
- } catch (Exception e) {
+ if (infoServer != null) {
+ try {
+ infoServer.stop();
+ } catch (Exception e) {
+ }
}
this.shouldRun = false;
- ((DataXceiveServer) this.dataXceiveServer.getRunnable()).kill();
- try {
- this.storage.closeAll();
- } catch (IOException ie) {
+ if (dataXceiveServer != null) {
+ ((DataXceiveServer) this.dataXceiveServer.getRunnable()).kill();
+ this.dataXceiveServer.interrupt();
+ }
+ if (storage != null) {
+ try {
+ this.storage.closeAll();
+ } catch (IOException ie) {
+ }
}
}
@@ -539,7 +546,6 @@
* Hadoop IPC mechanism.
*/
class DataXceiveServer implements Runnable {
- boolean shouldListen = true;
ServerSocket ss;
public DataXceiveServer(ServerSocket ss) {
this.ss = ss;
@@ -549,7 +555,7 @@
*/
public void run() {
try {
- while (shouldListen) {
+ while (shouldRun) {
Socket s = ss.accept();
//s.setSoTimeout(READ_TIMEOUT);
data.checkDataDir();
@@ -566,7 +572,8 @@
}
}
public void kill() {
- this.shouldListen = false;
+ assert shouldRun == false :
+ "shoudRun should be set to false before killing";
try {
this.ss.close();
} catch (IOException iex) {
@@ -1071,16 +1078,16 @@
LOG.info("Finishing DataNode in: "+data);
}
- private static ArrayList dataNodeList = new ArrayList();
- private static ArrayList dataNodeThreadList = new ArrayList();
+ private static ArrayList<DataNode> dataNodeList = new ArrayList<DataNode>();
+ private static ArrayList<Thread> dataNodeThreadList = new ArrayList<Thread>();
/** Start datanode daemon.
*/
public static void run(Configuration conf, String networkLoc) throws IOException {
String[] dataDirs = conf.getStrings("dfs.data.dir");
DataNode dn = makeInstance(networkLoc, dataDirs, conf);
- dataNodeList.add(dn);
if (dn != null) {
+ dataNodeList.add(dn);
Thread t = new Thread(dn, "DataNode: [" +
StringUtils.arrayToString(dataDirs) + "]");
t.setDaemon(true); // needed for JUnit testing
@@ -1090,15 +1097,14 @@
}
/**
- * Shut down all datanodes that where started via the run(conf) method.
+ * Shut down all datanodes that where started via the
+ * run(conf,networkLoc) method.
* Returns only after shutdown is complete.
*/
public static void shutdownAll(){
- if(!dataNodeList.isEmpty()){
- for (Iterator iterator = dataNodeList.iterator(); iterator.hasNext();) {
- DataNode dataNode = (DataNode) iterator.next();
- dataNode.shutdown();
- }
+ while (!dataNodeList.isEmpty()) {
+ dataNodeList.remove(0).shutdown();
+ dataNodeThreadList.remove(0).interrupt();
}
}
@@ -1113,12 +1119,7 @@
Thread t = (Thread) dataNodeThreadList.remove(dataNodeThreadList.size()-1);
try {
t.join();
- } catch (InterruptedException e) {
- if (Thread.currentThread().isInterrupted()) {
- // did someone knock?
- return;
- }
- }
+ } catch (InterruptedException e) {}
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=523062&r1=523061&r2=523062
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Tue Mar 27 13:17:25 2007
@@ -174,7 +174,7 @@
Daemon lmthread = null; // LeaseMonitor thread
Daemon smmthread = null; // SafeModeMonitor thread
Daemon replthread = null; // Replication thread
- boolean fsRunning = true;
+ volatile boolean fsRunning = true;
long systemStart = 0;
// The maximum number of replicates we should allow for a single block
@@ -302,20 +302,22 @@
* them to finish, but a short timeout returns control back to caller.
*/
public void close() {
- synchronized (this) {
fsRunning = false;
- }
try {
- pendingReplications.stop();
- infoServer.stop();
- hbthread.join(3000);
- replthread.join(3000);
- dnthread.join(3000);
+ if (pendingReplications != null) pendingReplications.stop();
+ if (infoServer != null) infoServer.stop();
+ if (hbthread != null) hbthread.interrupt();
+ if (replthread != null) replthread.interrupt();
+ if (dnthread != null) dnthread.interrupt();
+ if (smmthread != null) smmthread.interrupt();
} catch (InterruptedException ie) {
} finally {
// using finally to ensure we also wait for lease daemon
try {
- lmthread.join(3000);
+ if (lmthread != null) {
+ lmthread.interrupt();
+ lmthread.join(3000);
+ }
} catch (InterruptedException ie) {
} finally {
try {
@@ -3710,7 +3712,7 @@
/**
*/
public void run() {
- while( ! safeMode.canLeave() ) {
+ while (fsRunning && !safeMode.canLeave()) {
try {
Thread.sleep(recheckInterval);
} catch (InterruptedException ie) {
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java?view=diff&rev=523062&r1=523061&r2=523062
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java Tue Mar 27 13:17:25 2007
@@ -38,7 +38,7 @@
private Map<Block, PendingBlockInfo> pendingReplications;
private ArrayList<Block> timedOutItems;
Daemon timerThread = null;
- private boolean fsRunning = true;
+ private volatile boolean fsRunning = true;
//
// It might take anywhere between 5 to 10 minutes before
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java?view=diff&rev=523062&r1=523061&r2=523062
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java Tue Mar 27 13:17:25 2007
@@ -250,13 +250,7 @@
writeConfigFile(localFileSys, excludeFile, null);
MiniDFSCluster cluster = new MiniDFSCluster(conf, numDatanodes, true, null);
- // Now wait for 15 seconds to give datanodes chance to register
- // themselves and to report heartbeat
- try {
- Thread.sleep(15000L);
- } catch (InterruptedException e) {
- // nothing
- }
+ cluster.waitActive();
InetSocketAddress addr = new InetSocketAddress("localhost",
cluster.getNameNodePort());
DFSClient client = new DFSClient(addr, conf);