You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/12/12 23:00:08 UTC

[26/28] Making maven site works.

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/common/src/main/java/org/apache/twill/filesystem/Location.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/twill/filesystem/Location.java b/common/src/main/java/org/apache/twill/filesystem/Location.java
deleted file mode 100644
index dee9546..0000000
--- a/common/src/main/java/org/apache/twill/filesystem/Location.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.filesystem;
-
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.URI;
-
-/**
- * This interface defines the location and operations of a resource on the filesystem.
- * <p>
- * {@link Location} is agnostic to the type of file system the resource is on.
- * </p>
- */
-public interface Location {
-  /**
-   * Suffix added to every temp file name generated with {@link #getTempFile(String)}.
-   */
-  static final String TEMP_FILE_SUFFIX = ".tmp";
-
-  /**
-   * Checks if the this location exists.
-   *
-   * @return true if found; false otherwise.
-   * @throws IOException
-   */
-  boolean exists() throws IOException;
-
-  /**
-   * @return Returns the name of the file or directory denoteed by this abstract pathname.
-   */
-  String getName();
-
-  /**
-   * Atomically creates a new, empty file named by this abstract pathname if and only if a file with this name
-   * does not yet exist.
-   * @return {@code true} if the file is successfully create, {@code false} otherwise.
-   * @throws IOException
-   */
-  boolean createNew() throws IOException;
-
-  /**
-   * @return An {@link java.io.InputStream} for this location.
-   * @throws IOException
-   */
-  InputStream getInputStream() throws IOException;
-
-  /**
-   * @return An {@link java.io.OutputStream} for this location.
-   * @throws IOException
-   */
-  OutputStream getOutputStream() throws IOException;
-
-  /**
-   * Creates an {@link OutputStream} for this location with the given permission. The actual permission supported
-   * depends on implementation.
-   *
-   * @param permission A POSIX permission string.
-   * @return An {@link OutputStream} for writing to this location.
-   * @throws IOException If failed to create the {@link OutputStream}.
-   */
-  OutputStream getOutputStream(String permission) throws IOException;
-
-  /**
-   * Appends the child to the current {@link Location}.
-   * <p>
-   * Returns a new instance of Location.
-   * </p>
-   *
-   * @param child to be appended to this location.
-   * @return A new instance of {@link Location}
-   * @throws IOException
-   */
-  Location append(String child) throws IOException;
-
-  /**
-   * Returns unique location for temporary file to be placed near this location.
-   * Allows all temp files to follow same pattern for easier management of them.
-   * @param suffix part of the file name to include in the temp file name
-   * @return location of the temp file
-   * @throws IOException
-   */
-  Location getTempFile(String suffix) throws IOException;
-
-  /**
-   * @return A {@link java.net.URI} for this location.
-   */
-  URI toURI();
-
-  /**
-   * Deletes the file or directory denoted by this abstract pathname. If this
-   * pathname denotes a directory, then the directory must be empty in order
-   * to be deleted.
-   *
-   * @return true if and only if the file or directory is successfully delete; false otherwise.
-   */
-  boolean delete() throws IOException;
-
-  /**
-   * Deletes the file or directory denoted by this abstract pathname. If this
-   * pathname denotes a directory and {@code recursive} is {@code true}, then content of the
-   * directory will be deleted recursively, otherwise the directory must be empty in order to be deleted.
-   * Note that when calling this method with {@code recursive = true} for a directory, any
-   * failure during deletion will have some entries inside the directory being deleted while some are not.
-   *
-   * @param recursive Indicate if recursively delete a directory. Ignored if the pathname represents a file.
-   * @return true if and only if the file or directory is successfully delete; false otherwise.
-   */
-  boolean delete(boolean recursive) throws IOException;
-
-  /**
-   * Moves the file or directory denoted by this abstract pathname.
-   *
-   * @param destination destination location
-   * @return new location if and only if the file or directory is successfully moved; null otherwise.
-   */
-  @Nullable
-  Location renameTo(Location destination) throws IOException;
-
-  /**
-   * Creates the directory named by this abstract pathname, including any necessary
-   * but nonexistent parent directories.
-   *
-   * @return true if and only if the renaming succeeded; false otherwise
-   */
-  boolean mkdirs() throws IOException;
-
-  /**
-   * @return Length of file.
-   */
-  long length() throws IOException;
-
-  /**
-   * @return Last modified time of file.
-   */
-  long lastModified() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/common/src/main/java/org/apache/twill/filesystem/LocationFactories.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/twill/filesystem/LocationFactories.java b/common/src/main/java/org/apache/twill/filesystem/LocationFactories.java
deleted file mode 100644
index 751a632..0000000
--- a/common/src/main/java/org/apache/twill/filesystem/LocationFactories.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.filesystem;
-
-import com.google.common.base.Throwables;
-
-import java.io.IOException;
-import java.net.URI;
-
-/**
- * Providers helper methods for creating different {@link LocationFactory}.
- */
-public final class LocationFactories {
-
-  /**
-   * Creates a {@link LocationFactory} that always applies the giving namespace prefix.
-   */
-  public static LocationFactory namespace(LocationFactory delegate, final String namespace) {
-    return new ForwardingLocationFactory(delegate) {
-      @Override
-      public Location create(String path) {
-        try {
-          Location base = getDelegate().create(namespace);
-          return base.append(path);
-        } catch (IOException e) {
-          throw Throwables.propagate(e);
-        }
-      }
-
-      @Override
-      public Location create(URI uri) {
-        if (uri.isAbsolute()) {
-          return getDelegate().create(uri);
-        }
-        try {
-          Location base = getDelegate().create(namespace);
-          return base.append(uri.getPath());
-        } catch (IOException e) {
-          throw Throwables.propagate(e);
-        }
-      }
-
-      @Override
-      public Location getHomeLocation() {
-        return getDelegate().getHomeLocation();
-      }
-    };
-  }
-
-  private LocationFactories() {
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/common/src/main/java/org/apache/twill/filesystem/LocationFactory.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/twill/filesystem/LocationFactory.java b/common/src/main/java/org/apache/twill/filesystem/LocationFactory.java
deleted file mode 100644
index f88d94d..0000000
--- a/common/src/main/java/org/apache/twill/filesystem/LocationFactory.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.filesystem;
-
-import java.net.URI;
-
-/**
- * Factory for creating instance of {@link Location}.
- */
-public interface LocationFactory {
-
-  /**
-   * Creates an instance of {@link Location} of the given path.
-   * @param path The path representing the location.
-   * @return An instance of {@link Location}.
-   */
-  Location create(String path);
-
-  /**
-   * Creates an instance of {@link Location} based on {@link java.net.URI} <code>uri</code>.
-   *
-   * @param uri to the resource on the filesystem.
-   * @return An instance of {@link Location}
-   */
-  Location create(URI uri);
-
-  /**
-   * Returns the home location.
-   */
-  Location getHomeLocation();
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/common/src/test/java/org/apache/twill/common/ServicesTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/twill/common/ServicesTest.java b/common/src/test/java/org/apache/twill/common/ServicesTest.java
deleted file mode 100644
index c0aa7ee..0000000
--- a/common/src/test/java/org/apache/twill/common/ServicesTest.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.common;
-
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.AbstractIdleService;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.Service;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * Unit test for {@link Services} methods.
- */
-public class ServicesTest {
-
-  private static final Logger LOG = LoggerFactory.getLogger(ServicesTest.class);
-
-  @Test
-  public void testChain() throws ExecutionException, InterruptedException {
-    AtomicBoolean transiting = new AtomicBoolean(false);
-    Service s1 = new DummyService("s1", transiting);
-    Service s2 = new DummyService("s2", transiting);
-    Service s3 = new DummyService("s3", transiting);
-
-    Futures.allAsList(Services.chainStart(s1, s2, s3).get()).get();
-    Futures.allAsList(Services.chainStop(s3, s2, s1).get()).get();
-  }
-
-  @Test
-  public void testCompletion() throws ExecutionException, InterruptedException {
-    Service service = new DummyService("s1", new AtomicBoolean());
-    ListenableFuture<Service.State> completion = Services.getCompletionFuture(service);
-
-    service.start();
-    service.stop();
-
-    completion.get();
-
-    AtomicBoolean transiting = new AtomicBoolean();
-    service = new DummyService("s2", transiting);
-    completion = Services.getCompletionFuture(service);
-
-    service.startAndWait();
-    transiting.set(true);
-    service.stop();
-
-    try {
-      completion.get();
-      Assert.assertTrue(false);
-    } catch (ExecutionException e) {
-      // Expected
-    }
-  }
-
-  private static final class DummyService extends AbstractIdleService {
-
-    private final String name;
-    private final AtomicBoolean transiting;
-
-    private DummyService(String name, AtomicBoolean transiting) {
-      this.name = name;
-      this.transiting = transiting;
-    }
-
-    @Override
-    protected void startUp() throws Exception {
-      Preconditions.checkState(transiting.compareAndSet(false, true));
-      LOG.info("Starting: " + name);
-      TimeUnit.MILLISECONDS.sleep(500);
-      LOG.info("Started: " + name);
-      Preconditions.checkState(transiting.compareAndSet(true, false));
-    }
-
-    @Override
-    protected void shutDown() throws Exception {
-      Preconditions.checkState(transiting.compareAndSet(false, true));
-      LOG.info("Stopping: " + name);
-      TimeUnit.MILLISECONDS.sleep(500);
-      LOG.info("Stopped: " + name);
-      Preconditions.checkState(transiting.compareAndSet(true, false));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/common/src/test/java/org/apache/twill/filesystem/LocalLocationTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/twill/filesystem/LocalLocationTest.java b/common/src/test/java/org/apache/twill/filesystem/LocalLocationTest.java
deleted file mode 100644
index 198f77f..0000000
--- a/common/src/test/java/org/apache/twill/filesystem/LocalLocationTest.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.filesystem;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-
-/**
- *
- */
-public class LocalLocationTest {
-
-  @Test
-  public void testDelete() throws IOException {
-    LocationFactory factory = new LocalLocationFactory(new File(System.getProperty("java.io.tmpdir")));
-
-    Location base = factory.create("test").getTempFile(".tmp");
-    Assert.assertTrue(base.mkdirs());
-
-    Assert.assertTrue(base.append("test1").getTempFile(".tmp").createNew());
-    Assert.assertTrue(base.append("test2").getTempFile(".tmp").createNew());
-
-    Location subDir = base.append("test3");
-    Assert.assertTrue(subDir.mkdirs());
-
-    Assert.assertTrue(subDir.append("test4").getTempFile(".tmp").createNew());
-    Assert.assertTrue(subDir.append("test5").getTempFile(".tmp").createNew());
-
-    Assert.assertTrue(base.delete(true));
-    Assert.assertFalse(base.exists());
-  }
-
-  @Test
-  public void testHelper() {
-    LocationFactory factory = LocationFactories.namespace(
-                                new LocalLocationFactory(new File(System.getProperty("java.io.tmpdir"))),
-                                "testhelper");
-
-    Location location = factory.create("test");
-    Assert.assertTrue(location.toURI().getPath().endsWith("testhelper/test"));
-
-    location = factory.create(URI.create("test2"));
-    Assert.assertTrue(location.toURI().getPath().endsWith("testhelper/test2"));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
deleted file mode 100644
index faff711..0000000
--- a/core/pom.xml
+++ /dev/null
@@ -1,89 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <parent>
-        <artifactId>twill-parent</artifactId>
-        <groupId>org.apache.twill</groupId>
-        <version>0.1.0-SNAPSHOT</version>
-    </parent>
-    <modelVersion>4.0.0</modelVersion>
-
-    <artifactId>twill-core</artifactId>
-    <name>Twill core library</name>
-
-    <dependencies>
-        <dependency>
-            <groupId>${project.groupId}</groupId>
-            <artifactId>twill-api</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>${project.groupId}</groupId>
-            <artifactId>twill-zookeeper</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>${project.groupId}</groupId>
-            <artifactId>twill-discovery-core</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>com.google.code.gson</groupId>
-            <artifactId>gson</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>io.netty</groupId>
-            <artifactId>netty</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.xerial.snappy</groupId>
-            <artifactId>snappy-java</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.ow2.asm</groupId>
-            <artifactId>asm-all</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-api</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>ch.qos.logback</groupId>
-            <artifactId>logback-core</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>ch.qos.logback</groupId>
-            <artifactId>logback-classic</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.commons</groupId>
-            <artifactId>commons-compress</artifactId>
-        </dependency>
-    </dependencies>
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/AbstractExecutionServiceController.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/AbstractExecutionServiceController.java b/core/src/main/java/org/apache/twill/internal/AbstractExecutionServiceController.java
deleted file mode 100644
index 974639d..0000000
--- a/core/src/main/java/org/apache/twill/internal/AbstractExecutionServiceController.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-import org.apache.twill.api.RunId;
-import org.apache.twill.api.ServiceController;
-import org.apache.twill.common.Threads;
-import com.google.common.util.concurrent.AbstractIdleService;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.Service;
-
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- *
- */
-public abstract class AbstractExecutionServiceController implements ServiceController {
-
-  private final RunId runId;
-  private final ListenerExecutors listenerExecutors;
-  private final Service serviceDelegate;
-
-  protected AbstractExecutionServiceController(RunId runId) {
-    this.runId = runId;
-    this.listenerExecutors = new ListenerExecutors();
-    this.serviceDelegate = new ServiceDelegate();
-  }
-
-  protected abstract void startUp();
-
-  protected abstract void shutDown();
-
-  @Override
-  public final RunId getRunId() {
-    return runId;
-  }
-
-  @Override
-  public final void addListener(Listener listener, Executor executor) {
-    listenerExecutors.addListener(new ListenerExecutor(listener, executor));
-  }
-
-  @Override
-  public final ListenableFuture<State> start() {
-    serviceDelegate.addListener(listenerExecutors, Threads.SAME_THREAD_EXECUTOR);
-    return serviceDelegate.start();
-  }
-
-  @Override
-  public final State startAndWait() {
-    return Futures.getUnchecked(start());
-  }
-
-  @Override
-  public final boolean isRunning() {
-    return serviceDelegate.isRunning();
-  }
-
-  @Override
-  public final State state() {
-    return serviceDelegate.state();
-  }
-
-  @Override
-  public final State stopAndWait() {
-    return Futures.getUnchecked(stop());
-  }
-
-  @Override
-  public final ListenableFuture<State> stop() {
-    return serviceDelegate.stop();
-  }
-
-  protected Executor executor(final State state) {
-    return new Executor() {
-      @Override
-      public void execute(Runnable command) {
-        Thread t = new Thread(command, getClass().getSimpleName() + " " + state);
-        t.setDaemon(true);
-        t.start();
-      }
-    };
-  }
-
-
-  private final class ServiceDelegate extends AbstractIdleService {
-    @Override
-    protected void startUp() throws Exception {
-      AbstractExecutionServiceController.this.startUp();
-    }
-
-    @Override
-    protected void shutDown() throws Exception {
-      AbstractExecutionServiceController.this.shutDown();
-    }
-
-    @Override
-    protected Executor executor(State state) {
-      return AbstractExecutionServiceController.this.executor(state);
-    }
-  }
-
-  /**
-   * Inner class for dispatching listener call back to a list of listeners
-   */
-  private static final class ListenerExecutors implements Listener {
-
-    private interface Callback {
-      void call(Listener listener);
-    }
-
-    private final Queue<ListenerExecutor> listeners = new ConcurrentLinkedQueue<ListenerExecutor>();
-    private final AtomicReference<Callback> lastState = new AtomicReference<Callback>();
-
-    private synchronized void addListener(final ListenerExecutor listener) {
-      listeners.add(listener);
-      Callback callback = lastState.get();
-      if (callback != null) {
-        callback.call(listener);
-      }
-    }
-
-    @Override
-    public synchronized void starting() {
-      lastState.set(new Callback() {
-        @Override
-        public void call(Listener listener) {
-          listener.starting();
-        }
-      });
-      for (ListenerExecutor listener : listeners) {
-        listener.starting();
-      }
-    }
-
-    @Override
-    public synchronized void running() {
-      lastState.set(new Callback() {
-        @Override
-        public void call(Listener listener) {
-          listener.running();
-        }
-      });
-      for (ListenerExecutor listener : listeners) {
-        listener.running();
-      }
-    }
-
-    @Override
-    public synchronized void stopping(final State from) {
-      lastState.set(new Callback() {
-        @Override
-        public void call(Listener listener) {
-          listener.stopping(from);
-        }
-      });
-      for (ListenerExecutor listener : listeners) {
-        listener.stopping(from);
-      }
-    }
-
-    @Override
-    public synchronized void terminated(final State from) {
-      lastState.set(new Callback() {
-        @Override
-        public void call(Listener listener) {
-          listener.terminated(from);
-        }
-      });
-      for (ListenerExecutor listener : listeners) {
-        listener.terminated(from);
-      }
-    }
-
-    @Override
-    public synchronized void failed(final State from, final Throwable failure) {
-      lastState.set(new Callback() {
-        @Override
-        public void call(Listener listener) {
-          listener.failed(from, failure);
-        }
-      });
-      for (ListenerExecutor listener : listeners) {
-        listener.failed(from, failure);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/AbstractTwillController.java b/core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
deleted file mode 100644
index 5806f9d..0000000
--- a/core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-import org.apache.twill.api.RunId;
-import org.apache.twill.api.TwillController;
-import org.apache.twill.api.logging.LogEntry;
-import org.apache.twill.api.logging.LogHandler;
-import org.apache.twill.discovery.Discoverable;
-import org.apache.twill.discovery.DiscoveryServiceClient;
-import org.apache.twill.discovery.ZKDiscoveryService;
-import org.apache.twill.internal.json.StackTraceElementCodec;
-import org.apache.twill.internal.kafka.client.SimpleKafkaClient;
-import org.apache.twill.internal.logging.LogEntryDecoder;
-import org.apache.twill.internal.state.SystemMessages;
-import org.apache.twill.kafka.client.FetchedMessage;
-import org.apache.twill.kafka.client.KafkaClient;
-import org.apache.twill.zookeeper.ZKClient;
-import org.apache.twill.zookeeper.ZKClients;
-import com.google.common.base.Charsets;
-import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Iterator;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.TimeUnit;
-
-/**
- * A abstract base class for {@link org.apache.twill.api.TwillController} implementation that uses Zookeeper to controller a
- * running twill application.
- */
-public abstract class AbstractTwillController extends AbstractZKServiceController implements TwillController {
-
-  private static final Logger LOG = LoggerFactory.getLogger(AbstractTwillController.class);
-  private static final int MAX_KAFKA_FETCH_SIZE = 1048576;
-  private static final long SHUTDOWN_TIMEOUT_MS = 2000;
-  private static final long LOG_FETCH_TIMEOUT_MS = 5000;
-
-  private final Queue<LogHandler> logHandlers;
-  private final KafkaClient kafkaClient;
-  private final DiscoveryServiceClient discoveryServiceClient;
-  private final LogPollerThread logPoller;
-
-  public AbstractTwillController(RunId runId, ZKClient zkClient, Iterable<LogHandler> logHandlers) {
-    super(runId, zkClient);
-    this.logHandlers = new ConcurrentLinkedQueue<LogHandler>();
-    this.kafkaClient = new SimpleKafkaClient(ZKClients.namespace(zkClient, "/" + runId.getId() + "/kafka"));
-    this.discoveryServiceClient = new ZKDiscoveryService(zkClient);
-    Iterables.addAll(this.logHandlers, logHandlers);
-    this.logPoller = new LogPollerThread(runId, kafkaClient, logHandlers);
-  }
-
-  @Override
-  protected void doStartUp() {
-    if (!logHandlers.isEmpty()) {
-      logPoller.start();
-    }
-  }
-
-  @Override
-  protected void doShutDown() {
-    logPoller.terminate();
-    try {
-      // Wait for the poller thread to stop.
-      logPoller.join(SHUTDOWN_TIMEOUT_MS);
-    } catch (InterruptedException e) {
-      LOG.warn("Joining of log poller thread interrupted.", e);
-    }
-  }
-
-  @Override
-  public final synchronized void addLogHandler(LogHandler handler) {
-    logHandlers.add(handler);
-    if (!logPoller.isAlive()) {
-      logPoller.start();
-    }
-  }
-
-  @Override
-  public final Iterable<Discoverable> discoverService(String serviceName) {
-    return discoveryServiceClient.discover(serviceName);
-  }
-
-  @Override
-  public final ListenableFuture<Integer> changeInstances(String runnable, int newCount) {
-    return sendMessage(SystemMessages.setInstances(runnable, newCount), newCount);
-  }
-
-  private static final class LogPollerThread extends Thread {
-
-    private final KafkaClient kafkaClient;
-    private final Iterable<LogHandler> logHandlers;
-    private volatile boolean running = true;
-
-    LogPollerThread(RunId runId, KafkaClient kafkaClient, Iterable<LogHandler> logHandlers) {
-      super("twill-log-poller-" + runId.getId());
-      setDaemon(true);
-      this.kafkaClient = kafkaClient;
-      this.logHandlers = logHandlers;
-    }
-
-    @Override
-    public void run() {
-      LOG.info("Twill log poller thread '{}' started.", getName());
-      kafkaClient.startAndWait();
-      Gson gson = new GsonBuilder().registerTypeAdapter(LogEntry.class, new LogEntryDecoder())
-        .registerTypeAdapter(StackTraceElement.class, new StackTraceElementCodec())
-        .create();
-
-      while (running && !isInterrupted()) {
-        long offset;
-        try {
-          // Get the earliest offset
-          long[] offsets = kafkaClient.getOffset(Constants.LOG_TOPIC, 0, -2, 1).get(LOG_FETCH_TIMEOUT_MS,
-                                                                                    TimeUnit.MILLISECONDS);
-          // Should have one entry
-          offset = offsets[0];
-        } catch (Throwable t) {
-          // Keep retrying
-          LOG.warn("Failed to fetch offsets from Kafka. Retrying.", t);
-          continue;
-        }
-
-        // Now fetch log messages from Kafka
-        Iterator<FetchedMessage> messageIterator = kafkaClient.consume(Constants.LOG_TOPIC, 0,
-                                                                       offset, MAX_KAFKA_FETCH_SIZE);
-        try {
-          while (messageIterator.hasNext()) {
-            String json = Charsets.UTF_8.decode(messageIterator.next().getBuffer()).toString();
-            try {
-              LogEntry entry = gson.fromJson(json, LogEntry.class);
-              if (entry != null) {
-                invokeHandlers(entry);
-              }
-            } catch (Exception e) {
-              LOG.error("Failed to decode log entry {}", json, e);
-            }
-          }
-        } catch (Throwable t) {
-          LOG.warn("Exception while fetching log message from Kafka. Retrying.", t);
-          continue;
-        }
-      }
-
-      kafkaClient.stopAndWait();
-      LOG.info("Twill log poller thread stopped.");
-    }
-
-    void terminate() {
-      running = false;
-      interrupt();
-    }
-
-    private void invokeHandlers(LogEntry entry) {
-      for (LogHandler handler : logHandlers) {
-        handler.onLog(entry);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java b/core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
deleted file mode 100644
index 98cc2b8..0000000
--- a/core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
+++ /dev/null
@@ -1,314 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-import org.apache.twill.api.Command;
-import org.apache.twill.api.RunId;
-import org.apache.twill.api.ServiceController;
-import org.apache.twill.common.Threads;
-import org.apache.twill.internal.json.StackTraceElementCodec;
-import org.apache.twill.internal.json.StateNodeCodec;
-import org.apache.twill.internal.state.Message;
-import org.apache.twill.internal.state.Messages;
-import org.apache.twill.internal.state.StateNode;
-import org.apache.twill.internal.state.SystemMessages;
-import org.apache.twill.zookeeper.NodeData;
-import org.apache.twill.zookeeper.ZKClient;
-import com.google.common.base.Charsets;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.gson.GsonBuilder;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * An abstract base class for implementing a {@link ServiceController} using ZooKeeper as a means for
- * communicating with the remote service. This is designed to work in pair with the {@link ZKServiceDecorator}.
- */
-public abstract class AbstractZKServiceController extends AbstractExecutionServiceController {
-
-  private static final Logger LOG = LoggerFactory.getLogger(AbstractZKServiceController.class);
-
-  private final ZKClient zkClient;
-  private final InstanceNodeDataCallback instanceNodeDataCallback;
-  private final StateNodeDataCallback stateNodeDataCallback;
-  private final List<ListenableFuture<?>> messageFutures;
-  private ListenableFuture<State> stopMessageFuture;
-
-  protected AbstractZKServiceController(RunId runId, ZKClient zkClient) {
-    super(runId);
-    this.zkClient = zkClient;
-    this.instanceNodeDataCallback = new InstanceNodeDataCallback();
-    this.stateNodeDataCallback = new StateNodeDataCallback();
-    this.messageFutures = Lists.newLinkedList();
-  }
-
-  @Override
-  public final ListenableFuture<Command> sendCommand(Command command) {
-    return sendMessage(Messages.createForAll(command), command);
-  }
-
-  @Override
-  public final ListenableFuture<Command> sendCommand(String runnableName, Command command) {
-    return sendMessage(Messages.createForRunnable(runnableName, command), command);
-  }
-
-  @Override
-  protected final void startUp() {
-    // Watch for instance node existence.
-    actOnExists(getInstancePath(), new Runnable() {
-      @Override
-      public void run() {
-        watchInstanceNode();
-      }
-    });
-
-    // Watch for state node data
-    actOnExists(getZKPath("state"), new Runnable() {
-      @Override
-      public void run() {
-        watchStateNode();
-      }
-    });
-
-    doStartUp();
-  }
-
-  @Override
-  protected final synchronized void shutDown() {
-    if (stopMessageFuture == null) {
-      stopMessageFuture = ZKMessages.sendMessage(zkClient, getMessagePrefix(),
-                                                 SystemMessages.stopApplication(), State.TERMINATED);
-    }
-
-    // Cancel all pending message futures.
-    for (ListenableFuture<?> future : messageFutures) {
-      future.cancel(true);
-    }
-
-    doShutDown();
-  }
-
-  /**
-   * Sends a {@link Message} to the remote service. Returns a future that will be completed when the message
-   * has been processed.
-   * @param message The message to send.
-   * @param result Object to set into the future when message is being processed.
-   * @param <V> Type of the result.
-   * @return A {@link ListenableFuture} that will be completed when the message has been processed.
-   */
-  protected final synchronized <V> ListenableFuture<V> sendMessage(Message message, V result) {
-    if (!isRunning()) {
-      return Futures.immediateFailedFuture(new IllegalStateException("Cannot send message to non-running application"));
-    }
-    final ListenableFuture<V> messageFuture = ZKMessages.sendMessage(zkClient, getMessagePrefix(), message, result);
-    messageFutures.add(messageFuture);
-    messageFuture.addListener(new Runnable() {
-      @Override
-      public void run() {
-        // If the completion is triggered when stopping, do nothing.
-        if (state() == State.STOPPING) {
-          return;
-        }
-        synchronized (AbstractZKServiceController.this) {
-          messageFutures.remove(messageFuture);
-        }
-      }
-    }, Threads.SAME_THREAD_EXECUTOR);
-
-    return messageFuture;
-  }
-
-  protected final ListenableFuture<State> getStopMessageFuture() {
-    return stopMessageFuture;
-  }
-
-  /**
-   * Called during startup. Executed in the startup thread.
-   */
-  protected abstract void doStartUp();
-
-  /**
-   * Called during shutdown. Executed in the shutdown thread.
-   */
-  protected abstract void doShutDown();
-
-  /**
-   * Called when an update on the live instance node is detected.
-   * @param nodeData The updated live instance node data or {@code null} if there is an error when fetching
-   *                 the node data.
-   */
-  protected abstract void instanceNodeUpdated(NodeData nodeData);
-
-  /**
-   * Called when an update on the state node is detected.
-   * @param stateNode The update state node data or {@code null} if there is an error when fetching the node data.
-   */
-  protected abstract void stateNodeUpdated(StateNode stateNode);
-
-  protected synchronized void forceShutDown() {
-    if (stopMessageFuture == null) {
-      // In force shutdown, don't send message.
-      stopMessageFuture = Futures.immediateFuture(State.TERMINATED);
-    }
-    stop();
-  }
-
-
-  private void actOnExists(final String path, final Runnable action) {
-    // Watch for node existence.
-    final AtomicBoolean nodeExists = new AtomicBoolean(false);
-    Futures.addCallback(zkClient.exists(path, new Watcher() {
-      @Override
-      public void process(WatchedEvent event) {
-        // When node is created, call the action.
-        // Other event type would be handled by the action.
-        if (event.getType() == Event.EventType.NodeCreated && nodeExists.compareAndSet(false, true)) {
-          action.run();
-        }
-      }
-    }), new FutureCallback<Stat>() {
-      @Override
-      public void onSuccess(Stat result) {
-        if (result != null && nodeExists.compareAndSet(false, true)) {
-          action.run();
-        }
-      }
-
-      @Override
-      public void onFailure(Throwable t) {
-        LOG.error("Failed in exists call to {}. Shutting down service.", path, t);
-        forceShutDown();
-      }
-    }, Threads.SAME_THREAD_EXECUTOR);
-  }
-
-  private void watchInstanceNode() {
-    Futures.addCallback(zkClient.getData(getInstancePath(), new Watcher() {
-      @Override
-      public void process(WatchedEvent event) {
-        State state = state();
-        if (state != State.NEW && state != State.STARTING && state != State.RUNNING) {
-          // Ignore ZK node events when it is in stopping sequence.
-          return;
-        }
-        switch (event.getType()) {
-          case NodeDataChanged:
-            watchInstanceNode();
-            break;
-          case NodeDeleted:
-            // When the ephemeral node goes away, treat the remote service stopped.
-            forceShutDown();
-            break;
-          default:
-            LOG.info("Ignore ZK event for instance node: {}", event);
-        }
-      }
-    }), instanceNodeDataCallback, Threads.SAME_THREAD_EXECUTOR);
-  }
-
-  private void watchStateNode() {
-    Futures.addCallback(zkClient.getData(getZKPath("state"), new Watcher() {
-      @Override
-      public void process(WatchedEvent event) {
-        State state = state();
-        if (state != State.NEW && state != State.STARTING && state != State.RUNNING) {
-          // Ignore ZK node events when it is in stopping sequence.
-          return;
-        }
-        switch (event.getType()) {
-          case NodeDataChanged:
-            watchStateNode();
-            break;
-          default:
-            LOG.info("Ignore ZK event for state node: {}", event);
-        }
-      }
-    }), stateNodeDataCallback, Threads.SAME_THREAD_EXECUTOR);
-  }
-
-  /**
-   * Returns the path prefix for creating sequential message node for the remote service.
-   */
-  private String getMessagePrefix() {
-    return getZKPath("messages/msg");
-  }
-
-  /**
-   * Returns the zookeeper node path for the ephemeral instance node for this runId.
-   */
-  private String getInstancePath() {
-    return String.format("/instances/%s", getRunId().getId());
-  }
-
-  private String getZKPath(String path) {
-    return String.format("/%s/%s", getRunId().getId(), path);
-  }
-
-  private final class InstanceNodeDataCallback implements FutureCallback<NodeData> {
-
-    @Override
-    public void onSuccess(NodeData result) {
-      instanceNodeUpdated(result);
-    }
-
-    @Override
-    public void onFailure(Throwable t) {
-      LOG.error("Failed in fetching instance node data.", t);
-      if (t instanceof KeeperException && ((KeeperException) t).code() == KeeperException.Code.NONODE) {
-        // If the node is gone, treat the remote service stopped.
-        forceShutDown();
-      } else {
-        instanceNodeUpdated(null);
-      }
-    }
-  }
-
-  private final class StateNodeDataCallback implements FutureCallback<NodeData> {
-
-    @Override
-    public void onSuccess(NodeData result) {
-      byte[] data = result.getData();
-      if (data == null) {
-        stateNodeUpdated(null);
-        return;
-      }
-      StateNode stateNode = new GsonBuilder().registerTypeAdapter(StateNode.class, new StateNodeCodec())
-        .registerTypeAdapter(StackTraceElement.class, new StackTraceElementCodec())
-        .create()
-        .fromJson(new String(data, Charsets.UTF_8), StateNode.class);
-
-      stateNodeUpdated(stateNode);
-    }
-
-    @Override
-    public void onFailure(Throwable t) {
-      LOG.error("Failed in fetching state node data.", t);
-      stateNodeUpdated(null);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/ApplicationBundler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/ApplicationBundler.java b/core/src/main/java/org/apache/twill/internal/ApplicationBundler.java
deleted file mode 100644
index a0e9a71..0000000
--- a/core/src/main/java/org/apache/twill/internal/ApplicationBundler.java
+++ /dev/null
@@ -1,362 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-import org.apache.twill.filesystem.Location;
-import org.apache.twill.internal.utils.Dependencies;
-import com.google.common.base.Function;
-import com.google.common.base.Splitter;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.common.io.ByteStreams;
-import com.google.common.io.Files;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedOutputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.URI;
-import java.net.URL;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Queue;
-import java.util.Set;
-import java.util.jar.JarEntry;
-import java.util.jar.JarOutputStream;
-import java.util.zip.CRC32;
-import java.util.zip.CheckedOutputStream;
-
-/**
- * This class builds jar files based on class dependencies.
- */
-public final class ApplicationBundler {
-
-  private static final Logger LOG = LoggerFactory.getLogger(ApplicationBundler.class);
-  
-  public static final String SUBDIR_CLASSES = "classes/";
-  public static final String SUBDIR_LIB = "lib/";
-  public static final String SUBDIR_RESOURCES = "resources/";
-
-  private final List<String> excludePackages;
-  private final List<String> includePackages;
-  private final Set<String> bootstrapClassPaths;
-  private final CRC32 crc32;
-
-  /**
-   * Constructs a ApplicationBundler.
-   *
-   * @param excludePackages Class packages to exclude
-   */
-  public ApplicationBundler(Iterable<String> excludePackages) {
-    this(excludePackages, ImmutableList.<String>of());
-  }
-
-  /**
-   * Constructs a ApplicationBundler.
-   *
-   * @param excludePackages Class packages to exclude
-   * @param includePackages Class packages that should be included. Anything in this list will override the
-   *                        one provided in excludePackages.
-   */
-  public ApplicationBundler(Iterable<String> excludePackages, Iterable<String> includePackages) {
-    this.excludePackages = ImmutableList.copyOf(excludePackages);
-    this.includePackages = ImmutableList.copyOf(includePackages);
-
-    ImmutableSet.Builder<String> builder = ImmutableSet.builder();
-    for (String classpath : Splitter.on(File.pathSeparatorChar).split(System.getProperty("sun.boot.class.path"))) {
-      File file = new File(classpath);
-      builder.add(file.getAbsolutePath());
-      try {
-        builder.add(file.getCanonicalPath());
-      } catch (IOException e) {
-        // Ignore the exception and proceed.
-      }
-    }
-    this.bootstrapClassPaths = builder.build();
-    this.crc32 = new CRC32();
-
-  }
-
-  public void createBundle(Location target, Iterable<Class<?>> classes) throws IOException {
-    createBundle(target, classes, ImmutableList.<URI>of());
-  }
-
-  /**
-   * Same as calling {@link #createBundle(Location, Iterable)}.
-   */
-  public void createBundle(Location target, Class<?> clz, Class<?>...classes) throws IOException {
-    createBundle(target, ImmutableSet.<Class<?>>builder().add(clz).add(classes).build());
-  }
-
-  /**
-   * Creates a jar file which includes all the given classes and all the classes that they depended on.
-   * The jar will also include all classes and resources under the packages as given as include packages
-   * in the constructor.
-   *
-   * @param target Where to save the target jar file.
-   * @param resources Extra resources to put into the jar file. If resource is a jar file, it'll be put under
-   *                  lib/ entry, otherwise under the resources/ entry.
-   * @param classes Set of classes to start the dependency traversal.
-   * @throws IOException
-   */
-  public void createBundle(Location target, Iterable<Class<?>> classes, Iterable<URI> resources) throws IOException {
-    LOG.debug("start creating bundle {}. building a temporary file locally at first", target.getName());
-    // Write the jar to local tmp file first
-    File tmpJar = File.createTempFile(target.getName(), ".tmp");
-    try {
-      Set<String> entries = Sets.newHashSet();
-      JarOutputStream jarOut = new JarOutputStream(new FileOutputStream(tmpJar));
-      try {
-        // Find class dependencies
-        findDependencies(classes, entries, jarOut);
-
-        // Add extra resources
-        for (URI resource : resources) {
-          copyResource(resource, entries, jarOut);
-        }
-      } finally {
-        jarOut.close();
-      }
-      LOG.debug("copying temporary bundle to destination {} ({} bytes)", target.toURI(), tmpJar.length());
-      // Copy the tmp jar into destination.
-      OutputStream os = null; 
-      try {
-        os = new BufferedOutputStream(target.getOutputStream());
-        Files.copy(tmpJar, os);
-      } catch (IOException e) {
-        throw new IOException("failed to copy bundle from " + tmpJar.toURI() + " to " + target.toURI(), e);
-      } finally {
-        if (os != null) {
-          os.close();
-        }
-      }
-      LOG.debug("finished creating bundle at {}", target.toURI());
-    } finally {
-      tmpJar.delete();
-      LOG.debug("cleaned up local temporary for bundle {}", tmpJar.toURI());
-    }
-  }
-
-  private void findDependencies(Iterable<Class<?>> classes, final Set<String> entries,
-                                final JarOutputStream jarOut) throws IOException {
-
-    Iterable<String> classNames = Iterables.transform(classes, new Function<Class<?>, String>() {
-      @Override
-      public String apply(Class<?> input) {
-        return input.getName();
-      }
-    });
-
-    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
-    if (classLoader == null) {
-      classLoader = getClass().getClassLoader();
-    }
-    Dependencies.findClassDependencies(classLoader, new Dependencies.ClassAcceptor() {
-      @Override
-      public boolean accept(String className, URL classUrl, URL classPathUrl) {
-        if (bootstrapClassPaths.contains(classPathUrl.getFile())) {
-          return false;
-        }
-
-        boolean shouldInclude = false;
-        for (String include : includePackages) {
-          if (className.startsWith(include)) {
-            shouldInclude = true;
-            break;
-          }
-        }
-
-        if (!shouldInclude) {
-          for (String exclude : excludePackages) {
-            if (className.startsWith(exclude)) {
-              return false;
-            }
-          }
-        }
-
-        putEntry(className, classUrl, classPathUrl, entries, jarOut);
-        return true;
-      }
-    }, classNames);
-  }
-
-  private void putEntry(String className, URL classUrl, URL classPathUrl, Set<String> entries, JarOutputStream jarOut) {
-    String classPath = classPathUrl.getFile();
-    if (classPath.endsWith(".jar")) {
-      saveDirEntry(SUBDIR_LIB, entries, jarOut);
-      saveEntry(SUBDIR_LIB + classPath.substring(classPath.lastIndexOf('/') + 1), classPathUrl, entries, jarOut, false);
-    } else {
-      // Class file, put it under the classes directory
-      saveDirEntry(SUBDIR_CLASSES, entries, jarOut);
-      if ("file".equals(classPathUrl.getProtocol())) {
-        // Copy every files under the classPath
-        try {
-          copyDir(new File(classPathUrl.toURI()), SUBDIR_CLASSES, entries, jarOut);
-        } catch (Exception e) {
-          throw Throwables.propagate(e);
-        }
-      } else {
-        String entry = SUBDIR_CLASSES + className.replace('.', '/') + ".class";
-        saveDirEntry(entry.substring(0, entry.lastIndexOf('/') + 1), entries, jarOut);
-        saveEntry(entry, classUrl, entries, jarOut, true);
-      }
-    }
-  }
-
-  /**
-   * Saves a directory entry to the jar output.
-   */
-  private void saveDirEntry(String path, Set<String> entries, JarOutputStream jarOut) {
-    if (entries.contains(path)) {
-      return;
-    }
-
-    try {
-      String entry = "";
-      for (String dir : Splitter.on('/').omitEmptyStrings().split(path)) {
-        entry += dir + '/';
-        if (entries.add(entry)) {
-          JarEntry jarEntry = new JarEntry(entry);
-          jarEntry.setMethod(JarOutputStream.STORED);
-          jarEntry.setSize(0L);
-          jarEntry.setCrc(0L);
-          jarOut.putNextEntry(jarEntry);
-          jarOut.closeEntry();
-        }
-      }
-    } catch (IOException e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-  /**
-   * Saves a class entry to the jar output.
-   */
-  private void saveEntry(String entry, URL url, Set<String> entries, JarOutputStream jarOut, boolean compress) {
-    LOG.debug("adding bundle entry " + entry);
-    if (!entries.add(entry)) {
-      return;
-    }
-    try {
-      JarEntry jarEntry = new JarEntry(entry);
-      InputStream is = url.openStream();
-
-      try {
-        if (compress) {
-          jarOut.putNextEntry(jarEntry);
-          ByteStreams.copy(is, jarOut);
-        } else {
-          crc32.reset();
-          TransferByteOutputStream os = new TransferByteOutputStream();
-          CheckedOutputStream checkedOut = new CheckedOutputStream(os, crc32);
-          ByteStreams.copy(is, checkedOut);
-          checkedOut.close();
-
-          long size = os.size();
-          jarEntry.setMethod(JarEntry.STORED);
-          jarEntry.setSize(size);
-          jarEntry.setCrc(checkedOut.getChecksum().getValue());
-          jarOut.putNextEntry(jarEntry);
-          os.transfer(jarOut);
-        }
-      } finally {
-        is.close();
-      }
-      jarOut.closeEntry();
-    } catch (Exception e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-
-  /**
-   * Copies all entries under the file path.
-   */
-  private void copyDir(File baseDir, String entryPrefix,
-                       Set<String> entries, JarOutputStream jarOut) throws IOException {
-    LOG.debug("adding whole dir {} to bundle at '{}'", baseDir, entryPrefix);
-    URI baseUri = baseDir.toURI();
-    Queue<File> queue = Lists.newLinkedList();
-    Collections.addAll(queue, baseDir.listFiles());
-    while (!queue.isEmpty()) {
-      File file = queue.remove();
-
-      String entry = entryPrefix + baseUri.relativize(file.toURI()).getPath();
-      if (entries.add(entry)) {
-        jarOut.putNextEntry(new JarEntry(entry));
-        if (file.isFile()) {
-          try {
-            Files.copy(file, jarOut);
-          } catch (IOException e) {
-            throw new IOException("failure copying from " + file.getAbsoluteFile() + " to JAR file entry " + entry, e);
-          }
-        }
-        jarOut.closeEntry();
-      }
-
-      if (file.isDirectory()) {
-        File[] files = file.listFiles();
-        if (files != null) {
-          queue.addAll(Arrays.asList(files));
-        }
-      }
-    }
-  }
-
-  private void copyResource(URI resource, Set<String> entries, JarOutputStream jarOut) throws IOException {
-    if ("file".equals(resource.getScheme())) {
-      File file = new File(resource);
-      if (file.isDirectory()) {
-        saveDirEntry(SUBDIR_RESOURCES, entries, jarOut);
-        copyDir(file, SUBDIR_RESOURCES, entries, jarOut);
-        return;
-      }
-    }
-
-    URL url = resource.toURL();
-    String path = url.getFile();
-    String prefix = path.endsWith(".jar") ? SUBDIR_LIB : SUBDIR_RESOURCES;
-    path = prefix + path.substring(path.lastIndexOf('/') + 1);
-
-    saveDirEntry(prefix, entries, jarOut);
-    jarOut.putNextEntry(new JarEntry(path));
-    InputStream is = url.openStream();
-    try {
-      ByteStreams.copy(is, jarOut);
-    } finally {
-      is.close();
-    }
-  }
-
-  private static final class TransferByteOutputStream extends ByteArrayOutputStream {
-
-    public void transfer(OutputStream os) throws IOException {
-      os.write(buf, 0, count);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/Arguments.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/Arguments.java b/core/src/main/java/org/apache/twill/internal/Arguments.java
deleted file mode 100644
index a78547c..0000000
--- a/core/src/main/java/org/apache/twill/internal/Arguments.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMultimap;
-import com.google.common.collect.Multimap;
-
-import java.util.List;
-
-/**
- * Class that encapsulate application arguments and per runnable arguments.
- */
-public final class Arguments {
-
-  private final List<String> arguments;
-  private final Multimap<String, String> runnableArguments;
-
-  public Arguments(List<String> arguments, Multimap<String, String> runnableArguments) {
-    this.arguments = ImmutableList.copyOf(arguments);
-    this.runnableArguments = ImmutableMultimap.copyOf(runnableArguments);
-  }
-
-  public List<String> getArguments() {
-    return arguments;
-  }
-
-  public Multimap<String, String> getRunnableArguments() {
-    return runnableArguments;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/BasicTwillContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/BasicTwillContext.java b/core/src/main/java/org/apache/twill/internal/BasicTwillContext.java
deleted file mode 100644
index 61bdaef..0000000
--- a/core/src/main/java/org/apache/twill/internal/BasicTwillContext.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-import org.apache.twill.api.RunId;
-import org.apache.twill.api.TwillContext;
-import org.apache.twill.api.TwillRunnableSpecification;
-import org.apache.twill.common.Cancellable;
-import org.apache.twill.discovery.Discoverable;
-import org.apache.twill.discovery.DiscoveryService;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-
-/**
- *
- */
-public final class BasicTwillContext implements TwillContext {
-
-  private final RunId runId;
-  private final RunId appRunId;
-  private final InetAddress host;
-  private final String[] args;
-  private final String[] appArgs;
-  private final TwillRunnableSpecification spec;
-  private final int instanceId;
-  private final DiscoveryService discoveryService;
-  private final int allowedMemoryMB;
-  private final int virtualCores;
-  private volatile int instanceCount;
-
-  public BasicTwillContext(RunId runId, RunId appRunId, InetAddress host, String[] args, String[] appArgs,
-                           TwillRunnableSpecification spec, int instanceId, DiscoveryService discoveryService,
-                           int instanceCount, int allowedMemoryMB, int virtualCores) {
-    this.runId = runId;
-    this.appRunId = appRunId;
-    this.host = host;
-    this.args = args;
-    this.appArgs = appArgs;
-    this.spec = spec;
-    this.instanceId = instanceId;
-    this.discoveryService = discoveryService;
-    this.instanceCount = instanceCount;
-    this.allowedMemoryMB = allowedMemoryMB;
-    this.virtualCores = virtualCores;
-  }
-
-  @Override
-  public RunId getRunId() {
-    return runId;
-  }
-
-  @Override
-  public RunId getApplicationRunId() {
-    return appRunId;
-  }
-
-  @Override
-  public int getInstanceCount() {
-    return instanceCount;
-  }
-
-  public void setInstanceCount(int count) {
-    this.instanceCount = count;
-  }
-
-  @Override
-  public InetAddress getHost() {
-    return host;
-  }
-
-  @Override
-  public String[] getArguments() {
-    return args;
-  }
-
-  @Override
-  public String[] getApplicationArguments() {
-    return appArgs;
-  }
-
-  @Override
-  public TwillRunnableSpecification getSpecification() {
-    return spec;
-  }
-
-  @Override
-  public int getInstanceId() {
-    return instanceId;
-  }
-
-  @Override
-  public int getVirtualCores() {
-    return virtualCores;
-  }
-
-  @Override
-  public int getMaxMemoryMB() {
-    return allowedMemoryMB;
-  }
-
-  @Override
-  public Cancellable announce(final String serviceName, final int port) {
-    return discoveryService.register(new Discoverable() {
-      @Override
-      public String getName() {
-        return serviceName;
-      }
-
-      @Override
-      public InetSocketAddress getSocketAddress() {
-        return new InetSocketAddress(getHost(), port);
-      }
-    });
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/Configs.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/Configs.java b/core/src/main/java/org/apache/twill/internal/Configs.java
deleted file mode 100644
index 0fa1df8..0000000
--- a/core/src/main/java/org/apache/twill/internal/Configs.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-/**
- *
- */
-public final class Configs {
-
-  public static final class Keys {
-    /**
-     * Size in MB of reserved memory for Java process (non-heap memory).
-     */
-    public static final String JAVA_RESERVED_MEMORY_MB = "twill.java.reserved.memory.mb";
-
-    private Keys() {
-    }
-  }
-
-  public static final class Defaults {
-    // By default have 200MB reserved for Java process.
-    public static final int JAVA_RESERVED_MEMORY_MB = 200;
-
-    private Defaults() {
-    }
-  }
-
-  private Configs() {
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/Constants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/Constants.java b/core/src/main/java/org/apache/twill/internal/Constants.java
deleted file mode 100644
index 0387d3e..0000000
--- a/core/src/main/java/org/apache/twill/internal/Constants.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-/**
- * This class contains collection of common constants used in Twill.
- */
-public final class Constants {
-
-  public static final String LOG_TOPIC = "log";
-
-  /** Maximum number of seconds for AM to start. */
-  public static final int APPLICATION_MAX_START_SECONDS = 60;
-  /** Maximum number of seconds for AM to stop. */
-  public static final int APPLICATION_MAX_STOP_SECONDS = 60;
-
-  public static final long PROVISION_TIMEOUT = 30000;
-
-  /** Memory size of AM */
-  public static final int APP_MASTER_MEMORY_MB = 512;
-
-  public static final int APP_MASTER_RESERVED_MEMORY_MB = 150;
-
-  public static final String STDOUT = "stdout";
-  public static final String STDERR = "stderr";
-
-  /**
-   * Constants for names of internal files that are shared between client, AM and containers.
-   */
-  public static final class Files {
-
-    public static final String LAUNCHER_JAR = "launcher.jar";
-    public static final String APP_MASTER_JAR = "appMaster.jar";
-    public static final String CONTAINER_JAR = "container.jar";
-    public static final String LOCALIZE_FILES = "localizeFiles.json";
-    public static final String TWILL_SPEC = "twillSpec.json";
-    public static final String ARGUMENTS = "arguments.json";
-    public static final String LOGBACK_TEMPLATE = "logback-template.xml";
-    public static final String KAFKA = "kafka.tgz";
-    public static final String JVM_OPTIONS = "jvm.opts";
-    public static final String CREDENTIALS = "credentials.store";
-
-    private Files() {
-    }
-  }
-
-  private Constants() {
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/ContainerInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/ContainerInfo.java b/core/src/main/java/org/apache/twill/internal/ContainerInfo.java
deleted file mode 100644
index 67c21d3..0000000
--- a/core/src/main/java/org/apache/twill/internal/ContainerInfo.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-import java.net.InetAddress;
-
-/**
- * Represents information of the container that the processing is/will be running in.
- */
-public interface ContainerInfo {
-
-  String getId();
-
-  InetAddress getHost();
-
-  int getPort();
-
-  int getMemoryMB();
-
-  int getVirtualCores();
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/ContainerLiveNodeData.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/ContainerLiveNodeData.java b/core/src/main/java/org/apache/twill/internal/ContainerLiveNodeData.java
deleted file mode 100644
index 705943c..0000000
--- a/core/src/main/java/org/apache/twill/internal/ContainerLiveNodeData.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-/**
- *
- */
-public final class ContainerLiveNodeData {
-
-  private final String containerId;
-  private final String host;
-
-  public ContainerLiveNodeData(String containerId, String host) {
-    this.containerId = containerId;
-    this.host = host;
-  }
-
-  public String getContainerId() {
-    return containerId;
-  }
-
-  public String getHost() {
-    return host;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/EnvContainerInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/EnvContainerInfo.java b/core/src/main/java/org/apache/twill/internal/EnvContainerInfo.java
deleted file mode 100644
index fd50028..0000000
--- a/core/src/main/java/org/apache/twill/internal/EnvContainerInfo.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-
-/**
- * A {@link ContainerInfo} based on information on the environment.
- */
-public final class EnvContainerInfo implements ContainerInfo {
-  private final String id;
-  private final InetAddress host;
-  private final int port;
-  private final int virtualCores;
-  private final int memoryMB;
-
-  public EnvContainerInfo() throws UnknownHostException {
-    id = System.getenv(EnvKeys.YARN_CONTAINER_ID);
-    host = InetAddress.getByName(System.getenv(EnvKeys.YARN_CONTAINER_HOST));
-    port = Integer.parseInt(System.getenv(EnvKeys.YARN_CONTAINER_PORT));
-    virtualCores = Integer.parseInt(System.getenv(EnvKeys.YARN_CONTAINER_VIRTUAL_CORES));
-    memoryMB = Integer.parseInt(System.getenv(EnvKeys.YARN_CONTAINER_MEMORY_MB));
-  }
-
-  @Override
-  public String getId() {
-    return id;
-  }
-
-  @Override
-  public InetAddress getHost() {
-    return host;
-  }
-
-  @Override
-  public int getPort() {
-    return port;
-  }
-
-  @Override
-  public int getMemoryMB() {
-    return memoryMB;
-  }
-
-  @Override
-  public int getVirtualCores() {
-    return virtualCores;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/EnvKeys.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/EnvKeys.java b/core/src/main/java/org/apache/twill/internal/EnvKeys.java
deleted file mode 100644
index 9bf6523..0000000
--- a/core/src/main/java/org/apache/twill/internal/EnvKeys.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-/**
- * Places for define common environment keys.
- */
-public final class EnvKeys {
-
-  public static final String TWILL_ZK_CONNECT = "TWILL_ZK_CONNECT";
-  public static final String TWILL_APP_RUN_ID = "TWILL_APP_RUN_ID";
-  public static final String TWILL_RUN_ID = "TWILL_RUN_ID";
-  public static final String TWILL_INSTANCE_ID = "TWILL_INSTANCE_ID";
-  public static final String TWILL_INSTANCE_COUNT = "TWILL_INSTANCE_COUNT";
-  public static final String TWILL_RESERVED_MEMORY_MB = "TWILL_RESERVED_MEMORY_MB";
-
-  public static final String TWILL_FS_USER = "TWILL_FS_USER";
-
-  /**
-   * Cluster filesystem directory for storing twill app related files.
-   */
-  public static final String TWILL_APP_DIR = "TWILL_APP_DIR";
-
-  public static final String TWILL_APP_NAME = "TWILL_APP_NAME";
-  public static final String TWILL_RUNNABLE_NAME = "TWILL_RUNNABLE_NAME";
-
-  public static final String TWILL_LOG_KAFKA_ZK = "TWILL_LOG_KAFKA_ZK";
-
-  public static final String YARN_APP_ID = "YARN_APP_ID";
-  public static final String YARN_APP_ID_CLUSTER_TIME = "YARN_APP_ID_CLUSTER_TIME";
-  public static final String YARN_APP_ID_STR = "YARN_APP_ID_STR";
-
-  public static final String YARN_CONTAINER_ID = "YARN_CONTAINER_ID";
-  public static final String YARN_CONTAINER_HOST = "YARN_CONTAINER_HOST";
-  public static final String YARN_CONTAINER_PORT = "YARN_CONTAINER_PORT";
-  /**
-   * Used to inform runnables of their resource usage.
-   */
-  public static final String YARN_CONTAINER_VIRTUAL_CORES = "YARN_CONTAINER_VIRTUAL_CORES";
-  public static final String YARN_CONTAINER_MEMORY_MB = "YARN_CONTAINER_MEMORY_MB";
-
-  private EnvKeys() {
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/ListenerExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/ListenerExecutor.java b/core/src/main/java/org/apache/twill/internal/ListenerExecutor.java
deleted file mode 100644
index 9d3e156..0000000
--- a/core/src/main/java/org/apache/twill/internal/ListenerExecutor.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.Service;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executor;
-
-/**
- * Wrapper for {@link Service.Listener} to have callback executed on a given {@link Executor}.
- * Also make sure each method is called at most once.
- */
-final class ListenerExecutor implements Service.Listener {
-
-  private static final Logger LOG = LoggerFactory.getLogger(ListenerExecutor.class);
-
-  private final Service.Listener delegate;
-  private final Executor executor;
-  private final ConcurrentMap<Service.State, Boolean> callStates = Maps.newConcurrentMap();
-
-  ListenerExecutor(Service.Listener delegate, Executor executor) {
-    this.delegate = delegate;
-    this.executor = executor;
-  }
-
-  @Override
-  public void starting() {
-    if (hasCalled(Service.State.STARTING)) {
-      return;
-    }
-    executor.execute(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          delegate.starting();
-        } catch (Throwable t) {
-          LOG.warn("Exception thrown from listener", t);
-        }
-      }
-    });
-  }
-
-  @Override
-  public void running() {
-    if (hasCalled(Service.State.RUNNING)) {
-      return;
-    }
-    executor.execute(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          delegate.running();
-        } catch (Throwable t) {
-          LOG.warn("Exception thrown from listener", t);
-        }
-      }
-    });
-  }
-
-  @Override
-  public void stopping(final Service.State from) {
-    if (hasCalled(Service.State.STOPPING)) {
-      return;
-    }
-    executor.execute(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          delegate.stopping(from);
-        } catch (Throwable t) {
-          LOG.warn("Exception thrown from listener", t);
-        }
-      }
-    });
-  }
-
-  @Override
-  public void terminated(final Service.State from) {
-    if (hasCalled(Service.State.TERMINATED)) {
-      return;
-    }
-    executor.execute(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          delegate.terminated(from);
-        } catch (Throwable t) {
-          LOG.warn("Exception thrown from listener", t);
-        }
-      }
-    });
-  }
-
-  @Override
-  public void failed(final Service.State from, final Throwable failure) {
-    // Both failed and terminate are using the same state for checking as only either one could be called.
-    if (hasCalled(Service.State.TERMINATED)) {
-      return;
-    }
-    executor.execute(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          delegate.failed(from, failure);
-        } catch (Throwable t) {
-          LOG.warn("Exception thrown from listener", t);
-        }
-      }
-    });
-  }
-
-  private boolean hasCalled(Service.State state) {
-    return callStates.putIfAbsent(state, true) != null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/LogOnlyEventHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/LogOnlyEventHandler.java b/core/src/main/java/org/apache/twill/internal/LogOnlyEventHandler.java
deleted file mode 100644
index 4f71a05..0000000
--- a/core/src/main/java/org/apache/twill/internal/LogOnlyEventHandler.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-import org.apache.twill.api.EventHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- *
- */
-public final class LogOnlyEventHandler extends EventHandler {
-
-  private static final Logger LOG = LoggerFactory.getLogger(LogOnlyEventHandler.class);
-
-  @Override
-  public TimeoutAction launchTimeout(Iterable<TimeoutEvent> timeoutEvents) {
-    for (TimeoutEvent event : timeoutEvents) {
-      LOG.info("Requested {} containers for runnable {}, only got {} after {} ms.",
-               event.getExpectedInstances(), event.getRunnableName(),
-               event.getActualInstances(), System.currentTimeMillis() - event.getRequestTime());
-    }
-    return TimeoutAction.recheck(Constants.PROVISION_TIMEOUT, TimeUnit.MILLISECONDS);
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/ProcessController.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/ProcessController.java b/core/src/main/java/org/apache/twill/internal/ProcessController.java
deleted file mode 100644
index 4453838..0000000
--- a/core/src/main/java/org/apache/twill/internal/ProcessController.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal;
-
-import org.apache.twill.common.Cancellable;
-
-/**
- * For controlling a launch yarn process.
- *
- * @param <R> Report type.
- */
-public interface ProcessController<R> extends Cancellable {
-
-  R getReport();
-
-  /**
-   * Request to stop the running process.
-   */
-  void cancel();
-}