You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by jl...@apache.org on 2012/10/02 17:24:27 UTC

svn commit: r1392967 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: CHANGES.txt hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/ipc/TestSocketFactory.java

Author: jlowe
Date: Tue Oct  2 15:24:26 2012
New Revision: 1392967

URL: http://svn.apache.org/viewvc?rev=1392967&view=rev
Log:
svn merge -c 1325010 FIXES: MAPREDUCE-4107. Fix tests in org.apache.hadoop.ipc.TestSocketFactory (Devaraj K via tgraves)

Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/ipc/TestSocketFactory.java

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1392967&r1=1392966&r2=1392967&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Tue Oct  2 15:24:26 2012
@@ -15,6 +15,9 @@ Release 0.23.5 - UNRELEASED
     MAPREDUCE-4674. Hadoop examples secondarysort has a typo
     "secondarysrot" in the usage. (Robert Justice via eli)
 
+    MAPREDUCE-4107. Fix tests in org.apache.hadoop.ipc.TestSocketFactory
+    (Devaraj K via tgraves)
+
 Release 0.23.4 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/ipc/TestSocketFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/ipc/TestSocketFactory.java?rev=1392967&r1=1392966&r2=1392967&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/ipc/TestSocketFactory.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/ipc/TestSocketFactory.java Tue Oct  2 15:24:26 2012
@@ -22,124 +22,149 @@ import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketAddress;
 
-import junit.framework.TestCase;
-
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobStatus;
-import org.apache.hadoop.mapred.MiniMRCluster;
 import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
 import org.apache.hadoop.net.StandardSocketFactory;
-import org.junit.Ignore;
+import org.junit.Assert;
+import org.junit.Test;
 
 /**
  * This class checks that RPCs can use specialized socket factories.
  */
-@Ignore
-public class TestSocketFactory extends TestCase {
+public class TestSocketFactory {
 
   /**
-   * Check that we can reach a NameNode or a JobTracker using a specific
+   * Check that we can reach a NameNode or Resource Manager using a specific
    * socket factory
    */
+  @Test
   public void testSocketFactory() throws IOException {
     // Create a standard mini-cluster
     Configuration sconf = new Configuration();
-    MiniDFSCluster cluster = new MiniDFSCluster(sconf, 1, true, null);
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(sconf).numDataNodes(1)
+        .build();
     final int nameNodePort = cluster.getNameNodePort();
 
     // Get a reference to its DFS directly
     FileSystem fs = cluster.getFileSystem();
-    assertTrue(fs instanceof DistributedFileSystem);
+    Assert.assertTrue(fs instanceof DistributedFileSystem);
     DistributedFileSystem directDfs = (DistributedFileSystem) fs;
 
-    // Get another reference via network using a specific socket factory
-    Configuration cconf = new Configuration();
-    FileSystem.setDefaultUri(cconf, String.format("hdfs://localhost:%s/",
-        nameNodePort + 10));
-    cconf.set("hadoop.rpc.socket.factory.class.default",
-        "org.apache.hadoop.ipc.DummySocketFactory");
-    cconf.set("hadoop.rpc.socket.factory.class.ClientProtocol",
-        "org.apache.hadoop.ipc.DummySocketFactory");
-    cconf.set("hadoop.rpc.socket.factory.class.JobSubmissionProtocol",
-        "org.apache.hadoop.ipc.DummySocketFactory");
+    Configuration cconf = getCustomSocketConfigs(nameNodePort);
 
     fs = FileSystem.get(cconf);
-    assertTrue(fs instanceof DistributedFileSystem);
+    Assert.assertTrue(fs instanceof DistributedFileSystem);
     DistributedFileSystem dfs = (DistributedFileSystem) fs;
 
     JobClient client = null;
-    MiniMRCluster mr = null;
+    MiniMRYarnCluster miniMRYarnCluster = null;
     try {
       // This will test RPC to the NameNode only.
       // could we test Client-DataNode connections?
       Path filePath = new Path("/dir");
 
-      assertFalse(directDfs.exists(filePath));
-      assertFalse(dfs.exists(filePath));
+      Assert.assertFalse(directDfs.exists(filePath));
+      Assert.assertFalse(dfs.exists(filePath));
 
       directDfs.mkdirs(filePath);
-      assertTrue(directDfs.exists(filePath));
-      assertTrue(dfs.exists(filePath));
+      Assert.assertTrue(directDfs.exists(filePath));
+      Assert.assertTrue(dfs.exists(filePath));
 
-      // This will test TPC to a JobTracker
+      // This will test RPC to a Resource Manager
       fs = FileSystem.get(sconf);
-      mr = new MiniMRCluster(1, fs.getUri().toString(), 1);
-      final int jobTrackerPort = mr.getJobTrackerPort();
-
+      JobConf jobConf = new JobConf();
+      FileSystem.setDefaultUri(jobConf, fs.getUri().toString());
+      miniMRYarnCluster = initAndStartMiniMRYarnCluster(jobConf);
       JobConf jconf = new JobConf(cconf);
-      jconf.set("mapred.job.tracker", String.format("localhost:%d",
-          jobTrackerPort + 10));
-      jconf.set(MRConfig.FRAMEWORK_NAME, MRConfig.CLASSIC_FRAMEWORK_NAME);
+      jconf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
+      String rmAddress = jconf.get("yarn.resourcemanager.address");
+      String[] split = rmAddress.split(":");
+      jconf.set("yarn.resourcemanager.address", split[0] + ':'
+          + (Integer.parseInt(split[1]) + 10));
       client = new JobClient(jconf);
 
       JobStatus[] jobs = client.jobsToComplete();
-      assertTrue(jobs.length == 0);
+      Assert.assertTrue(jobs.length == 0);
 
     } finally {
-      try {
-        if (client != null)
-          client.close();
-      } catch (Exception ignored) {
-        // nothing we can do
-        ignored.printStackTrace();
-      }
-      try {
-        if (dfs != null)
-          dfs.close();
-
-      } catch (Exception ignored) {
-        // nothing we can do
-        ignored.printStackTrace();
-      }
-      try {
-        if (directDfs != null)
-          directDfs.close();
-
-      } catch (Exception ignored) {
-        // nothing we can do
-        ignored.printStackTrace();
-      }
-      try {
-        if (cluster != null)
-          cluster.shutdown();
-
-      } catch (Exception ignored) {
-        // nothing we can do
-        ignored.printStackTrace();
-      }
-      if (mr != null) {
-        try {
-          mr.shutdown();
-        } catch (Exception ignored) {
-          ignored.printStackTrace();
-        }
-      }
+      closeClient(client);
+      closeDfs(dfs);
+      closeDfs(directDfs);
+      stopMiniMRYarnCluster(miniMRYarnCluster);
+      shutdownDFSCluster(cluster);
+    }
+  }
+
+  private MiniMRYarnCluster initAndStartMiniMRYarnCluster(JobConf jobConf) {
+    MiniMRYarnCluster miniMRYarnCluster;
+    miniMRYarnCluster = new MiniMRYarnCluster(this.getClass().getName(), 1);
+    miniMRYarnCluster.init(jobConf);
+    miniMRYarnCluster.start();
+    return miniMRYarnCluster;
+  }
+
+  private Configuration getCustomSocketConfigs(final int nameNodePort) {
+    // Get another reference via network using a specific socket factory
+    Configuration cconf = new Configuration();
+    FileSystem.setDefaultUri(cconf, String.format("hdfs://localhost:%s/",
+        nameNodePort + 10));
+    cconf.set("hadoop.rpc.socket.factory.class.default",
+        "org.apache.hadoop.ipc.DummySocketFactory");
+    cconf.set("hadoop.rpc.socket.factory.class.ClientProtocol",
+        "org.apache.hadoop.ipc.DummySocketFactory");
+    cconf.set("hadoop.rpc.socket.factory.class.JobSubmissionProtocol",
+        "org.apache.hadoop.ipc.DummySocketFactory");
+    return cconf;
+  }
+
+  private void shutdownDFSCluster(MiniDFSCluster cluster) {
+    try {
+      if (cluster != null)
+        cluster.shutdown();
+
+    } catch (Exception ignored) {
+      // nothing we can do
+      ignored.printStackTrace();
+    }
+  }
+
+  private void stopMiniMRYarnCluster(MiniMRYarnCluster miniMRYarnCluster) {
+    try {
+      if (miniMRYarnCluster != null)
+        miniMRYarnCluster.stop();
+
+    } catch (Exception ignored) {
+      // nothing we can do
+      ignored.printStackTrace();
+    }
+  }
+
+  private void closeDfs(DistributedFileSystem dfs) {
+    try {
+      if (dfs != null)
+        dfs.close();
+
+    } catch (Exception ignored) {
+      // nothing we can do
+      ignored.printStackTrace();
+    }
+  }
+
+  private void closeClient(JobClient client) {
+    try {
+      if (client != null)
+        client.close();
+    } catch (Exception ignored) {
+      // nothing we can do
+      ignored.printStackTrace();
     }
   }
 }
@@ -155,32 +180,27 @@ class DummySocketFactory extends Standar
   public DummySocketFactory() {
   }
 
-  /* @inheritDoc */
   @Override
   public Socket createSocket() throws IOException {
     return new Socket() {
       @Override
-      public void connect(SocketAddress addr, int timeout)
-          throws IOException {
+      public void connect(SocketAddress addr, int timeout) throws IOException {
 
         assert (addr instanceof InetSocketAddress);
         InetSocketAddress iaddr = (InetSocketAddress) addr;
         SocketAddress newAddr = null;
         if (iaddr.isUnresolved())
-          newAddr =
-              new InetSocketAddress(iaddr.getHostName(),
-                  iaddr.getPort() - 10);
+          newAddr = new InetSocketAddress(iaddr.getHostName(),
+              iaddr.getPort() - 10);
         else
-          newAddr =
-              new InetSocketAddress(iaddr.getAddress(), iaddr.getPort() - 10);
-        System.out.printf("Test socket: rerouting %s to %s\n", iaddr,
-            newAddr);
+          newAddr = new InetSocketAddress(iaddr.getAddress(),
+              iaddr.getPort() - 10);
+        System.out.printf("Test socket: rerouting %s to %s\n", iaddr, newAddr);
         super.connect(newAddr, timeout);
       }
     };
   }
 
-  /* @inheritDoc */
   @Override
   public boolean equals(Object obj) {
     if (this == obj)
@@ -191,11 +211,4 @@ class DummySocketFactory extends Standar
       return false;
     return true;
   }
-
-  /* @inheritDoc */
-  @Override
-  public int hashCode() {
-    // Dummy hash code (to make find bugs happy)
-    return 53;
-  }
 }