You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/12/12 23:00:05 UTC

[23/28] Making maven site works.

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java b/core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
deleted file mode 100644
index 12818ef..0000000
--- a/core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
+++ /dev/null
@@ -1,303 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.logging;
-
-import org.apache.twill.common.Services;
-import org.apache.twill.common.Threads;
-import org.apache.twill.internal.kafka.client.Compression;
-import org.apache.twill.internal.kafka.client.SimpleKafkaClient;
-import org.apache.twill.kafka.client.KafkaClient;
-import org.apache.twill.kafka.client.PreparePublish;
-import org.apache.twill.zookeeper.RetryStrategies;
-import org.apache.twill.zookeeper.ZKClientService;
-import org.apache.twill.zookeeper.ZKClientServices;
-import org.apache.twill.zookeeper.ZKClients;
-import ch.qos.logback.classic.pattern.ClassOfCallerConverter;
-import ch.qos.logback.classic.pattern.FileOfCallerConverter;
-import ch.qos.logback.classic.pattern.LineOfCallerConverter;
-import ch.qos.logback.classic.pattern.MethodOfCallerConverter;
-import ch.qos.logback.classic.spi.ILoggingEvent;
-import ch.qos.logback.classic.spi.IThrowableProxy;
-import ch.qos.logback.classic.spi.StackTraceElementProxy;
-import ch.qos.logback.core.AppenderBase;
-import com.google.common.base.Charsets;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.gson.stream.JsonWriter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- *
- */
-public final class KafkaAppender extends AppenderBase<ILoggingEvent> {
-
-  private static final Logger LOG = LoggerFactory.getLogger(KafkaAppender.class);
-
-  private final LogEventConverter eventConverter;
-  private final AtomicReference<PreparePublish> publisher;
-  private final Runnable flushTask;
-  /**
-   * Rough count of how many entries are being buffered. It's just approximate, not exact.
-   */
-  private final AtomicInteger bufferedSize;
-
-  private ZKClientService zkClientService;
-  private KafkaClient kafkaClient;
-  private String zkConnectStr;
-  private String hostname;
-  private String topic;
-  private Queue<String> buffer;
-  private int flushLimit = 20;
-  private int flushPeriod = 100;
-  private ScheduledExecutorService scheduler;
-
-  public KafkaAppender() {
-    eventConverter = new LogEventConverter();
-    publisher = new AtomicReference<PreparePublish>();
-    flushTask = createFlushTask();
-    bufferedSize = new AtomicInteger();
-    buffer = new ConcurrentLinkedQueue<String>();
-  }
-
-  /**
-   * Sets the zookeeper connection string. Called by slf4j.
-   */
-  @SuppressWarnings("unused")
-  public void setZookeeper(String zkConnectStr) {
-    this.zkConnectStr = zkConnectStr;
-  }
-
-  /**
-   * Sets the hostname. Called by slf4j.
-   */
-  @SuppressWarnings("unused")
-  public void setHostname(String hostname) {
-    this.hostname = hostname;
-  }
-
-  /**
-   * Sets the topic name for publishing logs. Called by slf4j.
-   */
-  @SuppressWarnings("unused")
-  public void setTopic(String topic) {
-    this.topic = topic;
-  }
-
-  /**
-   * Sets the maximum number of cached log entries before performing an force flush. Called by slf4j.
-   */
-  @SuppressWarnings("unused")
-  public void setFlushLimit(int flushLimit) {
-    this.flushLimit = flushLimit;
-  }
-
-  /**
-   * Sets the periodic flush time in milliseconds. Called by slf4j.
-   */
-  @SuppressWarnings("unused")
-  public void setFlushPeriod(int flushPeriod) {
-    this.flushPeriod = flushPeriod;
-  }
-
-  @Override
-  public void start() {
-    Preconditions.checkNotNull(zkConnectStr);
-
-    scheduler = Executors.newSingleThreadScheduledExecutor(Threads.createDaemonThreadFactory("kafka-logger"));
-
-    zkClientService = ZKClientServices.delegate(
-      ZKClients.reWatchOnExpire(
-        ZKClients.retryOnFailure(ZKClientService.Builder.of(zkConnectStr).build(),
-                                 RetryStrategies.fixDelay(1, TimeUnit.SECONDS))));
-
-    kafkaClient = new SimpleKafkaClient(zkClientService);
-    Futures.addCallback(Services.chainStart(zkClientService, kafkaClient), new FutureCallback<Object>() {
-      @Override
-      public void onSuccess(Object result) {
-        LOG.info("Kafka client started: " + zkConnectStr);
-        publisher.set(kafkaClient.preparePublish(topic, Compression.SNAPPY));
-        scheduler.scheduleWithFixedDelay(flushTask, 0, flushPeriod, TimeUnit.MILLISECONDS);
-      }
-
-      @Override
-      public void onFailure(Throwable t) {
-        // Fail to talk to kafka. Other than logging, what can be done?
-        LOG.error("Failed to start kafka client.", t);
-      }
-    });
-
-    super.start();
-  }
-
-  @Override
-  public void stop() {
-    super.stop();
-    scheduler.shutdownNow();
-    Futures.getUnchecked(Services.chainStop(kafkaClient, zkClientService));
-  }
-
-  public void forceFlush() {
-    try {
-      publishLogs().get(2, TimeUnit.SECONDS);
-    } catch (Exception e) {
-      LOG.error("Failed to publish last batch of log.", e);
-    }
-  }
-
-  @Override
-  protected void append(ILoggingEvent eventObject) {
-    buffer.offer(eventConverter.convert(eventObject));
-    if (bufferedSize.incrementAndGet() >= flushLimit && publisher.get() != null) {
-      // Try to do a extra flush
-      scheduler.submit(flushTask);
-    }
-  }
-
-  private ListenableFuture<Integer> publishLogs() {
-    // If the publisher is not available, simply returns a completed future.
-    PreparePublish publisher = KafkaAppender.this.publisher.get();
-    if (publisher == null) {
-      return Futures.immediateFuture(0);
-    }
-
-    int count = 0;
-    for (String json : Iterables.consumingIterable(buffer)) {
-      publisher.add(Charsets.UTF_8.encode(json), 0);
-      count++;
-    }
-    // Nothing to publish, simply returns a completed future.
-    if (count == 0) {
-      return Futures.immediateFuture(0);
-    }
-
-    bufferedSize.set(0);
-    final int finalCount = count;
-    return Futures.transform(publisher.publish(), new Function<Object, Integer>() {
-      @Override
-      public Integer apply(Object input) {
-        return finalCount;
-      }
-    });
-  }
-
-  /**
-   * Creates a {@link Runnable} that writes all logs in the buffer into kafka.
-   * @return The Runnable task
-   */
-  private Runnable createFlushTask() {
-    return new Runnable() {
-      @Override
-      public void run() {
-        Futures.addCallback(publishLogs(), new FutureCallback<Integer>() {
-          @Override
-          public void onSuccess(Integer result) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Log entries published, size=" + result);
-            }
-          }
-
-          @Override
-          public void onFailure(Throwable t) {
-            LOG.error("Failed to push logs to kafka. Log entries dropped.", t);
-          }
-        });
-      }
-    };
-  }
-
-  /**
-   * Helper class to convert {@link ILoggingEvent} into json string.
-   */
-  private final class LogEventConverter {
-
-    private final ClassOfCallerConverter classNameConverter = new ClassOfCallerConverter();
-    private final MethodOfCallerConverter methodConverter = new MethodOfCallerConverter();
-    private final FileOfCallerConverter fileConverter = new FileOfCallerConverter();
-    private final LineOfCallerConverter lineConverter = new LineOfCallerConverter();
-
-    private String convert(ILoggingEvent event) {
-      StringWriter result = new StringWriter();
-      JsonWriter writer = new JsonWriter(result);
-
-      try {
-        try {
-          writer.beginObject();
-          writer.name("name").value(event.getLoggerName());
-          writer.name("host").value(hostname);
-          writer.name("timestamp").value(Long.toString(event.getTimeStamp()));
-          writer.name("level").value(event.getLevel().toString());
-          writer.name("className").value(classNameConverter.convert(event));
-          writer.name("method").value(methodConverter.convert(event));
-          writer.name("file").value(fileConverter.convert(event));
-          writer.name("line").value(lineConverter.convert(event));
-          writer.name("thread").value(event.getThreadName());
-          writer.name("message").value(event.getFormattedMessage());
-          writer.name("stackTraces");
-          encodeStackTraces(event.getThrowableProxy(), writer);
-
-          writer.endObject();
-        } finally {
-          writer.close();
-        }
-      } catch (IOException e) {
-        throw Throwables.propagate(e);
-      }
-
-      return result.toString();
-    }
-
-    private void encodeStackTraces(IThrowableProxy throwable, JsonWriter writer) throws IOException {
-      writer.beginArray();
-      try {
-        if (throwable == null) {
-          return;
-        }
-
-        for (StackTraceElementProxy stackTrace : throwable.getStackTraceElementProxyArray()) {
-          writer.beginObject();
-
-          StackTraceElement element = stackTrace.getStackTraceElement();
-          writer.name("className").value(element.getClassName());
-          writer.name("method").value(element.getMethodName());
-          writer.name("file").value(element.getFileName());
-          writer.name("line").value(element.getLineNumber());
-
-          writer.endObject();
-        }
-      } finally {
-        writer.endArray();
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/logging/KafkaTwillRunnable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/logging/KafkaTwillRunnable.java b/core/src/main/java/org/apache/twill/internal/logging/KafkaTwillRunnable.java
deleted file mode 100644
index c1695de..0000000
--- a/core/src/main/java/org/apache/twill/internal/logging/KafkaTwillRunnable.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.logging;
-
-import org.apache.twill.api.Command;
-import org.apache.twill.api.TwillContext;
-import org.apache.twill.api.TwillRunnable;
-import org.apache.twill.api.TwillRunnableSpecification;
-import org.apache.twill.internal.EnvKeys;
-import org.apache.twill.internal.kafka.EmbeddedKafkaServer;
-import org.apache.twill.internal.utils.Networks;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
-
-/**
- * A {@link org.apache.twill.api.TwillRunnable} for managing Kafka server.
- */
-public final class KafkaTwillRunnable implements TwillRunnable {
-
-  private static final Logger LOG = LoggerFactory.getLogger(KafkaTwillRunnable.class);
-
-  private final String kafkaDir;
-  private EmbeddedKafkaServer server;
-  private CountDownLatch stopLatch;
-
-  public KafkaTwillRunnable(String kafkaDir) {
-    this.kafkaDir = kafkaDir;
-  }
-
-  @Override
-  public TwillRunnableSpecification configure() {
-    return TwillRunnableSpecification.Builder.with()
-      .setName("kafka")
-      .withConfigs(ImmutableMap.of("kafkaDir", kafkaDir))
-      .build();
-  }
-
-  @Override
-  public void initialize(TwillContext context) {
-    Map<String, String> args = context.getSpecification().getConfigs();
-    String zkConnectStr = System.getenv(EnvKeys.TWILL_LOG_KAFKA_ZK);
-    stopLatch = new CountDownLatch(1);
-
-    try {
-      server = new EmbeddedKafkaServer(new File(args.get("kafkaDir")), generateKafkaConfig(zkConnectStr));
-      server.startAndWait();
-    } catch (Exception e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-  @Override
-  public void handleCommand(Command command) throws Exception {
-  }
-
-  @Override
-  public void stop() {
-    stopLatch.countDown();
-  }
-
-  @Override
-  public void destroy() {
-    server.stopAndWait();
-  }
-
-  @Override
-  public void run() {
-    try {
-      stopLatch.await();
-    } catch (InterruptedException e) {
-      LOG.info("Running thread interrupted, shutting down kafka server.", e);
-    }
-  }
-
-  private Properties generateKafkaConfig(String zkConnectStr) {
-    int port = Networks.getRandomPort();
-    Preconditions.checkState(port > 0, "Failed to get random port.");
-
-    Properties prop = new Properties();
-    prop.setProperty("log.dir", new File("kafka-logs").getAbsolutePath());
-    prop.setProperty("zk.connect", zkConnectStr);
-    prop.setProperty("num.threads", "8");
-    prop.setProperty("port", Integer.toString(port));
-    prop.setProperty("log.flush.interval", "10000");
-    prop.setProperty("max.socket.request.bytes", "104857600");
-    prop.setProperty("log.cleanup.interval.mins", "1");
-    prop.setProperty("log.default.flush.scheduler.interval.ms", "1000");
-    prop.setProperty("zk.connectiontimeout.ms", "1000000");
-    prop.setProperty("socket.receive.buffer", "1048576");
-    prop.setProperty("enable.zookeeper", "true");
-    prop.setProperty("log.retention.hours", "168");
-    prop.setProperty("brokerid", "0");
-    prop.setProperty("socket.send.buffer", "1048576");
-    prop.setProperty("num.partitions", "1");
-    prop.setProperty("log.file.size", "536870912");
-    prop.setProperty("log.default.flush.interval.ms", "1000");
-    return prop;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/logging/LogEntryDecoder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/logging/LogEntryDecoder.java b/core/src/main/java/org/apache/twill/internal/logging/LogEntryDecoder.java
deleted file mode 100644
index dc11666..0000000
--- a/core/src/main/java/org/apache/twill/internal/logging/LogEntryDecoder.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.logging;
-
-import org.apache.twill.api.logging.LogEntry;
-import org.apache.twill.internal.json.JsonUtils;
-import com.google.gson.JsonDeserializationContext;
-import com.google.gson.JsonDeserializer;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParseException;
-
-import java.lang.reflect.Type;
-
-/**
- * A {@link com.google.gson.Gson} decoder for {@link LogEntry}.
- */
-public final class LogEntryDecoder implements JsonDeserializer<LogEntry> {
-
-  @Override
-  public LogEntry deserialize(JsonElement json, Type typeOfT,
-                              JsonDeserializationContext context) throws JsonParseException {
-    if (!json.isJsonObject()) {
-      return null;
-    }
-    JsonObject jsonObj = json.getAsJsonObject();
-
-    final String name = JsonUtils.getAsString(jsonObj, "name");
-    final String host = JsonUtils.getAsString(jsonObj, "host");
-    final long timestamp = JsonUtils.getAsLong(jsonObj, "timestamp", 0);
-    LogEntry.Level l;
-    try {
-      l = LogEntry.Level.valueOf(JsonUtils.getAsString(jsonObj, "level"));
-    } catch (Exception e) {
-      l = LogEntry.Level.FATAL;
-    }
-    final LogEntry.Level logLevel = l;
-    final String className = JsonUtils.getAsString(jsonObj, "className");
-    final String method = JsonUtils.getAsString(jsonObj, "method");
-    final String file = JsonUtils.getAsString(jsonObj, "file");
-    final String line = JsonUtils.getAsString(jsonObj, "line");
-    final String thread = JsonUtils.getAsString(jsonObj, "thread");
-    final String message = JsonUtils.getAsString(jsonObj, "message");
-
-    final StackTraceElement[] stackTraces = context.deserialize(jsonObj.get("stackTraces").getAsJsonArray(),
-                                                                StackTraceElement[].class);
-
-    return new LogEntry() {
-      @Override
-      public String getLoggerName() {
-        return name;
-      }
-
-      @Override
-      public String getHost() {
-        return host;
-      }
-
-      @Override
-      public long getTimestamp() {
-        return timestamp;
-      }
-
-      @Override
-      public Level getLogLevel() {
-        return logLevel;
-      }
-
-      @Override
-      public String getSourceClassName() {
-        return className;
-      }
-
-      @Override
-      public String getSourceMethodName() {
-        return method;
-      }
-
-      @Override
-      public String getFileName() {
-        return file;
-      }
-
-      @Override
-      public int getLineNumber() {
-        if (line.equals("?")) {
-          return -1;
-        } else {
-          return Integer.parseInt(line);
-        }
-      }
-
-      @Override
-      public String getThreadName() {
-        return thread;
-      }
-
-      @Override
-      public String getMessage() {
-        return message;
-      }
-
-      @Override
-      public StackTraceElement[] getStackTraces() {
-        return stackTraces;
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/logging/Loggings.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/logging/Loggings.java b/core/src/main/java/org/apache/twill/internal/logging/Loggings.java
deleted file mode 100644
index 9baed63..0000000
--- a/core/src/main/java/org/apache/twill/internal/logging/Loggings.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.logging;
-
-import ch.qos.logback.classic.Logger;
-import ch.qos.logback.classic.LoggerContext;
-import ch.qos.logback.classic.spi.ILoggingEvent;
-import ch.qos.logback.core.Appender;
-import org.slf4j.ILoggerFactory;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- */
-public final class Loggings {
-
-  public static void forceFlush() {
-    ILoggerFactory loggerFactory = LoggerFactory.getILoggerFactory();
-
-    if (loggerFactory instanceof LoggerContext) {
-      Appender<ILoggingEvent> appender = ((LoggerContext) loggerFactory).getLogger(Logger.ROOT_LOGGER_NAME)
-                                                                        .getAppender("KAFKA");
-      if (appender != null && appender instanceof KafkaAppender) {
-        ((KafkaAppender) appender).forceFlush();
-      }
-    }
-  }
-
-  private Loggings() {
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/package-info.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/package-info.java b/core/src/main/java/org/apache/twill/internal/package-info.java
deleted file mode 100644
index a8459e0..0000000
--- a/core/src/main/java/org/apache/twill/internal/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * This package provides internal classes for Twill.
- */
-package org.apache.twill.internal;

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/state/Message.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/state/Message.java b/core/src/main/java/org/apache/twill/internal/state/Message.java
deleted file mode 100644
index 6c3e719..0000000
--- a/core/src/main/java/org/apache/twill/internal/state/Message.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.state;
-
-import org.apache.twill.api.Command;
-
-/**
- *
- */
-public interface Message {
-
-  /**
-   * Type of message.
-   */
-  enum Type {
-    SYSTEM,
-    USER
-  }
-
-  /**
-   * Scope of the message.
-   */
-  enum Scope {
-    APPLICATION,
-    ALL_RUNNABLE,
-    RUNNABLE
-  }
-
-  Type getType();
-
-  Scope getScope();
-
-  /**
-   * @return the name of the target runnable if scope is {@link Scope#RUNNABLE} or {@code null} otherwise.
-   */
-  String getRunnableName();
-
-  Command getCommand();
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/state/MessageCallback.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/state/MessageCallback.java b/core/src/main/java/org/apache/twill/internal/state/MessageCallback.java
deleted file mode 100644
index f94eaa3..0000000
--- a/core/src/main/java/org/apache/twill/internal/state/MessageCallback.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.state;
-
-import com.google.common.util.concurrent.ListenableFuture;
-
-/**
- *
- */
-public interface MessageCallback {
-
-  /**
-   * Called when a message is received.
-   * @param message Message being received.
-   * @return A {@link ListenableFuture} that would be completed when message processing is completed or failed.
-   *         The result of the future should be the input message Id if succeeded.
-   */
-  ListenableFuture<String> onReceived(String messageId, Message message);
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/state/MessageCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/state/MessageCodec.java b/core/src/main/java/org/apache/twill/internal/state/MessageCodec.java
deleted file mode 100644
index 176f620..0000000
--- a/core/src/main/java/org/apache/twill/internal/state/MessageCodec.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.state;
-
-import org.apache.twill.api.Command;
-import com.google.common.base.Charsets;
-import com.google.common.reflect.TypeToken;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonDeserializationContext;
-import com.google.gson.JsonDeserializer;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParseException;
-import com.google.gson.JsonSerializationContext;
-import com.google.gson.JsonSerializer;
-
-import java.lang.reflect.Type;
-import java.util.Map;
-
-/**
- *
- */
-public final class MessageCodec {
-
-  private static final Type OPTIONS_TYPE = new TypeToken<Map<String, String>>() {}.getType();
-  private static final Gson GSON = new GsonBuilder()
-                                        .registerTypeAdapter(Message.class, new MessageAdapter())
-                                        .registerTypeAdapter(Command.class, new CommandAdapter())
-                                        .create();
-
-  /**
-   * Decodes a {@link Message} from the given byte array.
-   * @param bytes byte array to be decoded
-   * @return Message decoded or {@code null} if fails to decode.
-   */
-  public static Message decode(byte[] bytes) {
-    if (bytes == null) {
-      return null;
-    }
-    String content = new String(bytes, Charsets.UTF_8);
-    return GSON.fromJson(content, Message.class);
-  }
-
-  /**
-   * Encodes a {@link Message} into byte array. Revserse of {@link #decode(byte[])} method.
-   * @param message Message to be encoded
-   * @return byte array representing the encoded message.
-   */
-  public static byte[] encode(Message message) {
-    return GSON.toJson(message, Message.class).getBytes(Charsets.UTF_8);
-  }
-
-  /**
-   * Gson codec for {@link Message} object.
-   */
-  private static final class MessageAdapter implements JsonSerializer<Message>, JsonDeserializer<Message> {
-
-    @Override
-    public Message deserialize(JsonElement json, Type typeOfT,
-                               JsonDeserializationContext context) throws JsonParseException {
-      JsonObject jsonObj = json.getAsJsonObject();
-
-      Message.Type type = Message.Type.valueOf(jsonObj.get("type").getAsString());
-      Message.Scope scope = Message.Scope.valueOf(jsonObj.get("scope").getAsString());
-      JsonElement name = jsonObj.get("runnableName");
-      String runnableName = (name == null || name.isJsonNull()) ? null : name.getAsString();
-      Command command = context.deserialize(jsonObj.get("command"), Command.class);
-
-      return new SimpleMessage(type, scope, runnableName, command);
-    }
-
-    @Override
-    public JsonElement serialize(Message message, Type typeOfSrc, JsonSerializationContext context) {
-      JsonObject jsonObj = new JsonObject();
-      jsonObj.addProperty("type", message.getType().name());
-      jsonObj.addProperty("scope", message.getScope().name());
-      jsonObj.addProperty("runnableName", message.getRunnableName());
-      jsonObj.add("command", context.serialize(message.getCommand(), Command.class));
-
-      return jsonObj;
-    }
-  }
-
-  /**
-   * Gson codec for {@link Command} object.
-   */
-  private static final class CommandAdapter implements JsonSerializer<Command>, JsonDeserializer<Command> {
-
-    @Override
-    public Command deserialize(JsonElement json, Type typeOfT,
-                               JsonDeserializationContext context) throws JsonParseException {
-      JsonObject jsonObj = json.getAsJsonObject();
-      return Command.Builder.of(jsonObj.get("command").getAsString())
-                            .addOptions(context.<Map<String, String>>deserialize(jsonObj.get("options"), OPTIONS_TYPE))
-                            .build();
-    }
-
-    @Override
-    public JsonElement serialize(Command command, Type typeOfSrc, JsonSerializationContext context) {
-      JsonObject jsonObj = new JsonObject();
-      jsonObj.addProperty("command", command.getCommand());
-      jsonObj.add("options", context.serialize(command.getOptions(), OPTIONS_TYPE));
-      return jsonObj;
-    }
-  }
-
-  private MessageCodec() {
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/state/Messages.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/state/Messages.java b/core/src/main/java/org/apache/twill/internal/state/Messages.java
deleted file mode 100644
index 9783d62..0000000
--- a/core/src/main/java/org/apache/twill/internal/state/Messages.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.state;
-
-import org.apache.twill.api.Command;
-
-/**
- * Factory class for creating instances of {@link Message}.
- */
-public final class Messages {
-
-  /**
-   * Creates a {@link Message.Type#USER} type {@link Message} that sends the giving {@link Command} to a
-   * particular runnable.
-   *
-   * @param runnableName Name of the runnable.
-   * @param command The user command to send.
-   * @return A new instance of {@link Message}.
-   */
-  public static Message createForRunnable(String runnableName, Command command) {
-    return new SimpleMessage(Message.Type.USER, Message.Scope.RUNNABLE, runnableName, command);
-  }
-
-  /**
-   * Creates a {@link Message.Type#USER} type {@link Message} that sends the giving {@link Command} to all
-   * runnables.
-   *
-   * @param command The user command to send.
-   * @return A new instance of {@link Message}.
-   */
-  public static Message createForAll(Command command) {
-    return new SimpleMessage(Message.Type.USER, Message.Scope.ALL_RUNNABLE, null, command);
-  }
-
-  private Messages() {
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/state/SimpleMessage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/state/SimpleMessage.java b/core/src/main/java/org/apache/twill/internal/state/SimpleMessage.java
deleted file mode 100644
index e146e56..0000000
--- a/core/src/main/java/org/apache/twill/internal/state/SimpleMessage.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.state;
-
-import org.apache.twill.api.Command;
-import com.google.common.base.Objects;
-
-/**
- *
- */
-final class SimpleMessage implements Message {
-
-  private final Type type;
-  private final Scope scope;
-  private final String runnableName;
-  private final Command command;
-
-  SimpleMessage(Type type, Scope scope, String runnableName, Command command) {
-    this.type = type;
-    this.scope = scope;
-    this.runnableName = runnableName;
-    this.command = command;
-  }
-
-  @Override
-  public Type getType() {
-    return type;
-  }
-
-  @Override
-  public Scope getScope() {
-    return scope;
-  }
-
-  @Override
-  public String getRunnableName() {
-    return runnableName;
-  }
-
-  @Override
-  public Command getCommand() {
-    return command;
-  }
-
-  @Override
-  public String toString() {
-    return Objects.toStringHelper(Message.class)
-      .add("type", type)
-      .add("scope", scope)
-      .add("runnable", runnableName)
-      .add("command", command)
-      .toString();
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hashCode(type, scope, runnableName, command);
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (obj == this) {
-      return true;
-    }
-    if (!(obj instanceof Message)) {
-      return false;
-    }
-    Message other = (Message) obj;
-    return type == other.getType()
-      && scope == other.getScope()
-      && Objects.equal(runnableName, other.getRunnableName())
-      && Objects.equal(command, other.getCommand());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/state/StateNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/state/StateNode.java b/core/src/main/java/org/apache/twill/internal/state/StateNode.java
deleted file mode 100644
index d66f8a2..0000000
--- a/core/src/main/java/org/apache/twill/internal/state/StateNode.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.state;
-
-import org.apache.twill.api.ServiceController;
-import com.google.common.util.concurrent.Service;
-
-/**
- *
- */
-public final class StateNode {
-
-  private final ServiceController.State state;
-  private final String errorMessage;
-  private final StackTraceElement[] stackTraces;
-
-  /**
-   * Constructs a StateNode with the given state.
-   */
-  public StateNode(ServiceController.State state) {
-    this(state, null, null);
-  }
-
-  /**
-   * Constructs a StateNode with {@link ServiceController.State#FAILED} caused by the given error.
-   */
-  public StateNode(Throwable error) {
-    this(Service.State.FAILED, error.getMessage(), error.getStackTrace());
-  }
-
-  /**
-   * Constructs a StateNode with the given state, error and stacktraces.
-   * This constructor should only be used by the StateNodeCodec.
-   */
-  public StateNode(ServiceController.State state, String errorMessage, StackTraceElement[] stackTraces) {
-    this.state = state;
-    this.errorMessage = errorMessage;
-    this.stackTraces = stackTraces;
-  }
-
-  public ServiceController.State getState() {
-    return state;
-  }
-
-  public String getErrorMessage() {
-    return errorMessage;
-  }
-
-  public StackTraceElement[] getStackTraces() {
-    return stackTraces;
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder builder = new StringBuilder("state=").append(state);
-
-    if (errorMessage != null) {
-      builder.append("\n").append("error=").append(errorMessage);
-    }
-    if (stackTraces != null) {
-      builder.append("\n");
-      for (StackTraceElement stackTrace : stackTraces) {
-        builder.append("\tat ").append(stackTrace.toString()).append("\n");
-      }
-    }
-    return builder.toString();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/state/SystemMessages.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/state/SystemMessages.java b/core/src/main/java/org/apache/twill/internal/state/SystemMessages.java
deleted file mode 100644
index 9877121..0000000
--- a/core/src/main/java/org/apache/twill/internal/state/SystemMessages.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.state;
-
-import org.apache.twill.api.Command;
-import com.google.common.base.Preconditions;
-
-/**
- * Collection of predefined system messages.
- */
-public final class SystemMessages {
-
-  public static final Command STOP_COMMAND = Command.Builder.of("stop").build();
-  public static final Message SECURE_STORE_UPDATED = new SimpleMessage(
-    Message.Type.SYSTEM, Message.Scope.APPLICATION, null, Command.Builder.of("secureStoreUpdated").build());
-
-  public static Message stopApplication() {
-    return new SimpleMessage(Message.Type.SYSTEM, Message.Scope.APPLICATION, null, STOP_COMMAND);
-  }
-
-  public static Message stopRunnable(String runnableName) {
-    return new SimpleMessage(Message.Type.SYSTEM, Message.Scope.RUNNABLE, runnableName, STOP_COMMAND);
-  }
-
-  public static Message setInstances(String runnableName, int instances) {
-    Preconditions.checkArgument(instances > 0, "Instances should be > 0.");
-    return new SimpleMessage(Message.Type.SYSTEM, Message.Scope.RUNNABLE, runnableName,
-                             Command.Builder.of("instances").addOption("count", Integer.toString(instances)).build());
-  }
-
-  private SystemMessages() {
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/utils/Dependencies.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/utils/Dependencies.java b/core/src/main/java/org/apache/twill/internal/utils/Dependencies.java
deleted file mode 100644
index 015b9f5..0000000
--- a/core/src/main/java/org/apache/twill/internal/utils/Dependencies.java
+++ /dev/null
@@ -1,323 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.utils;
-
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.common.io.ByteStreams;
-import org.objectweb.asm.AnnotationVisitor;
-import org.objectweb.asm.ClassReader;
-import org.objectweb.asm.ClassVisitor;
-import org.objectweb.asm.FieldVisitor;
-import org.objectweb.asm.Label;
-import org.objectweb.asm.MethodVisitor;
-import org.objectweb.asm.Opcodes;
-import org.objectweb.asm.Type;
-import org.objectweb.asm.signature.SignatureReader;
-import org.objectweb.asm.signature.SignatureVisitor;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.MalformedURLException;
-import java.net.URI;
-import java.net.URL;
-import java.util.Queue;
-import java.util.Set;
-
-/**
- * Utility class to help find out class dependencies.
- */
-public final class Dependencies {
-
-  /**
-   * Represents a callback for accepting a class during dependency traversal.
-   */
-  public interface ClassAcceptor {
-    /**
-     * Invoked when a class is being found as a dependency.
-     *
-     * @param className Name of the class.
-     * @param classUrl URL for the class resource.
-     * @param classPathUrl URL for the class path resource that contains the class resource.
-     *                     If the URL protocol is {@code file}, it would be the path to root package.
-     *                     If the URL protocol is {@code jar}, it would be the jar file.
-     * @return true keep finding dependencies on the given class.
-     */
-    boolean accept(String className, URL classUrl, URL classPathUrl);
-  }
-
-  public static void findClassDependencies(ClassLoader classLoader,
-                                           ClassAcceptor acceptor,
-                                           String...classesToResolve) throws IOException {
-    findClassDependencies(classLoader, acceptor, ImmutableList.copyOf(classesToResolve));
-  }
-
-  /**
-   * Finds the class dependencies of the given class.
-   * @param classLoader ClassLoader for finding class bytecode.
-   * @param acceptor Predicate to accept a found class and its bytecode.
-   * @param classesToResolve Classes for looking for dependencies.
-   * @throws IOException Thrown where there is error when loading in class bytecode.
-   */
-  public static void findClassDependencies(ClassLoader classLoader,
-                                           ClassAcceptor acceptor,
-                                           Iterable<String> classesToResolve) throws IOException {
-
-    final Set<String> seenClasses = Sets.newHashSet(classesToResolve);
-    final Queue<String> classes = Lists.newLinkedList(classesToResolve);
-
-    // Breadth-first-search classes dependencies.
-    while (!classes.isEmpty()) {
-      String className = classes.remove();
-      URL classUrl = getClassURL(className, classLoader);
-      if (classUrl == null) {
-        continue;
-      }
-
-      // Call the accept to see if it accept the current class.
-      if (!acceptor.accept(className, classUrl, getClassPathURL(className, classUrl))) {
-        continue;
-      }
-
-      InputStream is = classUrl.openStream();
-      try {
-        // Visit the bytecode to lookup classes that the visiting class is depended on.
-        new ClassReader(ByteStreams.toByteArray(is)).accept(new DependencyClassVisitor(new DependencyAcceptor() {
-          @Override
-          public void accept(String className) {
-            // See if the class is accepted
-            if (seenClasses.add(className)) {
-              classes.add(className);
-            }
-          }
-        }), ClassReader.SKIP_DEBUG + ClassReader.SKIP_FRAMES);
-      } finally {
-        is.close();
-      }
-    }
-  }
-
-  /**
-   * Returns the URL for loading the class bytecode of the given class, or null if it is not found or if it is
-   * a system class.
-   */
-  private static URL getClassURL(String className, ClassLoader classLoader) {
-    String resourceName = className.replace('.', '/') + ".class";
-    return classLoader.getResource(resourceName);
-  }
-
-  private static URL getClassPathURL(String className, URL classUrl) {
-    try {
-      if ("file".equals(classUrl.getProtocol())) {
-        String path = classUrl.getFile();
-        // Compute the directory container the class.
-        int endIdx = path.length() - className.length() - ".class".length();
-        if (endIdx > 1) {
-          // If it is not the root directory, return the end index to remove the trailing '/'.
-          endIdx--;
-        }
-        return new URL("file", "", -1, path.substring(0, endIdx));
-      }
-      if ("jar".equals(classUrl.getProtocol())) {
-        String path = classUrl.getFile();
-        return URI.create(path.substring(0, path.indexOf("!/"))).toURL();
-      }
-    } catch (MalformedURLException e) {
-      throw Throwables.propagate(e);
-    }
-    throw new IllegalStateException("Unsupported class URL: " + classUrl);
-  }
-
-  /**
-   * A private interface for accepting a dependent class that is found during bytecode inspection.
-   */
-  private interface DependencyAcceptor {
-    void accept(String className);
-  }
-
-  /**
-   * ASM ClassVisitor for extracting classes dependencies.
-   */
-  private static final class DependencyClassVisitor extends ClassVisitor {
-
-    private final SignatureVisitor signatureVisitor;
-    private final DependencyAcceptor acceptor;
-
-    public DependencyClassVisitor(DependencyAcceptor acceptor) {
-      super(Opcodes.ASM4);
-      this.acceptor = acceptor;
-      this.signatureVisitor = new SignatureVisitor(Opcodes.ASM4) {
-        private String currentClass;
-
-        @Override
-        public void visitClassType(String name) {
-          currentClass = name;
-          addClass(name);
-        }
-
-        @Override
-        public void visitInnerClassType(String name) {
-          addClass(currentClass + "$" + name);
-        }
-      };
-    }
-
-    @Override
-    public void visit(int version, int access, String name, String signature, String superName, String[] interfaces) {
-      addClass(name);
-
-      if (signature != null) {
-        new SignatureReader(signature).accept(signatureVisitor);
-      } else {
-        addClass(superName);
-        addClasses(interfaces);
-      }
-    }
-
-    @Override
-    public void visitOuterClass(String owner, String name, String desc) {
-      addClass(owner);
-    }
-
-    @Override
-    public AnnotationVisitor visitAnnotation(String desc, boolean visible) {
-      addType(Type.getType(desc));
-      return null;
-    }
-
-    @Override
-    public void visitInnerClass(String name, String outerName, String innerName, int access) {
-      addClass(name);
-    }
-
-    @Override
-    public FieldVisitor visitField(int access, String name, String desc, String signature, Object value) {
-      if (signature != null) {
-        new SignatureReader(signature).acceptType(signatureVisitor);
-      } else {
-        addType(Type.getType(desc));
-      }
-
-      return new FieldVisitor(Opcodes.ASM4) {
-        @Override
-        public AnnotationVisitor visitAnnotation(String desc, boolean visible) {
-          addType(Type.getType(desc));
-          return null;
-        }
-      };
-    }
-
-    @Override
-    public MethodVisitor visitMethod(int access, String name, String desc, String signature, String[] exceptions) {
-      if (signature != null) {
-        new SignatureReader(signature).accept(signatureVisitor);
-      } else {
-        addMethod(desc);
-      }
-      addClasses(exceptions);
-
-      return new MethodVisitor(Opcodes.ASM4) {
-        @Override
-        public AnnotationVisitor visitAnnotation(String desc, boolean visible) {
-          addType(Type.getType(desc));
-          return null;
-        }
-
-        @Override
-        public AnnotationVisitor visitParameterAnnotation(int parameter, String desc, boolean visible) {
-          addType(Type.getType(desc));
-          return null;
-        }
-
-        @Override
-        public void visitTypeInsn(int opcode, String type) {
-          addType(Type.getObjectType(type));
-        }
-
-        @Override
-        public void visitFieldInsn(int opcode, String owner, String name, String desc) {
-          addType(Type.getObjectType(owner));
-          addType(Type.getType(desc));
-        }
-
-        @Override
-        public void visitMethodInsn(int opcode, String owner, String name, String desc) {
-          addType(Type.getObjectType(owner));
-          addMethod(desc);
-        }
-
-        @Override
-        public void visitLdcInsn(Object cst) {
-          if (cst instanceof Type) {
-            addType((Type) cst);
-          }
-        }
-
-        @Override
-        public void visitMultiANewArrayInsn(String desc, int dims) {
-          addType(Type.getType(desc));
-        }
-
-        @Override
-        public void visitLocalVariable(String name, String desc, String signature, Label start, Label end, int index) {
-          if (signature != null) {
-            new SignatureReader(signature).acceptType(signatureVisitor);
-          } else {
-            addType(Type.getType(desc));
-          }
-        }
-      };
-    }
-
-    private void addClass(String internalName) {
-      if (internalName == null || internalName.startsWith("java/")) {
-        return;
-      }
-      acceptor.accept(Type.getObjectType(internalName).getClassName());
-    }
-
-    private void addClasses(String[] classes) {
-      if (classes != null) {
-        for (String clz : classes) {
-          addClass(clz);
-        }
-      }
-    }
-
-    private void addType(Type type) {
-      if (type.getSort() == Type.ARRAY) {
-        type = type.getElementType();
-      }
-      if (type.getSort() == Type.OBJECT) {
-        addClass(type.getInternalName());
-      }
-    }
-
-    private void addMethod(String desc) {
-      addType(Type.getReturnType(desc));
-      for (Type type : Type.getArgumentTypes(desc)) {
-        addType(type);
-      }
-    }
-  }
-
-  private Dependencies() {
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/utils/Instances.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/utils/Instances.java b/core/src/main/java/org/apache/twill/internal/utils/Instances.java
deleted file mode 100644
index 28bfce9..0000000
--- a/core/src/main/java/org/apache/twill/internal/utils/Instances.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.utils;
-
-import com.google.common.base.Defaults;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.reflect.TypeToken;
-
-import java.lang.reflect.Constructor;
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-
-/**
- * Utility class to help instantiate object instance from class.
- */
-public final class Instances {
-
-  private static final Object UNSAFE;
-  private static final Method UNSAFE_NEW_INSTANCE;
-
-  static {
-    Object unsafe;
-    Method newInstance;
-    try {
-      Class<?> clz = Class.forName("sun.misc.Unsafe");
-      Field f = clz.getDeclaredField("theUnsafe");
-      f.setAccessible(true);
-      unsafe = f.get(null);
-
-      newInstance = clz.getMethod("allocateInstance", Class.class);
-    } catch (Exception e) {
-      unsafe = null;
-      newInstance = null;
-    }
-    UNSAFE = unsafe;
-    UNSAFE_NEW_INSTANCE = newInstance;
-  }
-
-  /**
-   * Creates a new instance of the given class. It will use the default constructor if it is presents.
-   * Otherwise it will try to use {@link sun.misc.Unsafe#allocateInstance(Class)} to create the instance.
-   * @param clz Class of object to be instantiated.
-   * @param <T> Type of the class
-   * @return An instance of type {@code <T>}
-   */
-  @SuppressWarnings("unchecked")
-  public static <T> T newInstance(Class<T> clz) {
-    try {
-      try {
-        Constructor<T> cons = clz.getDeclaredConstructor();
-        if (!cons.isAccessible()) {
-          cons.setAccessible(true);
-        }
-        return cons.newInstance();
-      } catch (Exception e) {
-        // Try to use Unsafe
-        Preconditions.checkState(UNSAFE != null, "Fail to instantiate with Unsafe.");
-        return unsafeCreate(clz);
-      }
-    } catch (Exception e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-
-  /**
-   * Creates an instance of the given using Unsafe. It also initialize all fields into default values.
-   */
-  private static <T> T unsafeCreate(Class<T> clz) throws InvocationTargetException, IllegalAccessException {
-    T instance = (T) UNSAFE_NEW_INSTANCE.invoke(UNSAFE, clz);
-
-    for (TypeToken<?> type : TypeToken.of(clz).getTypes().classes()) {
-      if (Object.class.equals(type.getRawType())) {
-        break;
-      }
-      for (Field field : type.getRawType().getDeclaredFields()) {
-        if (Modifier.isStatic(field.getModifiers())) {
-          continue;
-        }
-        if (!field.isAccessible()) {
-          field.setAccessible(true);
-        }
-        field.set(instance, Defaults.defaultValue(field.getType()));
-      }
-    }
-
-    return instance;
-  }
-
-
-  private Instances() {
-    // Protect instantiation of this class
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/utils/Networks.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/utils/Networks.java b/core/src/main/java/org/apache/twill/internal/utils/Networks.java
deleted file mode 100644
index 8e7d736..0000000
--- a/core/src/main/java/org/apache/twill/internal/utils/Networks.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.utils;
-
-import java.io.IOException;
-import java.net.ServerSocket;
-
-/**
- *
- */
-public final class Networks {
-
-  /**
-   * Find a random free port in localhost for binding.
-   * @return A port number or -1 for failure.
-   */
-  public static int getRandomPort() {
-    try {
-      ServerSocket socket = new ServerSocket(0);
-      try {
-        return socket.getLocalPort();
-      } finally {
-        socket.close();
-      }
-    } catch (IOException e) {
-      return -1;
-    }
-  }
-
-  private Networks() {
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/utils/Paths.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/utils/Paths.java b/core/src/main/java/org/apache/twill/internal/utils/Paths.java
deleted file mode 100644
index aeee09f..0000000
--- a/core/src/main/java/org/apache/twill/internal/utils/Paths.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.utils;
-
-import com.google.common.io.Files;
-
-/**
- *
- */
-public final class Paths {
-
-
-  public static String appendSuffix(String extractFrom, String appendTo) {
-    String suffix = getExtension(extractFrom);
-    if (!suffix.isEmpty()) {
-      return appendTo + '.' + suffix;
-    }
-    return appendTo;
-  }
-
-  public static String getExtension(String path) {
-    if (path.endsWith(".tar.gz")) {
-      return "tar.gz";
-    }
-
-    return Files.getFileExtension(path);
-  }
-
-  private Paths() {
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/kafka/client/FetchException.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/kafka/client/FetchException.java b/core/src/main/java/org/apache/twill/kafka/client/FetchException.java
deleted file mode 100644
index acccf04..0000000
--- a/core/src/main/java/org/apache/twill/kafka/client/FetchException.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.kafka.client;
-
-/**
- *
- */
-public final class FetchException extends RuntimeException {
-
-  private final ErrorCode errorCode;
-
-  public FetchException(String message, ErrorCode errorCode) {
-    super(message);
-    this.errorCode = errorCode;
-  }
-
-  public ErrorCode getErrorCode() {
-    return errorCode;
-  }
-
-  @Override
-  public String toString() {
-    return String.format("%s. Error code: %s", super.toString(), errorCode);
-  }
-
-  public enum ErrorCode {
-    UNKNOWN(-1),
-    OK(0),
-    OFFSET_OUT_OF_RANGE(1),
-    INVALID_MESSAGE(2),
-    WRONG_PARTITION(3),
-    INVALID_FETCH_SIZE(4);
-
-    private final int code;
-
-    ErrorCode(int code) {
-      this.code = code;
-    }
-
-    public int getCode() {
-      return code;
-    }
-
-    public static ErrorCode fromCode(int code) {
-      switch (code) {
-        case -1:
-          return UNKNOWN;
-        case 0:
-          return OK;
-        case 1:
-          return OFFSET_OUT_OF_RANGE;
-        case 2:
-          return INVALID_MESSAGE;
-        case 3:
-          return WRONG_PARTITION;
-        case 4:
-          return INVALID_FETCH_SIZE;
-      }
-      throw new IllegalArgumentException("Unknown error code");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/kafka/client/FetchedMessage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/kafka/client/FetchedMessage.java b/core/src/main/java/org/apache/twill/kafka/client/FetchedMessage.java
deleted file mode 100644
index 65e140f..0000000
--- a/core/src/main/java/org/apache/twill/kafka/client/FetchedMessage.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.kafka.client;
-
-import java.nio.ByteBuffer;
-
-/**
- * Represents a message fetched from kafka broker.
- */
-public interface FetchedMessage {
-
-  /**
-   * Returns the message offset.
-   */
-  long getOffset();
-
-  /**
-   * Returns the message payload.
-   */
-  ByteBuffer getBuffer();
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/kafka/client/KafkaClient.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/kafka/client/KafkaClient.java b/core/src/main/java/org/apache/twill/kafka/client/KafkaClient.java
deleted file mode 100644
index 496195b..0000000
--- a/core/src/main/java/org/apache/twill/kafka/client/KafkaClient.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.kafka.client;
-
-import org.apache.twill.internal.kafka.client.Compression;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.Service;
-
-import java.util.Iterator;
-
-/**
- * This interface provides methods for interacting with kafka broker. It also
- * extends from {@link Service} for lifecycle management. The {@link #start()} method
- * must be called prior to other methods in this class. When instance of this class
- * is not needed, call {@link #stop()}} to release any resources that it holds.
- */
-public interface KafkaClient extends Service {
-
-  PreparePublish preparePublish(String topic, Compression compression);
-
-  Iterator<FetchedMessage> consume(String topic, int partition, long offset, int maxSize);
-
-  /**
-   * Fetches offset from the given topic and partition.
-   * @param topic Topic to fetch from.
-   * @param partition Partition to fetch from.
-   * @param time The first offset of every segment file for a given partition with a modified time less than time.
-   *             {@code -1} for latest offset, {@code -2} for earliest offset.
-   * @param maxOffsets Maximum number of offsets to fetch.
-   * @return A Future that carry the result as an array of offsets in descending order.
-   *         The size of the result array would not be larger than maxOffsets. If there is any error during the fetch,
-   *         the exception will be carried in the exception.
-   */
-  ListenableFuture<long[]> getOffset(String topic, int partition, long time, int maxOffsets);
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/kafka/client/PreparePublish.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/kafka/client/PreparePublish.java b/core/src/main/java/org/apache/twill/kafka/client/PreparePublish.java
deleted file mode 100644
index 5db4abb..0000000
--- a/core/src/main/java/org/apache/twill/kafka/client/PreparePublish.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.kafka.client;
-
-import com.google.common.util.concurrent.ListenableFuture;
-
-import java.nio.ByteBuffer;
-
-/**
- * This interface is for preparing to publish a set of messages to kafka.
- */
-public interface PreparePublish {
-
-  PreparePublish add(byte[] payload, Object partitionKey);
-
-  PreparePublish add(ByteBuffer payload, Object partitionKey);
-
-  ListenableFuture<?> publish();
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/kafka/client/package-info.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/kafka/client/package-info.java b/core/src/main/java/org/apache/twill/kafka/client/package-info.java
deleted file mode 100644
index ea3bf20..0000000
--- a/core/src/main/java/org/apache/twill/kafka/client/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * This package provides a pure java Kafka client interface.
- */
-package org.apache.twill.kafka.client;

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/launcher/TwillLauncher.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/launcher/TwillLauncher.java b/core/src/main/java/org/apache/twill/launcher/TwillLauncher.java
deleted file mode 100644
index 2c8c1ef..0000000
--- a/core/src/main/java/org/apache/twill/launcher/TwillLauncher.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.launcher;
-
-import java.io.BufferedOutputStream;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.lang.reflect.Method;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.jar.JarEntry;
-import java.util.jar.JarInputStream;
-
-/**
- * A launcher for application from a archive jar.
- * This class should have no dependencies on any library except the J2SE one.
- * This class should not import any thing except java.*
- */
-public final class TwillLauncher {
-
-  private static final int TEMP_DIR_ATTEMPTS = 20;
-
-  /**
-   * Main method to unpackage a jar and run the mainClass.main() method.
-   * @param args args[0] is the path to jar file, args[1] is the class name of the mainClass.
-   *             The rest of args will be passed the mainClass unmodified.
-   */
-  public static void main(String[] args) throws Exception {
-    if (args.length < 3) {
-      System.out.println("Usage: java " + TwillLauncher.class.getName() + " [jarFile] [mainClass] [use_classpath]");
-      return;
-    }
-
-    File file = new File(args[0]);
-    final File targetDir = createTempDir("twill.launcher");
-
-    Runtime.getRuntime().addShutdownHook(new Thread() {
-      @Override
-      public void run() {
-        System.out.println("Cleanup directory " + targetDir);
-        deleteDir(targetDir);
-      }
-    });
-
-    System.out.println("UnJar " + file + " to " + targetDir);
-    unJar(file, targetDir);
-
-    // Create ClassLoader
-    URLClassLoader classLoader = createClassLoader(targetDir, Boolean.parseBoolean(args[2]));
-    Thread.currentThread().setContextClassLoader(classLoader);
-
-    System.out.println("Launch class with classpath: " + Arrays.toString(classLoader.getURLs()));
-
-    Class<?> mainClass = classLoader.loadClass(args[1]);
-    Method mainMethod = mainClass.getMethod("main", String[].class);
-    String[] arguments = Arrays.copyOfRange(args, 3, args.length);
-    System.out.println("Launching main: " + mainMethod + " " + Arrays.toString(arguments));
-    mainMethod.invoke(mainClass, new Object[]{arguments});
-    System.out.println("Main class completed.");
-
-    System.out.println("Launcher completed");
-  }
-
-  /**
-   * This method is copied from Guava Files.createTempDir().
-   */
-  private static File createTempDir(String prefix) throws IOException {
-    File baseDir = new File(System.getProperty("java.io.tmpdir"));
-    if (!baseDir.isDirectory() && !baseDir.mkdirs()) {
-      throw new IOException("Tmp directory not exists: " + baseDir.getAbsolutePath());
-    }
-
-    String baseName = prefix + "-" + System.currentTimeMillis() + "-";
-
-    for (int counter = 0; counter < TEMP_DIR_ATTEMPTS; counter++) {
-      File tempDir = new File(baseDir, baseName + counter);
-      if (tempDir.mkdir()) {
-        return tempDir;
-      }
-    }
-    throw new IOException("Failed to create directory within "
-                            + TEMP_DIR_ATTEMPTS + " attempts (tried "
-                            + baseName + "0 to " + baseName + (TEMP_DIR_ATTEMPTS - 1) + ')');
-  }
-
-  private static void unJar(File jarFile, File targetDir) throws IOException {
-    JarInputStream jarInput = new JarInputStream(new FileInputStream(jarFile));
-    try {
-      JarEntry jarEntry = jarInput.getNextJarEntry();
-      while (jarEntry != null) {
-        File target = new File(targetDir, jarEntry.getName());
-        if (jarEntry.isDirectory()) {
-          target.mkdirs();
-        } else {
-          target.getParentFile().mkdirs();
-          copy(jarInput, target);
-        }
-        jarEntry = jarInput.getNextJarEntry();
-      }
-    } finally {
-      jarInput.close();
-    }
-  }
-
-  private static void copy(InputStream is, File file) throws IOException {
-    byte[] buf = new byte[8192];
-    OutputStream os = new BufferedOutputStream(new FileOutputStream(file));
-    try {
-      int len = is.read(buf);
-      while (len != -1) {
-        os.write(buf, 0, len);
-        len = is.read(buf);
-      }
-    } finally {
-      os.close();
-    }
-  }
-
-  private static URLClassLoader createClassLoader(File dir, boolean useClassPath) {
-    try {
-      List<URL> urls = new ArrayList<URL>();
-      urls.add(dir.toURI().toURL());
-      urls.add(new File(dir, "classes").toURI().toURL());
-      urls.add(new File(dir, "resources").toURI().toURL());
-
-      File libDir = new File(dir, "lib");
-      File[] files = libDir.listFiles();
-      if (files != null) {
-        for (File file : files) {
-          if (file.getName().endsWith(".jar")) {
-            urls.add(file.toURI().toURL());
-          }
-        }
-      }
-
-      if (useClassPath) {
-        InputStream is = ClassLoader.getSystemResourceAsStream("classpath");
-        if (is != null) {
-          try {
-            BufferedReader reader = new BufferedReader(new InputStreamReader(is, Charset.forName("UTF-8")));
-            String line = reader.readLine();
-            if (line != null) {
-              for (String path : line.split(":")) {
-                urls.addAll(getClassPaths(path));
-              }
-            }
-          } finally {
-            is.close();
-          }
-        }
-      }
-
-      return new URLClassLoader(urls.toArray(new URL[0]));
-
-    } catch (Exception e) {
-      throw new IllegalStateException(e);
-    }
-  }
-
-  private static Collection<URL> getClassPaths(String path) throws MalformedURLException {
-    String classpath = expand(path);
-    if (classpath.endsWith("/*")) {
-      // Grab all .jar files
-      File dir = new File(classpath.substring(0, classpath.length() - 2));
-      File[] files = dir.listFiles();
-      if (files == null || files.length == 0) {
-        return singleItem(dir.toURI().toURL());
-      }
-
-      List<URL> result = new ArrayList<URL>(files.length);
-      for (File file : files) {
-        if (file.getName().endsWith(".jar")) {
-          result.add(file.toURI().toURL());
-        }
-      }
-      return result;
-    } else {
-      return singleItem(new File(classpath).toURI().toURL());
-    }
-  }
-
-  private static Collection<URL> singleItem(URL url) {
-    List<URL> result = new ArrayList<URL>(1);
-    result.add(url);
-    return result;
-  }
-
-  private static String expand(String value) {
-    String result = value;
-    for (Map.Entry<String, String> entry : System.getenv().entrySet()) {
-      result = result.replace("$" + entry.getKey(), entry.getValue());
-      result = result.replace("${" + entry.getKey() + "}", entry.getValue());
-    }
-    return result;
-  }
-
-  private static void deleteDir(File dir) {
-    File[] files = dir.listFiles();
-    if (files == null || files.length == 0) {
-      dir.delete();
-      return;
-    }
-    for (File file : files) {
-      deleteDir(file);
-    }
-    dir.delete();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/resources/kafka-0.7.2.tgz
----------------------------------------------------------------------
diff --git a/core/src/main/resources/kafka-0.7.2.tgz b/core/src/main/resources/kafka-0.7.2.tgz
deleted file mode 100644
index 24178d9..0000000
Binary files a/core/src/main/resources/kafka-0.7.2.tgz and /dev/null differ