You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2012/03/08 20:35:29 UTC
svn commit: r1298532 - in
/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp:
TestBSPTaskFaults.java TestCheckpoint.java
Author: tjungblut
Date: Thu Mar 8 19:35:29 2012
New Revision: 1298532
URL: http://svn.apache.org/viewvc?rev=1298532&view=rev
Log:
Testcase fix (contributed by Suraj Menon)
Modified:
incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java
incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java?rev=1298532&r1=1298531&r2=1298532&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java Thu Mar 8 19:35:29 2012
@@ -59,18 +59,24 @@ public class TestBSPTaskFaults extends T
public static final Log LOG = LogFactory.getLog(HamaTestCase.class);
- private static final int PORT = 54321;
public static final String TEST_POINT = "bsp.ft.test.point";
-
+ public static final String TEST_GROOM_PORT = "bsp.ft.test.groomport";
+ private static int TEST_NUMBER = 0;
+
+
private volatile MinimalGroomServer groom;
private volatile BSPPeerProtocol umbilical;
private Server workerServer;
private TaskAttemptID taskid = new TaskAttemptID(new TaskID(new BSPJobID(
"job_201110302255", 1), 1), 1);
- public volatile static HamaConfiguration conf;
+ public volatile HamaConfiguration conf;
private ScheduledExecutorService testBSPTaskService;
+
+ private static synchronized int incrementTestNumber(){
+ return ++TEST_NUMBER;
+ }
@SuppressWarnings("unused")
public static class MinimalGroomServer implements BSPPeerProtocol {
@@ -195,11 +201,15 @@ public class TestBSPTaskFaults extends T
private Process bspTaskProcess;
private Thread errorLog;
private Thread infoLog;
+ private int testPoint;
+ private int testPort;
- TestBSPProcessRunner() {
+ TestBSPProcessRunner(int point, int port) {
sched = Executors.newScheduledThreadPool(1);
future = new AtomicReference<ScheduledFuture<Integer>>();
bspTaskProcess = null;
+ testPoint = point;
+ testPort = port;
}
private void readStream(InputStream input) throws IOException {
@@ -253,8 +263,9 @@ public class TestBSPTaskFaults extends T
commands.add(TestBSPProcessRunner.class.getName());
LOG.info("starting process for failure case - "
- + conf.getInt(TEST_POINT, 0));
- commands.add("" + conf.getInt(TEST_POINT, 0));
+ + testPoint);
+ commands.add("" + testPoint);
+ commands.add("" + testPort);
LOG.info(commands.toString());
@@ -308,16 +319,14 @@ public class TestBSPTaskFaults extends T
TaskAttemptID tid = new TaskAttemptID(new TaskID(new BSPJobID(
"job_201110102255", 1), 1), 1);
- if (args.length > 0) {
- hamaConf.setInt(TEST_POINT, Integer.parseInt(args[0]));
-
- }
+ hamaConf.setInt(TEST_POINT, Integer.parseInt(args[0]));
+ int port = Integer.parseInt(args[1]);
try {
BSPJob job = new BSPJob(hamaConf);
final BSPPeerProtocol proto = (BSPPeerProtocol) RPC.getProxy(
BSPPeerProtocol.class, BSPPeerProtocol.versionID,
- new InetSocketAddress("127.0.0.1", 54321), hamaConf);
+ new InetSocketAddress("127.0.0.1", port), hamaConf);
BSPTask task = new BSPTask();
task.setConf(job);
@@ -406,7 +415,8 @@ public class TestBSPTaskFaults extends T
conf.setClass(SyncServiceFactory.SYNC_CLIENT_CLASS,
LocalBSPRunner.LocalSyncClient.class, SyncClient.class);
- InetSocketAddress inetAddress = new InetSocketAddress(PORT);
+ int testNumber = incrementTestNumber();
+ InetSocketAddress inetAddress = new InetSocketAddress(54321 + testNumber);
groom = new MinimalGroomServer(conf);
workerServer = RPC.getServer(groom, inetAddress.getHostName(),
inetAddress.getPort(), conf);
@@ -444,7 +454,7 @@ public class TestBSPTaskFaults extends T
CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
this.testBSPTaskService);
Future<Integer> future = completionService
- .submit(new TestBSPProcessRunner());
+ .submit(new TestBSPProcessRunner(0, workerServer.getListenerAddress().getPort()));
try {
future.get(20000, TimeUnit.MILLISECONDS);
@@ -471,10 +481,11 @@ public class TestBSPTaskFaults extends T
LOG.info("Testing ping failure case - 1");
conf.setInt(TEST_POINT, 1);
- CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
- this.testBSPTaskService);
+ CompletionService<Integer> completionService =
+ new ExecutorCompletionService<Integer>(this.testBSPTaskService);
Future<Integer> future = completionService
- .submit(new TestBSPProcessRunner());
+ .submit(new TestBSPProcessRunner(1,
+ workerServer.getListenerAddress().getPort()));
try {
future.get(20000, TimeUnit.MILLISECONDS);
@@ -496,10 +507,11 @@ public class TestBSPTaskFaults extends T
LOG.info("Testing ping failure case - 2");
conf.setInt(TEST_POINT, 2);
- CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
- this.testBSPTaskService);
+ CompletionService<Integer> completionService =
+ new ExecutorCompletionService<Integer>(this.testBSPTaskService);
Future<Integer> future = completionService
- .submit(new TestBSPProcessRunner());
+ .submit(new TestBSPProcessRunner(2,
+ workerServer.getListenerAddress().getPort()));
try {
future.get(20000, TimeUnit.MILLISECONDS);
@@ -522,10 +534,12 @@ public class TestBSPTaskFaults extends T
LOG.info("Testing ping failure case - 3");
conf.setInt(TEST_POINT, 3);
- CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
- this.testBSPTaskService);
+ CompletionService<Integer> completionService =
+ new ExecutorCompletionService<Integer>(this.testBSPTaskService);
+
Future<Integer> future = completionService
- .submit(new TestBSPProcessRunner());
+ .submit(new TestBSPProcessRunner(3,
+ workerServer.getListenerAddress().getPort()));
try {
future.get(20000, TimeUnit.MILLISECONDS);
@@ -546,10 +560,11 @@ public class TestBSPTaskFaults extends T
public void testBSPTaskSelfDestroy() {
LOG.info("Testing self kill on lost contact.");
- CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
- this.testBSPTaskService);
+ CompletionService<Integer> completionService =
+ new ExecutorCompletionService<Integer>(this.testBSPTaskService);
Future<Integer> future = completionService
- .submit(new TestBSPProcessRunner());
+ .submit(new TestBSPProcessRunner(0,
+ workerServer.getListenerAddress().getPort()));
try {
while (groom.pingCount == 0) {
Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java?rev=1298532&r1=1298531&r2=1298532&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java Thu Mar 8 19:35:29 2012
@@ -36,6 +36,7 @@ import org.apache.hama.bsp.messages.Byte
import org.apache.hama.bsp.sync.SyncClient;
import org.apache.hama.bsp.sync.SyncServiceFactory;
import org.apache.hama.ipc.BSPPeerProtocol;
+import org.apache.hama.util.BSPNetUtils;
public class TestCheckpoint extends TestCase {
@@ -89,7 +90,7 @@ public class TestCheckpoint extends Test
conf.setBoolean(Constants.CHECKPOINT_ENABLED, false);
- int port = 54321;
+ int port = BSPNetUtils.getFreePort(5000);
InetSocketAddress inetAddress = new InetSocketAddress(port);
MinimalGroomServer groom = new MinimalGroomServer(conf);
Server workerServer = RPC.getServer(groom, inetAddress.getHostName(),