You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2014/04/22 08:06:03 UTC

[06/50] [abbrv] git commit: [TWILL-27] Upgrade to Kafka-0.8

[TWILL-27] Upgrade to Kafka-0.8

Signed-off-by: Terence Yim <te...@continuuity.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/fcbe54d5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/fcbe54d5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/fcbe54d5

Branch: refs/heads/site
Commit: fcbe54d50d22a4a85eaea35612cd9db143deef67
Parents: 15de6ae
Author: Terence Yim <te...@continuuity.com>
Authored: Wed Jan 22 17:07:52 2014 -0800
Committer: Terence Yim <te...@continuuity.com>
Committed: Mon Jan 27 13:40:28 2014 -0800

----------------------------------------------------------------------
 pom.xml                                         |  19 +
 twill-core/pom.xml                              |   4 +
 .../twill/internal/AbstractTwillController.java | 124 ++---
 .../org/apache/twill/internal/Constants.java    |   1 -
 .../internal/kafka/EmbeddedKafkaServer.java     |  64 +--
 .../AbstractCompressedMessageSetEncoder.java    |  78 ---
 .../kafka/client/AbstractMessageSetEncoder.java |  79 ---
 .../kafka/client/BasicFetchedMessage.java       |  34 +-
 .../twill/internal/kafka/client/Bufferer.java   |  61 ---
 .../kafka/client/ByteBufferEncoder.java         |  38 ++
 .../internal/kafka/client/Compression.java      |  49 --
 .../internal/kafka/client/ConnectionPool.java   | 125 -----
 .../kafka/client/GZipMessageSetEncoder.java     |  37 --
 .../kafka/client/IdentityMessageSetEncoder.java |  42 --
 .../internal/kafka/client/IntegerEncoder.java   |  35 ++
 .../kafka/client/IntegerPartitioner.java        |  34 ++
 .../internal/kafka/client/KafkaBrokerCache.java | 326 -------------
 .../internal/kafka/client/KafkaRequest.java     |  91 ----
 .../kafka/client/KafkaRequestEncoder.java       |  60 ---
 .../kafka/client/KafkaRequestSender.java        |  26 -
 .../internal/kafka/client/KafkaResponse.java    |  49 --
 .../kafka/client/KafkaResponseDispatcher.java   |  63 ---
 .../kafka/client/KafkaResponseHandler.java      |  51 --
 .../internal/kafka/client/MessageFetcher.java   | 243 ----------
 .../kafka/client/MessageSetEncoder.java         |  31 --
 .../internal/kafka/client/ResponseHandler.java  |  33 --
 .../kafka/client/SimpleKafkaClient.java         | 304 ------------
 .../kafka/client/SimpleKafkaConsumer.java       | 481 +++++++++++++++++++
 .../kafka/client/SimpleKafkaPublisher.java      | 113 +++++
 .../kafka/client/SnappyMessageSetEncoder.java   |  38 --
 .../internal/kafka/client/TopicBroker.java      |  48 --
 .../internal/kafka/client/ZKBrokerService.java  | 403 ++++++++++++++++
 .../kafka/client/ZKKafkaClientService.java      | 118 +++++
 .../internal/kafka/client/package-info.java     |   2 +-
 .../twill/internal/logging/KafkaAppender.java   |  43 +-
 .../internal/logging/KafkaTwillRunnable.java    | 122 -----
 .../apache/twill/kafka/client/BrokerInfo.java   |  65 +++
 .../twill/kafka/client/BrokerService.java       |  48 ++
 .../apache/twill/kafka/client/Compression.java  |  38 ++
 .../twill/kafka/client/FetchException.java      |  77 ---
 .../twill/kafka/client/FetchedMessage.java      |  13 +-
 .../apache/twill/kafka/client/KafkaClient.java  |  36 +-
 .../twill/kafka/client/KafkaClientService.java  |  26 +
 .../twill/kafka/client/KafkaConsumer.java       |  92 ++++
 .../twill/kafka/client/KafkaPublisher.java      |  95 ++++
 .../twill/kafka/client/PreparePublish.java      |  34 --
 .../twill/kafka/client/TopicPartition.java      |  70 +++
 .../apache/twill/kafka/client/package-info.java |   2 +-
 twill-core/src/main/resources/kafka-0.7.2.tgz   | Bin 8811693 -> 0 bytes
 .../apache/twill/kafka/client/KafkaTest.java    | 139 ++----
 .../appmaster/ApplicationMasterService.java     |  93 ++--
 .../apache/twill/yarn/YarnTwillPreparer.java    |  10 -
 52 files changed, 1889 insertions(+), 2418 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0c6e9dd..3d7cebe 100644
--- a/pom.xml
+++ b/pom.xml
@@ -522,6 +522,25 @@
                 <version>4.0</version>
             </dependency>
             <dependency>
+                <groupId>org.apache.kafka</groupId>
+                <artifactId>kafka_2.10</artifactId>
+                <version>0.8.0</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>org.slf4j</groupId>
+                        <artifactId>slf4j-simple</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>com.sun.jdmk</groupId>
+                        <artifactId>jmxtools</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>com.sun.jmx</groupId>
+                        <artifactId>jmxri</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+            <dependency>
                 <groupId>org.apache.hadoop</groupId>
                 <artifactId>hadoop-yarn-api</artifactId>
                 <version>${hadoop.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/pom.xml
----------------------------------------------------------------------
diff --git a/twill-core/pom.xml b/twill-core/pom.xml
index faff711..d998b40 100644
--- a/twill-core/pom.xml
+++ b/twill-core/pom.xml
@@ -78,6 +78,10 @@
             <artifactId>logback-classic</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.10</artifactId>
+        </dependency>
+        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
index 97e0a8f..71f0c14 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
@@ -17,33 +17,34 @@
  */
 package org.apache.twill.internal;
 
+import com.google.common.base.Charsets;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
 import org.apache.twill.api.RunId;
 import org.apache.twill.api.TwillController;
 import org.apache.twill.api.logging.LogEntry;
 import org.apache.twill.api.logging.LogHandler;
+import org.apache.twill.common.Cancellable;
 import org.apache.twill.discovery.DiscoveryServiceClient;
 import org.apache.twill.discovery.ServiceDiscovered;
 import org.apache.twill.discovery.ZKDiscoveryService;
 import org.apache.twill.internal.json.StackTraceElementCodec;
-import org.apache.twill.internal.kafka.client.SimpleKafkaClient;
+import org.apache.twill.internal.kafka.client.ZKKafkaClientService;
 import org.apache.twill.internal.logging.LogEntryDecoder;
 import org.apache.twill.internal.state.SystemMessages;
 import org.apache.twill.kafka.client.FetchedMessage;
-import org.apache.twill.kafka.client.KafkaClient;
+import org.apache.twill.kafka.client.KafkaClientService;
+import org.apache.twill.kafka.client.KafkaConsumer;
 import org.apache.twill.zookeeper.ZKClient;
 import org.apache.twill.zookeeper.ZKClients;
-import com.google.common.base.Charsets;
-import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Iterator;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.TimeUnit;
 
 /**
  * A abstract base class for {@link org.apache.twill.api.TwillController} implementation that uses Zookeeper to controller a
@@ -52,47 +53,47 @@ import java.util.concurrent.TimeUnit;
 public abstract class AbstractTwillController extends AbstractZKServiceController implements TwillController {
 
   private static final Logger LOG = LoggerFactory.getLogger(AbstractTwillController.class);
-  private static final int MAX_KAFKA_FETCH_SIZE = 1048576;
-  private static final long SHUTDOWN_TIMEOUT_MS = 2000;
-  private static final long LOG_FETCH_TIMEOUT_MS = 5000;
 
   private final Queue<LogHandler> logHandlers;
-  private final KafkaClient kafkaClient;
+  private final KafkaClientService kafkaClient;
   private final DiscoveryServiceClient discoveryServiceClient;
-  private final LogPollerThread logPoller;
+  private volatile Cancellable logCancellable;
 
   public AbstractTwillController(RunId runId, ZKClient zkClient, Iterable<LogHandler> logHandlers) {
     super(runId, zkClient);
     this.logHandlers = new ConcurrentLinkedQueue<LogHandler>();
-    this.kafkaClient = new SimpleKafkaClient(ZKClients.namespace(zkClient, "/" + runId.getId() + "/kafka"));
+    this.kafkaClient = new ZKKafkaClientService(ZKClients.namespace(zkClient, "/" + runId.getId() + "/kafka"));
     this.discoveryServiceClient = new ZKDiscoveryService(zkClient);
     Iterables.addAll(this.logHandlers, logHandlers);
-    this.logPoller = new LogPollerThread(runId, kafkaClient, logHandlers);
   }
 
   @Override
-  protected void doStartUp() {
+  protected synchronized void doStartUp() {
     if (!logHandlers.isEmpty()) {
-      logPoller.start();
+      kafkaClient.startAndWait();
+      logCancellable = kafkaClient.getConsumer().prepare()
+                                  .addFromBeginning(Constants.LOG_TOPIC, 0)
+                                  .consume(new LogMessageCallback(logHandlers));
     }
   }
 
   @Override
   protected void doShutDown() {
-    logPoller.terminate();
-    try {
-      // Wait for the poller thread to stop.
-      logPoller.join(SHUTDOWN_TIMEOUT_MS);
-    } catch (InterruptedException e) {
-      LOG.warn("Joining of log poller thread interrupted.", e);
+    if (logCancellable != null) {
+      logCancellable.cancel();
     }
+    // Safe to call stop no matter when state the KafkaClientService is in.
+    kafkaClient.stopAndWait();
   }
 
   @Override
   public final synchronized void addLogHandler(LogHandler handler) {
     logHandlers.add(handler);
-    if (!logPoller.isAlive()) {
-      logPoller.start();
+    if (logHandlers.size() == 1) {
+      kafkaClient.startAndWait();
+      logCancellable = kafkaClient.getConsumer().prepare()
+        .addFromBeginning(Constants.LOG_TOPIC, 0)
+        .consume(new LogMessageCallback(logHandlers));
     }
   }
 
@@ -106,74 +107,45 @@ public abstract class AbstractTwillController extends AbstractZKServiceControlle
     return sendMessage(SystemMessages.setInstances(runnable, newCount), newCount);
   }
 
-  private static final class LogPollerThread extends Thread {
+  private static final class LogMessageCallback implements KafkaConsumer.MessageCallback {
+
+    private static final Gson GSON = new GsonBuilder().registerTypeAdapter(LogEntry.class, new LogEntryDecoder())
+      .registerTypeAdapter(StackTraceElement.class, new StackTraceElementCodec())
+      .create();
 
-    private final KafkaClient kafkaClient;
     private final Iterable<LogHandler> logHandlers;
-    private volatile boolean running = true;
 
-    LogPollerThread(RunId runId, KafkaClient kafkaClient, Iterable<LogHandler> logHandlers) {
-      super("twill-log-poller-" + runId.getId());
-      setDaemon(true);
-      this.kafkaClient = kafkaClient;
+    private LogMessageCallback(Iterable<LogHandler> logHandlers) {
       this.logHandlers = logHandlers;
     }
 
     @Override
-    public void run() {
-      LOG.info("Twill log poller thread '{}' started.", getName());
-      kafkaClient.startAndWait();
-      Gson gson = new GsonBuilder().registerTypeAdapter(LogEntry.class, new LogEntryDecoder())
-        .registerTypeAdapter(StackTraceElement.class, new StackTraceElementCodec())
-        .create();
-
-      while (running && !isInterrupted()) {
-        long offset;
-        try {
-          // Get the earliest offset
-          long[] offsets = kafkaClient.getOffset(Constants.LOG_TOPIC, 0, -2, 1).get(LOG_FETCH_TIMEOUT_MS,
-                                                                                    TimeUnit.MILLISECONDS);
-          // Should have one entry
-          offset = offsets[0];
-        } catch (Throwable t) {
-          // Keep retrying
-          LOG.warn("Failed to fetch offsets from Kafka. Retrying.", t);
-          continue;
-        }
-
-        // Now fetch log messages from Kafka
-        Iterator<FetchedMessage> messageIterator = kafkaClient.consume(Constants.LOG_TOPIC, 0,
-                                                                       offset, MAX_KAFKA_FETCH_SIZE);
+    public void onReceived(Iterator<FetchedMessage> messages) {
+      while (messages.hasNext()) {
+        String json = Charsets.UTF_8.decode(messages.next().getPayload()).toString();
         try {
-          while (messageIterator.hasNext()) {
-            String json = Charsets.UTF_8.decode(messageIterator.next().getBuffer()).toString();
-            try {
-              LogEntry entry = gson.fromJson(json, LogEntry.class);
-              if (entry != null) {
-                invokeHandlers(entry);
-              }
-            } catch (Exception e) {
-              LOG.error("Failed to decode log entry {}", json, e);
-            }
+          LogEntry entry = GSON.fromJson(json, LogEntry.class);
+          if (entry != null) {
+            invokeHandlers(entry);
           }
-        } catch (Throwable t) {
-          LOG.warn("Exception while fetching log message from Kafka. Retrying.", t);
-          continue;
+        } catch (Exception e) {
+          LOG.error("Failed to decode log entry {}", json, e);
         }
       }
-
-      kafkaClient.stopAndWait();
-      LOG.info("Twill log poller thread stopped.");
     }
 
-    void terminate() {
-      running = false;
-      interrupt();
+    @Override
+    public void finished() {
+      // No-op
     }
 
     private void invokeHandlers(LogEntry entry) {
       for (LogHandler handler : logHandlers) {
-        handler.onLog(entry);
+        try {
+          handler.onLog(entry);
+        } catch (Throwable t) {
+          LOG.warn("Exception while calling LogHandler {}", handler, t);
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/internal/Constants.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/Constants.java b/twill-core/src/main/java/org/apache/twill/internal/Constants.java
index 0387d3e..efe91a7 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/Constants.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/Constants.java
@@ -51,7 +51,6 @@ public final class Constants {
     public static final String TWILL_SPEC = "twillSpec.json";
     public static final String ARGUMENTS = "arguments.json";
     public static final String LOGBACK_TEMPLATE = "logback-template.xml";
-    public static final String KAFKA = "kafka.tgz";
     public static final String JVM_OPTIONS = "jvm.opts";
     public static final String CREDENTIALS = "credentials.store";
 

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java
index 14dfc70..d82d617 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java
@@ -17,77 +17,33 @@
  */
 package org.apache.twill.internal.kafka;
 
-import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.AbstractIdleService;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServerStartable;
 
 import java.io.File;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.List;
 import java.util.Properties;
 
 /**
- *
+ * A {@link com.google.common.util.concurrent.Service} implementation for running an instance of Kafka server in
+ * the same process.
  */
 public final class EmbeddedKafkaServer extends AbstractIdleService {
 
-  private static final String KAFAK_CONFIG_CLASS = "kafka.server.KafkaConfig";
-  private static final String KAFKA_SERVER_CLASS = "kafka.server.KafkaServerStartable";
-
-  private final Object server;
-
-  public EmbeddedKafkaServer(File kafkaDir, Properties properties) {
-    this(createClassLoader(kafkaDir), properties);
-  }
-
-  public EmbeddedKafkaServer(ClassLoader classLoader, Properties properties) {
-    try {
-      Class<?> configClass = classLoader.loadClass(KAFAK_CONFIG_CLASS);
-      Object config = configClass.getConstructor(Properties.class).newInstance(properties);
+  private final KafkaServerStartable server;
 
-      Class<?> serverClass = classLoader.loadClass(KAFKA_SERVER_CLASS);
-      server = serverClass.getConstructor(configClass).newInstance(config);
-    } catch (Exception e) {
-      throw Throwables.propagate(e);
-    }
+  public EmbeddedKafkaServer(Properties properties) {
+    server = new KafkaServerStartable(new KafkaConfig(properties));
   }
 
   @Override
   protected void startUp() throws Exception {
-    server.getClass().getMethod("startup").invoke(server);
+    server.startup();
   }
 
   @Override
   protected void shutDown() throws Exception {
-    server.getClass().getMethod("shutdown").invoke(server);
-    server.getClass().getMethod("awaitShutdown").invoke(server);
-  }
-
-  private static ClassLoader createClassLoader(File kafkaDir) {
-    ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
-    ClassLoader thisClassLoader = EmbeddedKafkaServer.class.getClassLoader();
-    ClassLoader parent = contextClassLoader != null
-                            ? contextClassLoader
-                            : thisClassLoader != null
-                                ? thisClassLoader : ClassLoader.getSystemClassLoader();
-
-    return new URLClassLoader(findJars(kafkaDir, Lists.<URL>newArrayList()).toArray(new URL[0]), parent);
-  }
-
-  private static List<URL> findJars(File dir, List<URL> urls) {
-    try {
-      for (File file : dir.listFiles()) {
-        if (file.isDirectory()) {
-          findJars(file, urls);
-        } else if (file.getName().endsWith(".jar")) {
-          urls.add(file.toURI().toURL());
-        }
-      }
-      return urls;
-    } catch (MalformedURLException e) {
-      throw Throwables.propagate(e);
-    }
+    server.shutdown();
+    server.awaitShutdown();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/internal/kafka/client/AbstractCompressedMessageSetEncoder.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/AbstractCompressedMessageSetEncoder.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/AbstractCompressedMessageSetEncoder.java
deleted file mode 100644
index a9c3381..0000000
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/AbstractCompressedMessageSetEncoder.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-import com.google.common.base.Throwables;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBufferOutputStream;
-import org.jboss.netty.buffer.ChannelBuffers;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-/**
- * A base implementation of {@link MessageSetEncoder} that do message compression.
- */
-abstract class AbstractCompressedMessageSetEncoder extends AbstractMessageSetEncoder {
-
-  private final Compression compression;
-  private ChannelBufferOutputStream os;
-  private OutputStream compressedOutput;
-
-
-  protected AbstractCompressedMessageSetEncoder(Compression compression) {
-    this.compression = compression;
-    try {
-      this.os = new ChannelBufferOutputStream(ChannelBuffers.dynamicBuffer());
-      this.compressedOutput = createCompressedStream(os);
-    } catch (IOException e) {
-      // Should never happen
-      throw Throwables.propagate(e);
-    }
-  }
-
-  @Override
-  public final MessageSetEncoder add(ChannelBuffer payload) {
-    try {
-      ChannelBuffer encoded = encodePayload(payload);
-      encoded.readBytes(compressedOutput, encoded.readableBytes());
-    } catch (IOException e) {
-      throw Throwables.propagate(e);
-    }
-    return this;
-
-  }
-
-  @Override
-  public final ChannelBuffer finish() {
-    try {
-      compressedOutput.close();
-      ChannelBuffer buf = prefixLength(encodePayload(os.buffer(), compression));
-      compressedOutput = createCompressedStream(os);
-      os.buffer().clear();
-
-      return buf;
-
-    } catch (IOException e) {
-      throw Throwables.propagate(e);
-    }
-
-  }
-
-  protected abstract OutputStream createCompressedStream(OutputStream os) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/internal/kafka/client/AbstractMessageSetEncoder.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/AbstractMessageSetEncoder.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/AbstractMessageSetEncoder.java
deleted file mode 100644
index 9955d6a..0000000
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/AbstractMessageSetEncoder.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-
-import java.util.zip.CRC32;
-
-/**
- * A base implementation of {@link MessageSetEncoder}.
- */
-abstract class AbstractMessageSetEncoder implements MessageSetEncoder {
-
-  private static final ThreadLocal<CRC32> CRC32_LOCAL = new ThreadLocal<CRC32>() {
-    @Override
-    protected CRC32 initialValue() {
-      return new CRC32();
-    }
-  };
-
-  protected final int computeCRC32(ChannelBuffer buffer) {
-    CRC32 crc32 = CRC32_LOCAL.get();
-    crc32.reset();
-
-    if (buffer.hasArray()) {
-      crc32.update(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), buffer.readableBytes());
-    } else {
-      byte[] bytes = new byte[buffer.readableBytes()];
-      buffer.getBytes(buffer.readerIndex(), bytes);
-      crc32.update(bytes);
-    }
-    return (int) crc32.getValue();
-  }
-
-  protected final ChannelBuffer encodePayload(ChannelBuffer payload) {
-    return encodePayload(payload, Compression.NONE);
-  }
-
-  protected final ChannelBuffer encodePayload(ChannelBuffer payload, Compression compression) {
-    ChannelBuffer header = ChannelBuffers.buffer(10);
-
-    int crc = computeCRC32(payload);
-
-    int magic = ((compression == Compression.NONE) ? 0 : 1);
-
-    // Message length = 1 byte magic + (optional 1 compression byte) + 4 bytes crc + payload length
-    header.writeInt(5 + magic + payload.readableBytes());
-    // Magic number = 0 for non-compressed data
-    header.writeByte(magic);
-    if (magic > 0) {
-      header.writeByte(compression.getCode());
-    }
-    header.writeInt(crc);
-
-    return ChannelBuffers.wrappedBuffer(header, payload);
-  }
-
-  protected final ChannelBuffer prefixLength(ChannelBuffer buffer) {
-    ChannelBuffer sizeBuf = ChannelBuffers.buffer(4);
-    sizeBuf.writeInt(buffer.readableBytes());
-    return ChannelBuffers.wrappedBuffer(sizeBuf, buffer);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/internal/kafka/client/BasicFetchedMessage.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/BasicFetchedMessage.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/BasicFetchedMessage.java
index 286bf82..ee53ed4 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/BasicFetchedMessage.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/BasicFetchedMessage.java
@@ -18,29 +18,43 @@
 package org.apache.twill.internal.kafka.client;
 
 import org.apache.twill.kafka.client.FetchedMessage;
+import org.apache.twill.kafka.client.TopicPartition;
 
 import java.nio.ByteBuffer;
 
 /**
- *
+ * An implementation of FetchedMessage that provides setters as well.
  */
 final class BasicFetchedMessage implements FetchedMessage {
 
-  private final long offset;
-  private final ByteBuffer buffer;
+  private final TopicPartition topicPartition;
+  private ByteBuffer payload;
+  private long nextOffset;
+
+  BasicFetchedMessage(TopicPartition topicPartition) {
+    this.topicPartition = topicPartition;
+  }
+
+  void setPayload(ByteBuffer payload) {
+    this.payload = payload;
+  }
+
+  void setNextOffset(long nextOffset) {
+    this.nextOffset = nextOffset;
+  }
 
-  BasicFetchedMessage(long offset, ByteBuffer buffer) {
-    this.offset = offset;
-    this.buffer = buffer;
+  @Override
+  public TopicPartition getTopicPartition() {
+    return topicPartition;
   }
 
   @Override
-  public long getOffset() {
-    return offset;
+  public ByteBuffer getPayload() {
+    return payload;
   }
 
   @Override
-  public ByteBuffer getBuffer() {
-    return buffer;
+  public long getNextOffset() {
+    return nextOffset;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/internal/kafka/client/Bufferer.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/Bufferer.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/Bufferer.java
deleted file mode 100644
index c1fb4f2..0000000
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/Bufferer.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-
-/**
- * A class to help buffering data of format [len][payload-of-len].
- */
-final class Bufferer {
-
-  private ChannelBuffer currentBuffer = null;
-  private int currentSize = -1;
-
-  void apply(ChannelBuffer buffer) {
-    currentBuffer = concatBuffer(currentBuffer, buffer);
-  }
-
-  /**
-   * Returns the buffer if the buffer data is ready to be consumed,
-   * otherwise return {@link ChannelBuffers#EMPTY_BUFFER}.
-   */
-  ChannelBuffer getNext() {
-    if (currentSize < 0) {
-      if (currentBuffer.readableBytes() < 4) {
-        return ChannelBuffers.EMPTY_BUFFER;
-      }
-      currentSize = currentBuffer.readInt();
-    }
-
-    // Keep buffering if less then required number of bytes
-    if (currentBuffer.readableBytes() < currentSize) {
-      return ChannelBuffers.EMPTY_BUFFER;
-    }
-
-    ChannelBuffer result = currentBuffer.readSlice(currentSize);
-    currentSize = -1;
-
-    return result;
-  }
-
-  private ChannelBuffer concatBuffer(ChannelBuffer current, ChannelBuffer buffer) {
-    return current == null ? buffer : ChannelBuffers.wrappedBuffer(current, buffer);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ByteBufferEncoder.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ByteBufferEncoder.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ByteBufferEncoder.java
new file mode 100644
index 0000000..9211d92
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ByteBufferEncoder.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import kafka.serializer.Encoder;
+import kafka.utils.VerifiableProperties;
+
+import java.nio.ByteBuffer;
+
+/**
+ * A kafka {@link kafka.serializer.Encoder} for encoding byte buffer into byte array.
+ */
+public final class ByteBufferEncoder implements Encoder<ByteBuffer> {
+
+  public ByteBufferEncoder(VerifiableProperties properties) {
+  }
+
+  public byte[] toBytes(ByteBuffer buffer) {
+    byte[] bytes = new byte[buffer.remaining()];
+    buffer.get(bytes);
+    return bytes;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/internal/kafka/client/Compression.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/Compression.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/Compression.java
deleted file mode 100644
index 3355b9f..0000000
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/Compression.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-/**
- * Enum for indicating compression method.
- */
-public enum Compression {
-  NONE(0),
-  GZIP(1),
-  SNAPPY(2);
-
-  private final int code;
-
-  Compression(int code) {
-    this.code = code;
-  }
-
-  public int getCode() {
-    return code;
-  }
-
-  public static Compression fromCode(int code) {
-    switch (code) {
-      case 0:
-        return NONE;
-      case 1:
-        return GZIP;
-      case 2:
-        return SNAPPY;
-    }
-    throw new IllegalArgumentException("Unknown compression code.");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ConnectionPool.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ConnectionPool.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ConnectionPool.java
deleted file mode 100644
index c2865ba..0000000
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ConnectionPool.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-import com.google.common.collect.Maps;
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.ChannelGroupFuture;
-import org.jboss.netty.channel.group.ChannelGroupFutureListener;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-
-import java.net.InetSocketAddress;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * Provides netty socket connection reuse.
- */
-final class ConnectionPool {
-
-  private final ClientBootstrap bootstrap;
-  private final ChannelGroup channelGroup;
-  private final ConcurrentMap<InetSocketAddress, Queue<ChannelFuture>> connections;
-
-  /**
-   * For releasing a connection back to the pool.
-   */
-  interface ConnectionReleaser {
-    void release();
-  }
-
-  /**
-   * Result of a connect request.
-   */
-  interface ConnectResult extends ConnectionReleaser {
-    ChannelFuture getChannelFuture();
-  }
-
-  ConnectionPool(ClientBootstrap bootstrap) {
-    this.bootstrap = bootstrap;
-    this.channelGroup = new DefaultChannelGroup();
-    this.connections = Maps.newConcurrentMap();
-  }
-
-  ConnectResult connect(InetSocketAddress address) {
-    Queue<ChannelFuture> channelFutures = connections.get(address);
-    if (channelFutures == null) {
-      channelFutures = new ConcurrentLinkedQueue<ChannelFuture>();
-      Queue<ChannelFuture> result = connections.putIfAbsent(address, channelFutures);
-      channelFutures = result == null ? channelFutures : result;
-    }
-
-    ChannelFuture channelFuture = channelFutures.poll();
-    while (channelFuture != null) {
-      if (channelFuture.isSuccess() && channelFuture.getChannel().isConnected()) {
-        return new SimpleConnectResult(address, channelFuture);
-      }
-      channelFuture = channelFutures.poll();
-    }
-
-    channelFuture = bootstrap.connect(address);
-    channelFuture.addListener(new ChannelFutureListener() {
-      @Override
-      public void operationComplete(ChannelFuture future) throws Exception {
-        if (future.isSuccess()) {
-          channelGroup.add(future.getChannel());
-        }
-      }
-    });
-    return new SimpleConnectResult(address, channelFuture);
-  }
-
-  ChannelGroupFuture close() {
-    ChannelGroupFuture result = channelGroup.close();
-    result.addListener(new ChannelGroupFutureListener() {
-      @Override
-      public void operationComplete(ChannelGroupFuture future) throws Exception {
-        bootstrap.releaseExternalResources();
-      }
-    });
-    return result;
-  }
-
-  private final class SimpleConnectResult implements ConnectResult {
-
-    private final InetSocketAddress address;
-    private final ChannelFuture future;
-
-
-    private SimpleConnectResult(InetSocketAddress address, ChannelFuture future) {
-      this.address = address;
-      this.future = future;
-    }
-
-    @Override
-    public ChannelFuture getChannelFuture() {
-      return future;
-    }
-
-    @Override
-    public void release() {
-      if (future.isSuccess()) {
-        connections.get(address).offer(future);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/internal/kafka/client/GZipMessageSetEncoder.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/GZipMessageSetEncoder.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/GZipMessageSetEncoder.java
deleted file mode 100644
index daa0c2c..0000000
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/GZipMessageSetEncoder.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.zip.GZIPOutputStream;
-
-/**
- * A {@link MessageSetEncoder} that compress message set using GZIP.
- */
-final class GZipMessageSetEncoder extends AbstractCompressedMessageSetEncoder {
-
-  GZipMessageSetEncoder() {
-    super(Compression.GZIP);
-  }
-
-  @Override
-  protected OutputStream createCompressedStream(OutputStream os) throws IOException {
-    return new GZIPOutputStream(os);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IdentityMessageSetEncoder.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IdentityMessageSetEncoder.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IdentityMessageSetEncoder.java
deleted file mode 100644
index 51dc746..0000000
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IdentityMessageSetEncoder.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-
-/**
- * A pass-through {@link MessageSetEncoder}.
- */
-final class IdentityMessageSetEncoder extends AbstractMessageSetEncoder {
-
-  private ChannelBuffer messageSets = ChannelBuffers.EMPTY_BUFFER;
-
-  @Override
-  public MessageSetEncoder add(ChannelBuffer payload) {
-    messageSets = ChannelBuffers.wrappedBuffer(messageSets, encodePayload(payload));
-    return this;
-  }
-
-  @Override
-  public ChannelBuffer finish() {
-    ChannelBuffer buf = prefixLength(messageSets);
-    messageSets = ChannelBuffers.EMPTY_BUFFER;
-    return buf;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IntegerEncoder.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IntegerEncoder.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IntegerEncoder.java
new file mode 100644
index 0000000..cbe7eaa
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IntegerEncoder.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import com.google.common.primitives.Ints;
+import kafka.serializer.Encoder;
+import kafka.utils.VerifiableProperties;
+
+/**
+ * A kafka {@link kafka.serializer.Encoder} for encoding integer into bytes.
+ */
+public final class IntegerEncoder implements Encoder<Integer> {
+
+  public IntegerEncoder(VerifiableProperties properties) {
+  }
+
+  public byte[] toBytes(Integer buffer) {
+    return Ints.toByteArray(buffer.intValue());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IntegerPartitioner.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IntegerPartitioner.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IntegerPartitioner.java
new file mode 100644
index 0000000..4aa7940
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IntegerPartitioner.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import kafka.producer.Partitioner;
+import kafka.utils.VerifiableProperties;
+
+/**
+ * A kafka {@link kafka.producer.Partitioner} using integer key to compute partition id.
+ */
+public final class IntegerPartitioner implements Partitioner<Integer> {
+
+  public IntegerPartitioner(VerifiableProperties properties) {
+  }
+
+  public int partition(Integer key, int numPartitions) {
+    return key % numPartitions;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaBrokerCache.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaBrokerCache.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaBrokerCache.java
deleted file mode 100644
index f2bb815..0000000
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaBrokerCache.java
+++ /dev/null
@@ -1,326 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-import org.apache.twill.common.Threads;
-import org.apache.twill.zookeeper.NodeChildren;
-import org.apache.twill.zookeeper.NodeData;
-import org.apache.twill.zookeeper.ZKClient;
-import com.google.common.base.Charsets;
-import com.google.common.base.Function;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.ImmutableSortedMap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.AbstractIdleService;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetSocketAddress;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.SortedMap;
-
-/**
- * A Service to cache kafka broker information by subscribing to ZooKeeper.
- */
-final class KafkaBrokerCache extends AbstractIdleService {
-
-  private static final Logger LOG = LoggerFactory.getLogger(KafkaBrokerCache.class);
-
-  private static final String BROKERS_PATH = "/brokers";
-
-  private final ZKClient zkClient;
-  private final Map<String, InetSocketAddress> brokers;
-  // topicBrokers is from topic->partition size->brokerId
-  private final Map<String, SortedMap<Integer, Set<String>>> topicBrokers;
-  private final Runnable invokeGetBrokers = new Runnable() {
-    @Override
-    public void run() {
-      getBrokers();
-    }
-  };
-  private final Runnable invokeGetTopics = new Runnable() {
-    @Override
-    public void run() {
-      getTopics();
-    }
-  };
-
-  KafkaBrokerCache(ZKClient zkClient) {
-    this.zkClient = zkClient;
-    this.brokers = Maps.newConcurrentMap();
-    this.topicBrokers = Maps.newConcurrentMap();
-  }
-
-  @Override
-  protected void startUp() throws Exception {
-    getBrokers();
-    getTopics();
-  }
-
-  @Override
-  protected void shutDown() throws Exception {
-    // No-op
-  }
-
-  public int getPartitionSize(String topic) {
-    SortedMap<Integer, Set<String>> partitionBrokers = topicBrokers.get(topic);
-    if (partitionBrokers == null || partitionBrokers.isEmpty()) {
-      return 1;
-    }
-    return partitionBrokers.lastKey();
-  }
-
-  public TopicBroker getBrokerAddress(String topic, int partition) {
-    SortedMap<Integer, Set<String>> partitionBrokers = topicBrokers.get(topic);
-    if (partitionBrokers == null || partitionBrokers.isEmpty()) {
-      return pickRandomBroker(topic);
-    }
-
-    // If the requested partition is greater than supported partition size, randomly pick one
-    if (partition >= partitionBrokers.lastKey()) {
-      return pickRandomBroker(topic);
-    }
-
-    // Randomly pick a partition size and randomly pick a broker from it
-    Random random = new Random();
-    partitionBrokers = partitionBrokers.tailMap(partition + 1);
-    List<Integer> sizes = Lists.newArrayList(partitionBrokers.keySet());
-    Integer partitionSize = pickRandomItem(sizes, random);
-    List<String> ids = Lists.newArrayList(partitionBrokers.get(partitionSize));
-    InetSocketAddress address = brokers.get(ids.get(new Random().nextInt(ids.size())));
-    return address == null ? pickRandomBroker(topic) : new TopicBroker(topic, address, partitionSize);
-  }
-
-  private TopicBroker pickRandomBroker(String topic) {
-    Map.Entry<String, InetSocketAddress> entry = Iterables.getFirst(brokers.entrySet(), null);
-    if (entry == null) {
-      return null;
-    }
-    InetSocketAddress address = entry.getValue();
-    return new TopicBroker(topic, address, 0);
-  }
-
-  private <T> T pickRandomItem(List<T> list, Random random) {
-    return list.get(random.nextInt(list.size()));
-  }
-
-  private void getBrokers() {
-    final String idsPath = BROKERS_PATH + "/ids";
-
-    Futures.addCallback(zkClient.getChildren(idsPath, new Watcher() {
-      @Override
-      public void process(WatchedEvent event) {
-        getBrokers();
-      }
-    }), new ExistsOnFailureFutureCallback<NodeChildren>(idsPath, invokeGetBrokers) {
-      @Override
-      public void onSuccess(NodeChildren result) {
-        Set<String> children = ImmutableSet.copyOf(result.getChildren());
-        for (String child : children) {
-          getBrokenData(idsPath + "/" + child, child);
-        }
-        // Remove all removed brokers
-        removeDiff(children, brokers);
-      }
-    });
-  }
-
-  private void getTopics() {
-    final String topicsPath = BROKERS_PATH + "/topics";
-    Futures.addCallback(zkClient.getChildren(topicsPath, new Watcher() {
-      @Override
-      public void process(WatchedEvent event) {
-        getTopics();
-      }
-    }), new ExistsOnFailureFutureCallback<NodeChildren>(topicsPath, invokeGetTopics) {
-      @Override
-      public void onSuccess(NodeChildren result) {
-        Set<String> children = ImmutableSet.copyOf(result.getChildren());
-
-        // Process new children
-        for (String topic : ImmutableSet.copyOf(Sets.difference(children, topicBrokers.keySet()))) {
-          getTopic(topicsPath + "/" + topic, topic);
-        }
-
-        // Remove old children
-        removeDiff(children, topicBrokers);
-      }
-    });
-  }
-
-  private void getBrokenData(String path, final String brokerId) {
-    Futures.addCallback(zkClient.getData(path), new FutureCallback<NodeData>() {
-      @Override
-      public void onSuccess(NodeData result) {
-        String data = new String(result.getData(), Charsets.UTF_8);
-        String hostPort = data.substring(data.indexOf(':') + 1);
-        int idx = hostPort.indexOf(':');
-        brokers.put(brokerId, new InetSocketAddress(hostPort.substring(0, idx),
-                                                    Integer.parseInt(hostPort.substring(idx + 1))));
-      }
-
-      @Override
-      public void onFailure(Throwable t) {
-        // No-op, the watch on the parent node will handle it.
-      }
-    });
-  }
-
-  private void getTopic(final String path, final String topic) {
-    Futures.addCallback(zkClient.getChildren(path, new Watcher() {
-      @Override
-      public void process(WatchedEvent event) {
-        // Other event type changes are either could be ignored or handled by parent watcher
-        if (event.getType() == Event.EventType.NodeChildrenChanged) {
-          getTopic(path, topic);
-        }
-      }
-    }), new FutureCallback<NodeChildren>() {
-      @Override
-      public void onSuccess(NodeChildren result) {
-        List<String> children = result.getChildren();
-        final List<ListenableFuture<BrokerPartition>> futures = Lists.newArrayListWithCapacity(children.size());
-
-        // Fetch data from each broken node
-        for (final String brokerId : children) {
-          Futures.transform(zkClient.getData(path + "/" + brokerId), new Function<NodeData, BrokerPartition>() {
-            @Override
-            public BrokerPartition apply(NodeData input) {
-              return new BrokerPartition(brokerId, Integer.parseInt(new String(input.getData(), Charsets.UTF_8)));
-            }
-          });
-        }
-
-        // When all fetching is done, build the partition size->broker map for this topic
-        Futures.successfulAsList(futures).addListener(new Runnable() {
-          @Override
-          public void run() {
-            Map<Integer, Set<String>> partitionBrokers = Maps.newHashMap();
-            for (ListenableFuture<BrokerPartition> future : futures) {
-              try {
-                BrokerPartition info = future.get();
-                Set<String> brokerSet = partitionBrokers.get(info.getPartitionSize());
-                if (brokerSet == null) {
-                  brokerSet = Sets.newHashSet();
-                  partitionBrokers.put(info.getPartitionSize(), brokerSet);
-                }
-                brokerSet.add(info.getBrokerId());
-              } catch (Exception e) {
-                // Exception is ignored, as it will be handled by parent watcher
-              }
-            }
-            topicBrokers.put(topic, ImmutableSortedMap.copyOf(partitionBrokers));
-          }
-        }, Threads.SAME_THREAD_EXECUTOR);
-      }
-
-      @Override
-      public void onFailure(Throwable t) {
-        // No-op. Failure would be handled by parent watcher already (e.g. node not exists -> children change in parent)
-      }
-    });
-  }
-
-  private <K, V> void removeDiff(Set<K> keys, Map<K, V> map) {
-    for (K key : ImmutableSet.copyOf(Sets.difference(map.keySet(), keys))) {
-      map.remove(key);
-    }
-  }
-
-  private abstract class ExistsOnFailureFutureCallback<V> implements FutureCallback<V> {
-
-    private final String path;
-    private final Runnable action;
-
-    protected ExistsOnFailureFutureCallback(String path, Runnable action) {
-      this.path = path;
-      this.action = action;
-    }
-
-    @Override
-    public final void onFailure(Throwable t) {
-      if (!isNotExists(t)) {
-        LOG.error("Fail to watch for kafka brokers: " + path, t);
-        return;
-      }
-
-      waitExists(path);
-    }
-
-    private boolean isNotExists(Throwable t) {
-      return ((t instanceof KeeperException) && ((KeeperException) t).code() == KeeperException.Code.NONODE);
-    }
-
-    private void waitExists(String path) {
-      LOG.info("Path " + path + " not exists. Watch for creation.");
-
-      // If the node doesn't exists, use the "exists" call to watch for node creation.
-      Futures.addCallback(zkClient.exists(path, new Watcher() {
-        @Override
-        public void process(WatchedEvent event) {
-          if (event.getType() == Event.EventType.NodeCreated || event.getType() == Event.EventType.NodeDeleted) {
-            action.run();
-          }
-        }
-      }), new FutureCallback<Stat>() {
-        @Override
-        public void onSuccess(Stat result) {
-          // If path exists, get children again, otherwise wait for watch to get triggered
-          if (result != null) {
-            action.run();
-          }
-        }
-        @Override
-        public void onFailure(Throwable t) {
-          action.run();
-        }
-      });
-    }
-  }
-
-  private static final class BrokerPartition {
-    private final String brokerId;
-    private final int partitionSize;
-
-    private BrokerPartition(String brokerId, int partitionSize) {
-      this.brokerId = brokerId;
-      this.partitionSize = partitionSize;
-    }
-
-    public String getBrokerId() {
-      return brokerId;
-    }
-
-    public int getPartitionSize() {
-      return partitionSize;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequest.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequest.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequest.java
deleted file mode 100644
index 7b43f8a..0000000
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-import org.jboss.netty.buffer.ChannelBuffer;
-
-/**
- *
- */
-final class KafkaRequest {
-
-  public enum Type {
-    PRODUCE(0),
-    FETCH(1),
-    MULTI_FETCH(2),
-    MULTI_PRODUCE(3),
-    OFFSETS(4);
-
-    private final short id;
-
-    private Type(int id) {
-      this.id = (short) id;
-    }
-
-    public short getId() {
-      return id;
-    }
-  }
-
-  private final Type type;
-  private final String topic;
-  private final int partition;
-  private final ChannelBuffer body;
-  private final ResponseHandler responseHandler;
-
-
-  public static KafkaRequest createProduce(String topic, int partition, ChannelBuffer body) {
-    return new KafkaRequest(Type.PRODUCE, topic, partition, body, ResponseHandler.NO_OP);
-  }
-
-  public static KafkaRequest createFetch(String topic, int partition, ChannelBuffer body, ResponseHandler handler) {
-    return new KafkaRequest(Type.FETCH, topic, partition, body, handler);
-  }
-
-  public static KafkaRequest createOffsets(String topic, int partition, ChannelBuffer body, ResponseHandler handler) {
-    return new KafkaRequest(Type.OFFSETS, topic, partition, body, handler);
-  }
-
-  private KafkaRequest(Type type, String topic, int partition, ChannelBuffer body, ResponseHandler responseHandler) {
-    this.type = type;
-    this.topic = topic;
-    this.partition = partition;
-    this.body = body;
-    this.responseHandler = responseHandler;
-  }
-
-  Type getType() {
-    return type;
-  }
-
-  String getTopic() {
-    return topic;
-  }
-
-  int getPartition() {
-    return partition;
-  }
-
-  ChannelBuffer getBody() {
-    return body;
-  }
-
-  ResponseHandler getResponseHandler() {
-    return responseHandler;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestEncoder.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestEncoder.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestEncoder.java
deleted file mode 100644
index ef78c76..0000000
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestEncoder.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-import com.google.common.base.Charsets;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
-
-import java.nio.ByteBuffer;
-
-/**
- *
- */
-final class KafkaRequestEncoder extends OneToOneEncoder {
-
-  @Override
-  protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
-    if (!(msg instanceof KafkaRequest)) {
-      return msg;
-    }
-    KafkaRequest req = (KafkaRequest) msg;
-    ByteBuffer topic = Charsets.UTF_8.encode(req.getTopic());
-
-    ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(16 + topic.remaining() + req.getBody().readableBytes());
-    int writerIdx = buffer.writerIndex();
-    buffer.writerIndex(writerIdx + 4);    // Reserves 4 bytes for message length
-
-    // Write out <REQUEST_TYPE>, <TOPIC_LENGTH>, <TOPIC>, <PARTITION>
-    buffer.writeShort(req.getType().getId());
-    buffer.writeShort(topic.remaining());
-    buffer.writeBytes(topic);
-    buffer.writeInt(req.getPartition());
-
-    // Write out the size of the whole buffer (excluding the size field) at the beginning
-    buffer.setInt(writerIdx, buffer.readableBytes() - 4 + req.getBody().readableBytes());
-
-    ChannelBuffer buf = ChannelBuffers.wrappedBuffer(buffer, req.getBody());
-    buf = buf.readBytes(buf.readableBytes());
-
-    return buf;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestSender.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestSender.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestSender.java
deleted file mode 100644
index fbc552c..0000000
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestSender.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-/**
- *
- */
-interface KafkaRequestSender {
-
-  void send(KafkaRequest request);
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponse.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponse.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponse.java
deleted file mode 100644
index 68c1bd8..0000000
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponse.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-import org.apache.twill.kafka.client.FetchException;
-import org.jboss.netty.buffer.ChannelBuffer;
-
-/**
- *
- */
-final class KafkaResponse {
-
-  private final FetchException.ErrorCode errorCode;
-  private final ChannelBuffer body;
-  private final int size;
-
-  KafkaResponse(FetchException.ErrorCode errorCode, ChannelBuffer body, int size) {
-    this.errorCode = errorCode;
-    this.body = body;
-    this.size = size;
-  }
-
-  public int getSize() {
-    return size;
-  }
-
-  public FetchException.ErrorCode getErrorCode() {
-    return errorCode;
-  }
-
-  public ChannelBuffer getBody() {
-    return body;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseDispatcher.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseDispatcher.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseDispatcher.java
deleted file mode 100644
index 47f70ce..0000000
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseDispatcher.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.SocketException;
-import java.nio.channels.ClosedChannelException;
-
-/**
- *
- */
-final class KafkaResponseDispatcher extends SimpleChannelHandler {
-
-  private static final Logger LOG = LoggerFactory.getLogger(KafkaResponseDispatcher.class);
-
-  @Override
-  public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
-    Object attachment = ctx.getAttachment();
-    if (e.getMessage() instanceof KafkaResponse && attachment instanceof ResponseHandler) {
-      ((ResponseHandler) attachment).received((KafkaResponse) e.getMessage());
-    } else {
-      super.messageReceived(ctx, e);
-    }
-  }
-
-  @Override
-  public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
-    if (e.getMessage() instanceof KafkaRequest) {
-      ctx.setAttachment(((KafkaRequest) e.getMessage()).getResponseHandler());
-    }
-    super.writeRequested(ctx, e);
-  }
-
-  @Override
-  public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
-    if (e.getCause() instanceof ClosedChannelException || e.getCause() instanceof SocketException) {
-      // No need to log for socket exception as the client has logic to retry.
-      return;
-    }
-    LOG.warn("Exception caught in kafka client connection.", e.getCause());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseHandler.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseHandler.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseHandler.java
deleted file mode 100644
index 5251e65..0000000
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseHandler.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-import org.apache.twill.kafka.client.FetchException;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-
-/**
- *
- */
-final class KafkaResponseHandler extends SimpleChannelHandler {
-
-  private final Bufferer bufferer = new Bufferer();
-
-  @Override
-  public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
-    Object msg = e.getMessage();
-    if (!(msg instanceof ChannelBuffer)) {
-      super.messageReceived(ctx, e);
-      return;
-    }
-
-    bufferer.apply((ChannelBuffer) msg);
-    ChannelBuffer buffer = bufferer.getNext();
-    while (buffer.readable()) {
-      // Send the response object upstream
-      Channels.fireMessageReceived(ctx, new KafkaResponse(FetchException.ErrorCode.fromCode(buffer.readShort()),
-                                                          buffer, buffer.readableBytes() + 6));
-      buffer = bufferer.getNext();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/internal/kafka/client/MessageFetcher.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/MessageFetcher.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/MessageFetcher.java
deleted file mode 100644
index 0814917..0000000
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/MessageFetcher.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-import org.apache.twill.common.Threads;
-import org.apache.twill.kafka.client.FetchException;
-import org.apache.twill.kafka.client.FetchedMessage;
-import com.google.common.base.Throwables;
-import com.google.common.collect.AbstractIterator;
-import com.google.common.io.ByteStreams;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBufferInputStream;
-import org.jboss.netty.buffer.ChannelBufferOutputStream;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.xerial.snappy.SnappyInputStream;
-
-import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.zip.GZIPInputStream;
-
-/**
- * This class is for consuming messages from a kafka topic.
- */
-final class MessageFetcher extends AbstractIterator<FetchedMessage> implements ResponseHandler {
-
-  private static final long BACKOFF_INTERVAL_MS = 100;
-
-  private final KafkaRequestSender sender;
-  private final String topic;
-  private final int partition;
-  private final int maxSize;
-  private final AtomicLong offset;
-  private final BlockingQueue<FetchResult> messages;
-  private final ScheduledExecutorService scheduler;
-  private volatile long backoffMillis;
-  private final Runnable sendFetchRequest = new Runnable() {
-    @Override
-    public void run() {
-      sendFetchRequest();
-    }
-  };
-
-  MessageFetcher(String topic, int partition, long offset, int maxSize, KafkaRequestSender sender) {
-    this.topic = topic;
-    this.partition = partition;
-    this.sender = sender;
-    this.offset = new AtomicLong(offset);
-    this.maxSize = maxSize;
-    this.messages = new LinkedBlockingQueue<FetchResult>();
-    this.scheduler = Executors.newSingleThreadScheduledExecutor(
-                        Threads.createDaemonThreadFactory("kafka-" + topic + "-consumer"));
-  }
-
-  @Override
-  public void received(KafkaResponse response) {
-    if (response.getErrorCode() != FetchException.ErrorCode.OK) {
-      messages.add(FetchResult.failure(new FetchException("Error in fetching: " + response.getErrorCode(),
-                                                          response.getErrorCode())));
-      return;
-    }
-
-    try {
-      if (decodeResponse(response.getBody(), -1)) {
-        backoffMillis = 0;
-      } else {
-        backoffMillis = Math.max(backoffMillis + BACKOFF_INTERVAL_MS, 1000);
-        scheduler.schedule(sendFetchRequest, backoffMillis, TimeUnit.MILLISECONDS);
-      }
-    } catch (Throwable t) {
-      messages.add(FetchResult.failure(t));
-    }
-  }
-
-  private boolean decodeResponse(ChannelBuffer buffer, long nextOffset) {
-    boolean hasMessage = false;
-    boolean computeOffset = nextOffset < 0;
-    while (buffer.readableBytes() >= 4) {
-      int size = buffer.readInt();
-      if (buffer.readableBytes() < size) {
-        if (!hasMessage) {
-          throw new IllegalStateException("Size too small");
-        }
-        break;
-      }
-      nextOffset = computeOffset ? offset.addAndGet(size + 4) : nextOffset;
-      decodeMessage(size, buffer, nextOffset);
-      hasMessage = true;
-    }
-    return hasMessage;
-
-  }
-
-  private void decodeMessage(int size, ChannelBuffer buffer, long nextOffset) {
-    int readerIdx = buffer.readerIndex();
-    int magic = buffer.readByte();
-    Compression compression = magic == 0 ? Compression.NONE : Compression.fromCode(buffer.readByte());
-    int crc = buffer.readInt();
-
-    ChannelBuffer payload = buffer.readSlice(size - (buffer.readerIndex() - readerIdx));
-
-    // Verify CRC?
-    enqueueMessage(compression, payload, nextOffset);
-  }
-
-  private void enqueueMessage(Compression compression, ChannelBuffer payload, long nextOffset) {
-    switch (compression) {
-      case NONE:
-        messages.add(FetchResult.success(new BasicFetchedMessage(nextOffset, payload.toByteBuffer())));
-        break;
-      case GZIP:
-        decodeResponse(gunzip(payload), nextOffset);
-        break;
-      case SNAPPY:
-        decodeResponse(unsnappy(payload), nextOffset);
-        break;
-    }
-  }
-
-  private ChannelBuffer gunzip(ChannelBuffer source) {
-    ChannelBufferOutputStream output = new ChannelBufferOutputStream(
-                                              ChannelBuffers.dynamicBuffer(source.readableBytes() * 2));
-    try {
-      try {
-        GZIPInputStream gzipInput = new GZIPInputStream(new ChannelBufferInputStream(source));
-        try {
-          ByteStreams.copy(gzipInput, output);
-          return output.buffer();
-        } finally {
-          gzipInput.close();
-        }
-      } finally {
-        output.close();
-      }
-    } catch (IOException e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-  private ChannelBuffer unsnappy(ChannelBuffer source) {
-    ChannelBufferOutputStream output = new ChannelBufferOutputStream(
-                                              ChannelBuffers.dynamicBuffer(source.readableBytes() * 2));
-    try {
-      try {
-        SnappyInputStream snappyInput = new SnappyInputStream(new ChannelBufferInputStream(source));
-        try {
-          ByteStreams.copy(snappyInput, output);
-          return output.buffer();
-        } finally {
-          snappyInput.close();
-        }
-      } finally {
-        output.close();
-      }
-    } catch (IOException e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-  private void sendFetchRequest() {
-    ChannelBuffer fetchBody = ChannelBuffers.buffer(12);
-    fetchBody.writeLong(offset.get());
-    fetchBody.writeInt(maxSize);
-    sender.send(KafkaRequest.createFetch(topic, partition, fetchBody, MessageFetcher.this));
-  }
-
-  @Override
-  protected FetchedMessage computeNext() {
-    FetchResult result = messages.poll();
-    if (result != null) {
-      return getMessage(result);
-    }
-
-    try {
-      sendFetchRequest();
-      return getMessage(messages.take());
-    } catch (InterruptedException e) {
-      scheduler.shutdownNow();
-      return endOfData();
-    }
-  }
-
-  private FetchedMessage getMessage(FetchResult result) {
-    try {
-      if (result.isSuccess()) {
-        return result.getMessage();
-      } else {
-        throw result.getErrorCause();
-      }
-    } catch (Throwable t) {
-      throw Throwables.propagate(t);
-    }
-  }
-
-  private static final class FetchResult {
-    private final FetchedMessage message;
-    private final Throwable errorCause;
-
-    static FetchResult success(FetchedMessage message) {
-      return new FetchResult(message, null);
-    }
-
-    static FetchResult failure(Throwable cause) {
-      return new FetchResult(null, cause);
-    }
-
-    private FetchResult(FetchedMessage message, Throwable errorCause) {
-      this.message = message;
-      this.errorCause = errorCause;
-    }
-
-    public FetchedMessage getMessage() {
-      return message;
-    }
-
-    public Throwable getErrorCause() {
-      return errorCause;
-    }
-
-    public boolean isSuccess() {
-      return message != null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/internal/kafka/client/MessageSetEncoder.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/MessageSetEncoder.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/MessageSetEncoder.java
deleted file mode 100644
index 49008cc..0000000
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/MessageSetEncoder.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-import org.jboss.netty.buffer.ChannelBuffer;
-
-/**
- * This represents a set of messages that goes into the same message set and get encoded as
- * single kafka message set.
- */
-interface MessageSetEncoder {
-
-  MessageSetEncoder add(ChannelBuffer payload);
-
-  ChannelBuffer finish();
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ResponseHandler.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ResponseHandler.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ResponseHandler.java
deleted file mode 100644
index f681b85..0000000
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ResponseHandler.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-/**
- * Represents handler for kafka response.
- */
-interface ResponseHandler {
-
-  ResponseHandler NO_OP = new ResponseHandler() {
-    @Override
-    public void received(KafkaResponse response) {
-      // No-op
-    }
-  };
-
-  void received(KafkaResponse response);
-}