You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2012/03/08 20:35:29 UTC

svn commit: r1298532 - in /incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp: TestBSPTaskFaults.java TestCheckpoint.java

Author: tjungblut
Date: Thu Mar  8 19:35:29 2012
New Revision: 1298532

URL: http://svn.apache.org/viewvc?rev=1298532&view=rev
Log:
Testcase fix (contributed by Suraj Menon)

Modified:
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java?rev=1298532&r1=1298531&r2=1298532&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java Thu Mar  8 19:35:29 2012
@@ -59,18 +59,24 @@ public class TestBSPTaskFaults extends T
 
   public static final Log LOG = LogFactory.getLog(HamaTestCase.class);
 
-  private static final int PORT = 54321;
   public static final String TEST_POINT = "bsp.ft.test.point";
-
+  public static final String TEST_GROOM_PORT = "bsp.ft.test.groomport";
+  private static int TEST_NUMBER = 0;
+  
+  
   private volatile MinimalGroomServer groom;
   private volatile BSPPeerProtocol umbilical;
   private Server workerServer;
   private TaskAttemptID taskid = new TaskAttemptID(new TaskID(new BSPJobID(
       "job_201110302255", 1), 1), 1);
 
-  public volatile static HamaConfiguration conf;
+  public volatile HamaConfiguration conf;
 
   private ScheduledExecutorService testBSPTaskService;
+  
+  private static synchronized int incrementTestNumber(){
+    return ++TEST_NUMBER;
+  }
 
   @SuppressWarnings("unused")
   public static class MinimalGroomServer implements BSPPeerProtocol {
@@ -195,11 +201,15 @@ public class TestBSPTaskFaults extends T
     private Process bspTaskProcess;
     private Thread errorLog;
     private Thread infoLog;
+    private int testPoint;
+    private int testPort;
 
-    TestBSPProcessRunner() {
+    TestBSPProcessRunner(int point, int port) {
       sched = Executors.newScheduledThreadPool(1);
       future = new AtomicReference<ScheduledFuture<Integer>>();
       bspTaskProcess = null;
+      testPoint = point;
+      testPort = port;
     }
 
     private void readStream(InputStream input) throws IOException {
@@ -253,8 +263,9 @@ public class TestBSPTaskFaults extends T
       commands.add(TestBSPProcessRunner.class.getName());
 
       LOG.info("starting process for failure case - "
-          + conf.getInt(TEST_POINT, 0));
-      commands.add("" + conf.getInt(TEST_POINT, 0));
+          + testPoint);
+      commands.add("" + testPoint);
+      commands.add("" + testPort);
 
       LOG.info(commands.toString());
 
@@ -308,16 +319,14 @@ public class TestBSPTaskFaults extends T
       TaskAttemptID tid = new TaskAttemptID(new TaskID(new BSPJobID(
           "job_201110102255", 1), 1), 1);
 
-      if (args.length > 0) {
-        hamaConf.setInt(TEST_POINT, Integer.parseInt(args[0]));
-
-      }
+      hamaConf.setInt(TEST_POINT, Integer.parseInt(args[0]));
+      int port = Integer.parseInt(args[1]);
 
       try {
         BSPJob job = new BSPJob(hamaConf);
         final BSPPeerProtocol proto = (BSPPeerProtocol) RPC.getProxy(
             BSPPeerProtocol.class, BSPPeerProtocol.versionID,
-            new InetSocketAddress("127.0.0.1", 54321), hamaConf);
+            new InetSocketAddress("127.0.0.1", port), hamaConf);
 
         BSPTask task = new BSPTask();
         task.setConf(job);
@@ -406,7 +415,8 @@ public class TestBSPTaskFaults extends T
     conf.setClass(SyncServiceFactory.SYNC_CLIENT_CLASS,
         LocalBSPRunner.LocalSyncClient.class, SyncClient.class);
 
-    InetSocketAddress inetAddress = new InetSocketAddress(PORT);
+    int testNumber = incrementTestNumber();
+    InetSocketAddress inetAddress = new InetSocketAddress(54321 + testNumber);
     groom = new MinimalGroomServer(conf);
     workerServer = RPC.getServer(groom, inetAddress.getHostName(),
         inetAddress.getPort(), conf);
@@ -444,7 +454,7 @@ public class TestBSPTaskFaults extends T
     CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
         this.testBSPTaskService);
     Future<Integer> future = completionService
-        .submit(new TestBSPProcessRunner());
+        .submit(new TestBSPProcessRunner(0, workerServer.getListenerAddress().getPort()));
 
     try {
       future.get(20000, TimeUnit.MILLISECONDS);
@@ -471,10 +481,11 @@ public class TestBSPTaskFaults extends T
     LOG.info("Testing ping failure case - 1");
     conf.setInt(TEST_POINT, 1);
 
-    CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
-        this.testBSPTaskService);
+    CompletionService<Integer> completionService = 
+        new ExecutorCompletionService<Integer>(this.testBSPTaskService);
     Future<Integer> future = completionService
-        .submit(new TestBSPProcessRunner());
+        .submit(new TestBSPProcessRunner(1, 
+            workerServer.getListenerAddress().getPort()));
 
     try {
       future.get(20000, TimeUnit.MILLISECONDS);
@@ -496,10 +507,11 @@ public class TestBSPTaskFaults extends T
 
     LOG.info("Testing ping failure case - 2");
     conf.setInt(TEST_POINT, 2);
-    CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
-        this.testBSPTaskService);
+    CompletionService<Integer> completionService = 
+        new ExecutorCompletionService<Integer>(this.testBSPTaskService);
     Future<Integer> future = completionService
-        .submit(new TestBSPProcessRunner());
+        .submit(new TestBSPProcessRunner(2, 
+            workerServer.getListenerAddress().getPort()));
 
     try {
       future.get(20000, TimeUnit.MILLISECONDS);
@@ -522,10 +534,12 @@ public class TestBSPTaskFaults extends T
     LOG.info("Testing ping failure case - 3");
 
     conf.setInt(TEST_POINT, 3);
-    CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
-        this.testBSPTaskService);
+    CompletionService<Integer> completionService = 
+        new ExecutorCompletionService<Integer>(this.testBSPTaskService);
+    
     Future<Integer> future = completionService
-        .submit(new TestBSPProcessRunner());
+        .submit(new TestBSPProcessRunner(3, 
+            workerServer.getListenerAddress().getPort()));
 
     try {
       future.get(20000, TimeUnit.MILLISECONDS);
@@ -546,10 +560,11 @@ public class TestBSPTaskFaults extends T
   public void testBSPTaskSelfDestroy() {
     LOG.info("Testing self kill on lost contact.");
 
-    CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
-        this.testBSPTaskService);
+    CompletionService<Integer> completionService = 
+        new ExecutorCompletionService<Integer>(this.testBSPTaskService);
     Future<Integer> future = completionService
-        .submit(new TestBSPProcessRunner());
+        .submit(new TestBSPProcessRunner(0, 
+            workerServer.getListenerAddress().getPort()));
 
     try {
       while (groom.pingCount == 0) {

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java?rev=1298532&r1=1298531&r2=1298532&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java Thu Mar  8 19:35:29 2012
@@ -36,6 +36,7 @@ import org.apache.hama.bsp.messages.Byte
 import org.apache.hama.bsp.sync.SyncClient;
 import org.apache.hama.bsp.sync.SyncServiceFactory;
 import org.apache.hama.ipc.BSPPeerProtocol;
+import org.apache.hama.util.BSPNetUtils;
 
 public class TestCheckpoint extends TestCase {
 
@@ -89,7 +90,7 @@ public class TestCheckpoint extends Test
 
     conf.setBoolean(Constants.CHECKPOINT_ENABLED, false);
 
-    int port = 54321;
+    int port = BSPNetUtils.getFreePort(5000);
     InetSocketAddress inetAddress = new InetSocketAddress(port);
     MinimalGroomServer groom = new MinimalGroomServer(conf);
     Server workerServer = RPC.getServer(groom, inetAddress.getHostName(),