You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/07/18 11:06:38 UTC
svn commit: r423017 - in /lucene/hadoop/trunk/src:
java/org/apache/hadoop/dfs/ java/org/apache/hadoop/ipc/
java/org/apache/hadoop/mapred/ test/org/apache/hadoop/dfs/
test/org/apache/hadoop/fs/ test/org/apache/hadoop/mapred/
Author: cutting
Date: Tue Jul 18 02:06:36 2006
New Revision: 423017
URL: http://svn.apache.org/viewvc?rev=423017&view=rev
Log:
HADOOP-364. Fix some problems introduced by HADOOP-252. In particular, fix things when RPC clients start before daemons, plus other improvements to RPC versioning. Contributed by Owen.
Modified:
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/VersionedProtocol.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestLocalDFS.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRBringup.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=423017&r1=423016&r2=423017&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Tue Jul 18 02:06:36 2006
@@ -118,10 +118,11 @@
// get storage info and lock the data dir
storage = new DataStorage( datadir );
// connect to name node
- this.namenode = (DatanodeProtocol) RPC.getProxy(DatanodeProtocol.class,
- DatanodeProtocol.versionID,
- nameNodeAddr,
- conf);
+ this.namenode = (DatanodeProtocol)
+ RPC.waitForProxy(DatanodeProtocol.class,
+ DatanodeProtocol.versionID,
+ nameNodeAddr,
+ conf);
// find free port
ServerSocket ss = null;
int tmpPort = conf.getInt("dfs.datanode.port", 50010);
@@ -170,20 +171,7 @@
* @throws IOException
*/
private void register() throws IOException {
- while (shouldRun) {
- try {
- dnRegistration = namenode.register( dnRegistration );
- break;
- } catch( ConnectException se ) { // namenode has not been started
- LOG.info("Namenode not available yet, Zzzzz...");
- } catch( SocketTimeoutException te ) { // namenode is busy
- LOG.info("Problem connecting to Namenode: " +
- StringUtils.stringifyException(te));
- }
- try {
- Thread.sleep(10 * 1000);
- } catch (InterruptedException ie) {}
- }
+ dnRegistration = namenode.register( dnRegistration );
if( storage.getStorageID().equals("") ) {
storage.setStorageID( dnRegistration.getStorageID());
storage.write();
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java?rev=423017&r1=423016&r2=423017&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java Tue Jul 18 02:06:36 2006
@@ -22,12 +22,15 @@
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
+import java.net.ConnectException;
import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
import java.io.*;
import org.apache.commons.logging.*;
import org.apache.hadoop.io.*;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.conf.*;
/** A simple RPC mechanism.
@@ -163,22 +166,89 @@
}
}
+ /**
+ * A version mismatch for the RPC protocol.
+ * @author Owen O'Malley
+ */
+ public static class VersionMismatch extends IOException {
+ private String interfaceName;
+ private long clientVersion;
+ private long serverVersion;
+
+ /**
+ * Create a version mismatch exception
+ * @param interfaceName the name of the protocol mismatch
+ * @param clientVersion the client's version of the protocol
+ * @param serverVersion the server's version of the protocol
+ */
+ public VersionMismatch(String interfaceName, long clientVersion,
+ long serverVersion) {
+ super("Protocol " + interfaceName + " version mismatch. (client = " +
+ clientVersion + ", server = " + serverVersion + ")");
+ this.interfaceName = interfaceName;
+ this.clientVersion = clientVersion;
+ this.serverVersion = serverVersion;
+ }
+
+ /**
+ * Get the interface name
+ * @return the java class name
+ * (eg. org.apache.hadoop.mapred.InterTrackerProtocol)
+ */
+ public String getInterfaceName() {
+ return interfaceName;
+ }
+
+ /**
+ * Get the client's prefered version
+ */
+ public long getClientVersion() {
+ return clientVersion;
+ }
+
+ /**
+ * Get the server's agreed to version.
+ */
+ public long getServerVersion() {
+ return serverVersion;
+ }
+ }
+
+ public static VersionedProtocol waitForProxy(Class protocol,
+ long clientVersion,
+ InetSocketAddress addr,
+ Configuration conf
+ ) throws IOException {
+ while (true) {
+ try {
+ return getProxy(protocol, clientVersion, addr, conf);
+ } catch( ConnectException se ) { // namenode has not been started
+ LOG.info("Server at " + addr + " not available yet, Zzzzz...");
+ } catch( SocketTimeoutException te ) { // namenode is busy
+ LOG.info("Problem connecting to server: " + addr);
+ }
+ try {
+ Thread.sleep(10*1000);
+ } catch (InterruptedException ie) {
+ // IGNORE
+ }
+ }
+ }
/** Construct a client-side proxy object that implements the named protocol,
* talking to a server at the named address. */
public static VersionedProtocol getProxy(Class protocol, long clientVersion,
- InetSocketAddress addr, Configuration conf)
- throws RemoteException {
+ InetSocketAddress addr, Configuration conf) throws IOException {
VersionedProtocol proxy = (VersionedProtocol) Proxy.newProxyInstance(
protocol.getClassLoader(),
new Class[] { protocol },
new Invoker(addr, conf));
- long serverVersion = proxy.getProtocolVersion(protocol.getName(), clientVersion);
+ long serverVersion = proxy.getProtocolVersion(protocol.getName(),
+ clientVersion);
if (serverVersion == clientVersion) {
return proxy;
} else {
- throw new RemoteException(protocol.getName(),
- "RPC Server and Client Versions Mismatched. SID:"+serverVersion+
- " CID:"+clientVersion);
+ throw new VersionMismatch(protocol.getName(), clientVersion,
+ serverVersion);
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/VersionedProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/VersionedProtocol.java?rev=423017&r1=423016&r2=423017&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/VersionedProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/VersionedProtocol.java Tue Jul 18 02:06:36 2006
@@ -16,7 +16,7 @@
package org.apache.hadoop.ipc;
-import org.apache.hadoop.io.UTF8;
+import java.io.IOException;
/**
* Superclass of all protocols that use Hadoop RPC.
@@ -25,8 +25,13 @@
* @author milindb
*/
public interface VersionedProtocol {
+
/**
- * Return protocol version corresponding to protocol interface
+ * Return protocol version corresponding to protocol interface.
+ * @param protocol The classname of the protocol interface
+ * @param clientVersion The version of the protocol that the client speaks
+ * @return the version that the server will speak
*/
- public long getProtocolVersion(String protocol, long clientVersion);
+ public long getProtocolVersion(String protocol,
+ long clientVersion) throws IOException;
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=423017&r1=423016&r2=423017&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Jul 18 02:06:36 2006
@@ -61,6 +61,9 @@
boolean shuttingDown = false;
TreeMap tasks = null;
+ /**
+ * Map from taskId -> TaskInProgress.
+ */
TreeMap runningTasks = null;
int mapTotal = 0;
int reduceTotal = 0;
@@ -163,8 +166,10 @@
this.mapOutputFile.cleanupStorage();
this.justStarted = true;
- this.jobClient = (InterTrackerProtocol) RPC.getProxy(InterTrackerProtocol.class,
- InterTrackerProtocol.versionID, jobTrackAddr, this.fConf);
+ this.jobClient = (InterTrackerProtocol)
+ RPC.waitForProxy(InterTrackerProtocol.class,
+ InterTrackerProtocol.versionID,
+ jobTrackAddr, this.fConf);
this.running = true;
}
@@ -1139,7 +1144,8 @@
JobConf conf=new JobConf();
new TaskTracker(conf).run();
} catch (IOException e) {
- LOG.warn( "Can not start task tracker because "+e.getMessage());
+ LOG.warn( "Can not start task tracker because "+
+ StringUtils.stringifyException(e));
System.exit(-1);
}
}
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java?rev=423017&r1=423016&r2=423017&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java Tue Jul 18 02:06:36 2006
@@ -24,6 +24,18 @@
class NameNodeRunner implements Runnable {
private NameNode node;
+ public boolean isUp() {
+ if (node == null) {
+ return false;
+ }
+ try {
+ long[] sizes = node.getStats();
+ return sizes[0] != 0;
+ } catch (IOException ie) {
+ return false;
+ }
+ }
+
/**
* Create the name node and run it.
*/
@@ -82,8 +94,11 @@
/**
* Create the config and start up the servers.
+ * @param dataNodeFirst should the datanode be brought up before the namenode?
*/
- public MiniDFSCluster(int namenodePort, Configuration conf) throws IOException {
+ public MiniDFSCluster(int namenodePort,
+ Configuration conf,
+ boolean dataNodeFirst) throws IOException {
this.conf = conf;
conf.set("fs.default.name",
"localhost:"+ Integer.toString(namenodePort));
@@ -98,17 +113,21 @@
NameNode.format(conf);
nameNode = new NameNodeRunner();
nameNodeThread = new Thread(nameNode);
- nameNodeThread.start();
- try { // let namenode get started
- Thread.sleep(2000);
- } catch(InterruptedException e) {
- }
dataNode = new DataNodeRunner();
dataNodeThread = new Thread(dataNode);
- dataNodeThread.start();
- try { // let daemons get started
- Thread.sleep(2000);
- } catch(InterruptedException e) {
+ if (dataNodeFirst) {
+ dataNodeThread.start();
+ nameNodeThread.start();
+ } else {
+ nameNodeThread.start();
+ dataNodeThread.start();
+ }
+ while (!nameNode.isUp()) {
+ try { // let daemons get started
+ System.out.println("waiting for dfs minicluster to start");
+ Thread.sleep(2000);
+ } catch(InterruptedException e) {
+ }
}
}
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestLocalDFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestLocalDFS.java?rev=423017&r1=423016&r2=423017&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestLocalDFS.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestLocalDFS.java Tue Jul 18 02:06:36 2006
@@ -38,7 +38,7 @@
*/
public void testWorkingDirectory() throws IOException {
Configuration conf = new Configuration();
- MiniDFSCluster cluster = new MiniDFSCluster(65312, conf);
+ MiniDFSCluster cluster = new MiniDFSCluster(65312, conf, false);
FileSystem fileSys = cluster.getFileSystem();
try {
Path orig_path = fileSys.getWorkingDirectory();
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java?rev=423017&r1=423016&r2=423017&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java Tue Jul 18 02:06:36 2006
@@ -173,7 +173,7 @@
MiniDFSCluster cluster = null;
try {
Configuration conf = new Configuration();
- cluster = new MiniDFSCluster(65314, conf);
+ cluster = new MiniDFSCluster(65314, conf, false);
namenode = conf.get("fs.default.name", "local");
if (!"local".equals(namenode)) {
MyFile[] files = createFiles(namenode, "/srcdat");
@@ -195,7 +195,7 @@
MiniDFSCluster cluster = null;
try {
Configuration conf = new Configuration();
- cluster = new MiniDFSCluster(65316, conf);
+ cluster = new MiniDFSCluster(65316, conf, false);
namenode = conf.get("fs.default.name", "local");
if (!"local".equals(namenode)) {
MyFile[] files = createFiles("local", TEST_ROOT_DIR+"/srcdat");
@@ -217,7 +217,7 @@
MiniDFSCluster cluster = null;
try {
Configuration conf = new Configuration();
- cluster = new MiniDFSCluster(65318, conf);
+ cluster = new MiniDFSCluster(65318, conf, false);
namenode = conf.get("fs.default.name", "local");
if (!"local".equals(namenode)) {
MyFile[] files = createFiles(namenode, "/srcdat");
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=423017&r1=423016&r2=423017&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Tue Jul 18 02:06:36 2006
@@ -130,7 +130,8 @@
public MiniMRCluster(int jobTrackerPort,
int taskTrackerPort,
int numTaskTrackers,
- String namenode) throws IOException {
+ String namenode,
+ boolean taskTrackerFirst) throws IOException {
this.jobTrackerPort = jobTrackerPort;
this.taskTrackerPort = taskTrackerPort;
this.numTaskTrackers = numTaskTrackers;
@@ -151,10 +152,8 @@
pw.close();
jobTracker = new JobTrackerRunner();
jobTrackerThread = new Thread(jobTracker);
- jobTrackerThread.start();
- try { // let jobTracker get started
- Thread.sleep(2000);
- } catch(InterruptedException e) {
+ if (!taskTrackerFirst) {
+ jobTrackerThread.start();
}
for (int idx = 0; idx < numTaskTrackers; idx++) {
TaskTrackerRunner taskTracker = new TaskTrackerRunner();
@@ -163,6 +162,9 @@
taskTrackerList.add(taskTracker);
taskTrackerThreadList.add(taskTrackerThread);
}
+ if (taskTrackerFirst) {
+ jobTrackerThread.start();
+ }
try { // let taskTrackers get started
Thread.sleep(2000);
} catch(InterruptedException e) {
@@ -201,7 +203,7 @@
public static void main(String[] args) throws IOException {
System.out.println("Bringing up Jobtracker and tasktrackers.");
- MiniMRCluster mr = new MiniMRCluster(50000, 50002, 4, "local");
+ MiniMRCluster mr = new MiniMRCluster(50000, 50002, 4, "local", false);
System.out.println("JobTracker and TaskTrackers are up.");
mr.shutdown();
System.out.println("JobTracker and TaskTrackers brought down.");
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRBringup.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRBringup.java?rev=423017&r1=423016&r2=423017&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRBringup.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRBringup.java Tue Jul 18 02:06:36 2006
@@ -29,7 +29,7 @@
public void testBringUp() throws IOException {
MiniMRCluster mr = null;
try {
- mr = new MiniMRCluster(50000, 50010, 1, "local");
+ mr = new MiniMRCluster(50000, 50010, 1, "local", false);
} finally {
if (mr != null) { mr.shutdown(); }
}
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java?rev=423017&r1=423016&r2=423017&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java Tue Jul 18 02:06:36 2006
@@ -32,7 +32,7 @@
public void testWithLocal() throws IOException {
MiniMRCluster mr = null;
try {
- mr = new MiniMRCluster(60030, 60040, 2, "local");
+ mr = new MiniMRCluster(60030, 60040, 2, "local", false);
double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES, "localhost:60030", "local");
double error = Math.abs(Math.PI - estimate);
assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error < 0.01));
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=423017&r1=423016&r2=423017&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Tue Jul 18 02:06:36 2006
@@ -39,10 +39,10 @@
FileSystem fileSys = null;
try {
Configuration conf = new Configuration();
- dfs = new MiniDFSCluster(65314, conf);
+ dfs = new MiniDFSCluster(65314, conf, true);
fileSys = dfs.getFileSystem();
namenode = fileSys.getName();
- mr = new MiniMRCluster(50050, 50060, 4, namenode);
+ mr = new MiniMRCluster(50050, 50060, 4, namenode, true);
double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES, "localhost:50050", namenode);
double error = Math.abs(Math.PI - estimate);
assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error < 0.01));