You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/12/13 23:27:26 UTC
[10/28] [TWILL-14] Bootstrapping for the site generation.
Reorganization of the source tree happens:
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServer.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServer.java b/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServer.java
new file mode 100644
index 0000000..6b77e66
--- /dev/null
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServer.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.Command;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.PrintWriter;
+
+/**
+ * Test server that echoes back what it receives.
+ */
+public final class EchoServer extends SocketServer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(EchoServer.class);
+
+ @Override
+ public void handleRequest(BufferedReader reader, PrintWriter writer) throws IOException {
+ String line = reader.readLine();
+ LOG.info("Received: " + line);
+ if (line != null) {
+ writer.println(line);
+ }
+ }
+
+ @Override
+ public void handleCommand(Command command) throws Exception {
+ LOG.info("Command received: " + command + " " + getContext().getInstanceCount());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
new file mode 100644
index 0000000..d868eef
--- /dev/null
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.ResourceSpecification;
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillRunner;
+import org.apache.twill.api.TwillRunnerService;
+import org.apache.twill.api.logging.PrinterLogHandler;
+import org.apache.twill.common.ServiceListenerAdapter;
+import org.apache.twill.common.Threads;
+import org.apache.twill.discovery.Discoverable;
+import com.google.common.base.Charsets;
+import com.google.common.io.LineReader;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.net.Socket;
+import java.net.URISyntaxException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Using echo server to test various behavior of YarnTwillService.
+ * This test is executed by {@link YarnTestSuite}.
+ */
+public class EchoServerTestRun {
+
+ private static final Logger LOG = LoggerFactory.getLogger(EchoServerTestRun.class);
+
+ @Test
+ public void testEchoServer() throws InterruptedException, ExecutionException, IOException,
+ URISyntaxException, TimeoutException {
+ TwillRunner runner = YarnTestSuite.getTwillRunner();
+
+ TwillController controller = runner.prepare(new EchoServer(),
+ ResourceSpecification.Builder.with()
+ .setVirtualCores(1)
+ .setMemory(1, ResourceSpecification.SizeUnit.GIGA)
+ .setInstances(2)
+ .build())
+ .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+ .withApplicationArguments("echo")
+ .withArguments("EchoServer", "echo2")
+ .start();
+
+ final CountDownLatch running = new CountDownLatch(1);
+ controller.addListener(new ServiceListenerAdapter() {
+ @Override
+ public void running() {
+ running.countDown();
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+
+ Assert.assertTrue(running.await(30, TimeUnit.SECONDS));
+
+ Iterable<Discoverable> echoServices = controller.discoverService("echo");
+ Assert.assertTrue(YarnTestSuite.waitForSize(echoServices, 2, 60));
+
+ for (Discoverable discoverable : echoServices) {
+ String msg = "Hello: " + discoverable.getSocketAddress();
+
+ Socket socket = new Socket(discoverable.getSocketAddress().getAddress(),
+ discoverable.getSocketAddress().getPort());
+ try {
+ PrintWriter writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), Charsets.UTF_8), true);
+ LineReader reader = new LineReader(new InputStreamReader(socket.getInputStream(), Charsets.UTF_8));
+
+ writer.println(msg);
+ Assert.assertEquals(msg, reader.readLine());
+ } finally {
+ socket.close();
+ }
+ }
+
+ // Increase number of instances
+ controller.changeInstances("EchoServer", 3);
+ Assert.assertTrue(YarnTestSuite.waitForSize(echoServices, 3, 60));
+
+ echoServices = controller.discoverService("echo2");
+
+ // Decrease number of instances
+ controller.changeInstances("EchoServer", 1);
+ Assert.assertTrue(YarnTestSuite.waitForSize(echoServices, 1, 60));
+
+ // Increase number of instances again
+ controller.changeInstances("EchoServer", 2);
+ Assert.assertTrue(YarnTestSuite.waitForSize(echoServices, 2, 60));
+
+ // Make sure still only one app is running
+ Iterable<TwillRunner.LiveInfo> apps = runner.lookupLive();
+ Assert.assertTrue(YarnTestSuite.waitForSize(apps, 1, 60));
+
+ // Creates a new runner service to check it can regain control over running app.
+ TwillRunnerService runnerService = YarnTestSuite.createTwillRunnerService();
+ runnerService.startAndWait();
+
+ try {
+ Iterable <TwillController> controllers = runnerService.lookup("EchoServer");
+ Assert.assertTrue(YarnTestSuite.waitForSize(controllers, 1, 60));
+
+ for (TwillController c : controllers) {
+ LOG.info("Stopping application: " + c.getRunId());
+ c.stop().get(30, TimeUnit.SECONDS);
+ }
+
+ Assert.assertTrue(YarnTestSuite.waitForSize(apps, 0, 60));
+ } finally {
+ runnerService.stopAndWait();
+ }
+
+ // Sleep a bit before exiting.
+ TimeUnit.SECONDS.sleep(2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/test/java/org/apache/twill/yarn/EnvironmentEchoServer.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/EnvironmentEchoServer.java b/twill-yarn/src/test/java/org/apache/twill/yarn/EnvironmentEchoServer.java
new file mode 100644
index 0000000..4be2472
--- /dev/null
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/EnvironmentEchoServer.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.PrintWriter;
+
+/**
+ * Test server that returns back the value of the env key sent in. Used to check env for
+ * runnables is correctly set.
+ */
+public class EnvironmentEchoServer extends SocketServer {
+
+ @Override
+ public void handleRequest(BufferedReader reader, PrintWriter writer) throws IOException {
+ String envKey = reader.readLine();
+ writer.println(System.getenv(envKey));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/test/java/org/apache/twill/yarn/FailureRestartTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/FailureRestartTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/FailureRestartTestRun.java
new file mode 100644
index 0000000..b3d3933
--- /dev/null
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/FailureRestartTestRun.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.Command;
+import org.apache.twill.api.ResourceSpecification;
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillRunner;
+import org.apache.twill.api.logging.PrinterLogHandler;
+import org.apache.twill.discovery.Discoverable;
+import com.google.common.base.Charsets;
+import com.google.common.collect.Sets;
+import com.google.common.io.LineReader;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public class FailureRestartTestRun {
+
+ @Test
+ public void testFailureRestart() throws Exception {
+ TwillRunner runner = YarnTestSuite.getTwillRunner();
+
+ ResourceSpecification resource = ResourceSpecification.Builder.with()
+ .setVirtualCores(1)
+ .setMemory(512, ResourceSpecification.SizeUnit.MEGA)
+ .setInstances(2)
+ .build();
+ TwillController controller = runner.prepare(new FailureRunnable(), resource)
+ .withApplicationArguments("failure")
+ .withArguments(FailureRunnable.class.getSimpleName(), "failure2")
+ .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+ .start();
+
+ Iterable<Discoverable> discoverables = controller.discoverService("failure");
+ Assert.assertTrue(YarnTestSuite.waitForSize(discoverables, 2, 60));
+
+ // Make sure we see the right instance IDs
+ Assert.assertEquals(Sets.newHashSet(0, 1), getInstances(discoverables));
+
+ // Kill server with instanceId = 0
+ controller.sendCommand(FailureRunnable.class.getSimpleName(), Command.Builder.of("kill0").build());
+
+ // Do a shot sleep, make sure the runnable is killed.
+ TimeUnit.SECONDS.sleep(5);
+
+ Assert.assertTrue(YarnTestSuite.waitForSize(discoverables, 2, 60));
+ // Make sure we see the right instance IDs
+ Assert.assertEquals(Sets.newHashSet(0, 1), getInstances(discoverables));
+
+ controller.stopAndWait();
+ }
+
+ private Set<Integer> getInstances(Iterable<Discoverable> discoverables) throws IOException {
+ Set<Integer> instances = Sets.newHashSet();
+ for (Discoverable discoverable : discoverables) {
+ InetSocketAddress socketAddress = discoverable.getSocketAddress();
+ Socket socket = new Socket(socketAddress.getAddress(), socketAddress.getPort());
+ try {
+ PrintWriter writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), Charsets.UTF_8), true);
+ LineReader reader = new LineReader(new InputStreamReader(socket.getInputStream(), Charsets.UTF_8));
+
+ String msg = "Failure";
+ writer.println(msg);
+
+ String line = reader.readLine();
+ Assert.assertTrue(line.endsWith(msg));
+ instances.add(Integer.parseInt(line.substring(0, line.length() - msg.length())));
+ } finally {
+ socket.close();
+ }
+ }
+ return instances;
+ }
+
+
+ public static final class FailureRunnable extends SocketServer {
+
+ private volatile boolean killed;
+
+ @Override
+ public void run() {
+ killed = false;
+ super.run();
+ if (killed) {
+ throw new RuntimeException("Exception");
+ }
+ }
+
+ @Override
+ public void handleCommand(Command command) throws Exception {
+ if (command.getCommand().equals("kill" + getContext().getInstanceId())) {
+ killed = true;
+ running = false;
+ serverSocket.close();
+ }
+ }
+
+ @Override
+ public void handleRequest(BufferedReader reader, PrintWriter writer) throws IOException {
+ String line = reader.readLine();
+ writer.println(getContext().getInstanceId() + line);
+ writer.flush();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/test/java/org/apache/twill/yarn/LocalFileTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/LocalFileTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/LocalFileTestRun.java
new file mode 100644
index 0000000..de2c74c
--- /dev/null
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/LocalFileTestRun.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.TwillApplication;
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillRunner;
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.api.logging.PrinterLogHandler;
+import org.apache.twill.discovery.Discoverable;
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Files;
+import com.google.common.io.LineReader;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.concurrent.TimeUnit;
+import java.util.jar.JarEntry;
+import java.util.jar.JarOutputStream;
+
+/**
+ * Test for local file transfer.
+ */
+public class LocalFileTestRun {
+
+ @ClassRule
+ public static TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ @Test
+ public void testLocalFile() throws Exception {
+ String header = Files.readFirstLine(new File(getClass().getClassLoader().getResource("header.txt").toURI()),
+ Charsets.UTF_8);
+
+ TwillRunner runner = YarnTestSuite.getTwillRunner();
+ if (runner instanceof YarnTwillRunnerService) {
+ ((YarnTwillRunnerService) runner).setJVMOptions("-verbose:gc -Xloggc:gc.log -XX:+PrintGCDetails");
+ }
+
+ TwillController controller = runner.prepare(new LocalFileApplication())
+ .withApplicationArguments("local")
+ .withArguments("LocalFileSocketServer", "local2")
+ .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+ .start();
+
+ if (runner instanceof YarnTwillRunnerService) {
+ ((YarnTwillRunnerService) runner).setJVMOptions("");
+ }
+
+ Iterable<Discoverable> discoverables = controller.discoverService("local");
+ Assert.assertTrue(YarnTestSuite.waitForSize(discoverables, 1, 60));
+
+ InetSocketAddress socketAddress = discoverables.iterator().next().getSocketAddress();
+ Socket socket = new Socket(socketAddress.getAddress(), socketAddress.getPort());
+ try {
+ PrintWriter writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), Charsets.UTF_8), true);
+ LineReader reader = new LineReader(new InputStreamReader(socket.getInputStream(), Charsets.UTF_8));
+
+ String msg = "Local file test";
+ writer.println(msg);
+ Assert.assertEquals(header, reader.readLine());
+ Assert.assertEquals(msg, reader.readLine());
+ } finally {
+ socket.close();
+ }
+
+ controller.stopAndWait();
+
+ Assert.assertTrue(YarnTestSuite.waitForSize(discoverables, 0, 60));
+
+ TimeUnit.SECONDS.sleep(2);
+ }
+
+ public static final class LocalFileApplication implements TwillApplication {
+
+ private final File headerFile;
+
+ public LocalFileApplication() throws Exception {
+ // Create a jar file that contains the header.txt file inside.
+ headerFile = tmpFolder.newFile("header.jar");
+ JarOutputStream os = new JarOutputStream(new FileOutputStream(headerFile));
+ try {
+ os.putNextEntry(new JarEntry("header.txt"));
+ ByteStreams.copy(getClass().getClassLoader().getResourceAsStream("header.txt"), os);
+ } finally {
+ os.close();
+ }
+ }
+
+ @Override
+ public TwillSpecification configure() {
+ return TwillSpecification.Builder.with()
+ .setName("LocalFileApp")
+ .withRunnable()
+ .add(new LocalFileSocketServer())
+ .withLocalFiles()
+ .add("header", headerFile, true).apply()
+ .anyOrder()
+ .build();
+ }
+ }
+
+ public static final class LocalFileSocketServer extends SocketServer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LocalFileSocketServer.class);
+
+ @Override
+ public void handleRequest(BufferedReader reader, PrintWriter writer) throws IOException {
+ // Verify there is a gc.log file locally
+ Preconditions.checkState(new File("gc.log").exists());
+
+ LOG.info("handleRequest");
+ String header = Files.toString(new File("header/header.txt"), Charsets.UTF_8);
+ writer.write(header);
+ writer.println(reader.readLine());
+ LOG.info("Flushed response");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/test/java/org/apache/twill/yarn/ProvisionTimeoutTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/ProvisionTimeoutTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/ProvisionTimeoutTestRun.java
new file mode 100644
index 0000000..0598ef1
--- /dev/null
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/ProvisionTimeoutTestRun.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.AbstractTwillRunnable;
+import org.apache.twill.api.EventHandler;
+import org.apache.twill.api.EventHandlerContext;
+import org.apache.twill.api.ResourceSpecification;
+import org.apache.twill.api.TwillApplication;
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillRunner;
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.api.logging.PrinterLogHandler;
+import org.apache.twill.common.Services;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+
+import java.io.PrintWriter;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ *
+ */
+public class ProvisionTimeoutTestRun {
+
+ @Test
+ public void testProvisionTimeout() throws InterruptedException, ExecutionException, TimeoutException {
+ TwillRunner runner = YarnTestSuite.getTwillRunner();
+
+ TwillController controller = runner.prepare(new TimeoutApplication())
+ .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+ .start();
+
+ // The provision should failed in 30 seconds after AM started, which AM could took a while to start.
+ // Hence we give 90 seconds max time here.
+ try {
+ Services.getCompletionFuture(controller).get(90, TimeUnit.SECONDS);
+ } finally {
+ // If it timeout, kill the app as cleanup.
+ controller.kill();
+ }
+ }
+
+ public static final class Handler extends EventHandler {
+
+ private boolean abort;
+
+ @Override
+ protected Map<String, String> getConfigs() {
+ return ImmutableMap.of("abort", "true");
+ }
+
+ @Override
+ public void initialize(EventHandlerContext context) {
+ this.abort = Boolean.parseBoolean(context.getSpecification().getConfigs().get("abort"));
+ }
+
+ @Override
+ public TimeoutAction launchTimeout(Iterable<TimeoutEvent> timeoutEvents) {
+ if (abort) {
+ return TimeoutAction.abort();
+ } else {
+ return TimeoutAction.recheck(10, TimeUnit.SECONDS);
+ }
+ }
+ }
+
+ public static final class TimeoutApplication implements TwillApplication {
+
+ @Override
+ public TwillSpecification configure() {
+ return TwillSpecification.Builder.with()
+ .setName("TimeoutApplication")
+ .withRunnable()
+ .add(new TimeoutRunnable(),
+ ResourceSpecification.Builder.with()
+ .setVirtualCores(1)
+ .setMemory(8, ResourceSpecification.SizeUnit.GIGA).build())
+ .noLocalFiles()
+ .anyOrder()
+ .withEventHandler(new Handler())
+ .build();
+ }
+ }
+
+ /**
+ * A runnable that do nothing, as it's not expected to get provisioned.
+ */
+ public static final class TimeoutRunnable extends AbstractTwillRunnable {
+
+ private final CountDownLatch latch = new CountDownLatch(1);
+
+ @Override
+ public void stop() {
+ latch.countDown();
+ }
+
+ @Override
+ public void run() {
+ // Simply block here
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java
new file mode 100644
index 0000000..131f90a
--- /dev/null
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.ResourceReport;
+import org.apache.twill.api.ResourceSpecification;
+import org.apache.twill.api.TwillApplication;
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillRunResources;
+import org.apache.twill.api.TwillRunner;
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.api.logging.PrinterLogHandler;
+import org.apache.twill.common.ServiceListenerAdapter;
+import org.apache.twill.common.Threads;
+import org.apache.twill.discovery.Discoverable;
+import org.apache.twill.internal.EnvKeys;
+import com.google.common.base.Charsets;
+import com.google.common.collect.Maps;
+import com.google.common.io.LineReader;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.net.Socket;
+import java.net.URISyntaxException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Using echo server to test resource reports.
+ * This test is executed by {@link org.apache.twill.yarn.YarnTestSuite}.
+ */
+public class ResourceReportTestRun {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ResourceReportTestRun.class);
+
+ private class ResourceApplication implements TwillApplication {
+ @Override
+ public TwillSpecification configure() {
+ return TwillSpecification.Builder.with()
+ .setName("ResourceApplication")
+ .withRunnable()
+ .add("echo1", new EchoServer(), ResourceSpecification.Builder.with()
+ .setVirtualCores(1)
+ .setMemory(128, ResourceSpecification.SizeUnit.MEGA)
+ .setInstances(2).build()).noLocalFiles()
+ .add("echo2", new EchoServer(), ResourceSpecification.Builder.with()
+ .setVirtualCores(2)
+ .setMemory(256, ResourceSpecification.SizeUnit.MEGA)
+ .setInstances(1).build()).noLocalFiles()
+ .anyOrder()
+ .build();
+ }
+ }
+
+ @Test
+ public void testRunnablesGetAllowedResourcesInEnv() throws InterruptedException, IOException,
+ TimeoutException, ExecutionException {
+ TwillRunner runner = YarnTestSuite.getTwillRunner();
+
+ ResourceSpecification resourceSpec = ResourceSpecification.Builder.with()
+ .setVirtualCores(1)
+ .setMemory(2048, ResourceSpecification.SizeUnit.MEGA)
+ .setInstances(1)
+ .build();
+ TwillController controller = runner.prepare(new EnvironmentEchoServer(), resourceSpec)
+ .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+ .withApplicationArguments("envecho")
+ .withArguments("EnvironmentEchoServer", "echo2")
+ .start();
+
+ final CountDownLatch running = new CountDownLatch(1);
+ controller.addListener(new ServiceListenerAdapter() {
+ @Override
+ public void running() {
+ running.countDown();
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+
+ Assert.assertTrue(running.await(30, TimeUnit.SECONDS));
+
+ Iterable<Discoverable> envEchoServices = controller.discoverService("envecho");
+ Assert.assertTrue(YarnTestSuite.waitForSize(envEchoServices, 1, 30));
+
+ // TODO: check virtual cores once yarn adds the ability
+ Map<String, String> expectedValues = Maps.newHashMap();
+ expectedValues.put(EnvKeys.YARN_CONTAINER_MEMORY_MB, "2048");
+ expectedValues.put(EnvKeys.TWILL_INSTANCE_COUNT, "1");
+
+ // check environment of the runnable.
+ Discoverable discoverable = envEchoServices.iterator().next();
+ for (Map.Entry<String, String> expected : expectedValues.entrySet()) {
+ Socket socket = new Socket(discoverable.getSocketAddress().getHostName(),
+ discoverable.getSocketAddress().getPort());
+ try {
+ PrintWriter writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), Charsets.UTF_8), true);
+ LineReader reader = new LineReader(new InputStreamReader(socket.getInputStream(), Charsets.UTF_8));
+ writer.println(expected.getKey());
+ Assert.assertEquals(expected.getValue(), reader.readLine());
+ } finally {
+ socket.close();
+ }
+ }
+
+ controller.stop().get(30, TimeUnit.SECONDS);
+ // Sleep a bit before exiting.
+ TimeUnit.SECONDS.sleep(2);
+ }
+
+ @Test
+ public void testResourceReportWithFailingContainers() throws InterruptedException, IOException,
+ TimeoutException, ExecutionException {
+ TwillRunner runner = YarnTestSuite.getTwillRunner();
+
+ ResourceSpecification resourceSpec = ResourceSpecification.Builder.with()
+ .setVirtualCores(1)
+ .setMemory(128, ResourceSpecification.SizeUnit.MEGA)
+ .setInstances(2)
+ .build();
+ TwillController controller = runner.prepare(new BuggyServer(), resourceSpec)
+ .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+ .withApplicationArguments("echo")
+ .withArguments("BuggyServer", "echo2")
+ .start();
+
+ final CountDownLatch running = new CountDownLatch(1);
+ controller.addListener(new ServiceListenerAdapter() {
+ @Override
+ public void running() {
+ running.countDown();
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+
+ Assert.assertTrue(running.await(30, TimeUnit.SECONDS));
+
+ Iterable<Discoverable> echoServices = controller.discoverService("echo");
+ Assert.assertTrue(YarnTestSuite.waitForSize(echoServices, 2, 60));
+ // check that we have 2 runnables.
+ ResourceReport report = controller.getResourceReport();
+ Assert.assertEquals(2, report.getRunnableResources("BuggyServer").size());
+
+ // cause a divide by 0 in one server
+ Discoverable discoverable = echoServices.iterator().next();
+ Socket socket = new Socket(discoverable.getSocketAddress().getAddress(),
+ discoverable.getSocketAddress().getPort());
+ try {
+ PrintWriter writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), Charsets.UTF_8), true);
+ writer.println("0");
+ } finally {
+ socket.close();
+ }
+
+ // takes some time for app master to find out the container completed...
+ TimeUnit.SECONDS.sleep(5);
+ // check that we have 1 runnable, not 2.
+ report = controller.getResourceReport();
+ Assert.assertEquals(1, report.getRunnableResources("BuggyServer").size());
+
+ controller.stop().get(30, TimeUnit.SECONDS);
+ // Sleep a bit before exiting.
+ TimeUnit.SECONDS.sleep(2);
+ }
+
+ @Test
+ public void testResourceReport() throws InterruptedException, ExecutionException, IOException,
+ URISyntaxException, TimeoutException {
+ TwillRunner runner = YarnTestSuite.getTwillRunner();
+
+ TwillController controller = runner.prepare(new ResourceApplication())
+ .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+ .withApplicationArguments("echo")
+ .withArguments("echo1", "echo1")
+ .withArguments("echo2", "echo2")
+ .start();
+
+ final CountDownLatch running = new CountDownLatch(1);
+ controller.addListener(new ServiceListenerAdapter() {
+ @Override
+ public void running() {
+ running.countDown();
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+
+ Assert.assertTrue(running.await(30, TimeUnit.SECONDS));
+
+ // wait for 3 echo servers to come up
+ Iterable<Discoverable> echoServices = controller.discoverService("echo");
+ Assert.assertTrue(YarnTestSuite.waitForSize(echoServices, 3, 60));
+ ResourceReport report = controller.getResourceReport();
+ // make sure resources for echo1 and echo2 are there
+ Map<String, Collection<TwillRunResources>> usedResources = report.getResources();
+ Assert.assertEquals(2, usedResources.keySet().size());
+ Assert.assertTrue(usedResources.containsKey("echo1"));
+ Assert.assertTrue(usedResources.containsKey("echo2"));
+
+ Collection<TwillRunResources> echo1Resources = usedResources.get("echo1");
+ // 2 instances of echo1
+ Assert.assertEquals(2, echo1Resources.size());
+ // TODO: check cores after hadoop-2.1.0
+ for (TwillRunResources resources : echo1Resources) {
+ Assert.assertEquals(128, resources.getMemoryMB());
+ }
+
+ Collection<TwillRunResources> echo2Resources = usedResources.get("echo2");
+ // 2 instances of echo1
+ Assert.assertEquals(1, echo2Resources.size());
+ // TODO: check cores after hadoop-2.1.0
+ for (TwillRunResources resources : echo2Resources) {
+ Assert.assertEquals(256, resources.getMemoryMB());
+ }
+
+ // Decrease number of instances of echo1 from 2 to 1
+ controller.changeInstances("echo1", 1);
+ echoServices = controller.discoverService("echo1");
+ Assert.assertTrue(YarnTestSuite.waitForSize(echoServices, 1, 60));
+ report = controller.getResourceReport();
+
+ // make sure resources for echo1 and echo2 are there
+ usedResources = report.getResources();
+ Assert.assertEquals(2, usedResources.keySet().size());
+ Assert.assertTrue(usedResources.containsKey("echo1"));
+ Assert.assertTrue(usedResources.containsKey("echo2"));
+
+ echo1Resources = usedResources.get("echo1");
+ // 1 instance of echo1 now
+ Assert.assertEquals(1, echo1Resources.size());
+ // TODO: check cores after hadoop-2.1.0
+ for (TwillRunResources resources : echo1Resources) {
+ Assert.assertEquals(128, resources.getMemoryMB());
+ }
+
+ echo2Resources = usedResources.get("echo2");
+ // 2 instances of echo1
+ Assert.assertEquals(1, echo2Resources.size());
+ // TODO: check cores after hadoop-2.1.0
+ for (TwillRunResources resources : echo2Resources) {
+ Assert.assertEquals(256, resources.getMemoryMB());
+ }
+
+ controller.stop().get(30, TimeUnit.SECONDS);
+ // Sleep a bit before exiting.
+ TimeUnit.SECONDS.sleep(2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/test/java/org/apache/twill/yarn/SocketServer.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/SocketServer.java b/twill-yarn/src/test/java/org/apache/twill/yarn/SocketServer.java
new file mode 100644
index 0000000..5148ed2
--- /dev/null
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/SocketServer.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.AbstractTwillRunnable;
+import org.apache.twill.api.TwillContext;
+import org.apache.twill.api.TwillContext;
+import org.apache.twill.common.Cancellable;
+import com.google.common.base.Charsets;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+import java.util.List;
+
+/**
+ * Boilerplate for a server that announces itself and talks to clients through a socket.
+ */
+public abstract class SocketServer extends AbstractTwillRunnable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SocketServer.class);
+
+ protected volatile boolean running;
+ protected volatile Thread runThread;
+ protected ServerSocket serverSocket;
+ protected Cancellable canceller;
+
+ @Override
+ public void initialize(TwillContext context) {
+ super.initialize(context);
+ running = true;
+ try {
+ serverSocket = new ServerSocket(0);
+ LOG.info("Server started: " + serverSocket.getLocalSocketAddress() +
+ ", id: " + context.getInstanceId() +
+ ", count: " + context.getInstanceCount());
+
+ final List<Cancellable> cancellables = ImmutableList.of(
+ context.announce(context.getApplicationArguments()[0], serverSocket.getLocalPort()),
+ context.announce(context.getArguments()[0], serverSocket.getLocalPort())
+ );
+ canceller = new Cancellable() {
+ @Override
+ public void cancel() {
+ for (Cancellable c : cancellables) {
+ c.cancel();
+ }
+ }
+ };
+ } catch (IOException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ runThread = Thread.currentThread();
+ while (running) {
+ try {
+ Socket socket = serverSocket.accept();
+ try {
+ BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), Charsets.UTF_8));
+ PrintWriter writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream()), true);
+ handleRequest(reader, writer);
+ } finally {
+ socket.close();
+ }
+ } catch (SocketException e) {
+ LOG.info("Socket exception: " + e);
+ }
+ }
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void stop() {
+ LOG.info("Stopping server");
+ canceller.cancel();
+ running = false;
+ Thread t = runThread;
+ if (t != null) {
+ t.interrupt();
+ }
+ try {
+ serverSocket.close();
+ } catch (IOException e) {
+ LOG.error("Exception while closing socket.", e);
+ throw Throwables.propagate(e);
+ }
+ serverSocket = null;
+ }
+
+ @Override
+ public void destroy() {
+ try {
+ if (serverSocket != null) {
+ serverSocket.close();
+ }
+ } catch (IOException e) {
+ LOG.error("Exception while closing socket.", e);
+ throw Throwables.propagate(e);
+ }
+ }
+
+ abstract public void handleRequest(BufferedReader reader, PrintWriter writer) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/test/java/org/apache/twill/yarn/TaskCompletedTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/TaskCompletedTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/TaskCompletedTestRun.java
new file mode 100644
index 0000000..5a93271
--- /dev/null
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/TaskCompletedTestRun.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.AbstractTwillRunnable;
+import org.apache.twill.api.ResourceSpecification;
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillRunner;
+import org.apache.twill.api.logging.PrinterLogHandler;
+import org.apache.twill.common.ServiceListenerAdapter;
+import org.apache.twill.common.Threads;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.Service;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.PrintWriter;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Testing application master will shutdown itself when all tasks are completed.
+ * This test is executed by {@link YarnTestSuite}.
+ */
+public class TaskCompletedTestRun {
+
+ public static final class SleepTask extends AbstractTwillRunnable {
+
+ @Override
+ public void run() {
+ // Randomly sleep for 3-5 seconds.
+ try {
+ TimeUnit.SECONDS.sleep(new Random().nextInt(3) + 3);
+ } catch (InterruptedException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ public void stop() {
+ // No-op
+ }
+ }
+
+ @Test
+ public void testTaskCompleted() throws InterruptedException {
+ TwillRunner twillRunner = YarnTestSuite.getTwillRunner();
+ TwillController controller = twillRunner.prepare(new SleepTask(),
+ ResourceSpecification.Builder.with()
+ .setVirtualCores(1)
+ .setMemory(512, ResourceSpecification.SizeUnit.MEGA)
+ .setInstances(3).build())
+ .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+ .start();
+
+ final CountDownLatch runLatch = new CountDownLatch(1);
+ final CountDownLatch stopLatch = new CountDownLatch(1);
+ controller.addListener(new ServiceListenerAdapter() {
+
+ @Override
+ public void running() {
+ runLatch.countDown();
+ }
+
+ @Override
+ public void terminated(Service.State from) {
+ stopLatch.countDown();
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+
+ Assert.assertTrue(runLatch.await(1, TimeUnit.MINUTES));
+
+ Assert.assertTrue(stopLatch.await(1, TimeUnit.MINUTES));
+
+ TimeUnit.SECONDS.sleep(2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/test/java/org/apache/twill/yarn/TwillSpecificationTest.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/TwillSpecificationTest.java b/twill-yarn/src/test/java/org/apache/twill/yarn/TwillSpecificationTest.java
new file mode 100644
index 0000000..8be907b
--- /dev/null
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/TwillSpecificationTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.AbstractTwillRunnable;
+import org.apache.twill.api.TwillSpecification;
+import com.google.common.collect.ImmutableSet;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+/**
+ *
+ */
+public class TwillSpecificationTest {
+
+ /**
+ * Dummy for test.
+ */
+ public static final class DummyRunnable extends AbstractTwillRunnable {
+
+ @Override
+ public void stop() {
+ // no-op
+ }
+
+ @Override
+ public void run() {
+ // no-op
+ }
+ }
+
+ @Test
+ public void testAnyOrder() {
+ TwillSpecification spec =
+ TwillSpecification.Builder.with()
+ .setName("Testing")
+ .withRunnable()
+ .add("r1", new DummyRunnable()).noLocalFiles()
+ .add("r2", new DummyRunnable()).noLocalFiles()
+ .add("r3", new DummyRunnable()).noLocalFiles()
+ .anyOrder()
+ .build();
+
+ Assert.assertEquals(3, spec.getRunnables().size());
+ List<TwillSpecification.Order> orders = spec.getOrders();
+ Assert.assertEquals(1, orders.size());
+ Assert.assertEquals(ImmutableSet.of("r1", "r2", "r3"), orders.get(0).getNames());
+ }
+
+ @Test
+ public void testOrder() {
+ TwillSpecification spec =
+ TwillSpecification.Builder.with()
+ .setName("Testing")
+ .withRunnable()
+ .add("r1", new DummyRunnable()).noLocalFiles()
+ .add("r2", new DummyRunnable()).noLocalFiles()
+ .add("r3", new DummyRunnable()).noLocalFiles()
+ .add("r4", new DummyRunnable()).noLocalFiles()
+ .withOrder().begin("r1", "r2").nextWhenStarted("r3")
+ .build();
+
+ Assert.assertEquals(4, spec.getRunnables().size());
+ List<TwillSpecification.Order> orders = spec.getOrders();
+ Assert.assertEquals(3, orders.size());
+ Assert.assertEquals(ImmutableSet.of("r1", "r2"), orders.get(0).getNames());
+ Assert.assertEquals(ImmutableSet.of("r3"), orders.get(1).getNames());
+ Assert.assertEquals(ImmutableSet.of("r4"), orders.get(2).getNames());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java b/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
new file mode 100644
index 0000000..b55d620
--- /dev/null
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.TwillRunner;
+import org.apache.twill.api.TwillRunnerService;
+import org.apache.twill.filesystem.LocalLocationFactory;
+import org.apache.twill.internal.zookeeper.InMemoryZKServer;
+import org.apache.twill.internal.yarn.YarnUtils;
+import com.google.common.collect.Iterables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test suite for all tests with mini yarn cluster.
+ */
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+ EchoServerTestRun.class,
+ ResourceReportTestRun.class,
+ TaskCompletedTestRun.class,
+ DistributeShellTestRun.class,
+ LocalFileTestRun.class,
+ FailureRestartTestRun.class,
+ ProvisionTimeoutTestRun.class
+ })
+public class YarnTestSuite {
+ private static final Logger LOG = LoggerFactory.getLogger(YarnTestSuite.class);
+
+ @ClassRule
+ public static TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ private static InMemoryZKServer zkServer;
+ private static MiniYARNCluster cluster;
+ private static TwillRunnerService runnerService;
+ private static YarnConfiguration config;
+
+ @BeforeClass
+ public static final void init() throws IOException {
+ // Starts Zookeeper
+ zkServer = InMemoryZKServer.builder().build();
+ zkServer.startAndWait();
+
+ // Start YARN mini cluster
+ config = new YarnConfiguration(new Configuration());
+
+ if (YarnUtils.isHadoop20()) {
+ config.set("yarn.resourcemanager.scheduler.class",
+ "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler");
+ } else {
+ config.set("yarn.resourcemanager.scheduler.class",
+ "org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler");
+ config.set("yarn.scheduler.capacity.resource-calculator",
+ "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator");
+ }
+ config.set("yarn.minicluster.fixed.ports", "true");
+ config.set("yarn.nodemanager.vmem-pmem-ratio", "20.1");
+ config.set("yarn.nodemanager.vmem-check-enabled", "false");
+ config.set("yarn.scheduler.minimum-allocation-mb", "128");
+ config.set("yarn.nodemanager.delete.debug-delay-sec", "3600");
+
+ cluster = new MiniYARNCluster("test-cluster", 1, 1, 1);
+ cluster.init(config);
+ cluster.start();
+
+ runnerService = createTwillRunnerService();
+ runnerService.startAndWait();
+ }
+
+ @AfterClass
+ public static final void finish() {
+ runnerService.stopAndWait();
+ cluster.stop();
+ zkServer.stopAndWait();
+ }
+
+ public static final TwillRunner getTwillRunner() {
+ return runnerService;
+ }
+
+ /**
+ * Creates an unstarted instance of {@link org.apache.twill.api.TwillRunnerService}.
+ */
+ public static final TwillRunnerService createTwillRunnerService() throws IOException {
+ return new YarnTwillRunnerService(config, zkServer.getConnectionStr() + "/twill",
+ new LocalLocationFactory(tmpFolder.newFolder()));
+ }
+
+ public static final <T> boolean waitForSize(Iterable<T> iterable, int count, int limit) throws InterruptedException {
+ int trial = 0;
+ int size = Iterables.size(iterable);
+ while (size != count && trial < limit) {
+ LOG.info("Waiting for {} size {} == {}", iterable, size, count);
+ TimeUnit.SECONDS.sleep(1);
+ trial++;
+ size = Iterables.size(iterable);
+ }
+ return trial < limit;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/test/resources/header.txt
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/resources/header.txt b/twill-yarn/src/test/resources/header.txt
new file mode 100644
index 0000000..b6e25e6
--- /dev/null
+++ b/twill-yarn/src/test/resources/header.txt
@@ -0,0 +1 @@
+Local file header
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/resources/logback-test.xml b/twill-yarn/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..2615cb4
--- /dev/null
+++ b/twill-yarn/src/test/resources/logback-test.xml
@@ -0,0 +1,17 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!-- Default logback configuration for twill library -->
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n</pattern>
+ </encoder>
+ </appender>
+
+ <logger name="org.apache.twill" level="DEBUG" />
+
+ <root level="WARN">
+ <appender-ref ref="STDOUT"/>
+ </root>
+
+</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-zookeeper/pom.xml
----------------------------------------------------------------------
diff --git a/twill-zookeeper/pom.xml b/twill-zookeeper/pom.xml
new file mode 100644
index 0000000..e76ee50
--- /dev/null
+++ b/twill-zookeeper/pom.xml
@@ -0,0 +1,67 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>twill-parent</artifactId>
+ <groupId>org.apache.twill</groupId>
+ <version>0.1.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>twill-zookeeper</artifactId>
+ <name>Twill ZooKeeper client library</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>twill-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/BasicNodeChildren.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/BasicNodeChildren.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/BasicNodeChildren.java
new file mode 100644
index 0000000..9e4f55f
--- /dev/null
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/BasicNodeChildren.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.zookeeper;
+
+import org.apache.twill.zookeeper.NodeChildren;
+import com.google.common.base.Objects;
+import org.apache.zookeeper.data.Stat;
+
+import java.util.List;
+
+/**
+ *
+ */
+final class BasicNodeChildren implements NodeChildren {
+
+ private final Stat stat;
+ private final List<String> children;
+
+ BasicNodeChildren(List<String> children, Stat stat) {
+ this.stat = stat;
+ this.children = children;
+ }
+
+ @Override
+ public Stat getStat() {
+ return stat;
+ }
+
+ @Override
+ public List<String> getChildren() {
+ return children;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || !(o instanceof NodeChildren)) {
+ return false;
+ }
+
+ NodeChildren that = (NodeChildren) o;
+ return stat.equals(that.getStat()) && children.equals(that.getChildren());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(children, stat);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/BasicNodeData.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/BasicNodeData.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/BasicNodeData.java
new file mode 100644
index 0000000..98a3a66
--- /dev/null
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/BasicNodeData.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.zookeeper;
+
+import org.apache.twill.zookeeper.NodeData;
+import com.google.common.base.Objects;
+import org.apache.zookeeper.data.Stat;
+
+import java.util.Arrays;
+
+/**
+ * A straightforward implementation for {@link NodeData}.
+ */
+final class BasicNodeData implements NodeData {
+
+ private final byte[] data;
+ private final Stat stat;
+
+ BasicNodeData(byte[] data, Stat stat) {
+ this.data = data;
+ this.stat = stat;
+ }
+
+ @Override
+ public Stat getStat() {
+ return stat;
+ }
+
+ @Override
+ public byte[] getData() {
+ return data;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || !(o instanceof NodeData)) {
+ return false;
+ }
+
+ BasicNodeData that = (BasicNodeData) o;
+
+ return stat.equals(that.getStat()) && Arrays.equals(data, that.getData());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(data, stat);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
new file mode 100644
index 0000000..c52fb08
--- /dev/null
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
@@ -0,0 +1,525 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.zookeeper;
+
+import org.apache.twill.common.Threads;
+import org.apache.twill.zookeeper.NodeChildren;
+import org.apache.twill.zookeeper.NodeData;
+import org.apache.twill.zookeeper.OperationFuture;
+import org.apache.twill.zookeeper.ZKClientService;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.util.concurrent.AbstractService;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Service;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * The base implementation of {@link ZKClientService}.
+ */
+public final class DefaultZKClientService implements ZKClientService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultZKClientService.class);
+
+ private final String zkStr;
+ private final int sessionTimeout;
+ private final List<Watcher> connectionWatchers;
+ private final AtomicReference<ZooKeeper> zooKeeper;
+ private final Function<String, List<ACL>> aclMapper;
+ private final Service serviceDelegate;
+ private ExecutorService eventExecutor;
+
+ public DefaultZKClientService(String zkStr, int sessionTimeout, Watcher connectionWatcher) {
+ this.zkStr = zkStr;
+ this.sessionTimeout = sessionTimeout;
+ this.connectionWatchers = new CopyOnWriteArrayList<Watcher>();
+ addConnectionWatcher(connectionWatcher);
+
+ this.zooKeeper = new AtomicReference<ZooKeeper>();
+
+ // TODO (terence): Add ACL
+ aclMapper = new Function<String, List<ACL>>() {
+ @Override
+ public List<ACL> apply(String input) {
+ return ZooDefs.Ids.OPEN_ACL_UNSAFE;
+ }
+ };
+ serviceDelegate = new ServiceDelegate();
+ }
+
+ @Override
+ public Long getSessionId() {
+ ZooKeeper zk = zooKeeper.get();
+ return zk == null ? null : zk.getSessionId();
+ }
+
+ @Override
+ public String getConnectString() {
+ return zkStr;
+ }
+
+ @Override
+ public void addConnectionWatcher(Watcher watcher) {
+ if (watcher != null) {
+ connectionWatchers.add(wrapWatcher(watcher));
+ }
+ }
+
+ @Override
+ public OperationFuture<String> create(String path, byte[] data, CreateMode createMode) {
+ return create(path, data, createMode, true);
+ }
+
+ @Override
+ public OperationFuture<String> create(String path, @Nullable byte[] data,
+ CreateMode createMode, boolean createParent) {
+ return doCreate(path, data, createMode, createParent, false);
+ }
+
+ private OperationFuture<String> doCreate(final String path,
+ @Nullable final byte[] data,
+ final CreateMode createMode,
+ final boolean createParent,
+ final boolean ignoreNodeExists) {
+ final SettableOperationFuture<String> createFuture = SettableOperationFuture.create(path, eventExecutor);
+ getZooKeeper().create(path, data, aclMapper.apply(path), createMode, Callbacks.STRING, createFuture);
+ if (!createParent) {
+ return createFuture;
+ }
+
+ // If create parent is request, return a different future
+ final SettableOperationFuture<String> result = SettableOperationFuture.create(path, eventExecutor);
+ // Watch for changes in the original future
+ Futures.addCallback(createFuture, new FutureCallback<String>() {
+ @Override
+ public void onSuccess(String path) {
+ // Propagate if creation was successful
+ result.set(path);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ // See if the failure can be handled
+ if (updateFailureResult(t, result, path, ignoreNodeExists)) {
+ return;
+ }
+ // Create the parent node
+ String parentPath = getParent(path);
+ if (parentPath.isEmpty()) {
+ result.setException(t);
+ return;
+ }
+ // Watch for parent creation complete
+ Futures.addCallback(
+ doCreate(parentPath, null, CreateMode.PERSISTENT, createParent, true), new FutureCallback<String>() {
+ @Override
+ public void onSuccess(String parentPath) {
+ // Create the requested path again
+ Futures.addCallback(
+ doCreate(path, data, createMode, false, ignoreNodeExists), new FutureCallback<String>() {
+ @Override
+ public void onSuccess(String pathResult) {
+ result.set(pathResult);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ // handle the failure
+ updateFailureResult(t, result, path, ignoreNodeExists);
+ }
+ });
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ result.setException(t);
+ }
+ });
+ }
+
+ /**
+ * Updates the result future based on the given {@link Throwable}.
+ * @param t Cause of the failure
+ * @param result Future to be updated
+ * @param path Request path for the operation
+ * @return {@code true} if it is a failure, {@code false} otherwise.
+ */
+ private boolean updateFailureResult(Throwable t, SettableOperationFuture<String> result,
+ String path, boolean ignoreNodeExists) {
+ // Propagate if there is error
+ if (!(t instanceof KeeperException)) {
+ result.setException(t);
+ return true;
+ }
+ KeeperException.Code code = ((KeeperException) t).code();
+ // Node already exists, simply return success if it allows for ignoring node exists (for parent node creation).
+ if (ignoreNodeExists && code == KeeperException.Code.NODEEXISTS) {
+ // The requested path could be used because it only applies to non-sequential node
+ result.set(path);
+ return false;
+ }
+ if (code != KeeperException.Code.NONODE) {
+ result.setException(t);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Gets the parent of the given path.
+ * @param path Path for computing its parent
+ * @return Parent of the given path, or empty string if the given path is the root path already.
+ */
+ private String getParent(String path) {
+ String parentPath = path.substring(0, path.lastIndexOf('/'));
+ return (parentPath.isEmpty() && !"/".equals(path)) ? "/" : parentPath;
+ }
+ });
+
+ return result;
+ }
+
+ @Override
+ public OperationFuture<Stat> exists(String path) {
+ return exists(path, null);
+ }
+
+ @Override
+ public OperationFuture<Stat> exists(String path, Watcher watcher) {
+ SettableOperationFuture<Stat> result = SettableOperationFuture.create(path, eventExecutor);
+ getZooKeeper().exists(path, wrapWatcher(watcher), Callbacks.STAT_NONODE, result);
+ return result;
+ }
+
+ @Override
+ public OperationFuture<NodeChildren> getChildren(String path) {
+ return getChildren(path, null);
+ }
+
+ @Override
+ public OperationFuture<NodeChildren> getChildren(String path, Watcher watcher) {
+ SettableOperationFuture<NodeChildren> result = SettableOperationFuture.create(path, eventExecutor);
+ getZooKeeper().getChildren(path, wrapWatcher(watcher), Callbacks.CHILDREN, result);
+ return result;
+ }
+
+ @Override
+ public OperationFuture<NodeData> getData(String path) {
+ return getData(path, null);
+ }
+
+ @Override
+ public OperationFuture<NodeData> getData(String path, Watcher watcher) {
+ SettableOperationFuture<NodeData> result = SettableOperationFuture.create(path, eventExecutor);
+ getZooKeeper().getData(path, wrapWatcher(watcher), Callbacks.DATA, result);
+
+ return result;
+ }
+
+ @Override
+ public OperationFuture<Stat> setData(String path, byte[] data) {
+ return setData(path, data, -1);
+ }
+
+ @Override
+ public OperationFuture<Stat> setData(String dataPath, byte[] data, int version) {
+ SettableOperationFuture<Stat> result = SettableOperationFuture.create(dataPath, eventExecutor);
+ getZooKeeper().setData(dataPath, data, version, Callbacks.STAT, result);
+ return result;
+ }
+
+ @Override
+ public OperationFuture<String> delete(String path) {
+ return delete(path, -1);
+ }
+
+ @Override
+ public OperationFuture<String> delete(String deletePath, int version) {
+ SettableOperationFuture<String> result = SettableOperationFuture.create(deletePath, eventExecutor);
+ getZooKeeper().delete(deletePath, version, Callbacks.VOID, result);
+ return result;
+ }
+
+ @Override
+ public Supplier<ZooKeeper> getZooKeeperSupplier() {
+ return new Supplier<ZooKeeper>() {
+ @Override
+ public ZooKeeper get() {
+ return getZooKeeper();
+ }
+ };
+ }
+
+ @Override
+ public ListenableFuture<State> start() {
+ return serviceDelegate.start();
+ }
+
+ @Override
+ public State startAndWait() {
+ return serviceDelegate.startAndWait();
+ }
+
+ @Override
+ public boolean isRunning() {
+ return serviceDelegate.isRunning();
+ }
+
+ @Override
+ public State state() {
+ return serviceDelegate.state();
+ }
+
+ @Override
+ public ListenableFuture<State> stop() {
+ return serviceDelegate.stop();
+ }
+
+ @Override
+ public State stopAndWait() {
+ return serviceDelegate.stopAndWait();
+ }
+
+ @Override
+ public void addListener(Listener listener, Executor executor) {
+ serviceDelegate.addListener(listener, executor);
+ }
+
+ /**
+ * @return Current {@link ZooKeeper} client.
+ */
+ private ZooKeeper getZooKeeper() {
+ ZooKeeper zk = zooKeeper.get();
+ Preconditions.checkArgument(zk != null, "Not connected to zooKeeper.");
+ return zk;
+ }
+
+ /**
+ * Wraps the given watcher to be called from the event executor.
+ * @param watcher Watcher to be wrapped
+ * @return The wrapped Watcher
+ */
+ private Watcher wrapWatcher(final Watcher watcher) {
+ if (watcher == null) {
+ return null;
+ }
+ return new Watcher() {
+ @Override
+ public void process(final WatchedEvent event) {
+ eventExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ watcher.process(event);
+ } catch (Throwable t) {
+ LOG.error("Watcher throws exception.", t);
+ }
+ }
+ });
+ }
+ };
+ }
+
+ private final class ServiceDelegate extends AbstractService implements Watcher {
+
+ @Override
+ protected void doStart() {
+ // A single thread executor
+ eventExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
+ Threads.createDaemonThreadFactory("zk-client-EventThread")) {
+ @Override
+ protected void terminated() {
+ super.terminated();
+ notifyStopped();
+ }
+ };
+
+ try {
+ zooKeeper.set(new ZooKeeper(zkStr, sessionTimeout, this));
+ } catch (IOException e) {
+ notifyFailed(e);
+ }
+ }
+
+ @Override
+ protected void doStop() {
+ ZooKeeper zk = zooKeeper.getAndSet(null);
+ if (zk != null) {
+ try {
+ zk.close();
+ } catch (InterruptedException e) {
+ notifyFailed(e);
+ } finally {
+ eventExecutor.shutdown();
+ }
+ }
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ try {
+ if (event.getState() == Event.KeeperState.SyncConnected && state() == State.STARTING) {
+ LOG.info("Connected to ZooKeeper: " + zkStr);
+ notifyStarted();
+ return;
+ }
+ if (event.getState() == Event.KeeperState.Expired) {
+ LOG.info("ZooKeeper session expired: " + zkStr);
+
+ // When connection expired, simply reconnect again
+ Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ zooKeeper.set(new ZooKeeper(zkStr, sessionTimeout, ServiceDelegate.this));
+ } catch (IOException e) {
+ zooKeeper.set(null);
+ notifyFailed(e);
+ }
+ }
+ }, "zk-reconnect");
+ t.setDaemon(true);
+ t.start();
+ }
+ } finally {
+ if (event.getType() == Event.EventType.None && !connectionWatchers.isEmpty()) {
+ for (Watcher connectionWatcher : connectionWatchers) {
+ connectionWatcher.process(event);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Collection of generic callbacks that simply reflect results into OperationFuture.
+ */
+ private static final class Callbacks {
+ static final AsyncCallback.StringCallback STRING = new AsyncCallback.StringCallback() {
+ @Override
+ @SuppressWarnings("unchecked")
+ public void processResult(int rc, String path, Object ctx, String name) {
+ SettableOperationFuture<String> result = (SettableOperationFuture<String>) ctx;
+ KeeperException.Code code = KeeperException.Code.get(rc);
+ if (code == KeeperException.Code.OK) {
+ result.set((name == null || name.isEmpty()) ? path : name);
+ return;
+ }
+ result.setException(KeeperException.create(code, result.getRequestPath()));
+ }
+ };
+
+ static final AsyncCallback.StatCallback STAT = new AsyncCallback.StatCallback() {
+ @Override
+ @SuppressWarnings("unchecked")
+ public void processResult(int rc, String path, Object ctx, Stat stat) {
+ SettableOperationFuture<Stat> result = (SettableOperationFuture<Stat>) ctx;
+ KeeperException.Code code = KeeperException.Code.get(rc);
+ if (code == KeeperException.Code.OK) {
+ result.set(stat);
+ return;
+ }
+ result.setException(KeeperException.create(code, result.getRequestPath()));
+ }
+ };
+
+ /**
+ * A stat callback that treats NONODE as success.
+ */
+ static final AsyncCallback.StatCallback STAT_NONODE = new AsyncCallback.StatCallback() {
+ @Override
+ @SuppressWarnings("unchecked")
+ public void processResult(int rc, String path, Object ctx, Stat stat) {
+ SettableOperationFuture<Stat> result = (SettableOperationFuture<Stat>) ctx;
+ KeeperException.Code code = KeeperException.Code.get(rc);
+ if (code == KeeperException.Code.OK || code == KeeperException.Code.NONODE) {
+ result.set(stat);
+ return;
+ }
+ result.setException(KeeperException.create(code, result.getRequestPath()));
+ }
+ };
+
+ static final AsyncCallback.Children2Callback CHILDREN = new AsyncCallback.Children2Callback() {
+ @Override
+ @SuppressWarnings("unchecked")
+ public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
+ SettableOperationFuture<NodeChildren> result = (SettableOperationFuture<NodeChildren>) ctx;
+ KeeperException.Code code = KeeperException.Code.get(rc);
+ if (code == KeeperException.Code.OK) {
+ result.set(new BasicNodeChildren(children, stat));
+ return;
+ }
+ result.setException(KeeperException.create(code, result.getRequestPath()));
+ }
+ };
+
+ static final AsyncCallback.DataCallback DATA = new AsyncCallback.DataCallback() {
+ @Override
+ @SuppressWarnings("unchecked")
+ public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
+ SettableOperationFuture<NodeData> result = (SettableOperationFuture<NodeData>) ctx;
+ KeeperException.Code code = KeeperException.Code.get(rc);
+ if (code == KeeperException.Code.OK) {
+ result.set(new BasicNodeData(data, stat));
+ return;
+ }
+ result.setException(KeeperException.create(code, result.getRequestPath()));
+ }
+ };
+
+ static final AsyncCallback.VoidCallback VOID = new AsyncCallback.VoidCallback() {
+ @Override
+ @SuppressWarnings("unchecked")
+ public void processResult(int rc, String path, Object ctx) {
+ SettableOperationFuture<String> result = (SettableOperationFuture<String>) ctx;
+ KeeperException.Code code = KeeperException.Code.get(rc);
+ if (code == KeeperException.Code.OK) {
+ result.set(result.getRequestPath());
+ return;
+ }
+ // Otherwise, it is an error
+ result.setException(KeeperException.create(code, result.getRequestPath()));
+ }
+ };
+ }
+}