You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2012/11/01 05:04:18 UTC
svn commit: r1404460 -
/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java
Author: edwardyoon
Date: Thu Nov 1 04:04:17 2012
New Revision: 1404460
URL: http://svn.apache.org/viewvc?rev=1404460&view=rev
Log:
HAMA-662: Fix bugs of uni tests
Modified:
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java
Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java?rev=1404460&r1=1404459&r2=1404460&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java Thu Nov 1 04:04:17 2012
@@ -64,8 +64,7 @@ public class TestBSPTaskFaults extends T
public static final String TEST_POINT = "bsp.ft.test.point";
public static final String TEST_GROOM_PORT = "bsp.ft.test.groomport";
private static int TEST_NUMBER = 0;
-
-
+
private volatile MinimalGroomServer groom;
private volatile BSPPeerProtocol umbilical;
private Server workerServer;
@@ -75,8 +74,8 @@ public class TestBSPTaskFaults extends T
public volatile HamaConfiguration conf;
private ScheduledExecutorService testBSPTaskService;
-
- private static synchronized int incrementTestNumber(){
+
+ private static synchronized int incrementTestNumber() {
return ++TEST_NUMBER;
}
@@ -104,7 +103,6 @@ public class TestBSPTaskFaults extends T
@Override
public void close() throws IOException {
isShutDown = true;
-
}
@Override
@@ -172,7 +170,7 @@ public class TestBSPTaskFaults extends T
private class TestBSPTaskThreadRunner extends Thread {
BSPJob job;
-
+
TestBSPTaskThreadRunner(BSPJob jobConf) {
job = jobConf;
}
@@ -240,7 +238,8 @@ public class TestBSPTaskFaults extends T
}
public void destroyProcess() {
-
+ bspTaskProcess.destroy();
+ sched.shutdown();
}
@Override
@@ -265,8 +264,7 @@ public class TestBSPTaskFaults extends T
commands.add(TestBSPProcessRunner.class.getName());
- LOG.info("starting process for failure case - "
- + testPoint);
+ LOG.info("starting process for failure case - " + testPoint);
commands.add("" + testPoint);
commands.add("" + testPort);
@@ -331,7 +329,7 @@ public class TestBSPTaskFaults extends T
BSPJob job = new BSPJob(hamaConf);
job.setInputFormat(NullInputFormat.class);
job.setOutputFormat(NullOutputFormat.class);
-
+
final BSPPeerProtocol proto = (BSPPeerProtocol) RPC.getProxy(
BSPPeerProtocol.class, HamaRPCProtocolVersion.versionID,
new InetSocketAddress("127.0.0.1", port), hamaConf);
@@ -425,7 +423,8 @@ public class TestBSPTaskFaults extends T
LocalBSPRunner.LocalSyncClient.class, SyncClient.class);
int testNumber = incrementTestNumber();
- InetSocketAddress inetAddress = new InetSocketAddress(BSPNetUtils.getFreePort(34321) + testNumber);
+ InetSocketAddress inetAddress = new InetSocketAddress(
+ BSPNetUtils.getFreePort(34321) + testNumber);
groom = new MinimalGroomServer(conf);
workerServer = RPC.getServer(groom, inetAddress.getHostName(),
inetAddress.getPort(), conf);
@@ -462,8 +461,10 @@ public class TestBSPTaskFaults extends T
CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
this.testBSPTaskService);
- Future<Integer> future = completionService
- .submit(new TestBSPProcessRunner(0, workerServer.getListenerAddress().getPort()));
+ TestBSPProcessRunner runner = new TestBSPProcessRunner(0, workerServer
+ .getListenerAddress().getPort());
+
+ Future<Integer> future = completionService.submit(runner);
try {
future.get(20000, TimeUnit.MILLISECONDS);
@@ -478,7 +479,7 @@ public class TestBSPTaskFaults extends T
checkIfPingTestPassed();
groom.setPingCount(0);
this.testBSPTaskService.shutdownNow();
-
+ runner.destroyProcess();
}
/*
@@ -490,11 +491,11 @@ public class TestBSPTaskFaults extends T
LOG.info("Testing ping failure case - 1");
conf.setInt(TEST_POINT, 1);
- CompletionService<Integer> completionService =
- new ExecutorCompletionService<Integer>(this.testBSPTaskService);
- Future<Integer> future = completionService
- .submit(new TestBSPProcessRunner(1,
- workerServer.getListenerAddress().getPort()));
+ CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
+ this.testBSPTaskService);
+ TestBSPProcessRunner runner = new TestBSPProcessRunner(1, workerServer
+ .getListenerAddress().getPort());
+ Future<Integer> future = completionService.submit(runner);
try {
future.get(20000, TimeUnit.MILLISECONDS);
@@ -509,18 +510,18 @@ public class TestBSPTaskFaults extends T
checkIfPingTestPassed();
groom.setPingCount(0);
this.testBSPTaskService.shutdownNow();
-
+ runner.destroyProcess();
}
public void testPingOnTaskExecFailure() {
LOG.info("Testing ping failure case - 2");
conf.setInt(TEST_POINT, 2);
- CompletionService<Integer> completionService =
- new ExecutorCompletionService<Integer>(this.testBSPTaskService);
- Future<Integer> future = completionService
- .submit(new TestBSPProcessRunner(2,
- workerServer.getListenerAddress().getPort()));
+ CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
+ this.testBSPTaskService);
+ TestBSPProcessRunner runner = new TestBSPProcessRunner(2, workerServer
+ .getListenerAddress().getPort());
+ Future<Integer> future = completionService.submit(runner);
try {
future.get(20000, TimeUnit.MILLISECONDS);
@@ -535,7 +536,7 @@ public class TestBSPTaskFaults extends T
checkIfPingTestPassed();
groom.setPingCount(0);
this.testBSPTaskService.shutdownNow();
-
+ runner.destroyProcess();
}
public void testPingOnTaskCleanupFailure() {
@@ -543,12 +544,12 @@ public class TestBSPTaskFaults extends T
LOG.info("Testing ping failure case - 3");
conf.setInt(TEST_POINT, 3);
- CompletionService<Integer> completionService =
- new ExecutorCompletionService<Integer>(this.testBSPTaskService);
-
- Future<Integer> future = completionService
- .submit(new TestBSPProcessRunner(3,
- workerServer.getListenerAddress().getPort()));
+ CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
+ this.testBSPTaskService);
+ TestBSPProcessRunner runner = new TestBSPProcessRunner(3, workerServer
+ .getListenerAddress().getPort());
+
+ Future<Integer> future = completionService.submit(runner);
try {
future.get(20000, TimeUnit.MILLISECONDS);
@@ -563,17 +564,18 @@ public class TestBSPTaskFaults extends T
checkIfPingTestPassed();
groom.setPingCount(0);
this.testBSPTaskService.shutdownNow();
-
+ runner.destroyProcess();
}
public void testBSPTaskSelfDestroy() {
LOG.info("Testing self kill on lost contact.");
- CompletionService<Integer> completionService =
- new ExecutorCompletionService<Integer>(this.testBSPTaskService);
- Future<Integer> future = completionService
- .submit(new TestBSPProcessRunner(0,
- workerServer.getListenerAddress().getPort()));
+ CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
+ this.testBSPTaskService);
+ TestBSPProcessRunner runner = new TestBSPProcessRunner(0, workerServer
+ .getListenerAddress().getPort());
+
+ Future<Integer> future = completionService.submit(runner);
try {
while (groom.pingCount == 0) {
@@ -598,12 +600,13 @@ public class TestBSPTaskFaults extends T
}
assertEquals(69, exitValue.intValue());
+ runner.destroyProcess();
}
@Override
protected void tearDown() throws Exception {
-
super.tearDown();
+
if (groom != null)
groom.setPingCount(0);
if (umbilical != null) {