You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2015/05/21 19:35:08 UTC
incubator-twill git commit: (TWILL-83) Expose testing utility to help
writing twill tests
Repository: incubator-twill
Updated Branches:
refs/heads/feature/TWILL-83 [created] 8d2ff2125
(TWILL-83) Expose testing utility to help writing twill tests
- Replaced YarnTestUtils with TwillTester to provide better support for writing test
- Use junit ExternalResource
- Refactor existing twill tests to use TwillTester
Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/8d2ff212
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/8d2ff212
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/8d2ff212
Branch: refs/heads/feature/TWILL-83
Commit: 8d2ff212517bec8263c5c837dc0986ca36ee6ec5
Parents: 8601742
Author: Terence Yim <ch...@apache.org>
Authored: Wed May 20 15:34:44 2015 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Thu May 21 10:34:59 2015 -0700
----------------------------------------------------------------------
.../java/org/apache/twill/test/Java8Test.java | 3 +-
.../org/apache/twill/yarn/BaseYarnTest.java | 66 ++++++-
.../apache/twill/yarn/ContainerSizeTestRun.java | 4 +-
.../org/apache/twill/yarn/DebugTestRun.java | 4 +-
.../twill/yarn/DistributeShellTestRun.java | 2 +-
.../apache/twill/yarn/EchoServerTestRun.java | 19 +-
.../twill/yarn/FailureRestartTestRun.java | 8 +-
.../twill/yarn/InitializeFailTestRun.java | 2 +-
.../org/apache/twill/yarn/LocalFileTestRun.java | 6 +-
.../apache/twill/yarn/LogHandlerTestRun.java | 2 +-
.../twill/yarn/PlacementPolicyTestRun.java | 36 ++--
.../twill/yarn/ProvisionTimeoutTestRun.java | 2 +-
.../twill/yarn/ResourceReportTestRun.java | 17 +-
.../twill/yarn/ServiceDiscoveryTestRun.java | 4 +-
.../apache/twill/yarn/SessionExpireTestRun.java | 2 +-
.../apache/twill/yarn/TaskCompletedTestRun.java | 2 +-
.../java/org/apache/twill/yarn/TwillTester.java | 170 ++++++++++++++++++
.../org/apache/twill/yarn/YarnTestSuite.java | 2 +-
.../org/apache/twill/yarn/YarnTestUtils.java | 173 -------------------
19 files changed, 285 insertions(+), 239 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8d2ff212/twill-java8-test/src/test/java/org/apache/twill/test/Java8Test.java
----------------------------------------------------------------------
diff --git a/twill-java8-test/src/test/java/org/apache/twill/test/Java8Test.java b/twill-java8-test/src/test/java/org/apache/twill/test/Java8Test.java
index 11ee88e..6592d8f 100644
--- a/twill-java8-test/src/test/java/org/apache/twill/test/Java8Test.java
+++ b/twill-java8-test/src/test/java/org/apache/twill/test/Java8Test.java
@@ -24,7 +24,6 @@ import org.apache.twill.api.TwillRunnable;
import org.apache.twill.api.TwillRunner;
import org.apache.twill.api.logging.PrinterLogHandler;
import org.apache.twill.yarn.BaseYarnTest;
-import org.apache.twill.yarn.YarnTestUtils;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
@@ -43,7 +42,7 @@ public class Java8Test extends BaseYarnTest {
@Test
public void test() throws ExecutionException, InterruptedException, TimeoutException {
- TwillRunner runner = YarnTestUtils.getTwillRunner();
+ TwillRunner runner = getTwillRunner();
// Start the TestRunnable and make sure it is executed with the log message emitted.
CountDownLatch logLatch = new CountDownLatch(1);
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8d2ff212/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java b/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java
index 55ef9dc..bc461e6 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java
@@ -17,16 +17,19 @@
*/
package org.apache.twill.yarn;
+import com.google.common.collect.Iterables;
+import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.twill.api.TwillController;
import org.apache.twill.api.TwillRunner;
import org.junit.After;
-import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* Base class for all YARN tests.
@@ -38,15 +41,33 @@ public abstract class BaseYarnTest {
@ClassRule
public static TemporaryFolder tmpFolder = new TemporaryFolder();
- @BeforeClass
- public static void init() throws IOException {
- YarnTestUtils.initOnce();
- }
+ /**
+ * A singleton wrapper so that yarn cluster only bring up once across all tests in the YarnTestSuite.
+ */
+ @ClassRule
+ public static final TwillTester TWILL_TESTER = new TwillTester() {
+ private final AtomicInteger instances = new AtomicInteger();
+
+ @Override
+ protected void before() throws Throwable {
+ if (instances.getAndIncrement() == 0) {
+ super.before();
+ }
+ }
+
+ @Override
+ protected void after() {
+ if (instances.decrementAndGet() == 0) {
+ super.after();
+ }
+ }
+ };
+
@After
public final void cleanupTest() {
// Make sure all applications are stopped after a test case is executed, even it failed.
- TwillRunner twillRunner = YarnTestUtils.getTwillRunner();
+ TwillRunner twillRunner = TWILL_TESTER.getTwillRunner();
for (TwillRunner.LiveInfo liveInfo : twillRunner.lookupLive()) {
for (TwillController controller : liveInfo.getControllers()) {
try {
@@ -57,4 +78,35 @@ public abstract class BaseYarnTest {
}
}
}
+
+ /**
+ * Poll the given {@link Iterable} until its size is greater than or equal to the given count,
+ * with a limited amount of polls. There is one second sleep between each poll.
+ *
+ * @param iterable the Iterable to poll
+ * @param count the expected size
+ * @param limit number of times to poll.
+ * @param <T> type of the element inside the Iterable
+ * @return true if the size is greater than or equal to count
+ */
+ public <T> boolean waitForSize(Iterable<T> iterable, int count, int limit) throws InterruptedException {
+ int trial = 0;
+ int size = Iterables.size(iterable);
+ while (size != count && trial < limit) {
+ LOG.info("Waiting for {} size {} == {}", iterable, size, count);
+ TimeUnit.SECONDS.sleep(1);
+ trial++;
+ size = Iterables.size(iterable);
+ }
+ return trial < limit;
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T extends TwillRunner> T getTwillRunner() {
+ return (T) TWILL_TESTER.getTwillRunner();
+ }
+
+ public List<NodeReport> getNodeReports() throws Exception {
+ return TWILL_TESTER.getNodeReports();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8d2ff212/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java
index f73ff82..6e27b69 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java
@@ -43,14 +43,14 @@ public class ContainerSizeTestRun extends BaseYarnTest {
@Test
public void testContainerSize() throws InterruptedException, TimeoutException, ExecutionException {
- TwillRunner runner = YarnTestUtils.getTwillRunner();
+ TwillRunner runner = getTwillRunner();
TwillController controller = runner.prepare(new SleepApp())
.addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
.start();
try {
ServiceDiscovered discovered = controller.discoverService("sleep");
- Assert.assertTrue(YarnTestUtils.waitForSize(discovered, 2, 120));
+ Assert.assertTrue(waitForSize(discovered, 2, 120));
} finally {
controller.terminate().get(120, TimeUnit.SECONDS);
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8d2ff212/twill-yarn/src/test/java/org/apache/twill/yarn/DebugTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/DebugTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/DebugTestRun.java
index a4f9029..cd6faf9 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/DebugTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/DebugTestRun.java
@@ -102,7 +102,7 @@ public class DebugTestRun extends BaseYarnTest {
@Test
public void testDebugPortOneRunnable() throws Exception {
- YarnTwillRunnerService runner = (YarnTwillRunnerService) YarnTestUtils.getTwillRunner();
+ YarnTwillRunnerService runner = getTwillRunner();
runner.start();
TwillController controller = runner.prepare(new DummyApplication())
@@ -126,7 +126,7 @@ public class DebugTestRun extends BaseYarnTest {
@Test
public void testDebugPortAllRunnables() throws Exception {
- YarnTwillRunnerService runner = (YarnTwillRunnerService) YarnTestUtils.getTwillRunner();
+ YarnTwillRunnerService runner = getTwillRunner();
runner.start();
TwillController controller = runner.prepare(new DummyApplication())
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8d2ff212/twill-yarn/src/test/java/org/apache/twill/yarn/DistributeShellTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/DistributeShellTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/DistributeShellTestRun.java
index 68b3b7f..5ae9dcd 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/DistributeShellTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/DistributeShellTestRun.java
@@ -37,7 +37,7 @@ public final class DistributeShellTestRun extends BaseYarnTest {
@Ignore
@Test
public void testDistributedShell() throws InterruptedException {
- TwillRunner twillRunner = YarnTestUtils.getTwillRunner();
+ TwillRunner twillRunner = getTwillRunner();
TwillController controller = twillRunner.prepare(new DistributedShell("pwd", "ls -al"))
.addLogHandler(new PrinterLogHandler(new PrintWriter(System.out)))
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8d2ff212/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
index bb20c63..bb525a8 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
@@ -44,7 +44,6 @@ import java.util.concurrent.TimeoutException;
/**
* Using echo server to test various behavior of YarnTwillService.
- * This test is executed by {@link YarnTestUtils}.
*/
public final class EchoServerTestRun extends BaseYarnTest {
@@ -53,7 +52,7 @@ public final class EchoServerTestRun extends BaseYarnTest {
@Test
public void testEchoServer() throws InterruptedException, ExecutionException, IOException,
URISyntaxException, TimeoutException {
- TwillRunner runner = YarnTestUtils.getTwillRunner();
+ TwillRunner runner = getTwillRunner();
TwillController controller = runner.prepare(new EchoServer(),
ResourceSpecification.Builder.with()
@@ -77,7 +76,7 @@ public final class EchoServerTestRun extends BaseYarnTest {
Assert.assertTrue(running.await(120, TimeUnit.SECONDS));
Iterable<Discoverable> echoServices = controller.discoverService("echo");
- Assert.assertTrue(YarnTestUtils.waitForSize(echoServices, 2, 120));
+ Assert.assertTrue(waitForSize(echoServices, 2, 120));
for (Discoverable discoverable : echoServices) {
String msg = "Hello: " + discoverable.getSocketAddress();
@@ -97,36 +96,36 @@ public final class EchoServerTestRun extends BaseYarnTest {
// Increase number of instances
controller.changeInstances("EchoServer", 3);
- Assert.assertTrue(YarnTestUtils.waitForSize(echoServices, 3, 120));
+ Assert.assertTrue(waitForSize(echoServices, 3, 120));
echoServices = controller.discoverService("echo2");
// Decrease number of instances
controller.changeInstances("EchoServer", 1);
- Assert.assertTrue(YarnTestUtils.waitForSize(echoServices, 1, 120));
+ Assert.assertTrue(waitForSize(echoServices, 1, 120));
// Increase number of instances again
controller.changeInstances("EchoServer", 2);
- Assert.assertTrue(YarnTestUtils.waitForSize(echoServices, 2, 120));
+ Assert.assertTrue(waitForSize(echoServices, 2, 120));
// Make sure still only one app is running
Iterable<TwillRunner.LiveInfo> apps = runner.lookupLive();
- Assert.assertTrue(YarnTestUtils.waitForSize(apps, 1, 120));
+ Assert.assertTrue(waitForSize(apps, 1, 120));
// Creates a new runner service to check it can regain control over running app.
- TwillRunnerService runnerService = YarnTestUtils.createTwillRunnerService();
+ TwillRunnerService runnerService = TWILL_TESTER.createTwillRunnerService();
runnerService.start();
try {
Iterable <TwillController> controllers = runnerService.lookup("EchoServer");
- Assert.assertTrue(YarnTestUtils.waitForSize(controllers, 1, 120));
+ Assert.assertTrue(waitForSize(controllers, 1, 120));
for (TwillController c : controllers) {
LOG.info("Stopping application: " + c.getRunId());
c.terminate().get(30, TimeUnit.SECONDS);
}
- Assert.assertTrue(YarnTestUtils.waitForSize(apps, 0, 120));
+ Assert.assertTrue(waitForSize(apps, 0, 120));
} finally {
runnerService.stop();
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8d2ff212/twill-yarn/src/test/java/org/apache/twill/yarn/FailureRestartTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/FailureRestartTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/FailureRestartTestRun.java
index e1b8df0..a0c195d 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/FailureRestartTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/FailureRestartTestRun.java
@@ -46,7 +46,7 @@ public final class FailureRestartTestRun extends BaseYarnTest {
@Test
public void testFailureRestart() throws Exception {
- TwillRunner runner = YarnTestUtils.getTwillRunner();
+ TwillRunner runner = getTwillRunner();
ResourceSpecification resource = ResourceSpecification.Builder.with()
.setVirtualCores(1)
@@ -60,7 +60,7 @@ public final class FailureRestartTestRun extends BaseYarnTest {
.start();
Iterable<Discoverable> discoverables = controller.discoverService("failure");
- Assert.assertTrue(YarnTestUtils.waitForSize(discoverables, 2, 120));
+ Assert.assertTrue(waitForSize(discoverables, 2, 120));
// Make sure we see the right instance IDs
Assert.assertEquals(Sets.newHashSet(0, 1), getInstances(discoverables));
@@ -69,10 +69,10 @@ public final class FailureRestartTestRun extends BaseYarnTest {
controller.sendCommand(FailureRunnable.class.getSimpleName(), Command.Builder.of("kill0").build());
// Make sure the runnable is killed.
- Assert.assertTrue(YarnTestUtils.waitForSize(discoverables, 1, 120));
+ Assert.assertTrue(waitForSize(discoverables, 1, 120));
// Wait for the restart
- Assert.assertTrue(YarnTestUtils.waitForSize(discoverables, 2, 120));
+ Assert.assertTrue(waitForSize(discoverables, 2, 120));
// Make sure we see the right instance IDs
Assert.assertEquals(Sets.newHashSet(0, 1), getInstances(discoverables));
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8d2ff212/twill-yarn/src/test/java/org/apache/twill/yarn/InitializeFailTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/InitializeFailTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/InitializeFailTestRun.java
index 658d208..64f5a69 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/InitializeFailTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/InitializeFailTestRun.java
@@ -41,7 +41,7 @@ public class InitializeFailTestRun extends BaseYarnTest {
@Test
public void testInitFail() throws InterruptedException, ExecutionException, TimeoutException {
- TwillRunner runner = YarnTestUtils.getTwillRunner();
+ TwillRunner runner = getTwillRunner();
final CountDownLatch logLatch = new CountDownLatch(1);
// Verify that it receives the exception log entry that thrown when runnable initialize
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8d2ff212/twill-yarn/src/test/java/org/apache/twill/yarn/LocalFileTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/LocalFileTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/LocalFileTestRun.java
index 97ac781..fd146f2 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/LocalFileTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/LocalFileTestRun.java
@@ -56,7 +56,7 @@ public final class LocalFileTestRun extends BaseYarnTest {
String header = Files.readFirstLine(new File(getClass().getClassLoader().getResource("header.txt").toURI()),
Charsets.UTF_8);
- TwillRunner runner = YarnTestUtils.getTwillRunner();
+ TwillRunner runner = getTwillRunner();
TwillController controller = runner.prepare(new LocalFileApplication())
.addJVMOptions(" -verbose:gc -Xloggc:gc.log -XX:+PrintGCDetails")
@@ -66,7 +66,7 @@ public final class LocalFileTestRun extends BaseYarnTest {
.start();
Iterable<Discoverable> discoverables = controller.discoverService("local");
- Assert.assertTrue(YarnTestUtils.waitForSize(discoverables, 1, 60));
+ Assert.assertTrue(waitForSize(discoverables, 1, 60));
InetSocketAddress socketAddress = discoverables.iterator().next().getSocketAddress();
Socket socket = new Socket(socketAddress.getAddress(), socketAddress.getPort());
@@ -84,7 +84,7 @@ public final class LocalFileTestRun extends BaseYarnTest {
controller.terminate().get(120, TimeUnit.SECONDS);
- Assert.assertTrue(YarnTestUtils.waitForSize(discoverables, 0, 60));
+ Assert.assertTrue(waitForSize(discoverables, 0, 60));
TimeUnit.SECONDS.sleep(2);
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8d2ff212/twill-yarn/src/test/java/org/apache/twill/yarn/LogHandlerTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/LogHandlerTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/LogHandlerTestRun.java
index 060ef49..ad0a837 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/LogHandlerTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/LogHandlerTestRun.java
@@ -71,7 +71,7 @@ public class LogHandlerTestRun extends BaseYarnTest {
}
};
- TwillRunner runner = YarnTestUtils.getTwillRunner();
+ TwillRunner runner = getTwillRunner();
TwillController controller = runner.prepare(new LogRunnable())
.addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
.addLogHandler(logHandler)
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8d2ff212/twill-yarn/src/test/java/org/apache/twill/yarn/PlacementPolicyTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/PlacementPolicyTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/PlacementPolicyTestRun.java
index b3dc0dc..ec4c7bf 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/PlacementPolicyTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/PlacementPolicyTestRun.java
@@ -78,7 +78,7 @@ public class PlacementPolicyTestRun extends BaseYarnTest {
int trials = 0;
while (trials++ < 20) {
try {
- nodeReports = YarnTestUtils.getNodeReports();
+ nodeReports = TWILL_TESTER.getNodeReports();
if (nodeReports != null && nodeReports.size() == 3) {
break;
}
@@ -112,7 +112,7 @@ public class PlacementPolicyTestRun extends BaseYarnTest {
waitNodeManagerCount(0, 10, TimeUnit.SECONDS);
- TwillRunner runner = YarnTestUtils.getTwillRunner();
+ TwillRunner runner = getTwillRunner();
TwillController controller = runner.prepare(new PlacementPolicyApplication())
.addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
.withApplicationArguments("PlacementPolicyTest")
@@ -124,7 +124,7 @@ public class PlacementPolicyTestRun extends BaseYarnTest {
try {
// All runnables should get started.
ServiceDiscovered serviceDiscovered = controller.discoverService("PlacementPolicyTest");
- Assert.assertTrue(YarnTestUtils.waitForSize(serviceDiscovered, 4, 80));
+ Assert.assertTrue(waitForSize(serviceDiscovered, 4, 80));
// DISTRIBUTED runnables should be provisioned on different nodes.
Assert.assertTrue(getProvisionedNodeManagerCount() >= 2);
@@ -169,7 +169,7 @@ public class PlacementPolicyTestRun extends BaseYarnTest {
waitNodeManagerCount(0, 10, TimeUnit.SECONDS);
- TwillRunner runner = YarnTestUtils.getTwillRunner();
+ TwillRunner runner = getTwillRunner();
TwillController controller = runner.prepare(new DistributedApplication())
.addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
.withApplicationArguments("DistributedTest")
@@ -181,26 +181,26 @@ public class PlacementPolicyTestRun extends BaseYarnTest {
try {
// All runnables should get started with DISTRIBUTED ones being on different nodes.
ServiceDiscovered serviceDiscovered = controller.discoverService("DistributedTest");
- Assert.assertTrue(YarnTestUtils.waitForSize(serviceDiscovered, 3, 60));
+ Assert.assertTrue(waitForSize(serviceDiscovered, 3, 60));
Assert.assertTrue(getProvisionedNodeManagerCount() >= 2);
// Spawning a new instance for DISTRIBUTED runnable Alice, which should get a different node.
controller.changeInstances("Alice", 2);
serviceDiscovered = controller.discoverService("DistributedTest");
- Assert.assertTrue(YarnTestUtils.waitForSize(serviceDiscovered, 4, 60));
+ Assert.assertTrue(waitForSize(serviceDiscovered, 4, 60));
Assert.assertTrue(getProvisionedNodeManagerCount() >= 3);
// Spawning a new instance for DEFAULT runnable Eve,
// which should not be affected by placement policies of previous runnables.
controller.changeInstances("Eve", 2);
serviceDiscovered = controller.discoverService("DistributedTest");
- Assert.assertTrue(YarnTestUtils.waitForSize(serviceDiscovered, 5, 60));
+ Assert.assertTrue(waitForSize(serviceDiscovered, 5, 60));
// Spawning a new instance for DISTRIBUTED runnable Bob,
// which will be forced to give up it's placement policy restrictions, since there are only three nodes.
controller.changeInstances("Bob", 2);
serviceDiscovered = controller.discoverService("DistributedTest");
- Assert.assertTrue(YarnTestUtils.waitForSize(serviceDiscovered, 6, 60));
+ Assert.assertTrue(waitForSize(serviceDiscovered, 6, 60));
Assert.assertTrue(getProvisionedNodeManagerCount() >= 3);
} finally {
controller.terminate().get(120, TimeUnit.SECONDS);
@@ -257,7 +257,7 @@ public class PlacementPolicyTestRun extends BaseYarnTest {
ServiceDiscovered serviceDiscovered;
- TwillRunner runner = YarnTestUtils.getTwillRunner();
+ TwillRunner runner = getTwillRunner();
TwillController controller = runner.prepare(new ChangeInstanceApplication())
.addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
.withApplicationArguments("DistributedTest")
@@ -269,27 +269,27 @@ public class PlacementPolicyTestRun extends BaseYarnTest {
try {
// All runnables should get started.
serviceDiscovered = controller.discoverService("DistributedTest");
- Assert.assertTrue(YarnTestUtils.waitForSize(serviceDiscovered, 4, 60));
+ Assert.assertTrue(waitForSize(serviceDiscovered, 4, 60));
// Increasing the instance count for runnable Alice by 2.
controller.changeInstances("Alice", 4);
serviceDiscovered = controller.discoverService("DistributedTest");
- Assert.assertTrue(YarnTestUtils.waitForSize(serviceDiscovered, 6, 60));
+ Assert.assertTrue(waitForSize(serviceDiscovered, 6, 60));
// Decreasing instance count for runnable Alice by 3.
controller.changeInstances("Alice", 1);
serviceDiscovered = controller.discoverService("DistributedTest");
- Assert.assertTrue(YarnTestUtils.waitForSize(serviceDiscovered, 3, 60));
+ Assert.assertTrue(waitForSize(serviceDiscovered, 3, 60));
// Increasing instance count for runnable Bob by 2.
controller.changeInstances("Bob", 3);
serviceDiscovered = controller.discoverService("DistributedTest");
- Assert.assertTrue(YarnTestUtils.waitForSize(serviceDiscovered, 5, 60));
+ Assert.assertTrue(waitForSize(serviceDiscovered, 5, 60));
// Increasing instance count for runnable Eve by 2.
controller.changeInstances("Eve", 3);
serviceDiscovered = controller.discoverService("DistributedTest");
- Assert.assertTrue(YarnTestUtils.waitForSize(serviceDiscovered, 7, 60));
+ Assert.assertTrue(waitForSize(serviceDiscovered, 7, 60));
} finally {
controller.terminate().get(120, TimeUnit.SECONDS);
}
@@ -323,7 +323,7 @@ public class PlacementPolicyTestRun extends BaseYarnTest {
*/
@Test(expected = IllegalArgumentException.class)
public void testNonExistentRunnable() throws InterruptedException, ExecutionException, TimeoutException {
- TwillRunner runner = YarnTestUtils.getTwillRunner();
+ TwillRunner runner = getTwillRunner();
TwillController controller = runner.prepare(new FaultyApplication())
.addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
.start();
@@ -356,7 +356,7 @@ public class PlacementPolicyTestRun extends BaseYarnTest {
*/
@Test(expected = IllegalArgumentException.class)
public void testPlacementPolicySpecification() throws InterruptedException, ExecutionException, TimeoutException {
- TwillRunner runner = YarnTestUtils.getTwillRunner();
+ TwillRunner runner = getTwillRunner();
TwillController controller = runner.prepare(new BadApplication())
.addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
.start();
@@ -389,9 +389,9 @@ public class PlacementPolicyTestRun extends BaseYarnTest {
* Returns the number of NodeManagers on which runnables got provisioned.
* @return number of NodeManagers on which runnables got provisioned.
*/
- private static int getProvisionedNodeManagerCount() throws Exception {
+ private int getProvisionedNodeManagerCount() throws Exception {
int provisionedNodeManagerCount = 0;
- for (NodeReport nodeReport : YarnTestUtils.getNodeReports()) {
+ for (NodeReport nodeReport : getNodeReports()) {
Resource used = nodeReport.getUsed();
if (used != null && used.getMemory() > 0) {
provisionedNodeManagerCount++;
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8d2ff212/twill-yarn/src/test/java/org/apache/twill/yarn/ProvisionTimeoutTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/ProvisionTimeoutTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/ProvisionTimeoutTestRun.java
index e8c2de3..7eb475e 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/ProvisionTimeoutTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/ProvisionTimeoutTestRun.java
@@ -44,7 +44,7 @@ public final class ProvisionTimeoutTestRun extends BaseYarnTest {
@Test
public void testProvisionTimeout() throws InterruptedException, ExecutionException, TimeoutException {
- TwillRunner runner = YarnTestUtils.getTwillRunner();
+ TwillRunner runner = getTwillRunner();
TwillController controller = runner.prepare(new TimeoutApplication())
.addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8d2ff212/twill-yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java
index a6511a7..50f1e08 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java
@@ -53,7 +53,6 @@ import java.util.concurrent.TimeoutException;
/**
* Using echo server to test resource reports.
- * This test is executed by {@link org.apache.twill.yarn.YarnTestUtils}.
*/
public final class ResourceReportTestRun extends BaseYarnTest {
@@ -81,7 +80,7 @@ public final class ResourceReportTestRun extends BaseYarnTest {
@Test
public void testRunnablesGetAllowedResourcesInEnv() throws InterruptedException, IOException,
TimeoutException, ExecutionException {
- TwillRunner runner = YarnTestUtils.getTwillRunner();
+ TwillRunner runner = getTwillRunner();
ResourceSpecification resourceSpec = ResourceSpecification.Builder.with()
.setVirtualCores(1)
@@ -105,7 +104,7 @@ public final class ResourceReportTestRun extends BaseYarnTest {
Assert.assertTrue(running.await(120, TimeUnit.SECONDS));
Iterable<Discoverable> envEchoServices = controller.discoverService("envecho");
- Assert.assertTrue(YarnTestUtils.waitForSize(envEchoServices, 1, 120));
+ Assert.assertTrue(waitForSize(envEchoServices, 1, 120));
// TODO: check virtual cores once yarn adds the ability
Map<String, String> expectedValues = Maps.newHashMap();
@@ -135,7 +134,7 @@ public final class ResourceReportTestRun extends BaseYarnTest {
@Test
public void testResourceReportWithFailingContainers() throws InterruptedException, IOException,
TimeoutException, ExecutionException {
- TwillRunner runner = YarnTestUtils.getTwillRunner();
+ TwillRunner runner = getTwillRunner();
ResourceSpecification resourceSpec = ResourceSpecification.Builder.with()
.setVirtualCores(1)
@@ -159,7 +158,7 @@ public final class ResourceReportTestRun extends BaseYarnTest {
Assert.assertTrue(running.await(120, TimeUnit.SECONDS));
Iterable<Discoverable> echoServices = controller.discoverService("echo");
- Assert.assertTrue(YarnTestUtils.waitForSize(echoServices, 2, 120));
+ Assert.assertTrue(waitForSize(echoServices, 2, 120));
// check that we have 2 runnables.
ResourceReport report = controller.getResourceReport();
Assert.assertEquals(2, report.getRunnableResources("BuggyServer").size());
@@ -197,7 +196,7 @@ public final class ResourceReportTestRun extends BaseYarnTest {
@Test
public void testResourceReport() throws InterruptedException, ExecutionException, IOException,
URISyntaxException, TimeoutException {
- TwillRunner runner = YarnTestUtils.getTwillRunner();
+ TwillRunner runner = getTwillRunner();
final TwillController controller = runner.prepare(new ResourceApplication())
.addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
@@ -218,7 +217,7 @@ public final class ResourceReportTestRun extends BaseYarnTest {
// wait for 3 echo servers to come up
Iterable<Discoverable> echoServices = controller.discoverService("echo");
- Assert.assertTrue(YarnTestUtils.waitForSize(echoServices, 3, 120));
+ Assert.assertTrue(waitForSize(echoServices, 3, 120));
ResourceReport report = controller.getResourceReport();
// make sure resources for echo1 and echo2 are there
Map<String, Collection<TwillRunResources>> usedResources = report.getResources();
@@ -226,7 +225,7 @@ public final class ResourceReportTestRun extends BaseYarnTest {
Assert.assertTrue(usedResources.containsKey("echo1"));
Assert.assertTrue(usedResources.containsKey("echo2"));
- YarnTestUtils.waitForSize(new Iterable<String>() {
+ waitForSize(new Iterable<String>() {
@Override
public Iterator<String> iterator() {
return controller.getResourceReport().getServices().iterator();
@@ -254,7 +253,7 @@ public final class ResourceReportTestRun extends BaseYarnTest {
// Decrease number of instances of echo1 from 2 to 1
controller.changeInstances("echo1", 1);
echoServices = controller.discoverService("echo1");
- Assert.assertTrue(YarnTestUtils.waitForSize(echoServices, 1, 60));
+ Assert.assertTrue(waitForSize(echoServices, 1, 60));
report = controller.getResourceReport();
// make sure resources for echo1 and echo2 are there
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8d2ff212/twill-yarn/src/test/java/org/apache/twill/yarn/ServiceDiscoveryTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/ServiceDiscoveryTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/ServiceDiscoveryTestRun.java
index d34308a..59fe835 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/ServiceDiscoveryTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/ServiceDiscoveryTestRun.java
@@ -47,7 +47,7 @@ public final class ServiceDiscoveryTestRun extends BaseYarnTest {
@Test
public void testServiceDiscovery() throws InterruptedException, ExecutionException, TimeoutException {
- TwillRunner twillRunner = YarnTestUtils.getTwillRunner();
+ TwillRunner twillRunner = getTwillRunner();
TwillController controller = twillRunner
.prepare(new ServiceApplication())
.addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
@@ -56,7 +56,7 @@ public final class ServiceDiscoveryTestRun extends BaseYarnTest {
.start();
ServiceDiscovered completed = controller.discoverService("completed");
- Assert.assertTrue(YarnTestUtils.waitForSize(completed, 2, 120));
+ Assert.assertTrue(waitForSize(completed, 2, 120));
controller.sendCommand(Command.Builder.of("done").build());
controller.awaitTerminated(120, TimeUnit.SECONDS);
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8d2ff212/twill-yarn/src/test/java/org/apache/twill/yarn/SessionExpireTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/SessionExpireTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/SessionExpireTestRun.java
index 357de59..74cf80e 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/SessionExpireTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/SessionExpireTestRun.java
@@ -54,7 +54,7 @@ public class SessionExpireTestRun extends BaseYarnTest {
@Test
public void testAppSessionExpire() throws InterruptedException, ExecutionException, TimeoutException {
- TwillRunner runner = YarnTestUtils.getTwillRunner();
+ TwillRunner runner = getTwillRunner();
TwillController controller = runner.prepare(new SleepRunnable(600))
.addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
.start();
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8d2ff212/twill-yarn/src/test/java/org/apache/twill/yarn/TaskCompletedTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/TaskCompletedTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/TaskCompletedTestRun.java
index 4357cd9..51031d4 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/TaskCompletedTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/TaskCompletedTestRun.java
@@ -63,7 +63,7 @@ public final class TaskCompletedTestRun extends BaseYarnTest {
@Test
public void testTaskCompleted() throws InterruptedException, TimeoutException, ExecutionException {
- TwillRunner twillRunner = YarnTestUtils.getTwillRunner();
+ TwillRunner twillRunner = getTwillRunner();
TwillController controller = twillRunner.prepare(new SleepTask(),
ResourceSpecification.Builder.with()
.setVirtualCores(1)
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8d2ff212/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java b/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java
new file mode 100644
index 0000000..f669b83
--- /dev/null
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import com.google.common.util.concurrent.Service;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.twill.api.TwillRunner;
+import org.apache.twill.api.TwillRunnerService;
+import org.apache.twill.internal.yarn.VersionDetectYarnAppClientFactory;
+import org.apache.twill.internal.yarn.YarnAppClient;
+import org.apache.twill.internal.yarn.YarnUtils;
+import org.apache.twill.internal.zookeeper.InMemoryZKServer;
+import org.junit.rules.ExternalResource;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A TwillTester rule allows creation of mini Yarn cluster and {@link TwillRunner} used for testing that is
+ * guaranteed to be teared down when tests are done.
+ *
+ * <pre>
+ * public class TwillTest {
+ * @ClassRule
+ * public static final TwillTester TWILL_TESTER = new TwillTester();
+ *
+ * @Test
+ * public void test() {
+ * TwillRunner twillRunner = TWILL_TESTER.getTwillRunner();
+ * twillRunner.prepare(...).start();
+ * }
+ * }
+ * </pre>
+ */
+public class TwillTester extends ExternalResource {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TwillTester.class);
+
+ private final TemporaryFolder tmpFolder = new TemporaryFolder();
+ private InMemoryZKServer zkServer;
+ private MiniDFSCluster dfsCluster;
+ private MiniYARNCluster cluster;
+ private YarnConfiguration config;
+ private TwillRunnerService twillRunner;
+ private YarnAppClient yarnAppClient;
+
+ @Override
+ protected void before() throws Throwable {
+ tmpFolder.create();
+
+ // Starts Zookeeper
+ zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build();
+ zkServer.startAndWait();
+
+ // Start YARN mini cluster
+ File miniDFSDir = tmpFolder.newFolder();
+ LOG.info("Starting Mini DFS on path {}", miniDFSDir);
+ Configuration fsConf = new HdfsConfiguration(new Configuration());
+ fsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, miniDFSDir.getAbsolutePath());
+ dfsCluster = new MiniDFSCluster.Builder(fsConf).numDataNodes(1).build();
+
+ Configuration conf = new YarnConfiguration(dfsCluster.getFileSystem().getConf());
+
+ if (YarnUtils.getHadoopVersion().equals(YarnUtils.HadoopVersions.HADOOP_20)) {
+ conf.set("yarn.resourcemanager.scheduler.class",
+ "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler");
+ } else {
+ conf.set("yarn.resourcemanager.scheduler.class",
+ "org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler");
+ conf.set("yarn.scheduler.capacity.resource-calculator",
+ "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator");
+ conf.setBoolean("yarn.scheduler.include-port-in-node-name", true);
+ }
+ conf.set("yarn.nodemanager.vmem-pmem-ratio", "20.1");
+ conf.set("yarn.nodemanager.vmem-check-enabled", "false");
+ conf.set("yarn.scheduler.minimum-allocation-mb", "128");
+ conf.set("yarn.nodemanager.delete.debug-delay-sec", "3600");
+
+ cluster = new MiniYARNCluster("test-cluster", 3, 1, 1);
+ cluster.init(conf);
+ cluster.start();
+
+ this.config = new YarnConfiguration(cluster.getConfig());
+
+ twillRunner = createTwillRunnerService();
+ twillRunner.start();
+
+ yarnAppClient = new VersionDetectYarnAppClientFactory().create(conf);
+ yarnAppClient.startAndWait();
+ }
+
+ @Override
+ protected void after() {
+ stopQuietly(yarnAppClient);
+ try {
+ twillRunner.stop();
+ } catch (Exception e) {
+ LOG.warn("Failed to stop TwillRunner", e);
+ }
+ try {
+ cluster.stop();
+ } catch (Exception e) {
+ LOG.warn("Failed to stop mini Yarn cluster", e);
+ }
+ try {
+ dfsCluster.shutdown();
+ } catch (Exception e) {
+ LOG.warn("Failed to stop mini dfs cluster", e);
+ }
+ stopQuietly(zkServer);
+
+ tmpFolder.delete();
+ }
+
+ /**
+ * Creates an unstarted instance of {@link org.apache.twill.api.TwillRunnerService}.
+ */
+ public TwillRunnerService createTwillRunnerService() throws IOException {
+ YarnTwillRunnerService runner = new YarnTwillRunnerService(config, zkServer.getConnectionStr() + "/twill");
+ // disable tests stealing focus
+ runner.setJVMOptions("-Djava.awt.headless=true");
+ return runner;
+ }
+
+ /**
+ * Returns a {@link TwillRunner} that interact with the mini Yarn cluster.
+ */
+ public TwillRunner getTwillRunner() {
+ return twillRunner;
+ }
+
+ /**
+ * Returns a list of {@link NodeReport} about the mini yarn cluster.
+ */
+ public List<NodeReport> getNodeReports() throws Exception {
+ return yarnAppClient.getNodeReports();
+ }
+
+ private void stopQuietly(Service service) {
+ try {
+ service.stopAndWait();
+ } catch (Exception e) {
+ LOG.warn("Failed to stop service {}.", service, e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8d2ff212/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java b/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
index 8427041..d9efe3c 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
@@ -40,6 +40,6 @@ import org.junit.runners.Suite;
ContainerSizeTestRun.class,
InitializeFailTestRun.class
})
-public final class YarnTestSuite {
+public final class YarnTestSuite extends BaseYarnTest {
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8d2ff212/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestUtils.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestUtils.java b/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestUtils.java
deleted file mode 100644
index 856224a..0000000
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestUtils.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.yarn;
-
-import com.google.common.collect.Iterables;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.yarn.api.records.NodeReport;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.MiniYARNCluster;
-import org.apache.twill.api.TwillRunner;
-import org.apache.twill.api.TwillRunnerService;
-import org.apache.twill.internal.yarn.VersionDetectYarnAppClientFactory;
-import org.apache.twill.internal.yarn.YarnAppClient;
-import org.apache.twill.internal.yarn.YarnUtils;
-import org.apache.twill.internal.zookeeper.InMemoryZKServer;
-import org.junit.rules.TemporaryFolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * Utilities for testing YARN.
- */
-public final class YarnTestUtils {
- private static final Logger LOG = LoggerFactory.getLogger(YarnTestUtils.class);
-
- private static InMemoryZKServer zkServer;
- private static MiniDFSCluster dfsCluster;
- private static MiniYARNCluster cluster;
- private static TwillRunnerService runnerService;
- private static YarnConfiguration config;
- private static YarnAppClient yarnAppClient;
-
- private static final AtomicBoolean once = new AtomicBoolean(false);
-
- public static boolean initOnce() throws IOException {
- if (once.compareAndSet(false, true)) {
- final TemporaryFolder tmpFolder = new TemporaryFolder();
- tmpFolder.create();
- init(tmpFolder.newFolder());
-
- // add shutdown hook because we want to initialized/cleanup once
- Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- finish();
- } finally {
- tmpFolder.delete();
- }
- }
- }));
- return true;
- }
- return false;
- }
-
- private static void init(File folder) throws IOException {
- // Starts Zookeeper
- zkServer = InMemoryZKServer.builder().build();
- zkServer.startAndWait();
-
- // Start YARN mini cluster
- LOG.info("Starting Mini DFS on path {}", folder);
- Configuration fsConf = new HdfsConfiguration(new Configuration());
- fsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, folder.getAbsolutePath());
- dfsCluster = new MiniDFSCluster.Builder(fsConf).numDataNodes(1).build();
-
- Configuration conf = new YarnConfiguration(dfsCluster.getFileSystem().getConf());
-
- if (YarnUtils.getHadoopVersion().equals(YarnUtils.HadoopVersions.HADOOP_20)) {
- conf.set("yarn.resourcemanager.scheduler.class",
- "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler");
- } else {
- conf.set("yarn.resourcemanager.scheduler.class",
- "org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler");
- conf.set("yarn.scheduler.capacity.resource-calculator",
- "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator");
- conf.setBoolean("yarn.scheduler.include-port-in-node-name", true);
- }
- conf.set("yarn.nodemanager.vmem-pmem-ratio", "20.1");
- conf.set("yarn.nodemanager.vmem-check-enabled", "false");
- conf.set("yarn.scheduler.minimum-allocation-mb", "128");
- conf.set("yarn.nodemanager.delete.debug-delay-sec", "3600");
-
- cluster = new MiniYARNCluster("test-cluster", 3, 1, 1);
- cluster.init(conf);
- cluster.start();
-
- config = new YarnConfiguration(cluster.getConfig());
-
- runnerService = createTwillRunnerService();
- runnerService.start();
-
- yarnAppClient = new VersionDetectYarnAppClientFactory().create(conf);
- yarnAppClient.start();
- }
-
- public static boolean finish() {
- if (once.compareAndSet(true, false)) {
- runnerService.stop();
- cluster.stop();
- dfsCluster.shutdown();
- zkServer.stopAndWait();
- yarnAppClient.stop();
-
- return true;
- }
-
- return false;
- }
-
- public static TwillRunner getTwillRunner() {
- return runnerService;
- }
-
- public static InMemoryZKServer getZkServer() {
- return zkServer;
- }
-
- /**
- * Creates an unstarted instance of {@link org.apache.twill.api.TwillRunnerService}.
- */
- public static TwillRunnerService createTwillRunnerService() throws IOException {
- YarnTwillRunnerService runner = new YarnTwillRunnerService(config, zkServer.getConnectionStr() + "/twill");
- // disable tests stealing focus
- runner.setJVMOptions("-Djava.awt.headless=true");
- return runner;
- }
-
- /**
- * Returns {@link org.apache.hadoop.yarn.api.records.NodeReport} for the nodes in the MiniYarnCluster.
- * @return a list of {@link org.apache.hadoop.yarn.api.records.NodeReport} for the nodes in the cluster.
- * @throws Exception Propagates exceptions thrown by {@link org.apache.hadoop.yarn.client.api.YarnClient}.
- */
- public static List<NodeReport> getNodeReports() throws Exception {
- return yarnAppClient.getNodeReports();
- }
-
- public static <T> boolean waitForSize(Iterable<T> iterable, int count, int limit) throws InterruptedException {
- int trial = 0;
- int size = Iterables.size(iterable);
- while (size != count && trial < limit) {
- LOG.info("Waiting for {} size {} == {}", iterable, size, count);
- TimeUnit.SECONDS.sleep(1);
- trial++;
- size = Iterables.size(iterable);
- }
- return trial < limit;
- }
-}