You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/12/13 23:27:26 UTC

[10/28] [TWILL-14] Bootstrapping for the site generation. Reorganization of the source tree happens:

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServer.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServer.java b/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServer.java
new file mode 100644
index 0000000..6b77e66
--- /dev/null
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServer.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.Command;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.PrintWriter;
+
+/**
+ * Test server that echoes back what it receives.
+ */
+public final class EchoServer extends SocketServer {
+
+  private static final Logger LOG = LoggerFactory.getLogger(EchoServer.class);
+
+  @Override
+  public void handleRequest(BufferedReader reader, PrintWriter writer) throws IOException {
+    String line = reader.readLine();
+    LOG.info("Received: " + line);
+    if (line != null) {
+      writer.println(line);
+    }
+  }
+
+  @Override
+  public void handleCommand(Command command) throws Exception {
+    LOG.info("Command received: " + command + " " + getContext().getInstanceCount());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
new file mode 100644
index 0000000..d868eef
--- /dev/null
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.ResourceSpecification;
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillRunner;
+import org.apache.twill.api.TwillRunnerService;
+import org.apache.twill.api.logging.PrinterLogHandler;
+import org.apache.twill.common.ServiceListenerAdapter;
+import org.apache.twill.common.Threads;
+import org.apache.twill.discovery.Discoverable;
+import com.google.common.base.Charsets;
+import com.google.common.io.LineReader;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.net.Socket;
+import java.net.URISyntaxException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Using echo server to test various behavior of YarnTwillService.
+ * This test is executed by {@link YarnTestSuite}.
+ */
+public class EchoServerTestRun {
+
+  private static final Logger LOG = LoggerFactory.getLogger(EchoServerTestRun.class);
+
+  @Test
+  public void testEchoServer() throws InterruptedException, ExecutionException, IOException,
+    URISyntaxException, TimeoutException {
+    TwillRunner runner = YarnTestSuite.getTwillRunner();
+
+    TwillController controller = runner.prepare(new EchoServer(),
+                                                ResourceSpecification.Builder.with()
+                                                         .setVirtualCores(1)
+                                                         .setMemory(1, ResourceSpecification.SizeUnit.GIGA)
+                                                         .setInstances(2)
+                                                         .build())
+                                        .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+                                        .withApplicationArguments("echo")
+                                        .withArguments("EchoServer", "echo2")
+                                        .start();
+
+    final CountDownLatch running = new CountDownLatch(1);
+    controller.addListener(new ServiceListenerAdapter() {
+      @Override
+      public void running() {
+        running.countDown();
+      }
+    }, Threads.SAME_THREAD_EXECUTOR);
+
+    Assert.assertTrue(running.await(30, TimeUnit.SECONDS));
+
+    Iterable<Discoverable> echoServices = controller.discoverService("echo");
+    Assert.assertTrue(YarnTestSuite.waitForSize(echoServices, 2, 60));
+
+    for (Discoverable discoverable : echoServices) {
+      String msg = "Hello: " + discoverable.getSocketAddress();
+
+      Socket socket = new Socket(discoverable.getSocketAddress().getAddress(),
+                                 discoverable.getSocketAddress().getPort());
+      try {
+        PrintWriter writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), Charsets.UTF_8), true);
+        LineReader reader = new LineReader(new InputStreamReader(socket.getInputStream(), Charsets.UTF_8));
+
+        writer.println(msg);
+        Assert.assertEquals(msg, reader.readLine());
+      } finally {
+        socket.close();
+      }
+    }
+
+    // Increase number of instances
+    controller.changeInstances("EchoServer", 3);
+    Assert.assertTrue(YarnTestSuite.waitForSize(echoServices, 3, 60));
+
+    echoServices = controller.discoverService("echo2");
+
+    // Decrease number of instances
+    controller.changeInstances("EchoServer", 1);
+    Assert.assertTrue(YarnTestSuite.waitForSize(echoServices, 1, 60));
+
+    // Increase number of instances again
+    controller.changeInstances("EchoServer", 2);
+    Assert.assertTrue(YarnTestSuite.waitForSize(echoServices, 2, 60));
+
+    // Make sure still only one app is running
+    Iterable<TwillRunner.LiveInfo> apps = runner.lookupLive();
+    Assert.assertTrue(YarnTestSuite.waitForSize(apps, 1, 60));
+
+    // Creates a new runner service to check it can regain control over running app.
+    TwillRunnerService runnerService = YarnTestSuite.createTwillRunnerService();
+    runnerService.startAndWait();
+
+    try {
+      Iterable <TwillController> controllers = runnerService.lookup("EchoServer");
+      Assert.assertTrue(YarnTestSuite.waitForSize(controllers, 1, 60));
+
+      for (TwillController c : controllers) {
+        LOG.info("Stopping application: " + c.getRunId());
+        c.stop().get(30, TimeUnit.SECONDS);
+      }
+
+      Assert.assertTrue(YarnTestSuite.waitForSize(apps, 0, 60));
+    } finally {
+      runnerService.stopAndWait();
+    }
+
+    // Sleep a bit before exiting.
+    TimeUnit.SECONDS.sleep(2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/test/java/org/apache/twill/yarn/EnvironmentEchoServer.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/EnvironmentEchoServer.java b/twill-yarn/src/test/java/org/apache/twill/yarn/EnvironmentEchoServer.java
new file mode 100644
index 0000000..4be2472
--- /dev/null
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/EnvironmentEchoServer.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.PrintWriter;
+
+/**
+ * Test server that returns back the value of the env key sent in.  Used to check env for
+ * runnables is correctly set.
+ */
+public class EnvironmentEchoServer extends SocketServer {
+
+  @Override
+  public void handleRequest(BufferedReader reader, PrintWriter writer) throws IOException {
+    String envKey = reader.readLine();
+    writer.println(System.getenv(envKey));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/test/java/org/apache/twill/yarn/FailureRestartTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/FailureRestartTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/FailureRestartTestRun.java
new file mode 100644
index 0000000..b3d3933
--- /dev/null
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/FailureRestartTestRun.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.Command;
+import org.apache.twill.api.ResourceSpecification;
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillRunner;
+import org.apache.twill.api.logging.PrinterLogHandler;
+import org.apache.twill.discovery.Discoverable;
+import com.google.common.base.Charsets;
+import com.google.common.collect.Sets;
+import com.google.common.io.LineReader;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public class FailureRestartTestRun {
+
+  @Test
+  public void testFailureRestart() throws Exception {
+    TwillRunner runner = YarnTestSuite.getTwillRunner();
+
+    ResourceSpecification resource = ResourceSpecification.Builder.with()
+      .setVirtualCores(1)
+      .setMemory(512, ResourceSpecification.SizeUnit.MEGA)
+      .setInstances(2)
+      .build();
+    TwillController controller = runner.prepare(new FailureRunnable(), resource)
+      .withApplicationArguments("failure")
+      .withArguments(FailureRunnable.class.getSimpleName(), "failure2")
+      .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+      .start();
+
+    Iterable<Discoverable> discoverables = controller.discoverService("failure");
+    Assert.assertTrue(YarnTestSuite.waitForSize(discoverables, 2, 60));
+
+    // Make sure we see the right instance IDs
+    Assert.assertEquals(Sets.newHashSet(0, 1), getInstances(discoverables));
+
+    // Kill server with instanceId = 0
+    controller.sendCommand(FailureRunnable.class.getSimpleName(), Command.Builder.of("kill0").build());
+
+    // Do a shot sleep, make sure the runnable is killed.
+    TimeUnit.SECONDS.sleep(5);
+
+    Assert.assertTrue(YarnTestSuite.waitForSize(discoverables, 2, 60));
+    // Make sure we see the right instance IDs
+    Assert.assertEquals(Sets.newHashSet(0, 1), getInstances(discoverables));
+
+    controller.stopAndWait();
+  }
+
+  private Set<Integer> getInstances(Iterable<Discoverable> discoverables) throws IOException {
+    Set<Integer> instances = Sets.newHashSet();
+    for (Discoverable discoverable : discoverables) {
+      InetSocketAddress socketAddress = discoverable.getSocketAddress();
+      Socket socket = new Socket(socketAddress.getAddress(), socketAddress.getPort());
+      try {
+        PrintWriter writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), Charsets.UTF_8), true);
+        LineReader reader = new LineReader(new InputStreamReader(socket.getInputStream(), Charsets.UTF_8));
+
+        String msg = "Failure";
+        writer.println(msg);
+
+        String line = reader.readLine();
+        Assert.assertTrue(line.endsWith(msg));
+        instances.add(Integer.parseInt(line.substring(0, line.length() - msg.length())));
+      } finally {
+        socket.close();
+      }
+    }
+    return instances;
+  }
+
+
+  public static final class FailureRunnable extends SocketServer {
+
+    private volatile boolean killed;
+
+    @Override
+    public void run() {
+      killed = false;
+      super.run();
+      if (killed) {
+        throw new RuntimeException("Exception");
+      }
+    }
+
+    @Override
+    public void handleCommand(Command command) throws Exception {
+      if (command.getCommand().equals("kill" + getContext().getInstanceId())) {
+        killed = true;
+        running = false;
+        serverSocket.close();
+      }
+    }
+
+    @Override
+    public void handleRequest(BufferedReader reader, PrintWriter writer) throws IOException {
+      String line = reader.readLine();
+      writer.println(getContext().getInstanceId() + line);
+      writer.flush();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/test/java/org/apache/twill/yarn/LocalFileTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/LocalFileTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/LocalFileTestRun.java
new file mode 100644
index 0000000..de2c74c
--- /dev/null
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/LocalFileTestRun.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.TwillApplication;
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillRunner;
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.api.logging.PrinterLogHandler;
+import org.apache.twill.discovery.Discoverable;
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Files;
+import com.google.common.io.LineReader;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.concurrent.TimeUnit;
+import java.util.jar.JarEntry;
+import java.util.jar.JarOutputStream;
+
+/**
+ * Test for local file transfer.
+ */
+public class LocalFileTestRun {
+
+  @ClassRule
+  public static TemporaryFolder tmpFolder = new TemporaryFolder();
+
+  @Test
+  public void testLocalFile() throws Exception {
+    String header = Files.readFirstLine(new File(getClass().getClassLoader().getResource("header.txt").toURI()),
+                                        Charsets.UTF_8);
+
+    TwillRunner runner = YarnTestSuite.getTwillRunner();
+    if (runner instanceof YarnTwillRunnerService) {
+      ((YarnTwillRunnerService) runner).setJVMOptions("-verbose:gc -Xloggc:gc.log -XX:+PrintGCDetails");
+    }
+
+    TwillController controller = runner.prepare(new LocalFileApplication())
+      .withApplicationArguments("local")
+      .withArguments("LocalFileSocketServer", "local2")
+      .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+      .start();
+
+    if (runner instanceof YarnTwillRunnerService) {
+      ((YarnTwillRunnerService) runner).setJVMOptions("");
+    }
+
+    Iterable<Discoverable> discoverables = controller.discoverService("local");
+    Assert.assertTrue(YarnTestSuite.waitForSize(discoverables, 1, 60));
+
+    InetSocketAddress socketAddress = discoverables.iterator().next().getSocketAddress();
+    Socket socket = new Socket(socketAddress.getAddress(), socketAddress.getPort());
+    try {
+      PrintWriter writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), Charsets.UTF_8), true);
+      LineReader reader = new LineReader(new InputStreamReader(socket.getInputStream(), Charsets.UTF_8));
+
+      String msg = "Local file test";
+      writer.println(msg);
+      Assert.assertEquals(header, reader.readLine());
+      Assert.assertEquals(msg, reader.readLine());
+    } finally {
+      socket.close();
+    }
+
+    controller.stopAndWait();
+
+    Assert.assertTrue(YarnTestSuite.waitForSize(discoverables, 0, 60));
+
+    TimeUnit.SECONDS.sleep(2);
+  }
+
+  public static final class LocalFileApplication implements TwillApplication {
+
+    private final File headerFile;
+
+    public LocalFileApplication() throws Exception {
+      // Create a jar file that contains the header.txt file inside.
+      headerFile = tmpFolder.newFile("header.jar");
+      JarOutputStream os = new JarOutputStream(new FileOutputStream(headerFile));
+      try {
+        os.putNextEntry(new JarEntry("header.txt"));
+        ByteStreams.copy(getClass().getClassLoader().getResourceAsStream("header.txt"), os);
+      } finally {
+        os.close();
+      }
+    }
+
+    @Override
+    public TwillSpecification configure() {
+      return TwillSpecification.Builder.with()
+        .setName("LocalFileApp")
+        .withRunnable()
+          .add(new LocalFileSocketServer())
+            .withLocalFiles()
+              .add("header", headerFile, true).apply()
+        .anyOrder()
+        .build();
+    }
+  }
+
+  public static final class LocalFileSocketServer extends SocketServer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(LocalFileSocketServer.class);
+
+    @Override
+    public void handleRequest(BufferedReader reader, PrintWriter writer) throws IOException {
+      // Verify there is a gc.log file locally
+      Preconditions.checkState(new File("gc.log").exists());
+
+      LOG.info("handleRequest");
+      String header = Files.toString(new File("header/header.txt"), Charsets.UTF_8);
+      writer.write(header);
+      writer.println(reader.readLine());
+      LOG.info("Flushed response");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/test/java/org/apache/twill/yarn/ProvisionTimeoutTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/ProvisionTimeoutTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/ProvisionTimeoutTestRun.java
new file mode 100644
index 0000000..0598ef1
--- /dev/null
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/ProvisionTimeoutTestRun.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.AbstractTwillRunnable;
+import org.apache.twill.api.EventHandler;
+import org.apache.twill.api.EventHandlerContext;
+import org.apache.twill.api.ResourceSpecification;
+import org.apache.twill.api.TwillApplication;
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillRunner;
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.api.logging.PrinterLogHandler;
+import org.apache.twill.common.Services;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+
+import java.io.PrintWriter;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ *
+ */
+public class ProvisionTimeoutTestRun {
+
+  @Test
+  public void testProvisionTimeout() throws InterruptedException, ExecutionException, TimeoutException {
+    TwillRunner runner = YarnTestSuite.getTwillRunner();
+
+    TwillController controller = runner.prepare(new TimeoutApplication())
+                                       .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+                                       .start();
+
+    // The provision should failed in 30 seconds after AM started, which AM could took a while to start.
+    // Hence we give 90 seconds max time here.
+    try {
+      Services.getCompletionFuture(controller).get(90, TimeUnit.SECONDS);
+    } finally {
+      // If it timeout, kill the app as cleanup.
+      controller.kill();
+    }
+  }
+
+  public static final class Handler extends EventHandler {
+
+    private boolean abort;
+
+    @Override
+    protected Map<String, String> getConfigs() {
+      return ImmutableMap.of("abort", "true");
+    }
+
+    @Override
+    public void initialize(EventHandlerContext context) {
+      this.abort = Boolean.parseBoolean(context.getSpecification().getConfigs().get("abort"));
+    }
+
+    @Override
+    public TimeoutAction launchTimeout(Iterable<TimeoutEvent> timeoutEvents) {
+      if (abort) {
+        return TimeoutAction.abort();
+      } else {
+        return TimeoutAction.recheck(10, TimeUnit.SECONDS);
+      }
+    }
+  }
+
+  public static final class TimeoutApplication implements TwillApplication {
+
+    @Override
+    public TwillSpecification configure() {
+      return TwillSpecification.Builder.with()
+        .setName("TimeoutApplication")
+        .withRunnable()
+        .add(new TimeoutRunnable(),
+             ResourceSpecification.Builder.with()
+               .setVirtualCores(1)
+               .setMemory(8, ResourceSpecification.SizeUnit.GIGA).build())
+        .noLocalFiles()
+        .anyOrder()
+        .withEventHandler(new Handler())
+        .build();
+    }
+  }
+
+  /**
+   * A runnable that do nothing, as it's not expected to get provisioned.
+   */
+  public static final class TimeoutRunnable extends AbstractTwillRunnable {
+
+    private final CountDownLatch latch = new CountDownLatch(1);
+
+    @Override
+    public void stop() {
+      latch.countDown();
+    }
+
+    @Override
+    public void run() {
+      // Simply block here
+      try {
+        latch.await();
+      } catch (InterruptedException e) {
+        throw Throwables.propagate(e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java
new file mode 100644
index 0000000..131f90a
--- /dev/null
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.ResourceReport;
+import org.apache.twill.api.ResourceSpecification;
+import org.apache.twill.api.TwillApplication;
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillRunResources;
+import org.apache.twill.api.TwillRunner;
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.api.logging.PrinterLogHandler;
+import org.apache.twill.common.ServiceListenerAdapter;
+import org.apache.twill.common.Threads;
+import org.apache.twill.discovery.Discoverable;
+import org.apache.twill.internal.EnvKeys;
+import com.google.common.base.Charsets;
+import com.google.common.collect.Maps;
+import com.google.common.io.LineReader;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.net.Socket;
+import java.net.URISyntaxException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Using echo server to test resource reports.
+ * This test is executed by {@link org.apache.twill.yarn.YarnTestSuite}.
+ */
+public class ResourceReportTestRun {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ResourceReportTestRun.class);
+
+  private class ResourceApplication implements TwillApplication {
+    @Override
+    public TwillSpecification configure() {
+      return TwillSpecification.Builder.with()
+        .setName("ResourceApplication")
+        .withRunnable()
+          .add("echo1", new EchoServer(), ResourceSpecification.Builder.with()
+            .setVirtualCores(1)
+            .setMemory(128, ResourceSpecification.SizeUnit.MEGA)
+            .setInstances(2).build()).noLocalFiles()
+          .add("echo2", new EchoServer(), ResourceSpecification.Builder.with()
+            .setVirtualCores(2)
+            .setMemory(256, ResourceSpecification.SizeUnit.MEGA)
+            .setInstances(1).build()).noLocalFiles()
+        .anyOrder()
+        .build();
+    }
+  }
+
+  @Test
+  public void testRunnablesGetAllowedResourcesInEnv() throws InterruptedException, IOException,
+    TimeoutException, ExecutionException {
+    TwillRunner runner = YarnTestSuite.getTwillRunner();
+
+    ResourceSpecification resourceSpec = ResourceSpecification.Builder.with()
+      .setVirtualCores(1)
+      .setMemory(2048, ResourceSpecification.SizeUnit.MEGA)
+      .setInstances(1)
+      .build();
+    TwillController controller = runner.prepare(new EnvironmentEchoServer(), resourceSpec)
+      .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+      .withApplicationArguments("envecho")
+      .withArguments("EnvironmentEchoServer", "echo2")
+      .start();
+
+    final CountDownLatch running = new CountDownLatch(1);
+    controller.addListener(new ServiceListenerAdapter() {
+      @Override
+      public void running() {
+        running.countDown();
+      }
+    }, Threads.SAME_THREAD_EXECUTOR);
+
+    Assert.assertTrue(running.await(30, TimeUnit.SECONDS));
+
+    Iterable<Discoverable> envEchoServices = controller.discoverService("envecho");
+    Assert.assertTrue(YarnTestSuite.waitForSize(envEchoServices, 1, 30));
+
+    // TODO: check virtual cores once yarn adds the ability
+    Map<String, String> expectedValues = Maps.newHashMap();
+    expectedValues.put(EnvKeys.YARN_CONTAINER_MEMORY_MB, "2048");
+    expectedValues.put(EnvKeys.TWILL_INSTANCE_COUNT, "1");
+
+    // check environment of the runnable.
+    Discoverable discoverable = envEchoServices.iterator().next();
+    for (Map.Entry<String, String> expected : expectedValues.entrySet()) {
+      Socket socket = new Socket(discoverable.getSocketAddress().getHostName(),
+                                 discoverable.getSocketAddress().getPort());
+      try {
+        PrintWriter writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), Charsets.UTF_8), true);
+        LineReader reader = new LineReader(new InputStreamReader(socket.getInputStream(), Charsets.UTF_8));
+        writer.println(expected.getKey());
+        Assert.assertEquals(expected.getValue(), reader.readLine());
+      } finally {
+        socket.close();
+      }
+    }
+
+    controller.stop().get(30, TimeUnit.SECONDS);
+    // Sleep a bit before exiting.
+    TimeUnit.SECONDS.sleep(2);
+  }
+
+  @Test
+  public void testResourceReportWithFailingContainers() throws InterruptedException, IOException,
+    TimeoutException, ExecutionException {
+    TwillRunner runner = YarnTestSuite.getTwillRunner();
+
+    ResourceSpecification resourceSpec = ResourceSpecification.Builder.with()
+      .setVirtualCores(1)
+      .setMemory(128, ResourceSpecification.SizeUnit.MEGA)
+      .setInstances(2)
+      .build();
+    TwillController controller = runner.prepare(new BuggyServer(), resourceSpec)
+      .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+      .withApplicationArguments("echo")
+      .withArguments("BuggyServer", "echo2")
+      .start();
+
+    final CountDownLatch running = new CountDownLatch(1);
+    controller.addListener(new ServiceListenerAdapter() {
+      @Override
+      public void running() {
+        running.countDown();
+      }
+    }, Threads.SAME_THREAD_EXECUTOR);
+
+    Assert.assertTrue(running.await(30, TimeUnit.SECONDS));
+
+    Iterable<Discoverable> echoServices = controller.discoverService("echo");
+    Assert.assertTrue(YarnTestSuite.waitForSize(echoServices, 2, 60));
+    // check that we have 2 runnables.
+    ResourceReport report = controller.getResourceReport();
+    Assert.assertEquals(2, report.getRunnableResources("BuggyServer").size());
+
+    // cause a divide by 0 in one server
+    Discoverable discoverable = echoServices.iterator().next();
+    Socket socket = new Socket(discoverable.getSocketAddress().getAddress(),
+                               discoverable.getSocketAddress().getPort());
+    try {
+      PrintWriter writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), Charsets.UTF_8), true);
+      writer.println("0");
+    } finally {
+      socket.close();
+    }
+
+    // takes some time for app master to find out the container completed...
+    TimeUnit.SECONDS.sleep(5);
+    // check that we have 1 runnable, not 2.
+    report = controller.getResourceReport();
+    Assert.assertEquals(1, report.getRunnableResources("BuggyServer").size());
+
+    controller.stop().get(30, TimeUnit.SECONDS);
+    // Sleep a bit before exiting.
+    TimeUnit.SECONDS.sleep(2);
+  }
+
+  @Test
+  public void testResourceReport() throws InterruptedException, ExecutionException, IOException,
+    URISyntaxException, TimeoutException {
+    TwillRunner runner = YarnTestSuite.getTwillRunner();
+
+    TwillController controller = runner.prepare(new ResourceApplication())
+                                        .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+                                        .withApplicationArguments("echo")
+                                        .withArguments("echo1", "echo1")
+                                        .withArguments("echo2", "echo2")
+                                        .start();
+
+    final CountDownLatch running = new CountDownLatch(1);
+    controller.addListener(new ServiceListenerAdapter() {
+      @Override
+      public void running() {
+        running.countDown();
+      }
+    }, Threads.SAME_THREAD_EXECUTOR);
+
+    Assert.assertTrue(running.await(30, TimeUnit.SECONDS));
+
+    // wait for 3 echo servers to come up
+    Iterable<Discoverable> echoServices = controller.discoverService("echo");
+    Assert.assertTrue(YarnTestSuite.waitForSize(echoServices, 3, 60));
+    ResourceReport report = controller.getResourceReport();
+    // make sure resources for echo1 and echo2 are there
+    Map<String, Collection<TwillRunResources>> usedResources = report.getResources();
+    Assert.assertEquals(2, usedResources.keySet().size());
+    Assert.assertTrue(usedResources.containsKey("echo1"));
+    Assert.assertTrue(usedResources.containsKey("echo2"));
+
+    Collection<TwillRunResources> echo1Resources = usedResources.get("echo1");
+    // 2 instances of echo1
+    Assert.assertEquals(2, echo1Resources.size());
+    // TODO: check cores after hadoop-2.1.0
+    for (TwillRunResources resources : echo1Resources) {
+      Assert.assertEquals(128, resources.getMemoryMB());
+    }
+
+    Collection<TwillRunResources> echo2Resources = usedResources.get("echo2");
+    // 2 instances of echo1
+    Assert.assertEquals(1, echo2Resources.size());
+    // TODO: check cores after hadoop-2.1.0
+    for (TwillRunResources resources : echo2Resources) {
+      Assert.assertEquals(256, resources.getMemoryMB());
+    }
+
+    // Decrease number of instances of echo1 from 2 to 1
+    controller.changeInstances("echo1", 1);
+    echoServices = controller.discoverService("echo1");
+    Assert.assertTrue(YarnTestSuite.waitForSize(echoServices, 1, 60));
+    report = controller.getResourceReport();
+
+    // make sure resources for echo1 and echo2 are there
+    usedResources = report.getResources();
+    Assert.assertEquals(2, usedResources.keySet().size());
+    Assert.assertTrue(usedResources.containsKey("echo1"));
+    Assert.assertTrue(usedResources.containsKey("echo2"));
+
+    echo1Resources = usedResources.get("echo1");
+    // 1 instance of echo1 now
+    Assert.assertEquals(1, echo1Resources.size());
+    // TODO: check cores after hadoop-2.1.0
+    for (TwillRunResources resources : echo1Resources) {
+      Assert.assertEquals(128, resources.getMemoryMB());
+    }
+
+    echo2Resources = usedResources.get("echo2");
+    // 2 instances of echo1
+    Assert.assertEquals(1, echo2Resources.size());
+    // TODO: check cores after hadoop-2.1.0
+    for (TwillRunResources resources : echo2Resources) {
+      Assert.assertEquals(256, resources.getMemoryMB());
+    }
+
+    controller.stop().get(30, TimeUnit.SECONDS);
+    // Sleep a bit before exiting.
+    TimeUnit.SECONDS.sleep(2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/test/java/org/apache/twill/yarn/SocketServer.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/SocketServer.java b/twill-yarn/src/test/java/org/apache/twill/yarn/SocketServer.java
new file mode 100644
index 0000000..5148ed2
--- /dev/null
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/SocketServer.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.AbstractTwillRunnable;
+import org.apache.twill.api.TwillContext;
+import org.apache.twill.api.TwillContext;
+import org.apache.twill.common.Cancellable;
+import com.google.common.base.Charsets;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+import java.util.List;
+
+/**
+ * Boilerplate for a server that announces itself and talks to clients through a socket.
+ */
+public abstract class SocketServer extends AbstractTwillRunnable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SocketServer.class);
+
+  protected volatile boolean running;
+  protected volatile Thread runThread;
+  protected ServerSocket serverSocket;
+  protected Cancellable canceller;
+
+  @Override
+  public void initialize(TwillContext context) {
+    super.initialize(context);
+    running = true;
+    try {
+      serverSocket = new ServerSocket(0);
+      LOG.info("Server started: " + serverSocket.getLocalSocketAddress() +
+               ", id: " + context.getInstanceId() +
+               ", count: " + context.getInstanceCount());
+
+      final List<Cancellable> cancellables = ImmutableList.of(
+        context.announce(context.getApplicationArguments()[0], serverSocket.getLocalPort()),
+        context.announce(context.getArguments()[0], serverSocket.getLocalPort())
+      );
+      canceller = new Cancellable() {
+        @Override
+        public void cancel() {
+          for (Cancellable c : cancellables) {
+            c.cancel();
+          }
+        }
+      };
+    } catch (IOException e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public void run() {
+    try {
+      runThread = Thread.currentThread();
+      while (running) {
+        try {
+          Socket socket = serverSocket.accept();
+          try {
+            BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), Charsets.UTF_8));
+            PrintWriter writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream()), true);
+            handleRequest(reader, writer);
+          } finally {
+            socket.close();
+          }
+        } catch (SocketException e) {
+          LOG.info("Socket exception: " + e);
+        }
+      }
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public void stop() {
+    LOG.info("Stopping server");
+    canceller.cancel();
+    running = false;
+    Thread t = runThread;
+    if (t != null) {
+      t.interrupt();
+    }
+    try {
+      serverSocket.close();
+    } catch (IOException e) {
+      LOG.error("Exception while closing socket.", e);
+      throw Throwables.propagate(e);
+    }
+    serverSocket = null;
+  }
+
+  @Override
+  public void destroy() {
+    try {
+      if (serverSocket != null) {
+        serverSocket.close();
+      }
+    } catch (IOException e) {
+      LOG.error("Exception while closing socket.", e);
+      throw Throwables.propagate(e);
+    }
+  }
+
+  abstract public void handleRequest(BufferedReader reader, PrintWriter writer) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/test/java/org/apache/twill/yarn/TaskCompletedTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/TaskCompletedTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/TaskCompletedTestRun.java
new file mode 100644
index 0000000..5a93271
--- /dev/null
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/TaskCompletedTestRun.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.AbstractTwillRunnable;
+import org.apache.twill.api.ResourceSpecification;
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillRunner;
+import org.apache.twill.api.logging.PrinterLogHandler;
+import org.apache.twill.common.ServiceListenerAdapter;
+import org.apache.twill.common.Threads;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.Service;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.PrintWriter;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Testing application master will shutdown itself when all tasks are completed.
+ * This test is executed by {@link YarnTestSuite}.
+ */
+public class TaskCompletedTestRun {
+
+  public static final class SleepTask extends AbstractTwillRunnable {
+
+    @Override
+    public void run() {
+      // Randomly sleep for 3-5 seconds.
+      try {
+        TimeUnit.SECONDS.sleep(new Random().nextInt(3) + 3);
+      } catch (InterruptedException e) {
+        throw Throwables.propagate(e);
+      }
+    }
+
+    @Override
+    public void stop() {
+      // No-op
+    }
+  }
+
+  @Test
+  public void testTaskCompleted() throws InterruptedException {
+    TwillRunner twillRunner = YarnTestSuite.getTwillRunner();
+    TwillController controller = twillRunner.prepare(new SleepTask(),
+                                                ResourceSpecification.Builder.with()
+                                                  .setVirtualCores(1)
+                                                  .setMemory(512, ResourceSpecification.SizeUnit.MEGA)
+                                                  .setInstances(3).build())
+                                            .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+                                            .start();
+
+    final CountDownLatch runLatch = new CountDownLatch(1);
+    final CountDownLatch stopLatch = new CountDownLatch(1);
+    controller.addListener(new ServiceListenerAdapter() {
+
+      @Override
+      public void running() {
+        runLatch.countDown();
+      }
+
+      @Override
+      public void terminated(Service.State from) {
+        stopLatch.countDown();
+      }
+    }, Threads.SAME_THREAD_EXECUTOR);
+
+    Assert.assertTrue(runLatch.await(1, TimeUnit.MINUTES));
+
+    Assert.assertTrue(stopLatch.await(1, TimeUnit.MINUTES));
+
+    TimeUnit.SECONDS.sleep(2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/test/java/org/apache/twill/yarn/TwillSpecificationTest.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/TwillSpecificationTest.java b/twill-yarn/src/test/java/org/apache/twill/yarn/TwillSpecificationTest.java
new file mode 100644
index 0000000..8be907b
--- /dev/null
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/TwillSpecificationTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.AbstractTwillRunnable;
+import org.apache.twill.api.TwillSpecification;
+import com.google.common.collect.ImmutableSet;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+/**
+ *
+ */
+public class TwillSpecificationTest {
+
+  /**
+   * Dummy for test.
+   */
+  public static final class DummyRunnable extends AbstractTwillRunnable {
+
+    @Override
+    public void stop() {
+      // no-op
+    }
+
+    @Override
+    public void run() {
+      // no-op
+    }
+  }
+
+  @Test
+  public void testAnyOrder() {
+    TwillSpecification spec =
+      TwillSpecification.Builder.with()
+        .setName("Testing")
+        .withRunnable()
+        .add("r1", new DummyRunnable()).noLocalFiles()
+        .add("r2", new DummyRunnable()).noLocalFiles()
+        .add("r3", new DummyRunnable()).noLocalFiles()
+        .anyOrder()
+        .build();
+
+    Assert.assertEquals(3, spec.getRunnables().size());
+    List<TwillSpecification.Order> orders = spec.getOrders();
+    Assert.assertEquals(1, orders.size());
+    Assert.assertEquals(ImmutableSet.of("r1", "r2", "r3"), orders.get(0).getNames());
+  }
+
+  @Test
+  public void testOrder() {
+    TwillSpecification spec =
+      TwillSpecification.Builder.with()
+        .setName("Testing")
+        .withRunnable()
+        .add("r1", new DummyRunnable()).noLocalFiles()
+        .add("r2", new DummyRunnable()).noLocalFiles()
+        .add("r3", new DummyRunnable()).noLocalFiles()
+        .add("r4", new DummyRunnable()).noLocalFiles()
+        .withOrder().begin("r1", "r2").nextWhenStarted("r3")
+        .build();
+
+    Assert.assertEquals(4, spec.getRunnables().size());
+    List<TwillSpecification.Order> orders = spec.getOrders();
+    Assert.assertEquals(3, orders.size());
+    Assert.assertEquals(ImmutableSet.of("r1", "r2"), orders.get(0).getNames());
+    Assert.assertEquals(ImmutableSet.of("r3"), orders.get(1).getNames());
+    Assert.assertEquals(ImmutableSet.of("r4"), orders.get(2).getNames());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java b/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
new file mode 100644
index 0000000..b55d620
--- /dev/null
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import org.apache.twill.api.TwillRunner;
+import org.apache.twill.api.TwillRunnerService;
+import org.apache.twill.filesystem.LocalLocationFactory;
+import org.apache.twill.internal.zookeeper.InMemoryZKServer;
+import org.apache.twill.internal.yarn.YarnUtils;
+import com.google.common.collect.Iterables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test suite for all tests with mini yarn cluster.
+ */
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+                      EchoServerTestRun.class,
+                      ResourceReportTestRun.class,
+                      TaskCompletedTestRun.class,
+                      DistributeShellTestRun.class,
+                      LocalFileTestRun.class,
+                      FailureRestartTestRun.class,
+                      ProvisionTimeoutTestRun.class
+                    })
+public class YarnTestSuite {
+  private static final Logger LOG = LoggerFactory.getLogger(YarnTestSuite.class);
+
+  @ClassRule
+  public static TemporaryFolder tmpFolder = new TemporaryFolder();
+
+  private static InMemoryZKServer zkServer;
+  private static MiniYARNCluster cluster;
+  private static TwillRunnerService runnerService;
+  private static YarnConfiguration config;
+
+  @BeforeClass
+  public static final void init() throws IOException {
+    // Starts Zookeeper
+    zkServer = InMemoryZKServer.builder().build();
+    zkServer.startAndWait();
+
+    // Start YARN mini cluster
+    config = new YarnConfiguration(new Configuration());
+
+    if (YarnUtils.isHadoop20()) {
+      config.set("yarn.resourcemanager.scheduler.class",
+                 "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler");
+    } else {
+      config.set("yarn.resourcemanager.scheduler.class",
+                 "org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler");
+      config.set("yarn.scheduler.capacity.resource-calculator",
+                 "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator");
+    }
+    config.set("yarn.minicluster.fixed.ports", "true");
+    config.set("yarn.nodemanager.vmem-pmem-ratio", "20.1");
+    config.set("yarn.nodemanager.vmem-check-enabled", "false");
+    config.set("yarn.scheduler.minimum-allocation-mb", "128");
+    config.set("yarn.nodemanager.delete.debug-delay-sec", "3600");
+
+    cluster = new MiniYARNCluster("test-cluster", 1, 1, 1);
+    cluster.init(config);
+    cluster.start();
+
+    runnerService = createTwillRunnerService();
+    runnerService.startAndWait();
+  }
+
+  @AfterClass
+  public static final void finish() {
+    runnerService.stopAndWait();
+    cluster.stop();
+    zkServer.stopAndWait();
+  }
+
+  public static final TwillRunner getTwillRunner() {
+    return runnerService;
+  }
+
+  /**
+   * Creates an unstarted instance of {@link org.apache.twill.api.TwillRunnerService}.
+   */
+  public static final TwillRunnerService createTwillRunnerService() throws IOException {
+    return new YarnTwillRunnerService(config, zkServer.getConnectionStr() + "/twill",
+                                      new LocalLocationFactory(tmpFolder.newFolder()));
+  }
+
+  public static final <T> boolean waitForSize(Iterable<T> iterable, int count, int limit) throws InterruptedException {
+    int trial = 0;
+    int size = Iterables.size(iterable);
+    while (size != count && trial < limit) {
+      LOG.info("Waiting for {} size {} == {}", iterable, size, count);
+      TimeUnit.SECONDS.sleep(1);
+      trial++;
+      size = Iterables.size(iterable);
+    }
+    return trial < limit;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/test/resources/header.txt
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/resources/header.txt b/twill-yarn/src/test/resources/header.txt
new file mode 100644
index 0000000..b6e25e6
--- /dev/null
+++ b/twill-yarn/src/test/resources/header.txt
@@ -0,0 +1 @@
+Local file header

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-yarn/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/resources/logback-test.xml b/twill-yarn/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..2615cb4
--- /dev/null
+++ b/twill-yarn/src/test/resources/logback-test.xml
@@ -0,0 +1,17 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!-- Default logback configuration for twill library -->
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n</pattern>
+        </encoder>
+    </appender>
+
+    <logger name="org.apache.twill" level="DEBUG" />
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+
+</configuration>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-zookeeper/pom.xml
----------------------------------------------------------------------
diff --git a/twill-zookeeper/pom.xml b/twill-zookeeper/pom.xml
new file mode 100644
index 0000000..e76ee50
--- /dev/null
+++ b/twill-zookeeper/pom.xml
@@ -0,0 +1,67 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>twill-parent</artifactId>
+        <groupId>org.apache.twill</groupId>
+        <version>0.1.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>twill-zookeeper</artifactId>
+    <name>Twill ZooKeeper client library</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>twill-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.google.code.findbugs</groupId>
+            <artifactId>jsr305</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.zookeeper</groupId>
+            <artifactId>zookeeper</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/BasicNodeChildren.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/BasicNodeChildren.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/BasicNodeChildren.java
new file mode 100644
index 0000000..9e4f55f
--- /dev/null
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/BasicNodeChildren.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.zookeeper;
+
+import org.apache.twill.zookeeper.NodeChildren;
+import com.google.common.base.Objects;
+import org.apache.zookeeper.data.Stat;
+
+import java.util.List;
+
+/**
+ *
+ */
+final class BasicNodeChildren implements NodeChildren {
+
+  private final Stat stat;
+  private final List<String> children;
+
+  BasicNodeChildren(List<String> children, Stat stat) {
+    this.stat = stat;
+    this.children = children;
+  }
+
+  @Override
+  public Stat getStat() {
+    return stat;
+  }
+
+  @Override
+  public List<String> getChildren() {
+    return children;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || !(o instanceof NodeChildren)) {
+      return false;
+    }
+
+    NodeChildren that = (NodeChildren) o;
+    return stat.equals(that.getStat()) && children.equals(that.getChildren());
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(children, stat);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/BasicNodeData.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/BasicNodeData.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/BasicNodeData.java
new file mode 100644
index 0000000..98a3a66
--- /dev/null
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/BasicNodeData.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.zookeeper;
+
+import org.apache.twill.zookeeper.NodeData;
+import com.google.common.base.Objects;
+import org.apache.zookeeper.data.Stat;
+
+import java.util.Arrays;
+
+/**
+ * A straightforward implementation for {@link NodeData}.
+ */
+final class BasicNodeData implements NodeData {
+
+  private final byte[] data;
+  private final Stat stat;
+
+  BasicNodeData(byte[] data, Stat stat) {
+    this.data = data;
+    this.stat = stat;
+  }
+
+  @Override
+  public Stat getStat() {
+    return stat;
+  }
+
+  @Override
+  public byte[] getData() {
+    return data;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || !(o instanceof NodeData)) {
+      return false;
+    }
+
+    BasicNodeData that = (BasicNodeData) o;
+
+    return stat.equals(that.getStat()) && Arrays.equals(data, that.getData());
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(data, stat);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
new file mode 100644
index 0000000..c52fb08
--- /dev/null
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java
@@ -0,0 +1,525 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.zookeeper;
+
+import org.apache.twill.common.Threads;
+import org.apache.twill.zookeeper.NodeChildren;
+import org.apache.twill.zookeeper.NodeData;
+import org.apache.twill.zookeeper.OperationFuture;
+import org.apache.twill.zookeeper.ZKClientService;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.util.concurrent.AbstractService;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Service;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * The base implementation of {@link ZKClientService}.
+ */
+public final class DefaultZKClientService implements ZKClientService {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DefaultZKClientService.class);
+
+  private final String zkStr;
+  private final int sessionTimeout;
+  private final List<Watcher> connectionWatchers;
+  private final AtomicReference<ZooKeeper> zooKeeper;
+  private final Function<String, List<ACL>> aclMapper;
+  private final Service serviceDelegate;
+  private ExecutorService eventExecutor;
+
+  public DefaultZKClientService(String zkStr, int sessionTimeout, Watcher connectionWatcher) {
+    this.zkStr = zkStr;
+    this.sessionTimeout = sessionTimeout;
+    this.connectionWatchers = new CopyOnWriteArrayList<Watcher>();
+    addConnectionWatcher(connectionWatcher);
+
+    this.zooKeeper = new AtomicReference<ZooKeeper>();
+
+    // TODO (terence): Add ACL
+    aclMapper = new Function<String, List<ACL>>() {
+      @Override
+      public List<ACL> apply(String input) {
+        return ZooDefs.Ids.OPEN_ACL_UNSAFE;
+      }
+    };
+    serviceDelegate = new ServiceDelegate();
+  }
+
+  @Override
+  public Long getSessionId() {
+    ZooKeeper zk = zooKeeper.get();
+    return zk == null ? null : zk.getSessionId();
+  }
+
+  @Override
+  public String getConnectString() {
+    return zkStr;
+  }
+
+  @Override
+  public void addConnectionWatcher(Watcher watcher) {
+    if (watcher != null) {
+      connectionWatchers.add(wrapWatcher(watcher));
+    }
+  }
+
+  @Override
+  public OperationFuture<String> create(String path, byte[] data, CreateMode createMode) {
+    return create(path, data, createMode, true);
+  }
+
+  @Override
+  public OperationFuture<String> create(String path, @Nullable byte[] data,
+                                        CreateMode createMode, boolean createParent) {
+    return doCreate(path, data, createMode, createParent, false);
+  }
+
+  private OperationFuture<String> doCreate(final String path,
+                                        @Nullable final byte[] data,
+                                        final CreateMode createMode,
+                                        final boolean createParent,
+                                        final boolean ignoreNodeExists) {
+    final SettableOperationFuture<String> createFuture = SettableOperationFuture.create(path, eventExecutor);
+    getZooKeeper().create(path, data, aclMapper.apply(path), createMode, Callbacks.STRING, createFuture);
+    if (!createParent) {
+      return createFuture;
+    }
+
+    // If create parent is request, return a different future
+    final SettableOperationFuture<String> result = SettableOperationFuture.create(path, eventExecutor);
+    // Watch for changes in the original future
+    Futures.addCallback(createFuture, new FutureCallback<String>() {
+      @Override
+      public void onSuccess(String path) {
+        // Propagate if creation was successful
+        result.set(path);
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        // See if the failure can be handled
+        if (updateFailureResult(t, result, path, ignoreNodeExists)) {
+          return;
+        }
+        // Create the parent node
+        String parentPath = getParent(path);
+        if (parentPath.isEmpty()) {
+          result.setException(t);
+          return;
+        }
+        // Watch for parent creation complete
+        Futures.addCallback(
+          doCreate(parentPath, null, CreateMode.PERSISTENT, createParent, true), new FutureCallback<String>() {
+          @Override
+          public void onSuccess(String parentPath) {
+            // Create the requested path again
+            Futures.addCallback(
+              doCreate(path, data, createMode, false, ignoreNodeExists), new FutureCallback<String>() {
+              @Override
+              public void onSuccess(String pathResult) {
+                result.set(pathResult);
+              }
+
+              @Override
+              public void onFailure(Throwable t) {
+                // handle the failure
+                updateFailureResult(t, result, path, ignoreNodeExists);
+              }
+            });
+          }
+
+          @Override
+          public void onFailure(Throwable t) {
+            result.setException(t);
+          }
+        });
+      }
+
+      /**
+       * Updates the result future based on the given {@link Throwable}.
+       * @param t Cause of the failure
+       * @param result Future to be updated
+       * @param path Request path for the operation
+       * @return {@code true} if it is a failure, {@code false} otherwise.
+       */
+      private boolean updateFailureResult(Throwable t, SettableOperationFuture<String> result,
+                                          String path, boolean ignoreNodeExists) {
+        // Propagate if there is error
+        if (!(t instanceof KeeperException)) {
+          result.setException(t);
+          return true;
+        }
+        KeeperException.Code code = ((KeeperException) t).code();
+        // Node already exists, simply return success if it allows for ignoring node exists (for parent node creation).
+        if (ignoreNodeExists && code == KeeperException.Code.NODEEXISTS) {
+          // The requested path could be used because it only applies to non-sequential node
+          result.set(path);
+          return false;
+        }
+        if (code != KeeperException.Code.NONODE) {
+          result.setException(t);
+          return true;
+        }
+        return false;
+      }
+
+      /**
+       * Gets the parent of the given path.
+       * @param path Path for computing its parent
+       * @return Parent of the given path, or empty string if the given path is the root path already.
+       */
+      private String getParent(String path) {
+        String parentPath = path.substring(0, path.lastIndexOf('/'));
+        return (parentPath.isEmpty() && !"/".equals(path)) ? "/" : parentPath;
+      }
+    });
+
+    return result;
+  }
+
+  @Override
+  public OperationFuture<Stat> exists(String path) {
+    return exists(path, null);
+  }
+
+  @Override
+  public OperationFuture<Stat> exists(String path, Watcher watcher) {
+    SettableOperationFuture<Stat> result = SettableOperationFuture.create(path, eventExecutor);
+    getZooKeeper().exists(path, wrapWatcher(watcher), Callbacks.STAT_NONODE, result);
+    return result;
+  }
+
+  @Override
+  public OperationFuture<NodeChildren> getChildren(String path) {
+    return getChildren(path, null);
+  }
+
+  @Override
+  public OperationFuture<NodeChildren> getChildren(String path, Watcher watcher) {
+    SettableOperationFuture<NodeChildren> result = SettableOperationFuture.create(path, eventExecutor);
+    getZooKeeper().getChildren(path, wrapWatcher(watcher), Callbacks.CHILDREN, result);
+    return result;
+  }
+
+  @Override
+  public OperationFuture<NodeData> getData(String path) {
+    return getData(path, null);
+  }
+
+  @Override
+  public OperationFuture<NodeData> getData(String path, Watcher watcher) {
+    SettableOperationFuture<NodeData> result = SettableOperationFuture.create(path, eventExecutor);
+    getZooKeeper().getData(path, wrapWatcher(watcher), Callbacks.DATA, result);
+
+    return result;
+  }
+
+  @Override
+  public OperationFuture<Stat> setData(String path, byte[] data) {
+    return setData(path, data, -1);
+  }
+
+  @Override
+  public OperationFuture<Stat> setData(String dataPath, byte[] data, int version) {
+    SettableOperationFuture<Stat> result = SettableOperationFuture.create(dataPath, eventExecutor);
+    getZooKeeper().setData(dataPath, data, version, Callbacks.STAT, result);
+    return result;
+  }
+
+  @Override
+  public OperationFuture<String> delete(String path) {
+    return delete(path, -1);
+  }
+
+  @Override
+  public OperationFuture<String> delete(String deletePath, int version) {
+    SettableOperationFuture<String> result = SettableOperationFuture.create(deletePath, eventExecutor);
+    getZooKeeper().delete(deletePath, version, Callbacks.VOID, result);
+    return result;
+  }
+
+  @Override
+  public Supplier<ZooKeeper> getZooKeeperSupplier() {
+    return new Supplier<ZooKeeper>() {
+      @Override
+      public ZooKeeper get() {
+        return getZooKeeper();
+      }
+    };
+  }
+
+  @Override
+  public ListenableFuture<State> start() {
+    return serviceDelegate.start();
+  }
+
+  @Override
+  public State startAndWait() {
+    return serviceDelegate.startAndWait();
+  }
+
+  @Override
+  public boolean isRunning() {
+    return serviceDelegate.isRunning();
+  }
+
+  @Override
+  public State state() {
+    return serviceDelegate.state();
+  }
+
+  @Override
+  public ListenableFuture<State> stop() {
+    return serviceDelegate.stop();
+  }
+
+  @Override
+  public State stopAndWait() {
+    return serviceDelegate.stopAndWait();
+  }
+
+  @Override
+  public void addListener(Listener listener, Executor executor) {
+    serviceDelegate.addListener(listener, executor);
+  }
+
+  /**
+   * @return Current {@link ZooKeeper} client.
+   */
+  private ZooKeeper getZooKeeper() {
+    ZooKeeper zk = zooKeeper.get();
+    Preconditions.checkArgument(zk != null, "Not connected to zooKeeper.");
+    return zk;
+  }
+
+  /**
+   * Wraps the given watcher to be called from the event executor.
+   * @param watcher Watcher to be wrapped
+   * @return The wrapped Watcher
+   */
+  private Watcher wrapWatcher(final Watcher watcher) {
+    if (watcher == null) {
+      return null;
+    }
+    return new Watcher() {
+      @Override
+      public void process(final WatchedEvent event) {
+        eventExecutor.execute(new Runnable() {
+          @Override
+          public void run() {
+            try {
+              watcher.process(event);
+            } catch (Throwable t) {
+              LOG.error("Watcher throws exception.", t);
+            }
+          }
+        });
+      }
+    };
+  }
+
+  private final class ServiceDelegate extends AbstractService implements Watcher {
+
+    @Override
+    protected void doStart() {
+      // A single thread executor
+      eventExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
+                                             Threads.createDaemonThreadFactory("zk-client-EventThread")) {
+        @Override
+        protected void terminated() {
+          super.terminated();
+          notifyStopped();
+        }
+      };
+
+      try {
+        zooKeeper.set(new ZooKeeper(zkStr, sessionTimeout, this));
+      } catch (IOException e) {
+        notifyFailed(e);
+      }
+    }
+
+    @Override
+    protected void doStop() {
+      ZooKeeper zk = zooKeeper.getAndSet(null);
+      if (zk != null) {
+        try {
+          zk.close();
+        } catch (InterruptedException e) {
+          notifyFailed(e);
+        } finally {
+          eventExecutor.shutdown();
+        }
+      }
+    }
+
+    @Override
+    public void process(WatchedEvent event) {
+      try {
+        if (event.getState() == Event.KeeperState.SyncConnected && state() == State.STARTING) {
+          LOG.info("Connected to ZooKeeper: " + zkStr);
+          notifyStarted();
+          return;
+        }
+        if (event.getState() == Event.KeeperState.Expired) {
+          LOG.info("ZooKeeper session expired: " + zkStr);
+
+          // When connection expired, simply reconnect again
+          Thread t = new Thread(new Runnable() {
+            @Override
+            public void run() {
+              try {
+                zooKeeper.set(new ZooKeeper(zkStr, sessionTimeout, ServiceDelegate.this));
+              } catch (IOException e) {
+                zooKeeper.set(null);
+                notifyFailed(e);
+              }
+            }
+          }, "zk-reconnect");
+          t.setDaemon(true);
+          t.start();
+        }
+      } finally {
+        if (event.getType() == Event.EventType.None && !connectionWatchers.isEmpty()) {
+          for (Watcher connectionWatcher : connectionWatchers) {
+            connectionWatcher.process(event);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Collection of generic callbacks that simply reflect results into OperationFuture.
+   */
+  private static final class Callbacks {
+    static final AsyncCallback.StringCallback STRING = new AsyncCallback.StringCallback() {
+      @Override
+      @SuppressWarnings("unchecked")
+      public void processResult(int rc, String path, Object ctx, String name) {
+        SettableOperationFuture<String> result = (SettableOperationFuture<String>) ctx;
+        KeeperException.Code code = KeeperException.Code.get(rc);
+        if (code == KeeperException.Code.OK) {
+          result.set((name == null || name.isEmpty()) ? path : name);
+          return;
+        }
+        result.setException(KeeperException.create(code, result.getRequestPath()));
+      }
+    };
+
+    static final AsyncCallback.StatCallback STAT = new AsyncCallback.StatCallback() {
+      @Override
+      @SuppressWarnings("unchecked")
+      public void processResult(int rc, String path, Object ctx, Stat stat) {
+        SettableOperationFuture<Stat> result = (SettableOperationFuture<Stat>) ctx;
+        KeeperException.Code code = KeeperException.Code.get(rc);
+        if (code == KeeperException.Code.OK) {
+          result.set(stat);
+          return;
+        }
+        result.setException(KeeperException.create(code, result.getRequestPath()));
+      }
+    };
+
+    /**
+     * A stat callback that treats NONODE as success.
+     */
+    static final AsyncCallback.StatCallback STAT_NONODE = new AsyncCallback.StatCallback() {
+      @Override
+      @SuppressWarnings("unchecked")
+      public void processResult(int rc, String path, Object ctx, Stat stat) {
+        SettableOperationFuture<Stat> result = (SettableOperationFuture<Stat>) ctx;
+        KeeperException.Code code = KeeperException.Code.get(rc);
+        if (code == KeeperException.Code.OK || code == KeeperException.Code.NONODE) {
+          result.set(stat);
+          return;
+        }
+        result.setException(KeeperException.create(code, result.getRequestPath()));
+      }
+    };
+
+    static final AsyncCallback.Children2Callback CHILDREN = new AsyncCallback.Children2Callback() {
+      @Override
+      @SuppressWarnings("unchecked")
+      public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
+        SettableOperationFuture<NodeChildren> result = (SettableOperationFuture<NodeChildren>) ctx;
+        KeeperException.Code code = KeeperException.Code.get(rc);
+        if (code == KeeperException.Code.OK) {
+          result.set(new BasicNodeChildren(children, stat));
+          return;
+        }
+        result.setException(KeeperException.create(code, result.getRequestPath()));
+      }
+    };
+
+    static final AsyncCallback.DataCallback DATA = new AsyncCallback.DataCallback() {
+      @Override
+      @SuppressWarnings("unchecked")
+      public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
+        SettableOperationFuture<NodeData> result = (SettableOperationFuture<NodeData>) ctx;
+        KeeperException.Code code = KeeperException.Code.get(rc);
+        if (code == KeeperException.Code.OK) {
+          result.set(new BasicNodeData(data, stat));
+          return;
+        }
+        result.setException(KeeperException.create(code, result.getRequestPath()));
+      }
+    };
+
+    static final AsyncCallback.VoidCallback VOID = new AsyncCallback.VoidCallback() {
+      @Override
+      @SuppressWarnings("unchecked")
+      public void processResult(int rc, String path, Object ctx) {
+        SettableOperationFuture<String> result = (SettableOperationFuture<String>) ctx;
+        KeeperException.Code code = KeeperException.Code.get(rc);
+        if (code == KeeperException.Code.OK) {
+          result.set(result.getRequestPath());
+          return;
+        }
+        // Otherwise, it is an error
+        result.setException(KeeperException.create(code, result.getRequestPath()));
+      }
+    };
+  }
+}