You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2009/03/12 20:43:06 UTC
svn commit: r752984 [1/2] - in /hadoop/core/trunk: conf/ ivy/
src/contrib/streaming/src/test/org/apache/hadoop/streaming/
src/core/org/apache/hadoop/conf/ src/core/org/apache/hadoop/http/
src/core/org/apache/hadoop/ipc/ src/core/org/apache/hadoop/net/ ...
Author: cdouglas
Date: Thu Mar 12 19:43:05 2009
New Revision: 752984
URL: http://svn.apache.org/viewvc?rev=752984&view=rev
Log:
Revert HADOOP-3628
Removed:
hadoop/core/trunk/src/core/org/apache/hadoop/util/MockService.java
hadoop/core/trunk/src/core/org/apache/hadoop/util/Service.java
hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestConfigurationSubclass.java
hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestGetServerAddress.java
hadoop/core/trunk/src/test/org/apache/hadoop/conf/empty-configuration.xml
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestMiniDFSCluster.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskTrackerLifecycle.java
hadoop/core/trunk/src/test/org/apache/hadoop/net/TestDNS.java
hadoop/core/trunk/src/test/org/apache/hadoop/util/TestServiceLifecycle.java
Modified:
hadoop/core/trunk/conf/log4j.properties
hadoop/core/trunk/ivy/libraries.properties
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java
hadoop/core/trunk/src/core/org/apache/hadoop/conf/Configuration.java
hadoop/core/trunk/src/core/org/apache/hadoop/http/HttpServer.java
hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Client.java
hadoop/core/trunk/src/core/org/apache/hadoop/ipc/RPC.java
hadoop/core/trunk/src/core/org/apache/hadoop/net/DNS.java
hadoop/core/trunk/src/core/org/apache/hadoop/net/NetUtils.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/Storage.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/PendingReplicationBlocks.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
hadoop/core/trunk/src/test/hadoop-site.xml
hadoop/core/trunk/src/test/log4j.properties
hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestNoDefaultsJobConf.java
hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java
hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestHDFSServerPorts.java
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestReplication.java
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/HadoopTestCase.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMRServerPorts.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleLevelCaching.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java
Modified: hadoop/core/trunk/conf/log4j.properties
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/conf/log4j.properties?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/conf/log4j.properties (original)
+++ hadoop/core/trunk/conf/log4j.properties Thu Mar 12 19:43:05 2009
@@ -36,10 +36,8 @@
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
-log4j.appender.console.immediateFlush=true
log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} [%t] %p %c{2} %x: %m%n
-#log4j.appender.console.layout.ConversionPattern=%-4r %-5p %c %x - %m%n
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
#
# TaskLog Appender
Modified: hadoop/core/trunk/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/ivy/libraries.properties?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/ivy/libraries.properties (original)
+++ hadoop/core/trunk/ivy/libraries.properties Thu Mar 12 19:43:05 2009
@@ -35,7 +35,7 @@
hsqldb.version=1.8.0.10
#ivy.version=2.0.0-beta2
-ivy.version=2.0.0
+ivy.version=2.0.0-rc2
jasper.version=5.5.12
#not able to figureout the version of jsp & jsp-api version to get it resolved throught ivy
Modified: hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java (original)
+++ hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java Thu Mar 12 19:43:05 2009
@@ -71,7 +71,7 @@
* it to succeed. Then program is launched with insufficient memory and
* is expected to be a failure.
*/
- public void testCommandLine() throws Exception {
+ public void testCommandLine() {
if (StreamUtil.isCygwin()) {
return;
}
@@ -88,9 +88,11 @@
fs.delete(outputPath, true);
assertFalse("output not cleaned up", fs.exists(outputPath));
mr.waitUntilIdle();
+ } catch(IOException e) {
+ fail(e.toString());
} finally {
- MiniDFSCluster.close(dfs);
- MiniMRCluster.close(mr);
+ mr.shutdown();
+ dfs.shutdown();
}
}
Modified: hadoop/core/trunk/src/core/org/apache/hadoop/conf/Configuration.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/conf/Configuration.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/conf/Configuration.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/conf/Configuration.java Thu Mar 12 19:43:05 2009
@@ -185,9 +185,6 @@
private Properties properties;
private Properties overlay;
private ClassLoader classLoader;
- private static final String ERROR_PARSING_CONF_FILE =
- "Error parsing configuration resource ";
-
{
classLoader = Thread.currentThread().getContextClassLoader();
if (classLoader == null) {
@@ -976,7 +973,7 @@
}
}
- protected synchronized Properties getProps() {
+ private synchronized Properties getProps() {
if (properties == null) {
properties = new Properties();
loadResources(properties, resources, quietmode);
@@ -1160,16 +1157,16 @@
}
} catch (IOException e) {
- LOG.fatal(ERROR_PARSING_CONF_FILE + name + " : " + e, e);
+ LOG.fatal("error parsing conf file: " + e);
throw new RuntimeException(e);
} catch (DOMException e) {
- LOG.fatal(ERROR_PARSING_CONF_FILE + name + " : " + e, e);
+ LOG.fatal("error parsing conf file: " + e);
throw new RuntimeException(e);
} catch (SAXException e) {
- LOG.fatal(ERROR_PARSING_CONF_FILE + name + " : " + e, e);
+ LOG.fatal("error parsing conf file: " + e);
throw new RuntimeException(e);
} catch (ParserConfigurationException e) {
- LOG.fatal(ERROR_PARSING_CONF_FILE + name + " : " + e, e);
+ LOG.fatal("error parsing conf file: " + e);
throw new RuntimeException(e);
}
}
Modified: hadoop/core/trunk/src/core/org/apache/hadoop/http/HttpServer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/http/HttpServer.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/http/HttpServer.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/http/HttpServer.java Thu Mar 12 19:43:05 2009
@@ -22,7 +22,6 @@
import java.net.BindException;
import java.net.InetSocketAddress;
import java.net.URL;
-import java.net.BindException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -458,11 +457,7 @@
// then try the next port number.
if (ex instanceof BindException) {
if (!findPort) {
- BindException be = new BindException(
- "Port in use: " + listener.getHost()
- + ":" + listener.getPort());
- be.initCause(ex);
- throw be;
+ throw (BindException) ex;
}
} else {
LOG.info("HttpServer.start() threw a non Bind IOException");
@@ -494,14 +489,6 @@
}
/**
- * Test for the availability of the web server
- * @return true if the web server is started, false otherwise
- */
- public boolean isAlive() {
- return webServer.isStarted();
- }
-
- /**
* A very simple servlet to serve up a text representation of the current
* stack traces. It both returns the stacks to the caller and logs them.
* Currently the stack traces are done sequentially rather than exactly the
Modified: hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Client.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Client.java Thu Mar 12 19:43:05 2009
@@ -284,9 +284,8 @@
/** Connect to the server and set up the I/O streams. It then sends
* a header to the server and starts
* the connection thread that waits for responses.
- * @throws IOException if the connection attempt was unsuccessful
*/
- private synchronized void setupIOstreams() throws IOException {
+ private synchronized void setupIOstreams() {
if (socket != null || shouldCloseConnection.get()) {
return;
}
@@ -309,7 +308,7 @@
/* The max number of retries is 45,
* which amounts to 20s*45 = 15 minutes retries.
*/
- handleConnectionFailure(timeoutFailures++, maxRetries, toe);
+ handleConnectionFailure(timeoutFailures++, 45, toe);
} catch (IOException ie) {
handleConnectionFailure(ioFailures++, maxRetries, ie);
}
@@ -328,7 +327,6 @@
} catch (IOException e) {
markClosed(e);
close();
- throw e;
}
}
@@ -360,7 +358,7 @@
// throw the exception if the maximum number of retries is reached
if (curRetries >= maxRetries) {
- throw wrapException(remoteId.getAddress(), ioe);
+ throw ioe;
}
// otherwise back off and retry
@@ -369,7 +367,7 @@
} catch (InterruptedException ignored) {}
LOG.info("Retrying connect to server: " + server +
- ". Already tried " + curRetries + " time(s) out of "+ maxRetries);
+ ". Already tried " + curRetries + " time(s).");
}
/* Write the header for each connection
Modified: hadoop/core/trunk/src/core/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/ipc/RPC.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/ipc/RPC.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/ipc/RPC.java Thu Mar 12 19:43:05 2009
@@ -301,7 +301,7 @@
* @return the proxy
* @throws IOException if the far end through a RemoteException
*/
- public static VersionedProtocol waitForProxy(Class protocol,
+ static VersionedProtocol waitForProxy(Class protocol,
long clientVersion,
InetSocketAddress addr,
Configuration conf,
Modified: hadoop/core/trunk/src/core/org/apache/hadoop/net/DNS.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/net/DNS.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/net/DNS.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/net/DNS.java Thu Mar 12 19:43:05 2009
@@ -18,15 +18,10 @@
package org.apache.hadoop.net;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.net.UnknownHostException;
-import java.net.Inet4Address;
-import java.net.Inet6Address;
import java.util.Enumeration;
import java.util.Vector;
@@ -43,29 +38,6 @@
*
*/
public class DNS {
- private static final String LOCALHOST = "localhost";
- private static final Log LOG = LogFactory.getLog(DNS.class);
-
- /**
- * Returns the hostname associated with the specified IP address by the
- * provided nameserver.
- *
- * @param address The address to reverse lookup
- * @param nameserver The host name of a reachable DNS server; can be null
- * @return The host name associated with the provided IP
- * @throws NamingException If a NamingException is encountered
- * @throws IllegalArgumentException if the protocol is unsupported
- */
- public static String reverseDns(InetAddress address, String nameserver)
- throws NamingException {
- if (address instanceof Inet4Address) {
- return reverseDns((Inet4Address) address, nameserver);
- } else if (address instanceof Inet6Address) {
- return reverseDns((Inet6Address) address, nameserver);
- } else {
- throw new IllegalArgumentException("Unsupported Protocol " + address);
- }
- }
/**
* Returns the hostname associated with the specified IP address by the
@@ -73,76 +45,31 @@
*
* @param hostIp
* The address to reverse lookup
- * @param nameserver
- * The host name of a reachable DNS server; can be null
+ * @param ns
+ * The host name of a reachable DNS server
* @return The host name associated with the provided IP
* @throws NamingException
* If a NamingException is encountered
*/
- private static String reverseDns(Inet4Address hostIp, String nameserver)
+ public static String reverseDns(InetAddress hostIp, String ns)
throws NamingException {
//
// Builds the reverse IP lookup form
// This is formed by reversing the IP numbers and appending in-addr.arpa
//
- byte[] address32 = hostIp.getAddress();
- String reverseIP;
- reverseIP = ""+(address32[3]&0xff) + '.'
- + (address32[2] & 0xff) + '.'
- + (address32[1] & 0xff) + '.'
- + (address32[0] & 0xff) + '.'
- + "in-addr.arpa";
-/*
- String hostAddress = hostIp.getHostAddress();
- String[] parts = hostAddress.split("\\.");
- if (parts.length < 4) {
- throw new IllegalArgumentException("Unable to determine IPv4 address of "
- + hostAddress);
- }
- reverseIP = parts[3] + "." + parts[2] + "." + parts[1] + "."
+ String[] parts = hostIp.getHostAddress().split("\\.");
+ String reverseIP = parts[3] + "." + parts[2] + "." + parts[1] + "."
+ parts[0] + ".in-addr.arpa";
-*/
- return retrievePTRRecord(nameserver, reverseIP);
- }
-
- /**
- * Retrieve the PTR record from a DNS entry; if given the right
- * reverse IP this will resolve both IPv4 and IPv6 addresses
- * @param nameserver name server to use; can be null
- * @param reverseIP reverse IP address to look up
- * @return the PTR record of the host
- * @throws NamingException if the record can not be found.
- */
- private static String retrievePTRRecord(String nameserver, String reverseIP)
- throws NamingException {
DirContext ictx = new InitialDirContext();
- try {
- // Use "dns:///" if the default nameserver is to be used
- Attributes attribute = ictx.getAttributes("dns://"
- + ((nameserver == null) ? "" : nameserver) +
+ Attributes attribute =
+ ictx.getAttributes("dns://" // Use "dns:///" if the default
+ + ((ns == null) ? "" : ns) +
+ // nameserver is to be used
"/" + reverseIP, new String[] { "PTR" });
- return attribute.get("PTR").get().toString();
- } finally {
- ictx.close();
- }
- }
-
- /**
- * Returns the hostname associated with the specified IP address by the
- * provided nameserver.
- *
- * @param address
- * The address to reverse lookup
- * @param nameserver
- * The host name of a reachable DNS server; can be null
- * @return The host name associated with the provided IP
- * @throws NamingException
- * If a NamingException is encountered
- */
- private static String reverseDns(Inet6Address address, String nameserver)
- throws NamingException {
- throw new IllegalArgumentException("Reverse DNS lookup of IPv6 is currently unsupported: " + address);
+ ictx.close();
+
+ return attribute.get("PTR").get().toString();
}
/**
@@ -163,23 +90,21 @@
try {
NetworkInterface netIF = NetworkInterface.getByName(strInterface);
if (netIF == null)
- return new String[] {getLocalHostIPAddress()};
+ return new String[] { InetAddress.getLocalHost()
+ .getHostAddress() };
else {
Vector<String> ips = new Vector<String>();
- Enumeration<InetAddress> e = netIF.getInetAddresses();
-
- while (e.hasMoreElements()) {
- ips.add((e.nextElement()).getHostAddress());
- }
- return ips.toArray(new String[ips.size()]);
+ Enumeration e = netIF.getInetAddresses();
+ while (e.hasMoreElements())
+ ips.add(((InetAddress) e.nextElement()).getHostAddress());
+ return ips.toArray(new String[] {});
}
} catch (SocketException e) {
- return new String[] {getLocalHostIPAddress()};
+ return new String[] { InetAddress.getLocalHost().getHostAddress() };
}
}
-
- /**
+ /**
* Returns the first available IP address associated with the provided
* network interface
*
@@ -205,98 +130,26 @@
* The DNS host name
* @return A string vector of all host names associated with the IPs tied to
* the specified interface
- * @throws UnknownHostException if the hostname cannot be determined
+ * @throws UnknownHostException
*/
public static String[] getHosts(String strInterface, String nameserver)
throws UnknownHostException {
String[] ips = getIPs(strInterface);
Vector<String> hosts = new Vector<String>();
- for (String ip : ips) {
- InetAddress ipAddr = null;
+ for (int ctr = 0; ctr < ips.length; ctr++)
try {
- ipAddr = InetAddress.getByName(ip);
- hosts.add(reverseDns(ipAddr, nameserver));
- } catch (UnknownHostException e) {
- if (LOG.isDebugEnabled())
- LOG.debug("Unable to resolve hostname of " + ip + ": " + e, e);
- } catch (NamingException e) {
- if(LOG.isDebugEnabled())
- LOG.debug("Unable to reverse DNS of host" + ip
- + (ipAddr != null ? (" and address "+ipAddr): "")
- + " : " + e, e);
- } catch (IllegalArgumentException e) {
- if (LOG.isDebugEnabled())
- LOG.debug("Unable to resolve IP address of " + ip
- + (ipAddr != null ? (" and address " + ipAddr) : "")
- + " : " + e, e);
+ hosts.add(reverseDns(InetAddress.getByName(ips[ctr]),
+ nameserver));
+ } catch (Exception e) {
}
- }
- if (hosts.isEmpty()) {
- return new String[] { getLocalHostname() };
- } else {
- return hosts.toArray(new String[hosts.size()]);
- }
+ if (hosts.size() == 0)
+ return new String[] { InetAddress.getLocalHost().getCanonicalHostName() };
+ else
+ return hosts.toArray(new String[] {});
}
/**
- * The cached hostname -initially null.
- */
-
- private static volatile String cachedHostname;
-
- /**
- * The cached address hostname -initially null.
- */
- private static volatile String cachedHostAddress;
-
- /**
- * Determine the local hostname; retrieving it from cache if it is known
- * If we cannot determine our host name, return "localhost"
- * This code is not synchronized; if more than one thread calls it while
- * it is determining the address, the last one to exit the loop wins.
- * However, as both threads are determining and caching the same value, it
- * should be moot.
- * @return the local hostname or "localhost"
- */
- private static String getLocalHostname() {
- if(cachedHostname == null) {
- try {
- cachedHostname = InetAddress.getLocalHost().getCanonicalHostName();
- } catch (UnknownHostException e) {
- LOG.info("Unable to determine local hostname "
- + "-falling back to \""+LOCALHOST+"\"", e);
- cachedHostname = LOCALHOST;
- }
- }
- return cachedHostname;
- }
-
- /**
- * Get the IPAddress of the local host as a string. This may be a loop back
- * value.
- * This code is not synchronized; if more than one thread calls it while
- * it is determining the address, the last one to exit the loop wins.
- * However, as both threads are determining and caching the same value, it
- * should be moot.
- * @return the IPAddress of the localhost
- * @throws UnknownHostException if not even "localhost" resolves.
- */
- private static String getLocalHostIPAddress() throws UnknownHostException {
- if (cachedHostAddress == null) {
- try {
- InetAddress localHost = InetAddress.getLocalHost();
- cachedHostAddress = localHost.getHostAddress();
- } catch (UnknownHostException e) {
- LOG.info("Unable to determine local IP Address "
- + "-falling back to loopback address", e);
- cachedHostAddress = InetAddress.getByName(LOCALHOST).getHostAddress();
- }
- }
- return cachedHostAddress;
- }
-
- /**
* Returns all the host names associated by the default nameserver with the
* address bound to the specified network interface
*
@@ -305,7 +158,7 @@
* @return The list of host names associated with IPs bound to the network
* interface
* @throws UnknownHostException
- * If one is encountered while querying the default interface
+ * If one is encountered while querying the deault interface
*
*/
public static String[] getHosts(String strInterface)
@@ -324,17 +177,15 @@
* @return The default host names associated with IPs bound to the network
* interface
* @throws UnknownHostException
- * If one is encountered while querying the default interface
+ * If one is encountered while querying the deault interface
*/
public static String getDefaultHost(String strInterface, String nameserver)
throws UnknownHostException {
- if ("default".equals(strInterface)) {
- return getLocalHostname();
- }
+ if (strInterface.equals("default"))
+ return InetAddress.getLocalHost().getCanonicalHostName();
- if ("default".equals(nameserver)) {
+ if (nameserver != null && nameserver.equals("default"))
return getDefaultHost(strInterface);
- }
String[] hosts = getHosts(strInterface, nameserver);
return hosts[0];
@@ -345,12 +196,11 @@
* nameserver with the address bound to the specified network interface
*
* @param strInterface
- * The name of the network interface to query (e.g. eth0).
- * Must not be null.
+ * The name of the network interface to query (e.g. eth0)
* @return The default host name associated with IPs bound to the network
* interface
* @throws UnknownHostException
- * If one is encountered while querying the default interface
+ * If one is encountered while querying the deault interface
*/
public static String getDefaultHost(String strInterface)
throws UnknownHostException {
Modified: hadoop/core/trunk/src/core/org/apache/hadoop/net/NetUtils.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/net/NetUtils.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/net/NetUtils.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/net/NetUtils.java Thu Mar 12 19:43:05 2009
@@ -26,7 +26,6 @@
import java.net.SocketAddress;
import java.net.URI;
import java.net.UnknownHostException;
-import java.net.ConnectException;
import java.nio.channels.SocketChannel;
import java.util.Map.Entry;
import java.util.regex.Pattern;
@@ -181,12 +180,9 @@
String oldAddr = conf.get(oldBindAddressName);
String oldPort = conf.get(oldPortName);
String newAddrPort = conf.get(newBindAddressName);
- if (oldAddr == null && oldPort == null && newAddrPort != null) {
+ if (oldAddr == null && oldPort == null) {
return newAddrPort;
}
- if (newAddrPort == null) {
- throw new IllegalArgumentException("No value for " + newBindAddressName);
- }
String[] newAddrPortParts = newAddrPort.split(":",2);
if (newAddrPortParts.length != 2) {
throw new IllegalArgumentException("Invalid address/port: " +
@@ -399,18 +395,14 @@
if (socket == null || endpoint == null || timeout < 0) {
throw new IllegalArgumentException("Illegal argument for connect()");
}
- try {
- SocketChannel ch = socket.getChannel();
-
- if (ch == null) {
- // let the default implementation handle it.
- socket.connect(endpoint, timeout);
- } else {
- SocketIOWithTimeout.connect(ch, endpoint, timeout);
- }
- } catch (ConnectException e) {
- throw (ConnectException) new ConnectException(
- e + " connecting to " + endpoint).initCause(e);
+
+ SocketChannel ch = socket.getChannel();
+
+ if (ch == null) {
+ // let the default implementation handle it.
+ socket.connect(endpoint, timeout);
+ } else {
+ SocketIOWithTimeout.connect(ch, endpoint, timeout);
}
}
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Thu Mar 12 19:43:05 2009
@@ -1384,7 +1384,7 @@
synchronized void openInfo() throws IOException {
LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0, prefetchSize);
if (newInfo == null) {
- throw new FileNotFoundException("Cannot open HDFS file " + src);
+ throw new IOException("Cannot open filename " + src);
}
if (locatedBlocks != null) {
@@ -3084,9 +3084,7 @@
* resources associated with this stream.
*/
private synchronized void closeInternal() throws IOException {
- if ( !clientRunning ) {
- return;
- }
+ checkOpen();
isClosed();
try {
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/Storage.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/Storage.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/Storage.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/Storage.java Thu Mar 12 19:43:05 2009
@@ -24,7 +24,6 @@
import java.io.RandomAccessFile;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
-import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.List;
import java.util.Iterator;
@@ -550,10 +549,8 @@
/**
* Unlock storage.
- * Does nothing if there is no acquired lock
- * @throws IOException for IO problems
- * @throws ClosedChannelException if the channel owning the lock is
- * already closed.
+ *
+ * @throws IOException
*/
public void unlock() throws IOException {
if (this.lock == null)
@@ -706,22 +703,11 @@
/**
* Unlock all storage directories.
- * @throws IOException on a failure to unlock
+ * @throws IOException
*/
public void unlockAll() throws IOException {
- IOException ioe = null;
- for (StorageDirectory storageDir : storageDirs) {
- try {
- storageDir.unlock();
- } catch (IOException e) {
- LOG.warn("Failed to unlock " + storageDir.getRoot() + " : " + e, e);
- if (ioe != null) {
- ioe = e;
- }
- }
- }
- if (ioe != null) {
- throw ioe;
+ for (Iterator<StorageDirectory> it = storageDirs.iterator(); it.hasNext();) {
+ it.next().unlock();
}
}
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Thu Mar 12 19:43:05 2009
@@ -43,6 +43,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@@ -90,7 +91,6 @@
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
-import org.apache.hadoop.util.Service;
/**********************************************************
* DataNode is a class (and program) that stores a set of
@@ -123,7 +123,7 @@
* information to clients or other DataNodes that might be interested.
*
**********************************************************/
-public class DataNode extends Service
+public class DataNode extends Configured
implements InterDatanodeProtocol, ClientDatanodeProtocol, FSConstants, Runnable {
public static final Log LOG = LogFactory.getLog(DataNode.class);
@@ -186,7 +186,7 @@
int socketWriteTimeout = 0;
boolean transferToAllowed = true;
int writePacketSize = 0;
- private AbstractList<File> dataDirs;
+
public DataBlockScanner blockScanner = null;
public Daemon blockScannerThread = null;
@@ -206,57 +206,20 @@
/**
* Create the DataNode given a configuration and an array of dataDirs.
* 'dataDirs' is where the blocks are stored.
- * This constructor does not start the node, merely initialize it
- * @param conf configuration to use
- * @param dataDirs list of directories that may be used for data
- * @throws IOException for historical reasons
*/
DataNode(Configuration conf,
AbstractList<File> dataDirs) throws IOException {
super(conf);
datanodeObject = this;
- this.dataDirs = dataDirs;
- }
-
- /////////////////////////////////////////////////////
- // Lifecycle
- /////////////////////////////////////////////////////
-
- /**
- * Start any work (in separate threads)
- *
- * @throws IOException for any startup failure
- */
- @Override
- public void innerStart() throws IOException {
- startDataNode(getConf(), dataDirs);
- }
- /**
- * {@inheritDoc}.
- *
- * This implementation checks for the name system being non-null and live
- *
- * @throws IOException for any ping failure
- * @throws LivenessException if the IPC server is not defined @param status the initial status
- */
- @Override
- public void innerPing(ServiceStatus status) throws IOException {
- if (ipcServer == null) {
- status.addThrowable(new LivenessException("No IPC Server running"));
- }
- if (dnRegistration == null) {
- status.addThrowable(new LivenessException("Not registered to a namenode"));
+ try {
+ startDataNode(conf, dataDirs);
+ } catch (IOException ie) {
+ shutdown();
+ throw ie;
}
}
- /**
- * Shut down this instance of the datanode. Returns only after shutdown is
- * complete.
- */
- public void shutdown() {
- closeQuietly();
- }
/**
* This method starts the data node with the specified conf.
@@ -281,7 +244,7 @@
conf.get("dfs.datanode.dns.nameserver","default"));
}
InetSocketAddress nameNodeAddr = NameNode.getAddress(conf);
- DataNode.nameNodeAddr = nameNodeAddr;
+
this.socketTimeout = conf.getInt("dfs.socket.timeout",
HdfsConstants.READ_TIMEOUT);
this.socketWriteTimeout = conf.getInt("dfs.datanode.socket.write.timeout",
@@ -367,6 +330,7 @@
"dfs.blockreport.intervalMsec." + " Setting initial delay to 0 msec:");
}
this.heartBeatInterval = conf.getLong("dfs.heartbeat.interval", HEARTBEAT_INTERVAL) * 1000L;
+ DataNode.nameNodeAddr = nameNodeAddr;
//initialize periodic block scanner
String reason = null;
@@ -393,9 +357,6 @@
int tmpInfoPort = infoSocAddr.getPort();
this.infoServer = new HttpServer("datanode", infoHost, tmpInfoPort,
tmpInfoPort == 0, conf);
- if(LOG.isDebugEnabled()) {
- LOG.debug("Datanode listening on " + infoHost + ":" + tmpInfoPort);
- }
if (conf.getBoolean("dfs.https.enable", false)) {
boolean needClientAuth = conf.getBoolean("dfs.https.need.client.auth", false);
InetSocketAddress secInfoSocAddr = NetUtils.createSocketAddr(conf.get(
@@ -403,9 +364,6 @@
Configuration sslConf = new Configuration(false);
sslConf.addResource(conf.get("dfs.https.server.keystore.resource",
"ssl-server.xml"));
- if (LOG.isDebugEnabled()) {
- LOG.debug("Datanode listening for SSL on " + secInfoSocAddr);
- }
this.infoServer.addSslListener(secInfoSocAddr, sslConf, needClientAuth);
}
this.infoServer.addInternalServlet(null, "/streamFile/*", StreamFile.class);
@@ -462,10 +420,6 @@
} catch (InterruptedException ie) {}
}
}
- if(!shouldRun) {
- throw new IOException("Datanode shut down during handshake with NameNode "
- + getNameNodeAddr());
- }
String errorMsg = null;
// verify build version
if( ! nsInfo.getBuildVersion().equals( Storage.getBuildVersion() )) {
@@ -566,14 +520,10 @@
* @see FSNamesystem#registerDatanode(DatanodeRegistration)
* @throws IOException
*/
- protected void register() throws IOException {
+ private void register() throws IOException {
if (dnRegistration.getStorageID().equals("")) {
setNewStorageID(dnRegistration);
}
- //if we are LIVE, move into the STARTED state, as registration implies that
- //the node is no longer LIVE
- enterState(ServiceState.LIVE, ServiceState.STARTED);
- //spin until the server is up.
while(shouldRun) {
try {
// reset name to machineName. Mainly for web interface.
@@ -602,8 +552,7 @@
+ dnRegistration.getStorageID()
+ ". Expecting " + storage.getStorageID());
}
- //at this point the DataNode is now live.
- enterLiveState();
+
// random short delay - helps scatter the BR from all DNs
scheduleBlockReport(initialBlockReportDelay);
}
@@ -614,25 +563,18 @@
* This method can only be called by the offerService thread.
* Otherwise, deadlock might occur.
*/
- @Override
- protected void innerClose() throws IOException {
- synchronized (this) {
- //disable the should run flag first, so that everything out there starts
- //to shut down
- shouldRun = false;
- //shut down the infoserver
- if (infoServer != null) {
- try {
- infoServer.stop();
- } catch (Exception e) {
- LOG.debug("Ignoring exception when shutting down the infoserver", e);
- }
- }
- //shut down the IPC server
- if (ipcServer != null) {
- ipcServer.stop();
+ public void shutdown() {
+ if (infoServer != null) {
+ try {
+ infoServer.stop();
+ } catch (Exception e) {
+ LOG.warn("Exception shutting down DataNode", e);
}
}
+ if (ipcServer != null) {
+ ipcServer.stop();
+ }
+ this.shouldRun = false;
if (dataXceiverServer != null) {
((DataXceiverServer) this.dataXceiverServer.getRunnable()).kill();
this.dataXceiverServer.interrupt();
@@ -660,10 +602,9 @@
RPC.stopProxy(namenode); // stop the RPC threads
- if(upgradeManager != null) {
+ if(upgradeManager != null)
upgradeManager.shutdownUpgrade();
- }
- if (blockScannerThread != null) {
+ if (blockScannerThread != null) {
blockScannerThread.interrupt();
try {
blockScannerThread.join(3600000L); // wait for at most 1 hour
@@ -672,10 +613,8 @@
}
if (storage != null) {
try {
- storage.unlockAll();
+ this.storage.unlockAll();
} catch (IOException ie) {
- LOG.warn("Ignoring exception when unlocking storage: "+ie,
- ie);
}
}
if (dataNodeThread != null) {
@@ -866,13 +805,14 @@
if (UnregisteredDatanodeException.class.getName().equals(reClass) ||
DisallowedDatanodeException.class.getName().equals(reClass) ||
IncorrectVersionException.class.getName().equals(reClass)) {
- LOG.warn("DataNode is shutting down: " + re, re);
+ LOG.warn("DataNode is shutting down: " +
+ StringUtils.stringifyException(re));
shutdown();
return;
}
- LOG.warn(re, re);
+ LOG.warn(StringUtils.stringifyException(re));
} catch (IOException e) {
- LOG.warn(e, e);
+ LOG.warn(StringUtils.stringifyException(e));
}
} // while (shouldRun)
} // offerService
@@ -1245,9 +1185,7 @@
startDistributedUpgradeIfNeeded();
offerService();
} catch (Exception ex) {
- LOG.error("Exception while in state " + getServiceState()
- + " and shouldRun=" + shouldRun + ": " + ex,
- ex);
+ LOG.error("Exception: " + StringUtils.stringifyException(ex));
if (shouldRun) {
try {
Thread.sleep(5000);
@@ -1327,52 +1265,33 @@
* @param conf Configuration instance to use.
* @return DataNode instance for given list of data dirs and conf, or null if
* no directory from this directory list can be created.
- * @throws IOException if problems occur when starting the data node
+ * @throws IOException
*/
public static DataNode makeInstance(String[] dataDirs, Configuration conf)
- throws IOException {
+ throws IOException {
ArrayList<File> dirs = new ArrayList<File>();
- StringBuffer invalid = new StringBuffer();
- for (String dataDir : dataDirs) {
- File data = new File(dataDir);
+ for (int i = 0; i < dataDirs.length; i++) {
+ File data = new File(dataDirs[i]);
try {
DiskChecker.checkDir(data);
dirs.add(data);
} catch(DiskErrorException e) {
- LOG.warn("Invalid directory in dfs.data.dir: " + e, e);
- invalid.append(dataDir);
- invalid.append(" ");
+ LOG.warn("Invalid directory in dfs.data.dir: " + e.getMessage());
}
}
- if (dirs.size() > 0) {
- DataNode dataNode = new DataNode(conf, dirs);
- Service.deploy(dataNode);
- return dataNode;
- } else {
- LOG.error("All directories in dfs.data.dir are invalid: " + invalid);
- return null;
- }
+ if (dirs.size() > 0)
+ return new DataNode(conf, dirs);
+ LOG.error("All directories in dfs.data.dir are invalid.");
+ return null;
}
-
- /**
- * {@inheritDoc}
- * @return the name of this service
- */
- @Override
- public String getServiceName() {
- return "DataNode";
- }
-
@Override
public String toString() {
- return "DataNode {" +
+ return "DataNode{" +
"data=" + data +
- (dnRegistration != null? (
- ", localName='" + dnRegistration.getName() + "'" +
- ", storageID='" + dnRegistration.getStorageID() + "'" ) : "") +
+ ", localName='" + dnRegistration.getName() + "'" +
+ ", storageID='" + dnRegistration.getStorageID() + "'" +
", xmitsInProgress=" + xmitsInProgress +
- ", state="+ getServiceState() +
"}";
}
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Thu Mar 12 19:43:05 2009
@@ -185,9 +185,6 @@
}
File blockFiles[] = dir.listFiles();
- if (blockFiles == null) {
- throw new IllegalStateException("Not a valid directory: " + dir);
- }
for (int i = 0; i < blockFiles.length; i++) {
if (Block.isBlockFilename(blockFiles[i])) {
long genStamp = getGenerationStampFromFile(blockFiles, blockFiles[i]);
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Thu Mar 12 19:43:05 2009
@@ -395,7 +395,9 @@
*/
synchronized void processIOError(int index) {
if (editStreams == null || editStreams.size() <= 1) {
- processAllStorageInaccessible();
+ FSNamesystem.LOG.fatal(
+ "Fatal Error : All storage directories are inaccessible.");
+ Runtime.getRuntime().exit(-1);
}
assert(index < getNumStorageDirs());
assert(getNumStorageDirs() == editStreams.size());
@@ -415,43 +417,27 @@
//
fsimage.processIOError(parentStorageDir);
}
-
- /**
- * report inaccessible storage directories and trigger a fatal error
- */
- private void processAllStorageInaccessible() {
- processFatalError("Fatal Error: All storage directories are inaccessible.");
- }
-
- /**
- * Handle a fatal error
- * @param message message to include in any output
- */
- protected void processFatalError(String message) {
- FSNamesystem.LOG.fatal(message);
- Runtime.getRuntime().exit(-1);
- }
-
+
/**
* If there is an IO Error on any log operations on storage directory,
* remove any stream associated with that directory
*/
synchronized void processIOError(StorageDirectory sd) {
// Try to remove stream only if one should exist
- if (!sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
+ if (!sd.getStorageDirType().isOfType(NameNodeDirType.EDITS))
return;
- }
if (editStreams == null || editStreams.size() <= 1) {
- processAllStorageInaccessible();
+ FSNamesystem.LOG.fatal(
+ "Fatal Error : All storage directories are inaccessible.");
+ Runtime.getRuntime().exit(-1);
}
for (int idx = 0; idx < editStreams.size(); idx++) {
- File parentStorageDir = ((EditLogFileOutputStream) editStreams
- .get(idx)).getFile()
- .getParentFile().getParentFile();
- if (parentStorageDir.getName().equals(sd.getRoot().getName())) {
+ File parentStorageDir = ((EditLogFileOutputStream)editStreams
+ .get(idx)).getFile()
+ .getParentFile().getParentFile();
+ if (parentStorageDir.getName().equals(sd.getRoot().getName()))
editStreams.remove(idx);
- }
- }
+ }
}
/**
@@ -472,8 +458,10 @@
}
}
if (j == numEditStreams) {
- processFatalError("Fatal Error: Unable to find sync log on which " +
- " IO error occured. ");
+ FSNamesystem.LOG.error("Unable to find sync log on which " +
+ " IO error occured. " +
+ "Fatal Error.");
+ Runtime.getRuntime().exit(-1);
}
processIOError(j);
}
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Thu Mar 12 19:43:05 2009
@@ -449,59 +449,6 @@
}
/**
- * Test for a thread ref not being null or pointing to a dead thread
- * @param thread the thread to check
- * @return true if the thread is considered dead
- */
- private boolean isDead(Thread thread) {
- return thread == null || !thread.isAlive();
- }
-
- /**
- * Perform a cursory health check of the namesystem, particulary that it has
- * not been closed and that all threads are running.
- * @throws IOException for any health check
- */
- void ping() throws IOException {
- if (!fsRunning) {
- throw new IOException("Namesystem is not running");
- }
- boolean bad = false;
- StringBuilder sb = new StringBuilder();
- if (isDead(hbthread)) {
- bad = true;
- sb.append("[Heartbeat thread is dead]");
- }
- if (isDead(replthread)) {
- bad = true;
- sb.append("[Replication thread is dead]");
- }
- // this thread's liveness is only relevant in safe mode.
- if (safeMode!=null && isDead(smmthread)) {
- bad = true;
- sb.append("[SafeModeMonitor thread is dead while the name system is in safe mode]");
- }
- if (isDead(dnthread)) {
- bad = true;
- sb.append("[DecommissionedMonitor thread is dead]");
- }
- if (isDead(lmthread)) {
- bad = true;
- sb.append("[Lease monitor thread is dead]");
- }
- if (pendingReplications == null || !pendingReplications.isAlive()) {
- bad = true;
- sb.append("[Pending replication thread is dead]");
- }
- if (this != getFSNamesystem()) {
- bad = true;
- sb.append("[FSNamesystem not a singleton]");
- }
- if (bad) {
- throw new IOException(sb.toString());
- }
- }
- /**
* Close down this file system manager.
* Causes heartbeat and lease daemons to stop; waits briefly for
* them to finish, but a short timeout returns control back to caller.
@@ -523,10 +470,7 @@
lmthread.interrupt();
lmthread.join(3000);
}
- if(dir != null) {
- dir.close();
- dir = null;
- }
+ dir.close();
} catch (InterruptedException ie) {
} catch (IOException ie) {
LOG.error("Error closing FSDirectory", ie);
@@ -1308,13 +1252,9 @@
null,
blockSize);
if (targets.length < this.minReplication) {
- String message = "File " + src + " could only be replicated to " +
- targets.length + " nodes, instead of "
- + minReplication
- + ". ( there are " + heartbeats.size()
- + " live data nodes in the cluster)";
-
- throw new IOException(message);
+ throw new IOException("File " + src + " could only be replicated to " +
+ targets.length + " nodes, instead of " +
+ minReplication);
}
// Allocate a new block and record it in the INode.
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java Thu Mar 12 19:43:05 2009
@@ -45,7 +45,6 @@
import org.apache.hadoop.conf.*;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.Service;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.security.AccessControlException;
@@ -85,7 +84,7 @@
* NameNode implements the ClientProtocol interface, which allows
* clients to ask for DFS services. ClientProtocol is not
* designed for direct use by authors of DFS client code. End-users
- * should instead use the {@link FileSystem} class.
+ * should instead use the org.apache.nutch.hadoop.fs.FileSystem class.
*
* NameNode also implements the DatanodeProtocol interface, used by
* DataNode programs that actually store DFS data blocks. These
@@ -96,7 +95,7 @@
* secondary namenodes or rebalancing processes to get partial namenode's
* state, for example partial blocksMap etc.
**********************************************************/
-public class NameNode extends Service implements ClientProtocol, DatanodeProtocol,
+public class NameNode implements ClientProtocol, DatanodeProtocol,
NamenodeProtocol, FSConstants,
RefreshAuthorizationPolicyProtocol {
static{
@@ -134,6 +133,8 @@
/** HTTP server address */
private InetSocketAddress httpAddress = null;
private Thread emptier;
+ /** only used for testing purposes */
+ private boolean stopRequested = false;
/** Is service level authorization enabled? */
private boolean serviceAuthEnabled = false;
@@ -161,17 +162,7 @@
}
public static InetSocketAddress getAddress(Configuration conf) {
- URI fsURI = FileSystem.getDefaultUri(conf);
- if (fsURI == null) {
- throw new IllegalArgumentException(
- "No default filesystem URI in the configuration");
- }
- String auth = fsURI.getAuthority();
- if (auth == null) {
- throw new IllegalArgumentException(
- "No authority for the Filesystem URI " + fsURI);
- }
- return getAddress(auth);
+ return getAddress(FileSystem.getDefaultUri(conf).getAuthority());
}
public static URI getUri(InetSocketAddress namenode) {
@@ -184,7 +175,6 @@
* Initialize name-node.
*
* @param conf the configuration
- * @throws IOException for problems during initialization
*/
private void initialize(Configuration conf) throws IOException {
InetSocketAddress socAddr = NameNode.getAddress(conf);
@@ -271,7 +261,7 @@
}
/**
- * Create a NameNode.
+ * Start NameNode.
* <p>
* The name-node can be started with one of the following startup options:
* <ul>
@@ -290,49 +280,14 @@
* <code>zero</code> in the conf.
*
* @param conf confirguration
- * @throws IOException for backwards compatibility
+ * @throws IOException
*/
public NameNode(Configuration conf) throws IOException {
- super(conf);
- }
-
- /////////////////////////////////////////////////////
- // Service Lifecycle
- /////////////////////////////////////////////////////
-
- /**
- * This method does all the startup.
- * It is invoked from {@link #start()} when needed.
- *
- * @throws IOException for any problem.
- */
- @Override
- protected void innerStart() throws IOException {
- initialize(getConf());
- setServiceState(ServiceState.LIVE);
- }
-
- /**
- * {@inheritDoc}.
- *
- * This implementation checks for the name system being non-null and live
- * @throws IOException for any ping failure
- * @throws LivenessException if the name system is not running @param status
- */
- @Override
- public void innerPing(ServiceStatus status) throws IOException {
- if (namesystem == null) {
- status.addThrowable(new LivenessException("No name system"));
- } else {
- try {
- namesystem.ping();
- } catch (IOException e) {
- status.addThrowable(e);
- }
- }
- if (httpServer == null || !httpServer.isAlive()) {
- status.addThrowable(
- new IOException("NameNode HttpServer is not running"));
+ try {
+ initialize(conf);
+ } catch (IOException e) {
+ this.stop();
+ throw e;
}
}
@@ -342,81 +297,34 @@
*/
public void join() {
try {
- if (server != null) {
- server.join();
- }
+ this.server.join();
} catch (InterruptedException ie) {
}
}
/**
- * {@inheritDoc}
- * To shut down, this service stops all NameNode threads and waits for them
- * to finish. It also stops the metrics.
+ * Stop all NameNode threads and wait for all to finish.
*/
- @Override
- public synchronized void innerClose() throws IOException {
- LOG.info("Closing NameNode");
+ public void stop() {
+ if (stopRequested)
+ return;
+ stopRequested = true;
try {
- if (httpServer != null) {
- httpServer.stop();
- }
+ if (httpServer != null) httpServer.stop();
} catch (Exception e) {
- LOG.error(StringUtils.stringifyException(e),e);
- }
- httpServer = null;
- if (namesystem != null) {
- namesystem.close();
- }
- if (emptier != null) {
- emptier.interrupt();
- emptier = null;
- }
- if (server != null) {
- server.stop();
- server = null;
+ LOG.error(StringUtils.stringifyException(e));
}
+ if(namesystem != null) namesystem.close();
+ if(emptier != null) emptier.interrupt();
+ if(server != null) server.stop();
if (myMetrics != null) {
myMetrics.shutdown();
}
if (namesystem != null) {
namesystem.shutdown();
- namesystem = null;
}
}
- /**
- * Retained for backwards compatibility.
- */
- public void stop() {
- closeQuietly();
- }
-
- /**
- * {@inheritDoc}
- *
- * @return the name of this service
- */
- @Override
- public String getServiceName() {
- return "NameNode";
- }
-
- /**
- * The toString operator returns the super class name/id, and the state. This
- * gives all services a slightly useful message in a debugger or test report
- *
- * @return a string representation of the object.
- */
- @Override
- public String toString() {
- return getServiceName() + " instance " + super.toString() + " in state "
- + getServiceState()
- + (httpAddress != null ? (" at " + httpAddress + " , "): "")
- + (server != null ? (", IPC " + server.getListenerAddress()) : "");
- }
-
-
/////////////////////////////////////////////////////
// NamenodeProtocol
/////////////////////////////////////////////////////
@@ -1061,7 +969,6 @@
}
NameNode namenode = new NameNode(conf);
- deploy(namenode);
return namenode;
}
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/PendingReplicationBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/PendingReplicationBlocks.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/PendingReplicationBlocks.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/PendingReplicationBlocks.java Thu Mar 12 19:43:05 2009
@@ -136,15 +136,6 @@
}
/**
- * Test for the replicator being alive.
- * @return true if the thread is running.
- */
- boolean isAlive() {
- Daemon daemon = timerThread;
- return daemon != null && daemon.isAlive();
- }
-
- /**
* An object that contains information about a block that
* is being replicated. It records the timestamp when the
* system started replicating the most recent copy of this
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java Thu Mar 12 19:43:05 2009
@@ -1243,26 +1243,11 @@
/**
* Utility that submits a job, then polls for progress until the job is
* complete.
- *
- * @param job the job configuration.
- * @return the job reference of the completed, successful, job
- * @throws IOException any IO problem, and job failure
- */
- public static RunningJob runJob(JobConf job) throws IOException {
- return runJob(job, -1);
- }
-
- /**
- * Utility that submits a job, then polls for progress until the job is
- * complete.
*
* @param job the job configuration.
- * @param timeout timeout in milliseconds; any value less than or equal to
- * zero means "do not time out"
- * @return the job reference of the completed, successful, job
- * @throws IOException any IO problem, and job failure
+ * @throws IOException
*/
- public static RunningJob runJob(JobConf job, long timeout) throws IOException {
+ public static RunningJob runJob(JobConf job) throws IOException {
JobClient jc = new JobClient(job);
boolean error = true;
RunningJob running = null;
@@ -1270,7 +1255,6 @@
final int MAX_RETRIES = 5;
int retries = MAX_RETRIES;
TaskStatusFilter filter;
- long endTime = timeout > 0 ? System.currentTimeMillis() + timeout : 0;
try {
filter = getTaskOutputFilter(job);
} catch(IllegalArgumentException e) {
@@ -1367,10 +1351,6 @@
LOG.info("Communication problem with server: " +
StringUtils.stringifyException(ie));
}
- //check for timeout
- if (endTime > 0 && endTime > System.currentTimeMillis()) {
- throw new IOException("Job execution timed out");
- }
}
if (!running.isSuccessful()) {
throw new IOException("Job failed!");
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java Thu Mar 12 19:43:05 2009
@@ -89,11 +89,7 @@
public static void stopNotifier() {
running = false;
- //copy into a variable to deal with race conditions
- Thread notifier = thread;
- if (notifier != null) {
- notifier.interrupt();
- }
+ thread.interrupt();
}
private static JobEndStatusInfo createNotification(JobConf conf,
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Thu Mar 12 19:43:05 2009
@@ -77,14 +77,13 @@
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.VersionInfo;
-import org.apache.hadoop.util.Service;
/*******************************************************
* JobTracker is the central location for submitting and
* tracking MR jobs in a network environment.
*
*******************************************************/
-public class JobTracker extends Service implements MRConstants, InterTrackerProtocol,
+public class JobTracker implements MRConstants, InterTrackerProtocol,
JobSubmissionProtocol, TaskTrackerManager, RefreshAuthorizationPolicyProtocol {
static{
@@ -111,11 +110,7 @@
public static enum State { INITIALIZING, RUNNING }
State state = State.INITIALIZING;
private static final int SYSTEM_DIR_CLEANUP_RETRY_PERIOD = 10000;
- /**
- * Time in milliseconds to sleep while trying to start the job tracker:
- * {@value}
- */
- private static final int STARTUP_SLEEP_INTERVAL = 1000;
+
private DNSToSwitchMapping dnsToSwitchMapping;
private NetworkTopology clusterMap = new NetworkTopology();
private int numTaskCacheLevels; // the max level to which we cache tasks
@@ -171,7 +166,7 @@
while (true) {
try {
result = new JobTracker(conf);
- deploy(result);
+ result.taskScheduler.setTaskTrackerManager(result);
break;
} catch (VersionMismatch e) {
throw e;
@@ -180,24 +175,19 @@
} catch (UnknownHostException e) {
throw e;
} catch (IOException e) {
- LOG.warn("Error starting tracker: " +
- e.getMessage(), e);
+ LOG.warn("Error starting tracker: " +
+ StringUtils.stringifyException(e));
}
- Thread.sleep(STARTUP_SLEEP_INTERVAL);
+ Thread.sleep(1000);
}
- if (result != null && result.isRunning()) {
+ if (result != null) {
JobEndNotifier.startNotifier();
}
return result;
}
- /**
- * This stops the tracker, the JobEndNotifier and moves the service into the
- * terminated state.
- *
- * @throws IOException for any trouble during closedown
- */
- public synchronized void stopTracker() throws IOException {
+ public void stopTracker() throws IOException {
+ JobEndNotifier.stopNotifier();
close();
}
@@ -1309,7 +1299,7 @@
// (hostname --> Node (NetworkTopology))
Map<String, Node> hostnameToNodeMap =
Collections.synchronizedMap(new TreeMap<String, Node>());
-
+
// Number of resolved entries
int numResolved;
@@ -1361,7 +1351,7 @@
);
// Used to provide an HTML view on Job, Task, and TaskTracker structures
- HttpServer infoServer;
+ final HttpServer infoServer;
int infoPort;
Server interTrackerServer;
@@ -1376,13 +1366,9 @@
private QueueManager queueManager;
/**
- * Create the JobTracker, based on the configuration
- * @param conf configuration to use
- * @throws IOException on problems initializing the tracker
+ * Start the JobTracker process, listen on the indicated port
*/
JobTracker(JobConf conf) throws IOException, InterruptedException {
- super(conf);
- this.conf = conf;
//
// Grab some static constants
//
@@ -1400,6 +1386,10 @@
AVERAGE_BLACKLIST_THRESHOLD =
conf.getFloat("mapred.cluster.average.blacklist.threshold", 0.5f);
+ // This is a directory of temporary submission files. We delete it
+ // on startup, and can delete any files that we're done with
+ this.conf = conf;
+ JobConf jobConf = new JobConf(conf);
// Read the hosts/exclude files to restrict access to the jobtracker.
this.hostsReader = new HostsFileReader(conf.get("mapred.hosts", ""),
@@ -1412,23 +1402,7 @@
= conf.getClass("mapred.jobtracker.taskScheduler",
JobQueueTaskScheduler.class, TaskScheduler.class);
taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass, conf);
- taskScheduler.setTaskTrackerManager(this);
- }
- /**
- * This contains the startup logic moved out of the constructor.
- * It must never be called directly. Instead call {@link Service#start()} and
- * let service decide whether to invoke this method once and once only.
- *
- * Although most of the intialization work has been performed, the
- * JobTracker does not go live until {@link #offerService()} is called.
- * accordingly, JobTracker does not enter the Live state here.
- * @throws IOException for any startup problems
- */
- protected void innerStart() throws IOException {
- // This is a directory of temporary submission files. We delete it
- // on startup, and can delete any files that we're done with
- JobConf jobConf = new JobConf(conf);
// Set ports, start RPC servers, setup security policy etc.
InetSocketAddress addr = getAddress(conf);
this.localMachine = addr.getHostName();
@@ -1484,19 +1458,15 @@
trackerIdentifier = getDateFormat().format(new Date());
Class<? extends JobTrackerInstrumentation> metricsInst = getInstrumentationClass(jobConf);
- //this operation is synchronized to stop findbugs warning of inconsistent
- //access
- synchronized (this) {
- try {
- java.lang.reflect.Constructor<? extends JobTrackerInstrumentation> c =
- metricsInst.getConstructor(new Class[] {JobTracker.class, JobConf.class} );
- this.myInstrumentation = c.newInstance(this, jobConf);
- } catch(Exception e) {
- //Reflection can throw lots of exceptions -- handle them all by
- //falling back on the default.
- LOG.error("failed to initialize job tracker metrics", e);
- this.myInstrumentation = new JobTrackerMetricsInst(this, jobConf);
- }
+ try {
+ java.lang.reflect.Constructor<? extends JobTrackerInstrumentation> c =
+ metricsInst.getConstructor(new Class[] {JobTracker.class, JobConf.class} );
+ this.myInstrumentation = c.newInstance(this, jobConf);
+ } catch(Exception e) {
+ //Reflection can throw lots of exceptions -- handle them all by
+ //falling back on the default.
+ LOG.error("failed to initialize job tracker metrics", e);
+ this.myInstrumentation = new JobTrackerMetricsInst(this, jobConf);
}
@@ -1518,9 +1488,6 @@
// if we haven't contacted the namenode go ahead and do it
if (fs == null) {
fs = FileSystem.get(conf);
- if(fs == null) {
- throw new IllegalStateException("Unable to bind to the filesystem");
- }
}
// clean up the system dir, which will only work if hdfs is out of
// safe mode
@@ -1562,14 +1529,9 @@
((RemoteException)ie).getClassName())) {
throw ie;
}
- LOG.info("problem cleaning system directory: " + systemDir + ": " + ie, ie);
- }
- try {
- Thread.sleep(SYSTEM_DIR_CLEANUP_RETRY_PERIOD);
- } catch (InterruptedException e) {
- throw new IOException("Interrupted during system directory cleanup ",
- e);
+ LOG.info("problem cleaning system directory: " + systemDir, ie);
}
+ Thread.sleep(SYSTEM_DIR_CLEANUP_RETRY_PERIOD);
}
// Same with 'localDir' except it's always on the local disk.
jobConf.deleteLocalFiles(SUBDIR);
@@ -1591,11 +1553,7 @@
NetworkTopology.DEFAULT_HOST_LEVEL);
//initializes the job status store
- //this operation is synchronized to stop findbugs warning of inconsistent
- //access
- synchronized(this) {
- completedJobStatusStore = new CompletedJobStatusStore(conf, fs);
- }
+ completedJobStatusStore = new CompletedJobStatusStore(conf,fs);
}
private static SimpleDateFormat getDateFormat() {
@@ -1660,16 +1618,9 @@
}
/**
- * Run forever.
- * Change the system state to indicate that we are live
- * @throws InterruptedException interrupted operations
- * @throws IOException IO Problems
+ * Run forever
*/
public void offerService() throws InterruptedException, IOException {
- if(!enterLiveState()) {
- //catch re-entrancy by returning early
- return;
- };
taskScheduler.start();
// Start the recovery after starting the scheduler
@@ -1686,139 +1637,79 @@
this.retireJobsThread.start();
expireLaunchingTaskThread.start();
- synchronized (this) {
- //this is synchronized to stop findbugs warning
- if (completedJobStatusStore.isActive()) {
- completedJobsStoreThread = new Thread(completedJobStatusStore,
- "completedjobsStore-housekeeper");
- completedJobsStoreThread.start();
- }
+ if (completedJobStatusStore.isActive()) {
+ completedJobsStoreThread = new Thread(completedJobStatusStore,
+ "completedjobsStore-housekeeper");
+ completedJobsStoreThread.start();
}
- LOG.info("Starting interTrackerServer");
// start the inter-tracker server once the jt is ready
this.interTrackerServer.start();
-
+ synchronized (this) {
+ state = State.RUNNING;
+ }
LOG.info("Starting RUNNING");
+
this.interTrackerServer.join();
LOG.info("Stopped interTrackerServer");
}
- /////////////////////////////////////////////////////
- // Service Lifecycle
- /////////////////////////////////////////////////////
-
- /**
- * {@inheritDoc}
- *
- * @param status a status that can be updated with problems
- * @throws IOException for any problem
- */
- @Override
- public void innerPing(ServiceStatus status) throws IOException {
- if (infoServer == null || !infoServer.isAlive()) {
- status.addThrowable(
- new IOException("TaskTracker HttpServer is not running on port "
- + infoPort));
- }
- if (interTrackerServer == null) {
- status.addThrowable(
- new IOException("InterTrackerServer is not running"));
- }
- }
-
- /**
- * This service shuts down by stopping the
- * {@link JobEndNotifier} and then closing down the job
- * tracker
- *
- * @throws IOException exceptions which will be logged
- */
- @Override
- protected void innerClose() throws IOException {
- JobEndNotifier.stopNotifier();
- closeJobTracker();
- }
-
- /**
- * Close down all the Job tracker threads, and the
- * task scheduler.
- * This was package scoped, but has been made private so that
- * it does not get used. Callers should call {@link #close()} to
- * stop a JobTracker
- * @throws IOException if problems occur
- */
- private void closeJobTracker() throws IOException {
- if (infoServer != null) {
+ void close() throws IOException {
+ if (this.infoServer != null) {
LOG.info("Stopping infoServer");
try {
- infoServer.stop();
+ this.infoServer.stop();
} catch (Exception ex) {
LOG.warn("Exception shutting down JobTracker", ex);
}
}
- if (interTrackerServer != null) {
+ if (this.interTrackerServer != null) {
LOG.info("Stopping interTrackerServer");
- interTrackerServer.stop();
+ this.interTrackerServer.stop();
+ }
+ if (this.expireTrackersThread != null && this.expireTrackersThread.isAlive()) {
+ LOG.info("Stopping expireTrackers");
+ this.expireTrackersThread.interrupt();
+ try {
+ this.expireTrackersThread.join();
+ } catch (InterruptedException ex) {
+ ex.printStackTrace();
+ }
+ }
+ if (this.retireJobsThread != null && this.retireJobsThread.isAlive()) {
+ LOG.info("Stopping retirer");
+ this.retireJobsThread.interrupt();
+ try {
+ this.retireJobsThread.join();
+ } catch (InterruptedException ex) {
+ ex.printStackTrace();
+ }
}
- retireThread("expireTrackersThread", expireTrackersThread);
- retireThread("retirer", retireJobsThread);
if (taskScheduler != null) {
taskScheduler.terminate();
}
- retireThread("expireLaunchingTasks", expireLaunchingTaskThread);
- retireThread("completedJobsStore thread", completedJobsStoreThread);
- LOG.info("stopped all jobtracker services");
- }
-
- /**
- * Close the filesystem without raising an exception. At the end of this
- * method, fs==null.
- * Warning: closing the FS may make it unusable for other clients in the same JVM.
- */
- protected synchronized void closeTheFilesystemQuietly() {
- if (fs != null) {
+ if (this.expireLaunchingTaskThread != null && this.expireLaunchingTaskThread.isAlive()) {
+ LOG.info("Stopping expireLaunchingTasks");
+ this.expireLaunchingTaskThread.interrupt();
try {
- fs.close();
- } catch (IOException e) {
- LOG.warn("When closing the filesystem: " + e, e);
+ this.expireLaunchingTaskThread.join();
+ } catch (InterruptedException ex) {
+ ex.printStackTrace();
}
- fs = null;
}
- }
-
- /**
- * Retire a named thread if it is not null and still alive. The thread will be
- * interruped and then joined.
- *
- * @param name thread name for log messages
- * @param thread thread -can be null.
- * @return true if the thread was shut down; false implies this thread was
- * interrupted.
- */
- protected boolean retireThread(String name, Thread thread) {
- if (thread != null && thread.isAlive()) {
- LOG.info("Stopping " + name);
- thread.interrupt();
+ if (this.completedJobsStoreThread != null &&
+ this.completedJobsStoreThread.isAlive()) {
+ LOG.info("Stopping completedJobsStore thread");
+ this.completedJobsStoreThread.interrupt();
try {
- thread.join();
+ this.completedJobsStoreThread.join();
} catch (InterruptedException ex) {
- LOG.info("interruped during " + name + " shutdown", ex);
- return false;
+ ex.printStackTrace();
}
}
- return true;
- }
-
- /**
- * {@inheritDoc}
- *
- * @return the name of this service
- */
- @Override
- public String getServiceName() {
- return "JobTracker";
+ LOG.info("stopped all jobtracker services");
+ return;
}
///////////////////////////////////////////////////////
@@ -1956,7 +1847,7 @@
}
/**
- * Call {@link #removeTaskEntry(TaskAttemptID)} for each of the
+ * Call {@link #removeTaskEntry(String)} for each of the
* job's tasks.
* When the JobTracker is retiring the long-completed
* job, either because it has outlived {@link #RETIRE_JOB_INTERVAL}
@@ -2339,7 +2230,7 @@
return numTaskCacheLevels;
}
public int getNumResolvedTaskTrackers() {
- return taskTrackers.size();
+ return numResolved;
}
public int getNumberOfUniqueHosts() {
@@ -2866,7 +2757,6 @@
* Allocates a new JobId string.
*/
public synchronized JobID getNewJobId() throws IOException {
- verifyServiceState(ServiceState.LIVE);
return new JobID(getTrackerIdentifier(), nextJobId++);
}
@@ -2879,7 +2769,6 @@
* the JobTracker alone.
*/
public synchronized JobStatus submitJob(JobID jobId) throws IOException {
- verifyServiceState(ServiceState.LIVE);
if(jobs.containsKey(jobId)) {
//job already running, don't start twice
return jobs.get(jobId).getStatus();
@@ -2962,10 +2851,6 @@
public synchronized ClusterStatus getClusterStatus(boolean detailed) {
synchronized (taskTrackers) {
- //backport the service state into the job tracker state
- State state = getServiceState() == ServiceState.LIVE ?
- State.RUNNING :
- State.INITIALIZING;
if (detailed) {
List<List<String>> trackerNames = taskTrackerNames();
return new ClusterStatus(trackerNames.get(0),
@@ -3282,10 +3167,6 @@
* @see org.apache.hadoop.mapred.JobSubmissionProtocol#getSystemDir()
*/
public String getSystemDir() {
- if (fs == null) {
- throw new java.lang.IllegalStateException("Filesystem is null; "
- + "JobTracker is not live: " + this);
- }
Path sysDir = new Path(conf.get("mapred.system.dir", "/tmp/hadoop/mapred/system"));
return fs.makeQualified(sysDir).toString();
}