You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2012/11/01 05:04:18 UTC

svn commit: r1404460 - /hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java

Author: edwardyoon
Date: Thu Nov  1 04:04:17 2012
New Revision: 1404460

URL: http://svn.apache.org/viewvc?rev=1404460&view=rev
Log:
HAMA-662: Fix bugs of uni tests

Modified:
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java?rev=1404460&r1=1404459&r2=1404460&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java Thu Nov  1 04:04:17 2012
@@ -64,8 +64,7 @@ public class TestBSPTaskFaults extends T
   public static final String TEST_POINT = "bsp.ft.test.point";
   public static final String TEST_GROOM_PORT = "bsp.ft.test.groomport";
   private static int TEST_NUMBER = 0;
-  
-  
+
   private volatile MinimalGroomServer groom;
   private volatile BSPPeerProtocol umbilical;
   private Server workerServer;
@@ -75,8 +74,8 @@ public class TestBSPTaskFaults extends T
   public volatile HamaConfiguration conf;
 
   private ScheduledExecutorService testBSPTaskService;
-  
-  private static synchronized int incrementTestNumber(){
+
+  private static synchronized int incrementTestNumber() {
     return ++TEST_NUMBER;
   }
 
@@ -104,7 +103,6 @@ public class TestBSPTaskFaults extends T
     @Override
     public void close() throws IOException {
       isShutDown = true;
-
     }
 
     @Override
@@ -172,7 +170,7 @@ public class TestBSPTaskFaults extends T
   private class TestBSPTaskThreadRunner extends Thread {
 
     BSPJob job;
-    
+
     TestBSPTaskThreadRunner(BSPJob jobConf) {
       job = jobConf;
     }
@@ -240,7 +238,8 @@ public class TestBSPTaskFaults extends T
     }
 
     public void destroyProcess() {
-
+      bspTaskProcess.destroy();
+      sched.shutdown();
     }
 
     @Override
@@ -265,8 +264,7 @@ public class TestBSPTaskFaults extends T
 
       commands.add(TestBSPProcessRunner.class.getName());
 
-      LOG.info("starting process for failure case - "
-          + testPoint);
+      LOG.info("starting process for failure case - " + testPoint);
       commands.add("" + testPoint);
       commands.add("" + testPort);
 
@@ -331,7 +329,7 @@ public class TestBSPTaskFaults extends T
         BSPJob job = new BSPJob(hamaConf);
         job.setInputFormat(NullInputFormat.class);
         job.setOutputFormat(NullOutputFormat.class);
-        
+
         final BSPPeerProtocol proto = (BSPPeerProtocol) RPC.getProxy(
             BSPPeerProtocol.class, HamaRPCProtocolVersion.versionID,
             new InetSocketAddress("127.0.0.1", port), hamaConf);
@@ -425,7 +423,8 @@ public class TestBSPTaskFaults extends T
         LocalBSPRunner.LocalSyncClient.class, SyncClient.class);
 
     int testNumber = incrementTestNumber();
-    InetSocketAddress inetAddress = new InetSocketAddress(BSPNetUtils.getFreePort(34321) + testNumber);
+    InetSocketAddress inetAddress = new InetSocketAddress(
+        BSPNetUtils.getFreePort(34321) + testNumber);
     groom = new MinimalGroomServer(conf);
     workerServer = RPC.getServer(groom, inetAddress.getHostName(),
         inetAddress.getPort(), conf);
@@ -462,8 +461,10 @@ public class TestBSPTaskFaults extends T
 
     CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
         this.testBSPTaskService);
-    Future<Integer> future = completionService
-        .submit(new TestBSPProcessRunner(0, workerServer.getListenerAddress().getPort()));
+    TestBSPProcessRunner runner = new TestBSPProcessRunner(0, workerServer
+        .getListenerAddress().getPort());
+
+    Future<Integer> future = completionService.submit(runner);
 
     try {
       future.get(20000, TimeUnit.MILLISECONDS);
@@ -478,7 +479,7 @@ public class TestBSPTaskFaults extends T
     checkIfPingTestPassed();
     groom.setPingCount(0);
     this.testBSPTaskService.shutdownNow();
-
+    runner.destroyProcess();
   }
 
   /*
@@ -490,11 +491,11 @@ public class TestBSPTaskFaults extends T
     LOG.info("Testing ping failure case - 1");
     conf.setInt(TEST_POINT, 1);
 
-    CompletionService<Integer> completionService = 
-        new ExecutorCompletionService<Integer>(this.testBSPTaskService);
-    Future<Integer> future = completionService
-        .submit(new TestBSPProcessRunner(1, 
-            workerServer.getListenerAddress().getPort()));
+    CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
+        this.testBSPTaskService);
+    TestBSPProcessRunner runner = new TestBSPProcessRunner(1, workerServer
+        .getListenerAddress().getPort());
+    Future<Integer> future = completionService.submit(runner);
 
     try {
       future.get(20000, TimeUnit.MILLISECONDS);
@@ -509,18 +510,18 @@ public class TestBSPTaskFaults extends T
     checkIfPingTestPassed();
     groom.setPingCount(0);
     this.testBSPTaskService.shutdownNow();
-
+    runner.destroyProcess();
   }
 
   public void testPingOnTaskExecFailure() {
 
     LOG.info("Testing ping failure case - 2");
     conf.setInt(TEST_POINT, 2);
-    CompletionService<Integer> completionService = 
-        new ExecutorCompletionService<Integer>(this.testBSPTaskService);
-    Future<Integer> future = completionService
-        .submit(new TestBSPProcessRunner(2, 
-            workerServer.getListenerAddress().getPort()));
+    CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
+        this.testBSPTaskService);
+    TestBSPProcessRunner runner = new TestBSPProcessRunner(2, workerServer
+        .getListenerAddress().getPort());
+    Future<Integer> future = completionService.submit(runner);
 
     try {
       future.get(20000, TimeUnit.MILLISECONDS);
@@ -535,7 +536,7 @@ public class TestBSPTaskFaults extends T
     checkIfPingTestPassed();
     groom.setPingCount(0);
     this.testBSPTaskService.shutdownNow();
-
+    runner.destroyProcess();
   }
 
   public void testPingOnTaskCleanupFailure() {
@@ -543,12 +544,12 @@ public class TestBSPTaskFaults extends T
     LOG.info("Testing ping failure case - 3");
 
     conf.setInt(TEST_POINT, 3);
-    CompletionService<Integer> completionService = 
-        new ExecutorCompletionService<Integer>(this.testBSPTaskService);
-    
-    Future<Integer> future = completionService
-        .submit(new TestBSPProcessRunner(3, 
-            workerServer.getListenerAddress().getPort()));
+    CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
+        this.testBSPTaskService);
+    TestBSPProcessRunner runner = new TestBSPProcessRunner(3, workerServer
+        .getListenerAddress().getPort());
+
+    Future<Integer> future = completionService.submit(runner);
 
     try {
       future.get(20000, TimeUnit.MILLISECONDS);
@@ -563,17 +564,18 @@ public class TestBSPTaskFaults extends T
     checkIfPingTestPassed();
     groom.setPingCount(0);
     this.testBSPTaskService.shutdownNow();
-
+    runner.destroyProcess();
   }
 
   public void testBSPTaskSelfDestroy() {
     LOG.info("Testing self kill on lost contact.");
 
-    CompletionService<Integer> completionService = 
-        new ExecutorCompletionService<Integer>(this.testBSPTaskService);
-    Future<Integer> future = completionService
-        .submit(new TestBSPProcessRunner(0, 
-            workerServer.getListenerAddress().getPort()));
+    CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
+        this.testBSPTaskService);
+    TestBSPProcessRunner runner = new TestBSPProcessRunner(0, workerServer
+        .getListenerAddress().getPort());
+
+    Future<Integer> future = completionService.submit(runner);
 
     try {
       while (groom.pingCount == 0) {
@@ -598,12 +600,13 @@ public class TestBSPTaskFaults extends T
     }
 
     assertEquals(69, exitValue.intValue());
+    runner.destroyProcess();
   }
 
   @Override
   protected void tearDown() throws Exception {
-
     super.tearDown();
+
     if (groom != null)
       groom.setPingCount(0);
     if (umbilical != null) {