You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/12/13 23:27:35 UTC

[19/28] [TWILL-14] Bootstrapping for the site generation. Reorganization of the source tree happens:

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-common/src/test/java/org/apache/twill/common/ServicesTest.java
----------------------------------------------------------------------
diff --git a/twill-common/src/test/java/org/apache/twill/common/ServicesTest.java b/twill-common/src/test/java/org/apache/twill/common/ServicesTest.java
new file mode 100644
index 0000000..c0aa7ee
--- /dev/null
+++ b/twill-common/src/test/java/org/apache/twill/common/ServicesTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.common;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Service;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Unit test for {@link Services} methods.
+ */
+public class ServicesTest {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ServicesTest.class);
+
+  @Test
+  public void testChain() throws ExecutionException, InterruptedException {
+    AtomicBoolean transiting = new AtomicBoolean(false);
+    Service s1 = new DummyService("s1", transiting);
+    Service s2 = new DummyService("s2", transiting);
+    Service s3 = new DummyService("s3", transiting);
+
+    Futures.allAsList(Services.chainStart(s1, s2, s3).get()).get();
+    Futures.allAsList(Services.chainStop(s3, s2, s1).get()).get();
+  }
+
+  @Test
+  public void testCompletion() throws ExecutionException, InterruptedException {
+    Service service = new DummyService("s1", new AtomicBoolean());
+    ListenableFuture<Service.State> completion = Services.getCompletionFuture(service);
+
+    service.start();
+    service.stop();
+
+    completion.get();
+
+    AtomicBoolean transiting = new AtomicBoolean();
+    service = new DummyService("s2", transiting);
+    completion = Services.getCompletionFuture(service);
+
+    service.startAndWait();
+    transiting.set(true);
+    service.stop();
+
+    try {
+      completion.get();
+      Assert.assertTrue(false);
+    } catch (ExecutionException e) {
+      // Expected
+    }
+  }
+
+  private static final class DummyService extends AbstractIdleService {
+
+    private final String name;
+    private final AtomicBoolean transiting;
+
+    private DummyService(String name, AtomicBoolean transiting) {
+      this.name = name;
+      this.transiting = transiting;
+    }
+
+    @Override
+    protected void startUp() throws Exception {
+      Preconditions.checkState(transiting.compareAndSet(false, true));
+      LOG.info("Starting: " + name);
+      TimeUnit.MILLISECONDS.sleep(500);
+      LOG.info("Started: " + name);
+      Preconditions.checkState(transiting.compareAndSet(true, false));
+    }
+
+    @Override
+    protected void shutDown() throws Exception {
+      Preconditions.checkState(transiting.compareAndSet(false, true));
+      LOG.info("Stopping: " + name);
+      TimeUnit.MILLISECONDS.sleep(500);
+      LOG.info("Stopped: " + name);
+      Preconditions.checkState(transiting.compareAndSet(true, false));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-common/src/test/java/org/apache/twill/filesystem/LocalLocationTest.java
----------------------------------------------------------------------
diff --git a/twill-common/src/test/java/org/apache/twill/filesystem/LocalLocationTest.java b/twill-common/src/test/java/org/apache/twill/filesystem/LocalLocationTest.java
new file mode 100644
index 0000000..198f77f
--- /dev/null
+++ b/twill-common/src/test/java/org/apache/twill/filesystem/LocalLocationTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.filesystem;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ *
+ */
+public class LocalLocationTest {
+
+  @Test
+  public void testDelete() throws IOException {
+    LocationFactory factory = new LocalLocationFactory(new File(System.getProperty("java.io.tmpdir")));
+
+    Location base = factory.create("test").getTempFile(".tmp");
+    Assert.assertTrue(base.mkdirs());
+
+    Assert.assertTrue(base.append("test1").getTempFile(".tmp").createNew());
+    Assert.assertTrue(base.append("test2").getTempFile(".tmp").createNew());
+
+    Location subDir = base.append("test3");
+    Assert.assertTrue(subDir.mkdirs());
+
+    Assert.assertTrue(subDir.append("test4").getTempFile(".tmp").createNew());
+    Assert.assertTrue(subDir.append("test5").getTempFile(".tmp").createNew());
+
+    Assert.assertTrue(base.delete(true));
+    Assert.assertFalse(base.exists());
+  }
+
+  @Test
+  public void testHelper() {
+    LocationFactory factory = LocationFactories.namespace(
+                                new LocalLocationFactory(new File(System.getProperty("java.io.tmpdir"))),
+                                "testhelper");
+
+    Location location = factory.create("test");
+    Assert.assertTrue(location.toURI().getPath().endsWith("testhelper/test"));
+
+    location = factory.create(URI.create("test2"));
+    Assert.assertTrue(location.toURI().getPath().endsWith("testhelper/test2"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/pom.xml
----------------------------------------------------------------------
diff --git a/twill-core/pom.xml b/twill-core/pom.xml
new file mode 100644
index 0000000..faff711
--- /dev/null
+++ b/twill-core/pom.xml
@@ -0,0 +1,89 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>twill-parent</artifactId>
+        <groupId>org.apache.twill</groupId>
+        <version>0.1.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>twill-core</artifactId>
+    <name>Twill core library</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>twill-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>twill-zookeeper</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>twill-discovery-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.google.code.gson</groupId>
+            <artifactId>gson</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.xerial.snappy</groupId>
+            <artifactId>snappy-java</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.ow2.asm</groupId>
+            <artifactId>asm-all</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-compress</artifactId>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/AbstractExecutionServiceController.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractExecutionServiceController.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractExecutionServiceController.java
new file mode 100644
index 0000000..974639d
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractExecutionServiceController.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+import org.apache.twill.api.RunId;
+import org.apache.twill.api.ServiceController;
+import org.apache.twill.common.Threads;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Service;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ *
+ */
+public abstract class AbstractExecutionServiceController implements ServiceController {
+
+  private final RunId runId;
+  private final ListenerExecutors listenerExecutors;
+  private final Service serviceDelegate;
+
+  protected AbstractExecutionServiceController(RunId runId) {
+    this.runId = runId;
+    this.listenerExecutors = new ListenerExecutors();
+    this.serviceDelegate = new ServiceDelegate();
+  }
+
+  protected abstract void startUp();
+
+  protected abstract void shutDown();
+
+  @Override
+  public final RunId getRunId() {
+    return runId;
+  }
+
+  @Override
+  public final void addListener(Listener listener, Executor executor) {
+    listenerExecutors.addListener(new ListenerExecutor(listener, executor));
+  }
+
+  @Override
+  public final ListenableFuture<State> start() {
+    serviceDelegate.addListener(listenerExecutors, Threads.SAME_THREAD_EXECUTOR);
+    return serviceDelegate.start();
+  }
+
+  @Override
+  public final State startAndWait() {
+    return Futures.getUnchecked(start());
+  }
+
+  @Override
+  public final boolean isRunning() {
+    return serviceDelegate.isRunning();
+  }
+
+  @Override
+  public final State state() {
+    return serviceDelegate.state();
+  }
+
+  @Override
+  public final State stopAndWait() {
+    return Futures.getUnchecked(stop());
+  }
+
+  @Override
+  public final ListenableFuture<State> stop() {
+    return serviceDelegate.stop();
+  }
+
+  protected Executor executor(final State state) {
+    return new Executor() {
+      @Override
+      public void execute(Runnable command) {
+        Thread t = new Thread(command, getClass().getSimpleName() + " " + state);
+        t.setDaemon(true);
+        t.start();
+      }
+    };
+  }
+
+
+  private final class ServiceDelegate extends AbstractIdleService {
+    @Override
+    protected void startUp() throws Exception {
+      AbstractExecutionServiceController.this.startUp();
+    }
+
+    @Override
+    protected void shutDown() throws Exception {
+      AbstractExecutionServiceController.this.shutDown();
+    }
+
+    @Override
+    protected Executor executor(State state) {
+      return AbstractExecutionServiceController.this.executor(state);
+    }
+  }
+
+  /**
+   * Inner class for dispatching listener call back to a list of listeners
+   */
+  private static final class ListenerExecutors implements Listener {
+
+    private interface Callback {
+      void call(Listener listener);
+    }
+
+    private final Queue<ListenerExecutor> listeners = new ConcurrentLinkedQueue<ListenerExecutor>();
+    private final AtomicReference<Callback> lastState = new AtomicReference<Callback>();
+
+    private synchronized void addListener(final ListenerExecutor listener) {
+      listeners.add(listener);
+      Callback callback = lastState.get();
+      if (callback != null) {
+        callback.call(listener);
+      }
+    }
+
+    @Override
+    public synchronized void starting() {
+      lastState.set(new Callback() {
+        @Override
+        public void call(Listener listener) {
+          listener.starting();
+        }
+      });
+      for (ListenerExecutor listener : listeners) {
+        listener.starting();
+      }
+    }
+
+    @Override
+    public synchronized void running() {
+      lastState.set(new Callback() {
+        @Override
+        public void call(Listener listener) {
+          listener.running();
+        }
+      });
+      for (ListenerExecutor listener : listeners) {
+        listener.running();
+      }
+    }
+
+    @Override
+    public synchronized void stopping(final State from) {
+      lastState.set(new Callback() {
+        @Override
+        public void call(Listener listener) {
+          listener.stopping(from);
+        }
+      });
+      for (ListenerExecutor listener : listeners) {
+        listener.stopping(from);
+      }
+    }
+
+    @Override
+    public synchronized void terminated(final State from) {
+      lastState.set(new Callback() {
+        @Override
+        public void call(Listener listener) {
+          listener.terminated(from);
+        }
+      });
+      for (ListenerExecutor listener : listeners) {
+        listener.terminated(from);
+      }
+    }
+
+    @Override
+    public synchronized void failed(final State from, final Throwable failure) {
+      lastState.set(new Callback() {
+        @Override
+        public void call(Listener listener) {
+          listener.failed(from, failure);
+        }
+      });
+      for (ListenerExecutor listener : listeners) {
+        listener.failed(from, failure);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
new file mode 100644
index 0000000..5806f9d
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+import org.apache.twill.api.RunId;
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.logging.LogEntry;
+import org.apache.twill.api.logging.LogHandler;
+import org.apache.twill.discovery.Discoverable;
+import org.apache.twill.discovery.DiscoveryServiceClient;
+import org.apache.twill.discovery.ZKDiscoveryService;
+import org.apache.twill.internal.json.StackTraceElementCodec;
+import org.apache.twill.internal.kafka.client.SimpleKafkaClient;
+import org.apache.twill.internal.logging.LogEntryDecoder;
+import org.apache.twill.internal.state.SystemMessages;
+import org.apache.twill.kafka.client.FetchedMessage;
+import org.apache.twill.kafka.client.KafkaClient;
+import org.apache.twill.zookeeper.ZKClient;
+import org.apache.twill.zookeeper.ZKClients;
+import com.google.common.base.Charsets;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A abstract base class for {@link org.apache.twill.api.TwillController} implementation that uses Zookeeper to controller a
+ * running twill application.
+ */
+public abstract class AbstractTwillController extends AbstractZKServiceController implements TwillController {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractTwillController.class);
+  private static final int MAX_KAFKA_FETCH_SIZE = 1048576;
+  private static final long SHUTDOWN_TIMEOUT_MS = 2000;
+  private static final long LOG_FETCH_TIMEOUT_MS = 5000;
+
+  private final Queue<LogHandler> logHandlers;
+  private final KafkaClient kafkaClient;
+  private final DiscoveryServiceClient discoveryServiceClient;
+  private final LogPollerThread logPoller;
+
+  public AbstractTwillController(RunId runId, ZKClient zkClient, Iterable<LogHandler> logHandlers) {
+    super(runId, zkClient);
+    this.logHandlers = new ConcurrentLinkedQueue<LogHandler>();
+    this.kafkaClient = new SimpleKafkaClient(ZKClients.namespace(zkClient, "/" + runId.getId() + "/kafka"));
+    this.discoveryServiceClient = new ZKDiscoveryService(zkClient);
+    Iterables.addAll(this.logHandlers, logHandlers);
+    this.logPoller = new LogPollerThread(runId, kafkaClient, logHandlers);
+  }
+
+  @Override
+  protected void doStartUp() {
+    if (!logHandlers.isEmpty()) {
+      logPoller.start();
+    }
+  }
+
+  @Override
+  protected void doShutDown() {
+    logPoller.terminate();
+    try {
+      // Wait for the poller thread to stop.
+      logPoller.join(SHUTDOWN_TIMEOUT_MS);
+    } catch (InterruptedException e) {
+      LOG.warn("Joining of log poller thread interrupted.", e);
+    }
+  }
+
+  @Override
+  public final synchronized void addLogHandler(LogHandler handler) {
+    logHandlers.add(handler);
+    if (!logPoller.isAlive()) {
+      logPoller.start();
+    }
+  }
+
+  @Override
+  public final Iterable<Discoverable> discoverService(String serviceName) {
+    return discoveryServiceClient.discover(serviceName);
+  }
+
+  @Override
+  public final ListenableFuture<Integer> changeInstances(String runnable, int newCount) {
+    return sendMessage(SystemMessages.setInstances(runnable, newCount), newCount);
+  }
+
+  private static final class LogPollerThread extends Thread {
+
+    private final KafkaClient kafkaClient;
+    private final Iterable<LogHandler> logHandlers;
+    private volatile boolean running = true;
+
+    LogPollerThread(RunId runId, KafkaClient kafkaClient, Iterable<LogHandler> logHandlers) {
+      super("twill-log-poller-" + runId.getId());
+      setDaemon(true);
+      this.kafkaClient = kafkaClient;
+      this.logHandlers = logHandlers;
+    }
+
+    @Override
+    public void run() {
+      LOG.info("Twill log poller thread '{}' started.", getName());
+      kafkaClient.startAndWait();
+      Gson gson = new GsonBuilder().registerTypeAdapter(LogEntry.class, new LogEntryDecoder())
+        .registerTypeAdapter(StackTraceElement.class, new StackTraceElementCodec())
+        .create();
+
+      while (running && !isInterrupted()) {
+        long offset;
+        try {
+          // Get the earliest offset
+          long[] offsets = kafkaClient.getOffset(Constants.LOG_TOPIC, 0, -2, 1).get(LOG_FETCH_TIMEOUT_MS,
+                                                                                    TimeUnit.MILLISECONDS);
+          // Should have one entry
+          offset = offsets[0];
+        } catch (Throwable t) {
+          // Keep retrying
+          LOG.warn("Failed to fetch offsets from Kafka. Retrying.", t);
+          continue;
+        }
+
+        // Now fetch log messages from Kafka
+        Iterator<FetchedMessage> messageIterator = kafkaClient.consume(Constants.LOG_TOPIC, 0,
+                                                                       offset, MAX_KAFKA_FETCH_SIZE);
+        try {
+          while (messageIterator.hasNext()) {
+            String json = Charsets.UTF_8.decode(messageIterator.next().getBuffer()).toString();
+            try {
+              LogEntry entry = gson.fromJson(json, LogEntry.class);
+              if (entry != null) {
+                invokeHandlers(entry);
+              }
+            } catch (Exception e) {
+              LOG.error("Failed to decode log entry {}", json, e);
+            }
+          }
+        } catch (Throwable t) {
+          LOG.warn("Exception while fetching log message from Kafka. Retrying.", t);
+          continue;
+        }
+      }
+
+      kafkaClient.stopAndWait();
+      LOG.info("Twill log poller thread stopped.");
+    }
+
+    void terminate() {
+      running = false;
+      interrupt();
+    }
+
+    private void invokeHandlers(LogEntry entry) {
+      for (LogHandler handler : logHandlers) {
+        handler.onLog(entry);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
new file mode 100644
index 0000000..98cc2b8
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+import org.apache.twill.api.Command;
+import org.apache.twill.api.RunId;
+import org.apache.twill.api.ServiceController;
+import org.apache.twill.common.Threads;
+import org.apache.twill.internal.json.StackTraceElementCodec;
+import org.apache.twill.internal.json.StateNodeCodec;
+import org.apache.twill.internal.state.Message;
+import org.apache.twill.internal.state.Messages;
+import org.apache.twill.internal.state.StateNode;
+import org.apache.twill.internal.state.SystemMessages;
+import org.apache.twill.zookeeper.NodeData;
+import org.apache.twill.zookeeper.ZKClient;
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.gson.GsonBuilder;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * An abstract base class for implementing a {@link ServiceController} using ZooKeeper as a means for
+ * communicating with the remote service. This is designed to work in pair with the {@link ZKServiceDecorator}.
+ */
+public abstract class AbstractZKServiceController extends AbstractExecutionServiceController {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractZKServiceController.class);
+
+  private final ZKClient zkClient;
+  private final InstanceNodeDataCallback instanceNodeDataCallback;
+  private final StateNodeDataCallback stateNodeDataCallback;
+  private final List<ListenableFuture<?>> messageFutures;
+  private ListenableFuture<State> stopMessageFuture;
+
+  protected AbstractZKServiceController(RunId runId, ZKClient zkClient) {
+    super(runId);
+    this.zkClient = zkClient;
+    this.instanceNodeDataCallback = new InstanceNodeDataCallback();
+    this.stateNodeDataCallback = new StateNodeDataCallback();
+    this.messageFutures = Lists.newLinkedList();
+  }
+
+  @Override
+  public final ListenableFuture<Command> sendCommand(Command command) {
+    return sendMessage(Messages.createForAll(command), command);
+  }
+
+  @Override
+  public final ListenableFuture<Command> sendCommand(String runnableName, Command command) {
+    return sendMessage(Messages.createForRunnable(runnableName, command), command);
+  }
+
+  @Override
+  protected final void startUp() {
+    // Watch for instance node existence.
+    actOnExists(getInstancePath(), new Runnable() {
+      @Override
+      public void run() {
+        watchInstanceNode();
+      }
+    });
+
+    // Watch for state node data
+    actOnExists(getZKPath("state"), new Runnable() {
+      @Override
+      public void run() {
+        watchStateNode();
+      }
+    });
+
+    doStartUp();
+  }
+
+  @Override
+  protected final synchronized void shutDown() {
+    if (stopMessageFuture == null) {
+      stopMessageFuture = ZKMessages.sendMessage(zkClient, getMessagePrefix(),
+                                                 SystemMessages.stopApplication(), State.TERMINATED);
+    }
+
+    // Cancel all pending message futures.
+    for (ListenableFuture<?> future : messageFutures) {
+      future.cancel(true);
+    }
+
+    doShutDown();
+  }
+
+  /**
+   * Sends a {@link Message} to the remote service. Returns a future that will be completed when the message
+   * has been processed.
+   * @param message The message to send.
+   * @param result Object to set into the future when message is being processed.
+   * @param <V> Type of the result.
+   * @return A {@link ListenableFuture} that will be completed when the message has been processed.
+   */
+  protected final synchronized <V> ListenableFuture<V> sendMessage(Message message, V result) {
+    if (!isRunning()) {
+      return Futures.immediateFailedFuture(new IllegalStateException("Cannot send message to non-running application"));
+    }
+    final ListenableFuture<V> messageFuture = ZKMessages.sendMessage(zkClient, getMessagePrefix(), message, result);
+    messageFutures.add(messageFuture);
+    messageFuture.addListener(new Runnable() {
+      @Override
+      public void run() {
+        // If the completion is triggered when stopping, do nothing.
+        if (state() == State.STOPPING) {
+          return;
+        }
+        synchronized (AbstractZKServiceController.this) {
+          messageFutures.remove(messageFuture);
+        }
+      }
+    }, Threads.SAME_THREAD_EXECUTOR);
+
+    return messageFuture;
+  }
+
+  protected final ListenableFuture<State> getStopMessageFuture() {
+    return stopMessageFuture;
+  }
+
+  /**
+   * Called during startup. Executed in the startup thread.
+   */
+  protected abstract void doStartUp();
+
+  /**
+   * Called during shutdown. Executed in the shutdown thread.
+   */
+  protected abstract void doShutDown();
+
+  /**
+   * Called when an update on the live instance node is detected.
+   * @param nodeData The updated live instance node data or {@code null} if there is an error when fetching
+   *                 the node data.
+   */
+  protected abstract void instanceNodeUpdated(NodeData nodeData);
+
+  /**
+   * Called when an update on the state node is detected.
+   * @param stateNode The update state node data or {@code null} if there is an error when fetching the node data.
+   */
+  protected abstract void stateNodeUpdated(StateNode stateNode);
+
+  protected synchronized void forceShutDown() {
+    if (stopMessageFuture == null) {
+      // In force shutdown, don't send message.
+      stopMessageFuture = Futures.immediateFuture(State.TERMINATED);
+    }
+    stop();
+  }
+
+
+  private void actOnExists(final String path, final Runnable action) {
+    // Watch for node existence.
+    final AtomicBoolean nodeExists = new AtomicBoolean(false);
+    Futures.addCallback(zkClient.exists(path, new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+        // When node is created, call the action.
+        // Other event type would be handled by the action.
+        if (event.getType() == Event.EventType.NodeCreated && nodeExists.compareAndSet(false, true)) {
+          action.run();
+        }
+      }
+    }), new FutureCallback<Stat>() {
+      @Override
+      public void onSuccess(Stat result) {
+        if (result != null && nodeExists.compareAndSet(false, true)) {
+          action.run();
+        }
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        LOG.error("Failed in exists call to {}. Shutting down service.", path, t);
+        forceShutDown();
+      }
+    }, Threads.SAME_THREAD_EXECUTOR);
+  }
+
+  private void watchInstanceNode() {
+    Futures.addCallback(zkClient.getData(getInstancePath(), new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+        State state = state();
+        if (state != State.NEW && state != State.STARTING && state != State.RUNNING) {
+          // Ignore ZK node events when it is in stopping sequence.
+          return;
+        }
+        switch (event.getType()) {
+          case NodeDataChanged:
+            watchInstanceNode();
+            break;
+          case NodeDeleted:
+            // When the ephemeral node goes away, treat the remote service stopped.
+            forceShutDown();
+            break;
+          default:
+            LOG.info("Ignore ZK event for instance node: {}", event);
+        }
+      }
+    }), instanceNodeDataCallback, Threads.SAME_THREAD_EXECUTOR);
+  }
+
+  private void watchStateNode() {
+    Futures.addCallback(zkClient.getData(getZKPath("state"), new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+        State state = state();
+        if (state != State.NEW && state != State.STARTING && state != State.RUNNING) {
+          // Ignore ZK node events when it is in stopping sequence.
+          return;
+        }
+        switch (event.getType()) {
+          case NodeDataChanged:
+            watchStateNode();
+            break;
+          default:
+            LOG.info("Ignore ZK event for state node: {}", event);
+        }
+      }
+    }), stateNodeDataCallback, Threads.SAME_THREAD_EXECUTOR);
+  }
+
+  /**
+   * Returns the path prefix for creating sequential message node for the remote service.
+   */
+  private String getMessagePrefix() {
+    return getZKPath("messages/msg");
+  }
+
+  /**
+   * Returns the zookeeper node path for the ephemeral instance node for this runId.
+   */
+  private String getInstancePath() {
+    return String.format("/instances/%s", getRunId().getId());
+  }
+
+  private String getZKPath(String path) {
+    return String.format("/%s/%s", getRunId().getId(), path);
+  }
+
+  private final class InstanceNodeDataCallback implements FutureCallback<NodeData> {
+
+    @Override
+    public void onSuccess(NodeData result) {
+      instanceNodeUpdated(result);
+    }
+
+    @Override
+    public void onFailure(Throwable t) {
+      LOG.error("Failed in fetching instance node data.", t);
+      if (t instanceof KeeperException && ((KeeperException) t).code() == KeeperException.Code.NONODE) {
+        // If the node is gone, treat the remote service stopped.
+        forceShutDown();
+      } else {
+        instanceNodeUpdated(null);
+      }
+    }
+  }
+
+  private final class StateNodeDataCallback implements FutureCallback<NodeData> {
+
+    @Override
+    public void onSuccess(NodeData result) {
+      byte[] data = result.getData();
+      if (data == null) {
+        stateNodeUpdated(null);
+        return;
+      }
+      StateNode stateNode = new GsonBuilder().registerTypeAdapter(StateNode.class, new StateNodeCodec())
+        .registerTypeAdapter(StackTraceElement.class, new StackTraceElementCodec())
+        .create()
+        .fromJson(new String(data, Charsets.UTF_8), StateNode.class);
+
+      stateNodeUpdated(stateNode);
+    }
+
+    @Override
+    public void onFailure(Throwable t) {
+      LOG.error("Failed in fetching state node data.", t);
+      stateNodeUpdated(null);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/ApplicationBundler.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/ApplicationBundler.java b/twill-core/src/main/java/org/apache/twill/internal/ApplicationBundler.java
new file mode 100644
index 0000000..a0e9a71
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/ApplicationBundler.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+import org.apache.twill.filesystem.Location;
+import org.apache.twill.internal.utils.Dependencies;
+import com.google.common.base.Function;
+import com.google.common.base.Splitter;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Files;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.jar.JarEntry;
+import java.util.jar.JarOutputStream;
+import java.util.zip.CRC32;
+import java.util.zip.CheckedOutputStream;
+
+/**
+ * This class builds jar files based on class dependencies.
+ */
+public final class ApplicationBundler {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ApplicationBundler.class);
+  
+  public static final String SUBDIR_CLASSES = "classes/";
+  public static final String SUBDIR_LIB = "lib/";
+  public static final String SUBDIR_RESOURCES = "resources/";
+
+  private final List<String> excludePackages;
+  private final List<String> includePackages;
+  private final Set<String> bootstrapClassPaths;
+  private final CRC32 crc32;
+
+  /**
+   * Constructs a ApplicationBundler.
+   *
+   * @param excludePackages Class packages to exclude
+   */
+  public ApplicationBundler(Iterable<String> excludePackages) {
+    this(excludePackages, ImmutableList.<String>of());
+  }
+
+  /**
+   * Constructs a ApplicationBundler.
+   *
+   * @param excludePackages Class packages to exclude
+   * @param includePackages Class packages that should be included. Anything in this list will override the
+   *                        one provided in excludePackages.
+   */
+  public ApplicationBundler(Iterable<String> excludePackages, Iterable<String> includePackages) {
+    this.excludePackages = ImmutableList.copyOf(excludePackages);
+    this.includePackages = ImmutableList.copyOf(includePackages);
+
+    ImmutableSet.Builder<String> builder = ImmutableSet.builder();
+    for (String classpath : Splitter.on(File.pathSeparatorChar).split(System.getProperty("sun.boot.class.path"))) {
+      File file = new File(classpath);
+      builder.add(file.getAbsolutePath());
+      try {
+        builder.add(file.getCanonicalPath());
+      } catch (IOException e) {
+        // Ignore the exception and proceed.
+      }
+    }
+    this.bootstrapClassPaths = builder.build();
+    this.crc32 = new CRC32();
+
+  }
+
+  public void createBundle(Location target, Iterable<Class<?>> classes) throws IOException {
+    createBundle(target, classes, ImmutableList.<URI>of());
+  }
+
+  /**
+   * Same as calling {@link #createBundle(Location, Iterable)}.
+   */
+  public void createBundle(Location target, Class<?> clz, Class<?>...classes) throws IOException {
+    createBundle(target, ImmutableSet.<Class<?>>builder().add(clz).add(classes).build());
+  }
+
+  /**
+   * Creates a jar file which includes all the given classes and all the classes that they depended on.
+   * The jar will also include all classes and resources under the packages as given as include packages
+   * in the constructor.
+   *
+   * @param target Where to save the target jar file.
+   * @param resources Extra resources to put into the jar file. If resource is a jar file, it'll be put under
+   *                  lib/ entry, otherwise under the resources/ entry.
+   * @param classes Set of classes to start the dependency traversal.
+   * @throws IOException
+   */
+  public void createBundle(Location target, Iterable<Class<?>> classes, Iterable<URI> resources) throws IOException {
+    LOG.debug("start creating bundle {}. building a temporary file locally at first", target.getName());
+    // Write the jar to local tmp file first
+    File tmpJar = File.createTempFile(target.getName(), ".tmp");
+    try {
+      Set<String> entries = Sets.newHashSet();
+      JarOutputStream jarOut = new JarOutputStream(new FileOutputStream(tmpJar));
+      try {
+        // Find class dependencies
+        findDependencies(classes, entries, jarOut);
+
+        // Add extra resources
+        for (URI resource : resources) {
+          copyResource(resource, entries, jarOut);
+        }
+      } finally {
+        jarOut.close();
+      }
+      LOG.debug("copying temporary bundle to destination {} ({} bytes)", target.toURI(), tmpJar.length());
+      // Copy the tmp jar into destination.
+      OutputStream os = null; 
+      try {
+        os = new BufferedOutputStream(target.getOutputStream());
+        Files.copy(tmpJar, os);
+      } catch (IOException e) {
+        throw new IOException("failed to copy bundle from " + tmpJar.toURI() + " to " + target.toURI(), e);
+      } finally {
+        if (os != null) {
+          os.close();
+        }
+      }
+      LOG.debug("finished creating bundle at {}", target.toURI());
+    } finally {
+      tmpJar.delete();
+      LOG.debug("cleaned up local temporary for bundle {}", tmpJar.toURI());
+    }
+  }
+
+  private void findDependencies(Iterable<Class<?>> classes, final Set<String> entries,
+                                final JarOutputStream jarOut) throws IOException {
+
+    Iterable<String> classNames = Iterables.transform(classes, new Function<Class<?>, String>() {
+      @Override
+      public String apply(Class<?> input) {
+        return input.getName();
+      }
+    });
+
+    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+    if (classLoader == null) {
+      classLoader = getClass().getClassLoader();
+    }
+    Dependencies.findClassDependencies(classLoader, new Dependencies.ClassAcceptor() {
+      @Override
+      public boolean accept(String className, URL classUrl, URL classPathUrl) {
+        if (bootstrapClassPaths.contains(classPathUrl.getFile())) {
+          return false;
+        }
+
+        boolean shouldInclude = false;
+        for (String include : includePackages) {
+          if (className.startsWith(include)) {
+            shouldInclude = true;
+            break;
+          }
+        }
+
+        if (!shouldInclude) {
+          for (String exclude : excludePackages) {
+            if (className.startsWith(exclude)) {
+              return false;
+            }
+          }
+        }
+
+        putEntry(className, classUrl, classPathUrl, entries, jarOut);
+        return true;
+      }
+    }, classNames);
+  }
+
+  private void putEntry(String className, URL classUrl, URL classPathUrl, Set<String> entries, JarOutputStream jarOut) {
+    String classPath = classPathUrl.getFile();
+    if (classPath.endsWith(".jar")) {
+      saveDirEntry(SUBDIR_LIB, entries, jarOut);
+      saveEntry(SUBDIR_LIB + classPath.substring(classPath.lastIndexOf('/') + 1), classPathUrl, entries, jarOut, false);
+    } else {
+      // Class file, put it under the classes directory
+      saveDirEntry(SUBDIR_CLASSES, entries, jarOut);
+      if ("file".equals(classPathUrl.getProtocol())) {
+        // Copy every files under the classPath
+        try {
+          copyDir(new File(classPathUrl.toURI()), SUBDIR_CLASSES, entries, jarOut);
+        } catch (Exception e) {
+          throw Throwables.propagate(e);
+        }
+      } else {
+        String entry = SUBDIR_CLASSES + className.replace('.', '/') + ".class";
+        saveDirEntry(entry.substring(0, entry.lastIndexOf('/') + 1), entries, jarOut);
+        saveEntry(entry, classUrl, entries, jarOut, true);
+      }
+    }
+  }
+
+  /**
+   * Saves a directory entry to the jar output.
+   */
+  private void saveDirEntry(String path, Set<String> entries, JarOutputStream jarOut) {
+    if (entries.contains(path)) {
+      return;
+    }
+
+    try {
+      String entry = "";
+      for (String dir : Splitter.on('/').omitEmptyStrings().split(path)) {
+        entry += dir + '/';
+        if (entries.add(entry)) {
+          JarEntry jarEntry = new JarEntry(entry);
+          jarEntry.setMethod(JarOutputStream.STORED);
+          jarEntry.setSize(0L);
+          jarEntry.setCrc(0L);
+          jarOut.putNextEntry(jarEntry);
+          jarOut.closeEntry();
+        }
+      }
+    } catch (IOException e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  /**
+   * Saves a class entry to the jar output.
+   */
+  private void saveEntry(String entry, URL url, Set<String> entries, JarOutputStream jarOut, boolean compress) {
+    LOG.debug("adding bundle entry " + entry);
+    if (!entries.add(entry)) {
+      return;
+    }
+    try {
+      JarEntry jarEntry = new JarEntry(entry);
+      InputStream is = url.openStream();
+
+      try {
+        if (compress) {
+          jarOut.putNextEntry(jarEntry);
+          ByteStreams.copy(is, jarOut);
+        } else {
+          crc32.reset();
+          TransferByteOutputStream os = new TransferByteOutputStream();
+          CheckedOutputStream checkedOut = new CheckedOutputStream(os, crc32);
+          ByteStreams.copy(is, checkedOut);
+          checkedOut.close();
+
+          long size = os.size();
+          jarEntry.setMethod(JarEntry.STORED);
+          jarEntry.setSize(size);
+          jarEntry.setCrc(checkedOut.getChecksum().getValue());
+          jarOut.putNextEntry(jarEntry);
+          os.transfer(jarOut);
+        }
+      } finally {
+        is.close();
+      }
+      jarOut.closeEntry();
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+
+  /**
+   * Copies all entries under the file path.
+   */
+  private void copyDir(File baseDir, String entryPrefix,
+                       Set<String> entries, JarOutputStream jarOut) throws IOException {
+    LOG.debug("adding whole dir {} to bundle at '{}'", baseDir, entryPrefix);
+    URI baseUri = baseDir.toURI();
+    Queue<File> queue = Lists.newLinkedList();
+    Collections.addAll(queue, baseDir.listFiles());
+    while (!queue.isEmpty()) {
+      File file = queue.remove();
+
+      String entry = entryPrefix + baseUri.relativize(file.toURI()).getPath();
+      if (entries.add(entry)) {
+        jarOut.putNextEntry(new JarEntry(entry));
+        if (file.isFile()) {
+          try {
+            Files.copy(file, jarOut);
+          } catch (IOException e) {
+            throw new IOException("failure copying from " + file.getAbsoluteFile() + " to JAR file entry " + entry, e);
+          }
+        }
+        jarOut.closeEntry();
+      }
+
+      if (file.isDirectory()) {
+        File[] files = file.listFiles();
+        if (files != null) {
+          queue.addAll(Arrays.asList(files));
+        }
+      }
+    }
+  }
+
+  private void copyResource(URI resource, Set<String> entries, JarOutputStream jarOut) throws IOException {
+    if ("file".equals(resource.getScheme())) {
+      File file = new File(resource);
+      if (file.isDirectory()) {
+        saveDirEntry(SUBDIR_RESOURCES, entries, jarOut);
+        copyDir(file, SUBDIR_RESOURCES, entries, jarOut);
+        return;
+      }
+    }
+
+    URL url = resource.toURL();
+    String path = url.getFile();
+    String prefix = path.endsWith(".jar") ? SUBDIR_LIB : SUBDIR_RESOURCES;
+    path = prefix + path.substring(path.lastIndexOf('/') + 1);
+
+    saveDirEntry(prefix, entries, jarOut);
+    jarOut.putNextEntry(new JarEntry(path));
+    InputStream is = url.openStream();
+    try {
+      ByteStreams.copy(is, jarOut);
+    } finally {
+      is.close();
+    }
+  }
+
+  private static final class TransferByteOutputStream extends ByteArrayOutputStream {
+
+    public void transfer(OutputStream os) throws IOException {
+      os.write(buf, 0, count);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/Arguments.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/Arguments.java b/twill-core/src/main/java/org/apache/twill/internal/Arguments.java
new file mode 100644
index 0000000..a78547c
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/Arguments.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Multimap;
+
+import java.util.List;
+
+/**
+ * Class that encapsulate application arguments and per runnable arguments.
+ */
+public final class Arguments {
+
+  private final List<String> arguments;
+  private final Multimap<String, String> runnableArguments;
+
+  public Arguments(List<String> arguments, Multimap<String, String> runnableArguments) {
+    this.arguments = ImmutableList.copyOf(arguments);
+    this.runnableArguments = ImmutableMultimap.copyOf(runnableArguments);
+  }
+
+  public List<String> getArguments() {
+    return arguments;
+  }
+
+  public Multimap<String, String> getRunnableArguments() {
+    return runnableArguments;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/BasicTwillContext.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/BasicTwillContext.java b/twill-core/src/main/java/org/apache/twill/internal/BasicTwillContext.java
new file mode 100644
index 0000000..61bdaef
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/BasicTwillContext.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+import org.apache.twill.api.RunId;
+import org.apache.twill.api.TwillContext;
+import org.apache.twill.api.TwillRunnableSpecification;
+import org.apache.twill.common.Cancellable;
+import org.apache.twill.discovery.Discoverable;
+import org.apache.twill.discovery.DiscoveryService;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+/**
+ *
+ */
+public final class BasicTwillContext implements TwillContext {
+
+  private final RunId runId;
+  private final RunId appRunId;
+  private final InetAddress host;
+  private final String[] args;
+  private final String[] appArgs;
+  private final TwillRunnableSpecification spec;
+  private final int instanceId;
+  private final DiscoveryService discoveryService;
+  private final int allowedMemoryMB;
+  private final int virtualCores;
+  private volatile int instanceCount;
+
+  public BasicTwillContext(RunId runId, RunId appRunId, InetAddress host, String[] args, String[] appArgs,
+                           TwillRunnableSpecification spec, int instanceId, DiscoveryService discoveryService,
+                           int instanceCount, int allowedMemoryMB, int virtualCores) {
+    this.runId = runId;
+    this.appRunId = appRunId;
+    this.host = host;
+    this.args = args;
+    this.appArgs = appArgs;
+    this.spec = spec;
+    this.instanceId = instanceId;
+    this.discoveryService = discoveryService;
+    this.instanceCount = instanceCount;
+    this.allowedMemoryMB = allowedMemoryMB;
+    this.virtualCores = virtualCores;
+  }
+
+  @Override
+  public RunId getRunId() {
+    return runId;
+  }
+
+  @Override
+  public RunId getApplicationRunId() {
+    return appRunId;
+  }
+
+  @Override
+  public int getInstanceCount() {
+    return instanceCount;
+  }
+
+  public void setInstanceCount(int count) {
+    this.instanceCount = count;
+  }
+
+  @Override
+  public InetAddress getHost() {
+    return host;
+  }
+
+  @Override
+  public String[] getArguments() {
+    return args;
+  }
+
+  @Override
+  public String[] getApplicationArguments() {
+    return appArgs;
+  }
+
+  @Override
+  public TwillRunnableSpecification getSpecification() {
+    return spec;
+  }
+
+  @Override
+  public int getInstanceId() {
+    return instanceId;
+  }
+
+  @Override
+  public int getVirtualCores() {
+    return virtualCores;
+  }
+
+  @Override
+  public int getMaxMemoryMB() {
+    return allowedMemoryMB;
+  }
+
+  @Override
+  public Cancellable announce(final String serviceName, final int port) {
+    return discoveryService.register(new Discoverable() {
+      @Override
+      public String getName() {
+        return serviceName;
+      }
+
+      @Override
+      public InetSocketAddress getSocketAddress() {
+        return new InetSocketAddress(getHost(), port);
+      }
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/Configs.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/Configs.java b/twill-core/src/main/java/org/apache/twill/internal/Configs.java
new file mode 100644
index 0000000..0fa1df8
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/Configs.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+/**
+ *
+ */
+public final class Configs {
+
+  public static final class Keys {
+    /**
+     * Size in MB of reserved memory for Java process (non-heap memory).
+     */
+    public static final String JAVA_RESERVED_MEMORY_MB = "twill.java.reserved.memory.mb";
+
+    private Keys() {
+    }
+  }
+
+  public static final class Defaults {
+    // By default have 200MB reserved for Java process.
+    public static final int JAVA_RESERVED_MEMORY_MB = 200;
+
+    private Defaults() {
+    }
+  }
+
+  private Configs() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/Constants.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/Constants.java b/twill-core/src/main/java/org/apache/twill/internal/Constants.java
new file mode 100644
index 0000000..0387d3e
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/Constants.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+/**
+ * This class contains collection of common constants used in Twill.
+ */
+public final class Constants {
+
+  public static final String LOG_TOPIC = "log";
+
+  /** Maximum number of seconds for AM to start. */
+  public static final int APPLICATION_MAX_START_SECONDS = 60;
+  /** Maximum number of seconds for AM to stop. */
+  public static final int APPLICATION_MAX_STOP_SECONDS = 60;
+
+  public static final long PROVISION_TIMEOUT = 30000;
+
+  /** Memory size of AM */
+  public static final int APP_MASTER_MEMORY_MB = 512;
+
+  public static final int APP_MASTER_RESERVED_MEMORY_MB = 150;
+
+  public static final String STDOUT = "stdout";
+  public static final String STDERR = "stderr";
+
+  /**
+   * Constants for names of internal files that are shared between client, AM and containers.
+   */
+  public static final class Files {
+
+    public static final String LAUNCHER_JAR = "launcher.jar";
+    public static final String APP_MASTER_JAR = "appMaster.jar";
+    public static final String CONTAINER_JAR = "container.jar";
+    public static final String LOCALIZE_FILES = "localizeFiles.json";
+    public static final String TWILL_SPEC = "twillSpec.json";
+    public static final String ARGUMENTS = "arguments.json";
+    public static final String LOGBACK_TEMPLATE = "logback-template.xml";
+    public static final String KAFKA = "kafka.tgz";
+    public static final String JVM_OPTIONS = "jvm.opts";
+    public static final String CREDENTIALS = "credentials.store";
+
+    private Files() {
+    }
+  }
+
+  private Constants() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/ContainerInfo.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/ContainerInfo.java b/twill-core/src/main/java/org/apache/twill/internal/ContainerInfo.java
new file mode 100644
index 0000000..67c21d3
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/ContainerInfo.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+import java.net.InetAddress;
+
+/**
+ * Represents information of the container that the processing is/will be running in.
+ */
+public interface ContainerInfo {
+
+  String getId();
+
+  InetAddress getHost();
+
+  int getPort();
+
+  int getMemoryMB();
+
+  int getVirtualCores();
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/ContainerLiveNodeData.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/ContainerLiveNodeData.java b/twill-core/src/main/java/org/apache/twill/internal/ContainerLiveNodeData.java
new file mode 100644
index 0000000..705943c
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/ContainerLiveNodeData.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+/**
+ *
+ */
+public final class ContainerLiveNodeData {
+
+  private final String containerId;
+  private final String host;
+
+  public ContainerLiveNodeData(String containerId, String host) {
+    this.containerId = containerId;
+    this.host = host;
+  }
+
+  public String getContainerId() {
+    return containerId;
+  }
+
+  public String getHost() {
+    return host;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/EnvContainerInfo.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/EnvContainerInfo.java b/twill-core/src/main/java/org/apache/twill/internal/EnvContainerInfo.java
new file mode 100644
index 0000000..fd50028
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/EnvContainerInfo.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+/**
+ * A {@link ContainerInfo} based on information on the environment.
+ */
+public final class EnvContainerInfo implements ContainerInfo {
+  private final String id;
+  private final InetAddress host;
+  private final int port;
+  private final int virtualCores;
+  private final int memoryMB;
+
+  public EnvContainerInfo() throws UnknownHostException {
+    id = System.getenv(EnvKeys.YARN_CONTAINER_ID);
+    host = InetAddress.getByName(System.getenv(EnvKeys.YARN_CONTAINER_HOST));
+    port = Integer.parseInt(System.getenv(EnvKeys.YARN_CONTAINER_PORT));
+    virtualCores = Integer.parseInt(System.getenv(EnvKeys.YARN_CONTAINER_VIRTUAL_CORES));
+    memoryMB = Integer.parseInt(System.getenv(EnvKeys.YARN_CONTAINER_MEMORY_MB));
+  }
+
+  @Override
+  public String getId() {
+    return id;
+  }
+
+  @Override
+  public InetAddress getHost() {
+    return host;
+  }
+
+  @Override
+  public int getPort() {
+    return port;
+  }
+
+  @Override
+  public int getMemoryMB() {
+    return memoryMB;
+  }
+
+  @Override
+  public int getVirtualCores() {
+    return virtualCores;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/EnvKeys.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/EnvKeys.java b/twill-core/src/main/java/org/apache/twill/internal/EnvKeys.java
new file mode 100644
index 0000000..9bf6523
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/EnvKeys.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+/**
+ * Places for define common environment keys.
+ */
+public final class EnvKeys {
+
+  public static final String TWILL_ZK_CONNECT = "TWILL_ZK_CONNECT";
+  public static final String TWILL_APP_RUN_ID = "TWILL_APP_RUN_ID";
+  public static final String TWILL_RUN_ID = "TWILL_RUN_ID";
+  public static final String TWILL_INSTANCE_ID = "TWILL_INSTANCE_ID";
+  public static final String TWILL_INSTANCE_COUNT = "TWILL_INSTANCE_COUNT";
+  public static final String TWILL_RESERVED_MEMORY_MB = "TWILL_RESERVED_MEMORY_MB";
+
+  public static final String TWILL_FS_USER = "TWILL_FS_USER";
+
+  /**
+   * Cluster filesystem directory for storing twill app related files.
+   */
+  public static final String TWILL_APP_DIR = "TWILL_APP_DIR";
+
+  public static final String TWILL_APP_NAME = "TWILL_APP_NAME";
+  public static final String TWILL_RUNNABLE_NAME = "TWILL_RUNNABLE_NAME";
+
+  public static final String TWILL_LOG_KAFKA_ZK = "TWILL_LOG_KAFKA_ZK";
+
+  public static final String YARN_APP_ID = "YARN_APP_ID";
+  public static final String YARN_APP_ID_CLUSTER_TIME = "YARN_APP_ID_CLUSTER_TIME";
+  public static final String YARN_APP_ID_STR = "YARN_APP_ID_STR";
+
+  public static final String YARN_CONTAINER_ID = "YARN_CONTAINER_ID";
+  public static final String YARN_CONTAINER_HOST = "YARN_CONTAINER_HOST";
+  public static final String YARN_CONTAINER_PORT = "YARN_CONTAINER_PORT";
+  /**
+   * Used to inform runnables of their resource usage.
+   */
+  public static final String YARN_CONTAINER_VIRTUAL_CORES = "YARN_CONTAINER_VIRTUAL_CORES";
+  public static final String YARN_CONTAINER_MEMORY_MB = "YARN_CONTAINER_MEMORY_MB";
+
+  private EnvKeys() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/ListenerExecutor.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/ListenerExecutor.java b/twill-core/src/main/java/org/apache/twill/internal/ListenerExecutor.java
new file mode 100644
index 0000000..9d3e156
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/ListenerExecutor.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Service;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+
+/**
+ * Wrapper for {@link Service.Listener} to have callback executed on a given {@link Executor}.
+ * Also make sure each method is called at most once.
+ */
+final class ListenerExecutor implements Service.Listener {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ListenerExecutor.class);
+
+  private final Service.Listener delegate;
+  private final Executor executor;
+  private final ConcurrentMap<Service.State, Boolean> callStates = Maps.newConcurrentMap();
+
+  ListenerExecutor(Service.Listener delegate, Executor executor) {
+    this.delegate = delegate;
+    this.executor = executor;
+  }
+
+  @Override
+  public void starting() {
+    if (hasCalled(Service.State.STARTING)) {
+      return;
+    }
+    executor.execute(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          delegate.starting();
+        } catch (Throwable t) {
+          LOG.warn("Exception thrown from listener", t);
+        }
+      }
+    });
+  }
+
+  @Override
+  public void running() {
+    if (hasCalled(Service.State.RUNNING)) {
+      return;
+    }
+    executor.execute(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          delegate.running();
+        } catch (Throwable t) {
+          LOG.warn("Exception thrown from listener", t);
+        }
+      }
+    });
+  }
+
+  @Override
+  public void stopping(final Service.State from) {
+    if (hasCalled(Service.State.STOPPING)) {
+      return;
+    }
+    executor.execute(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          delegate.stopping(from);
+        } catch (Throwable t) {
+          LOG.warn("Exception thrown from listener", t);
+        }
+      }
+    });
+  }
+
+  @Override
+  public void terminated(final Service.State from) {
+    if (hasCalled(Service.State.TERMINATED)) {
+      return;
+    }
+    executor.execute(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          delegate.terminated(from);
+        } catch (Throwable t) {
+          LOG.warn("Exception thrown from listener", t);
+        }
+      }
+    });
+  }
+
+  @Override
+  public void failed(final Service.State from, final Throwable failure) {
+    // Both failed and terminate are using the same state for checking as only either one could be called.
+    if (hasCalled(Service.State.TERMINATED)) {
+      return;
+    }
+    executor.execute(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          delegate.failed(from, failure);
+        } catch (Throwable t) {
+          LOG.warn("Exception thrown from listener", t);
+        }
+      }
+    });
+  }
+
+  private boolean hasCalled(Service.State state) {
+    return callStates.putIfAbsent(state, true) != null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/LogOnlyEventHandler.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/LogOnlyEventHandler.java b/twill-core/src/main/java/org/apache/twill/internal/LogOnlyEventHandler.java
new file mode 100644
index 0000000..4f71a05
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/LogOnlyEventHandler.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+import org.apache.twill.api.EventHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public final class LogOnlyEventHandler extends EventHandler {
+
+  private static final Logger LOG = LoggerFactory.getLogger(LogOnlyEventHandler.class);
+
+  @Override
+  public TimeoutAction launchTimeout(Iterable<TimeoutEvent> timeoutEvents) {
+    for (TimeoutEvent event : timeoutEvents) {
+      LOG.info("Requested {} containers for runnable {}, only got {} after {} ms.",
+               event.getExpectedInstances(), event.getRunnableName(),
+               event.getActualInstances(), System.currentTimeMillis() - event.getRequestTime());
+    }
+    return TimeoutAction.recheck(Constants.PROVISION_TIMEOUT, TimeUnit.MILLISECONDS);
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/ProcessController.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/ProcessController.java b/twill-core/src/main/java/org/apache/twill/internal/ProcessController.java
new file mode 100644
index 0000000..4453838
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/ProcessController.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+import org.apache.twill.common.Cancellable;
+
+/**
+ * For controlling a launch yarn process.
+ *
+ * @param <R> Report type.
+ */
+public interface ProcessController<R> extends Cancellable {
+
+  R getReport();
+
+  /**
+   * Request to stop the running process.
+   */
+  void cancel();
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/ProcessLauncher.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/ProcessLauncher.java b/twill-core/src/main/java/org/apache/twill/internal/ProcessLauncher.java
new file mode 100644
index 0000000..e48a226
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/ProcessLauncher.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+import org.apache.twill.api.LocalFile;
+
+import java.util.Map;
+
+/**
+ * Class for launching container process.
+ *
+ * @param <T> Type of the object that contains information about the container that the process is going to launch.
+ */
+public interface ProcessLauncher<T> {
+
+  /**
+   * Returns information about the container that this launch would launch process in.
+   */
+  T getContainerInfo();
+
+  /**
+   * Returns a preparer with the given default set of environments, resources and credentials.
+   */
+  <C> PrepareLaunchContext prepareLaunch(Map<String, String> environments,
+                                         Iterable<LocalFile> resources, C credentials);
+
+  /**
+   * For setting up the launcher.
+   */
+  interface PrepareLaunchContext {
+
+    ResourcesAdder withResources();
+
+    AfterResources noResources();
+
+    interface ResourcesAdder {
+      MoreResources add(LocalFile localFile);
+    }
+
+    interface AfterResources {
+      EnvironmentAdder withEnvironment();
+
+      AfterEnvironment noEnvironment();
+    }
+
+    interface EnvironmentAdder {
+      <V> MoreEnvironment add(String key, V value);
+    }
+
+    interface MoreEnvironment extends EnvironmentAdder, AfterEnvironment {
+    }
+
+    interface AfterEnvironment {
+      CommandAdder withCommands();
+    }
+
+    interface MoreResources extends ResourcesAdder, AfterResources { }
+
+    interface CommandAdder {
+      StdOutSetter add(String cmd, String...args);
+    }
+
+    interface StdOutSetter {
+      StdErrSetter redirectOutput(String stdout);
+
+      StdErrSetter noOutput();
+    }
+
+    interface StdErrSetter {
+      MoreCommand redirectError(String stderr);
+
+      MoreCommand noError();
+    }
+
+    interface MoreCommand extends CommandAdder {
+      <R> ProcessController<R> launch();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/SingleRunnableApplication.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/SingleRunnableApplication.java b/twill-core/src/main/java/org/apache/twill/internal/SingleRunnableApplication.java
new file mode 100644
index 0000000..a52afe1
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/SingleRunnableApplication.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+import org.apache.twill.api.ResourceSpecification;
+import org.apache.twill.api.TwillApplication;
+import org.apache.twill.api.TwillRunnable;
+import org.apache.twill.api.TwillRunnableSpecification;
+import org.apache.twill.api.TwillSpecification;
+
+/**
+ * A simple {@link org.apache.twill.api.TwillApplication} that contains only one {@link org.apache.twill.api.TwillRunnable}.
+ */
+public class SingleRunnableApplication implements TwillApplication {
+
+  private final TwillRunnable runnable;
+  private final ResourceSpecification resourceSpec;
+
+  public SingleRunnableApplication(TwillRunnable runnable, ResourceSpecification resourceSpec) {
+    this.runnable = runnable;
+    this.resourceSpec = resourceSpec;
+  }
+
+  @Override
+  public TwillSpecification configure() {
+    TwillRunnableSpecification runnableSpec = runnable.configure();
+    return TwillSpecification.Builder.with()
+      .setName(runnableSpec.getName())
+      .withRunnable().add(runnableSpec.getName(), runnable, resourceSpec)
+      .noLocalFiles()
+      .anyOrder()
+      .build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/TwillContainerController.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerController.java b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerController.java
new file mode 100644
index 0000000..8b090bd
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerController.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+import org.apache.twill.api.ServiceController;
+import org.apache.twill.internal.state.Message;
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * A {@link ServiceController} that allows sending a message directly. Internal use only.
+ */
+public interface TwillContainerController extends ServiceController {
+
+  ListenableFuture<Message> sendMessage(Message message);
+
+  /**
+   * Calls to indicated that the container that this controller is associated with is completed.
+   * Any resources it hold will be releases and all pending futures will be cancelled.
+   */
+  void completed(int exitStatus);
+}