You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by jl...@apache.org on 2012/10/02 17:24:27 UTC
svn commit: r1392967 - in
/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: CHANGES.txt
hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/ipc/TestSocketFactory.java
Author: jlowe
Date: Tue Oct 2 15:24:26 2012
New Revision: 1392967
URL: http://svn.apache.org/viewvc?rev=1392967&view=rev
Log:
svn merge -c 1325010 FIXES: MAPREDUCE-4107. Fix tests in org.apache.hadoop.ipc.TestSocketFactory (Devaraj K via tgraves)
Modified:
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/ipc/TestSocketFactory.java
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1392967&r1=1392966&r2=1392967&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Tue Oct 2 15:24:26 2012
@@ -15,6 +15,9 @@ Release 0.23.5 - UNRELEASED
MAPREDUCE-4674. Hadoop examples secondarysort has a typo
"secondarysrot" in the usage. (Robert Justice via eli)
+ MAPREDUCE-4107. Fix tests in org.apache.hadoop.ipc.TestSocketFactory
+ (Devaraj K via tgraves)
+
Release 0.23.4 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/ipc/TestSocketFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/ipc/TestSocketFactory.java?rev=1392967&r1=1392966&r2=1392967&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/ipc/TestSocketFactory.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/ipc/TestSocketFactory.java Tue Oct 2 15:24:26 2012
@@ -22,124 +22,149 @@ import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
-import junit.framework.TestCase;
-
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobStatus;
-import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
import org.apache.hadoop.net.StandardSocketFactory;
-import org.junit.Ignore;
+import org.junit.Assert;
+import org.junit.Test;
/**
* This class checks that RPCs can use specialized socket factories.
*/
-@Ignore
-public class TestSocketFactory extends TestCase {
+public class TestSocketFactory {
/**
- * Check that we can reach a NameNode or a JobTracker using a specific
+ * Check that we can reach a NameNode or Resource Manager using a specific
* socket factory
*/
+ @Test
public void testSocketFactory() throws IOException {
// Create a standard mini-cluster
Configuration sconf = new Configuration();
- MiniDFSCluster cluster = new MiniDFSCluster(sconf, 1, true, null);
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(sconf).numDataNodes(1)
+ .build();
final int nameNodePort = cluster.getNameNodePort();
// Get a reference to its DFS directly
FileSystem fs = cluster.getFileSystem();
- assertTrue(fs instanceof DistributedFileSystem);
+ Assert.assertTrue(fs instanceof DistributedFileSystem);
DistributedFileSystem directDfs = (DistributedFileSystem) fs;
- // Get another reference via network using a specific socket factory
- Configuration cconf = new Configuration();
- FileSystem.setDefaultUri(cconf, String.format("hdfs://localhost:%s/",
- nameNodePort + 10));
- cconf.set("hadoop.rpc.socket.factory.class.default",
- "org.apache.hadoop.ipc.DummySocketFactory");
- cconf.set("hadoop.rpc.socket.factory.class.ClientProtocol",
- "org.apache.hadoop.ipc.DummySocketFactory");
- cconf.set("hadoop.rpc.socket.factory.class.JobSubmissionProtocol",
- "org.apache.hadoop.ipc.DummySocketFactory");
+ Configuration cconf = getCustomSocketConfigs(nameNodePort);
fs = FileSystem.get(cconf);
- assertTrue(fs instanceof DistributedFileSystem);
+ Assert.assertTrue(fs instanceof DistributedFileSystem);
DistributedFileSystem dfs = (DistributedFileSystem) fs;
JobClient client = null;
- MiniMRCluster mr = null;
+ MiniMRYarnCluster miniMRYarnCluster = null;
try {
// This will test RPC to the NameNode only.
// could we test Client-DataNode connections?
Path filePath = new Path("/dir");
- assertFalse(directDfs.exists(filePath));
- assertFalse(dfs.exists(filePath));
+ Assert.assertFalse(directDfs.exists(filePath));
+ Assert.assertFalse(dfs.exists(filePath));
directDfs.mkdirs(filePath);
- assertTrue(directDfs.exists(filePath));
- assertTrue(dfs.exists(filePath));
+ Assert.assertTrue(directDfs.exists(filePath));
+ Assert.assertTrue(dfs.exists(filePath));
- // This will test TPC to a JobTracker
+ // This will test RPC to a Resource Manager
fs = FileSystem.get(sconf);
- mr = new MiniMRCluster(1, fs.getUri().toString(), 1);
- final int jobTrackerPort = mr.getJobTrackerPort();
-
+ JobConf jobConf = new JobConf();
+ FileSystem.setDefaultUri(jobConf, fs.getUri().toString());
+ miniMRYarnCluster = initAndStartMiniMRYarnCluster(jobConf);
JobConf jconf = new JobConf(cconf);
- jconf.set("mapred.job.tracker", String.format("localhost:%d",
- jobTrackerPort + 10));
- jconf.set(MRConfig.FRAMEWORK_NAME, MRConfig.CLASSIC_FRAMEWORK_NAME);
+ jconf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
+ String rmAddress = jconf.get("yarn.resourcemanager.address");
+ String[] split = rmAddress.split(":");
+ jconf.set("yarn.resourcemanager.address", split[0] + ':'
+ + (Integer.parseInt(split[1]) + 10));
client = new JobClient(jconf);
JobStatus[] jobs = client.jobsToComplete();
- assertTrue(jobs.length == 0);
+ Assert.assertTrue(jobs.length == 0);
} finally {
- try {
- if (client != null)
- client.close();
- } catch (Exception ignored) {
- // nothing we can do
- ignored.printStackTrace();
- }
- try {
- if (dfs != null)
- dfs.close();
-
- } catch (Exception ignored) {
- // nothing we can do
- ignored.printStackTrace();
- }
- try {
- if (directDfs != null)
- directDfs.close();
-
- } catch (Exception ignored) {
- // nothing we can do
- ignored.printStackTrace();
- }
- try {
- if (cluster != null)
- cluster.shutdown();
-
- } catch (Exception ignored) {
- // nothing we can do
- ignored.printStackTrace();
- }
- if (mr != null) {
- try {
- mr.shutdown();
- } catch (Exception ignored) {
- ignored.printStackTrace();
- }
- }
+ closeClient(client);
+ closeDfs(dfs);
+ closeDfs(directDfs);
+ stopMiniMRYarnCluster(miniMRYarnCluster);
+ shutdownDFSCluster(cluster);
+ }
+ }
+
+ private MiniMRYarnCluster initAndStartMiniMRYarnCluster(JobConf jobConf) {
+ MiniMRYarnCluster miniMRYarnCluster;
+ miniMRYarnCluster = new MiniMRYarnCluster(this.getClass().getName(), 1);
+ miniMRYarnCluster.init(jobConf);
+ miniMRYarnCluster.start();
+ return miniMRYarnCluster;
+ }
+
+ private Configuration getCustomSocketConfigs(final int nameNodePort) {
+ // Get another reference via network using a specific socket factory
+ Configuration cconf = new Configuration();
+ FileSystem.setDefaultUri(cconf, String.format("hdfs://localhost:%s/",
+ nameNodePort + 10));
+ cconf.set("hadoop.rpc.socket.factory.class.default",
+ "org.apache.hadoop.ipc.DummySocketFactory");
+ cconf.set("hadoop.rpc.socket.factory.class.ClientProtocol",
+ "org.apache.hadoop.ipc.DummySocketFactory");
+ cconf.set("hadoop.rpc.socket.factory.class.JobSubmissionProtocol",
+ "org.apache.hadoop.ipc.DummySocketFactory");
+ return cconf;
+ }
+
+ private void shutdownDFSCluster(MiniDFSCluster cluster) {
+ try {
+ if (cluster != null)
+ cluster.shutdown();
+
+ } catch (Exception ignored) {
+ // nothing we can do
+ ignored.printStackTrace();
+ }
+ }
+
+ private void stopMiniMRYarnCluster(MiniMRYarnCluster miniMRYarnCluster) {
+ try {
+ if (miniMRYarnCluster != null)
+ miniMRYarnCluster.stop();
+
+ } catch (Exception ignored) {
+ // nothing we can do
+ ignored.printStackTrace();
+ }
+ }
+
+ private void closeDfs(DistributedFileSystem dfs) {
+ try {
+ if (dfs != null)
+ dfs.close();
+
+ } catch (Exception ignored) {
+ // nothing we can do
+ ignored.printStackTrace();
+ }
+ }
+
+ private void closeClient(JobClient client) {
+ try {
+ if (client != null)
+ client.close();
+ } catch (Exception ignored) {
+ // nothing we can do
+ ignored.printStackTrace();
}
}
}
@@ -155,32 +180,27 @@ class DummySocketFactory extends Standar
public DummySocketFactory() {
}
- /* @inheritDoc */
@Override
public Socket createSocket() throws IOException {
return new Socket() {
@Override
- public void connect(SocketAddress addr, int timeout)
- throws IOException {
+ public void connect(SocketAddress addr, int timeout) throws IOException {
assert (addr instanceof InetSocketAddress);
InetSocketAddress iaddr = (InetSocketAddress) addr;
SocketAddress newAddr = null;
if (iaddr.isUnresolved())
- newAddr =
- new InetSocketAddress(iaddr.getHostName(),
- iaddr.getPort() - 10);
+ newAddr = new InetSocketAddress(iaddr.getHostName(),
+ iaddr.getPort() - 10);
else
- newAddr =
- new InetSocketAddress(iaddr.getAddress(), iaddr.getPort() - 10);
- System.out.printf("Test socket: rerouting %s to %s\n", iaddr,
- newAddr);
+ newAddr = new InetSocketAddress(iaddr.getAddress(),
+ iaddr.getPort() - 10);
+ System.out.printf("Test socket: rerouting %s to %s\n", iaddr, newAddr);
super.connect(newAddr, timeout);
}
};
}
- /* @inheritDoc */
@Override
public boolean equals(Object obj) {
if (this == obj)
@@ -191,11 +211,4 @@ class DummySocketFactory extends Standar
return false;
return true;
}
-
- /* @inheritDoc */
- @Override
- public int hashCode() {
- // Dummy hash code (to make find bugs happy)
- return 53;
- }
}