You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/12/12 23:00:06 UTC

[24/28] Making maven site works.

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java b/core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java
deleted file mode 100644
index 14dfc70..0000000
--- a/core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka;
-
-import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.AbstractIdleService;
-
-import java.io.File;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.List;
-import java.util.Properties;
-
-/**
- *
- */
-public final class EmbeddedKafkaServer extends AbstractIdleService {
-
-  private static final String KAFAK_CONFIG_CLASS = "kafka.server.KafkaConfig";
-  private static final String KAFKA_SERVER_CLASS = "kafka.server.KafkaServerStartable";
-
-  private final Object server;
-
-  public EmbeddedKafkaServer(File kafkaDir, Properties properties) {
-    this(createClassLoader(kafkaDir), properties);
-  }
-
-  public EmbeddedKafkaServer(ClassLoader classLoader, Properties properties) {
-    try {
-      Class<?> configClass = classLoader.loadClass(KAFAK_CONFIG_CLASS);
-      Object config = configClass.getConstructor(Properties.class).newInstance(properties);
-
-      Class<?> serverClass = classLoader.loadClass(KAFKA_SERVER_CLASS);
-      server = serverClass.getConstructor(configClass).newInstance(config);
-    } catch (Exception e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-  @Override
-  protected void startUp() throws Exception {
-    server.getClass().getMethod("startup").invoke(server);
-  }
-
-  @Override
-  protected void shutDown() throws Exception {
-    server.getClass().getMethod("shutdown").invoke(server);
-    server.getClass().getMethod("awaitShutdown").invoke(server);
-  }
-
-  private static ClassLoader createClassLoader(File kafkaDir) {
-    ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
-    ClassLoader thisClassLoader = EmbeddedKafkaServer.class.getClassLoader();
-    ClassLoader parent = contextClassLoader != null
-                            ? contextClassLoader
-                            : thisClassLoader != null
-                                ? thisClassLoader : ClassLoader.getSystemClassLoader();
-
-    return new URLClassLoader(findJars(kafkaDir, Lists.<URL>newArrayList()).toArray(new URL[0]), parent);
-  }
-
-  private static List<URL> findJars(File dir, List<URL> urls) {
-    try {
-      for (File file : dir.listFiles()) {
-        if (file.isDirectory()) {
-          findJars(file, urls);
-        } else if (file.getName().endsWith(".jar")) {
-          urls.add(file.toURI().toURL());
-        }
-      }
-      return urls;
-    } catch (MalformedURLException e) {
-      throw Throwables.propagate(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/kafka/client/AbstractCompressedMessageSetEncoder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/kafka/client/AbstractCompressedMessageSetEncoder.java b/core/src/main/java/org/apache/twill/internal/kafka/client/AbstractCompressedMessageSetEncoder.java
deleted file mode 100644
index a9c3381..0000000
--- a/core/src/main/java/org/apache/twill/internal/kafka/client/AbstractCompressedMessageSetEncoder.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-import com.google.common.base.Throwables;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBufferOutputStream;
-import org.jboss.netty.buffer.ChannelBuffers;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-/**
- * A base implementation of {@link MessageSetEncoder} that do message compression.
- */
-abstract class AbstractCompressedMessageSetEncoder extends AbstractMessageSetEncoder {
-
-  private final Compression compression;
-  private ChannelBufferOutputStream os;
-  private OutputStream compressedOutput;
-
-
-  protected AbstractCompressedMessageSetEncoder(Compression compression) {
-    this.compression = compression;
-    try {
-      this.os = new ChannelBufferOutputStream(ChannelBuffers.dynamicBuffer());
-      this.compressedOutput = createCompressedStream(os);
-    } catch (IOException e) {
-      // Should never happen
-      throw Throwables.propagate(e);
-    }
-  }
-
-  @Override
-  public final MessageSetEncoder add(ChannelBuffer payload) {
-    try {
-      ChannelBuffer encoded = encodePayload(payload);
-      encoded.readBytes(compressedOutput, encoded.readableBytes());
-    } catch (IOException e) {
-      throw Throwables.propagate(e);
-    }
-    return this;
-
-  }
-
-  @Override
-  public final ChannelBuffer finish() {
-    try {
-      compressedOutput.close();
-      ChannelBuffer buf = prefixLength(encodePayload(os.buffer(), compression));
-      compressedOutput = createCompressedStream(os);
-      os.buffer().clear();
-
-      return buf;
-
-    } catch (IOException e) {
-      throw Throwables.propagate(e);
-    }
-
-  }
-
-  protected abstract OutputStream createCompressedStream(OutputStream os) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/kafka/client/AbstractMessageSetEncoder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/kafka/client/AbstractMessageSetEncoder.java b/core/src/main/java/org/apache/twill/internal/kafka/client/AbstractMessageSetEncoder.java
deleted file mode 100644
index 9955d6a..0000000
--- a/core/src/main/java/org/apache/twill/internal/kafka/client/AbstractMessageSetEncoder.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-
-import java.util.zip.CRC32;
-
-/**
- * A base implementation of {@link MessageSetEncoder}.
- */
-abstract class AbstractMessageSetEncoder implements MessageSetEncoder {
-
-  private static final ThreadLocal<CRC32> CRC32_LOCAL = new ThreadLocal<CRC32>() {
-    @Override
-    protected CRC32 initialValue() {
-      return new CRC32();
-    }
-  };
-
-  protected final int computeCRC32(ChannelBuffer buffer) {
-    CRC32 crc32 = CRC32_LOCAL.get();
-    crc32.reset();
-
-    if (buffer.hasArray()) {
-      crc32.update(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), buffer.readableBytes());
-    } else {
-      byte[] bytes = new byte[buffer.readableBytes()];
-      buffer.getBytes(buffer.readerIndex(), bytes);
-      crc32.update(bytes);
-    }
-    return (int) crc32.getValue();
-  }
-
-  protected final ChannelBuffer encodePayload(ChannelBuffer payload) {
-    return encodePayload(payload, Compression.NONE);
-  }
-
-  protected final ChannelBuffer encodePayload(ChannelBuffer payload, Compression compression) {
-    ChannelBuffer header = ChannelBuffers.buffer(10);
-
-    int crc = computeCRC32(payload);
-
-    int magic = ((compression == Compression.NONE) ? 0 : 1);
-
-    // Message length = 1 byte magic + (optional 1 compression byte) + 4 bytes crc + payload length
-    header.writeInt(5 + magic + payload.readableBytes());
-    // Magic number = 0 for non-compressed data
-    header.writeByte(magic);
-    if (magic > 0) {
-      header.writeByte(compression.getCode());
-    }
-    header.writeInt(crc);
-
-    return ChannelBuffers.wrappedBuffer(header, payload);
-  }
-
-  protected final ChannelBuffer prefixLength(ChannelBuffer buffer) {
-    ChannelBuffer sizeBuf = ChannelBuffers.buffer(4);
-    sizeBuf.writeInt(buffer.readableBytes());
-    return ChannelBuffers.wrappedBuffer(sizeBuf, buffer);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/kafka/client/BasicFetchedMessage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/kafka/client/BasicFetchedMessage.java b/core/src/main/java/org/apache/twill/internal/kafka/client/BasicFetchedMessage.java
deleted file mode 100644
index 286bf82..0000000
--- a/core/src/main/java/org/apache/twill/internal/kafka/client/BasicFetchedMessage.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-import org.apache.twill.kafka.client.FetchedMessage;
-
-import java.nio.ByteBuffer;
-
-/**
- *
- */
-final class BasicFetchedMessage implements FetchedMessage {
-
-  private final long offset;
-  private final ByteBuffer buffer;
-
-  BasicFetchedMessage(long offset, ByteBuffer buffer) {
-    this.offset = offset;
-    this.buffer = buffer;
-  }
-
-  @Override
-  public long getOffset() {
-    return offset;
-  }
-
-  @Override
-  public ByteBuffer getBuffer() {
-    return buffer;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/kafka/client/Bufferer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/kafka/client/Bufferer.java b/core/src/main/java/org/apache/twill/internal/kafka/client/Bufferer.java
deleted file mode 100644
index c1fb4f2..0000000
--- a/core/src/main/java/org/apache/twill/internal/kafka/client/Bufferer.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-
-/**
- * A class to help buffering data of format [len][payload-of-len].
- */
-final class Bufferer {
-
-  private ChannelBuffer currentBuffer = null;
-  private int currentSize = -1;
-
-  void apply(ChannelBuffer buffer) {
-    currentBuffer = concatBuffer(currentBuffer, buffer);
-  }
-
-  /**
-   * Returns the buffer if the buffer data is ready to be consumed,
-   * otherwise return {@link ChannelBuffers#EMPTY_BUFFER}.
-   */
-  ChannelBuffer getNext() {
-    if (currentSize < 0) {
-      if (currentBuffer.readableBytes() < 4) {
-        return ChannelBuffers.EMPTY_BUFFER;
-      }
-      currentSize = currentBuffer.readInt();
-    }
-
-    // Keep buffering if less then required number of bytes
-    if (currentBuffer.readableBytes() < currentSize) {
-      return ChannelBuffers.EMPTY_BUFFER;
-    }
-
-    ChannelBuffer result = currentBuffer.readSlice(currentSize);
-    currentSize = -1;
-
-    return result;
-  }
-
-  private ChannelBuffer concatBuffer(ChannelBuffer current, ChannelBuffer buffer) {
-    return current == null ? buffer : ChannelBuffers.wrappedBuffer(current, buffer);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/kafka/client/Compression.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/kafka/client/Compression.java b/core/src/main/java/org/apache/twill/internal/kafka/client/Compression.java
deleted file mode 100644
index 3355b9f..0000000
--- a/core/src/main/java/org/apache/twill/internal/kafka/client/Compression.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-/**
- * Enum for indicating compression method.
- */
-public enum Compression {
-  NONE(0),
-  GZIP(1),
-  SNAPPY(2);
-
-  private final int code;
-
-  Compression(int code) {
-    this.code = code;
-  }
-
-  public int getCode() {
-    return code;
-  }
-
-  public static Compression fromCode(int code) {
-    switch (code) {
-      case 0:
-        return NONE;
-      case 1:
-        return GZIP;
-      case 2:
-        return SNAPPY;
-    }
-    throw new IllegalArgumentException("Unknown compression code.");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/kafka/client/ConnectionPool.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/kafka/client/ConnectionPool.java b/core/src/main/java/org/apache/twill/internal/kafka/client/ConnectionPool.java
deleted file mode 100644
index c2865ba..0000000
--- a/core/src/main/java/org/apache/twill/internal/kafka/client/ConnectionPool.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-import com.google.common.collect.Maps;
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.ChannelGroupFuture;
-import org.jboss.netty.channel.group.ChannelGroupFutureListener;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-
-import java.net.InetSocketAddress;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * Provides netty socket connection reuse.
- */
-final class ConnectionPool {
-
-  private final ClientBootstrap bootstrap;
-  private final ChannelGroup channelGroup;
-  private final ConcurrentMap<InetSocketAddress, Queue<ChannelFuture>> connections;
-
-  /**
-   * For releasing a connection back to the pool.
-   */
-  interface ConnectionReleaser {
-    void release();
-  }
-
-  /**
-   * Result of a connect request.
-   */
-  interface ConnectResult extends ConnectionReleaser {
-    ChannelFuture getChannelFuture();
-  }
-
-  ConnectionPool(ClientBootstrap bootstrap) {
-    this.bootstrap = bootstrap;
-    this.channelGroup = new DefaultChannelGroup();
-    this.connections = Maps.newConcurrentMap();
-  }
-
-  ConnectResult connect(InetSocketAddress address) {
-    Queue<ChannelFuture> channelFutures = connections.get(address);
-    if (channelFutures == null) {
-      channelFutures = new ConcurrentLinkedQueue<ChannelFuture>();
-      Queue<ChannelFuture> result = connections.putIfAbsent(address, channelFutures);
-      channelFutures = result == null ? channelFutures : result;
-    }
-
-    ChannelFuture channelFuture = channelFutures.poll();
-    while (channelFuture != null) {
-      if (channelFuture.isSuccess() && channelFuture.getChannel().isConnected()) {
-        return new SimpleConnectResult(address, channelFuture);
-      }
-      channelFuture = channelFutures.poll();
-    }
-
-    channelFuture = bootstrap.connect(address);
-    channelFuture.addListener(new ChannelFutureListener() {
-      @Override
-      public void operationComplete(ChannelFuture future) throws Exception {
-        if (future.isSuccess()) {
-          channelGroup.add(future.getChannel());
-        }
-      }
-    });
-    return new SimpleConnectResult(address, channelFuture);
-  }
-
-  ChannelGroupFuture close() {
-    ChannelGroupFuture result = channelGroup.close();
-    result.addListener(new ChannelGroupFutureListener() {
-      @Override
-      public void operationComplete(ChannelGroupFuture future) throws Exception {
-        bootstrap.releaseExternalResources();
-      }
-    });
-    return result;
-  }
-
-  private final class SimpleConnectResult implements ConnectResult {
-
-    private final InetSocketAddress address;
-    private final ChannelFuture future;
-
-
-    private SimpleConnectResult(InetSocketAddress address, ChannelFuture future) {
-      this.address = address;
-      this.future = future;
-    }
-
-    @Override
-    public ChannelFuture getChannelFuture() {
-      return future;
-    }
-
-    @Override
-    public void release() {
-      if (future.isSuccess()) {
-        connections.get(address).offer(future);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/kafka/client/GZipMessageSetEncoder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/kafka/client/GZipMessageSetEncoder.java b/core/src/main/java/org/apache/twill/internal/kafka/client/GZipMessageSetEncoder.java
deleted file mode 100644
index daa0c2c..0000000
--- a/core/src/main/java/org/apache/twill/internal/kafka/client/GZipMessageSetEncoder.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.zip.GZIPOutputStream;
-
-/**
- * A {@link MessageSetEncoder} that compress message set using GZIP.
- */
-final class GZipMessageSetEncoder extends AbstractCompressedMessageSetEncoder {
-
-  GZipMessageSetEncoder() {
-    super(Compression.GZIP);
-  }
-
-  @Override
-  protected OutputStream createCompressedStream(OutputStream os) throws IOException {
-    return new GZIPOutputStream(os);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/kafka/client/IdentityMessageSetEncoder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/kafka/client/IdentityMessageSetEncoder.java b/core/src/main/java/org/apache/twill/internal/kafka/client/IdentityMessageSetEncoder.java
deleted file mode 100644
index 51dc746..0000000
--- a/core/src/main/java/org/apache/twill/internal/kafka/client/IdentityMessageSetEncoder.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-
-/**
- * A pass-through {@link MessageSetEncoder}.
- */
-final class IdentityMessageSetEncoder extends AbstractMessageSetEncoder {
-
-  private ChannelBuffer messageSets = ChannelBuffers.EMPTY_BUFFER;
-
-  @Override
-  public MessageSetEncoder add(ChannelBuffer payload) {
-    messageSets = ChannelBuffers.wrappedBuffer(messageSets, encodePayload(payload));
-    return this;
-  }
-
-  @Override
-  public ChannelBuffer finish() {
-    ChannelBuffer buf = prefixLength(messageSets);
-    messageSets = ChannelBuffers.EMPTY_BUFFER;
-    return buf;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/kafka/client/KafkaBrokerCache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/kafka/client/KafkaBrokerCache.java b/core/src/main/java/org/apache/twill/internal/kafka/client/KafkaBrokerCache.java
deleted file mode 100644
index f2bb815..0000000
--- a/core/src/main/java/org/apache/twill/internal/kafka/client/KafkaBrokerCache.java
+++ /dev/null
@@ -1,326 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-import org.apache.twill.common.Threads;
-import org.apache.twill.zookeeper.NodeChildren;
-import org.apache.twill.zookeeper.NodeData;
-import org.apache.twill.zookeeper.ZKClient;
-import com.google.common.base.Charsets;
-import com.google.common.base.Function;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.ImmutableSortedMap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.AbstractIdleService;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetSocketAddress;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.SortedMap;
-
-/**
- * A Service to cache kafka broker information by subscribing to ZooKeeper.
- */
-final class KafkaBrokerCache extends AbstractIdleService {
-
-  private static final Logger LOG = LoggerFactory.getLogger(KafkaBrokerCache.class);
-
-  private static final String BROKERS_PATH = "/brokers";
-
-  private final ZKClient zkClient;
-  private final Map<String, InetSocketAddress> brokers;
-  // topicBrokers is from topic->partition size->brokerId
-  private final Map<String, SortedMap<Integer, Set<String>>> topicBrokers;
-  private final Runnable invokeGetBrokers = new Runnable() {
-    @Override
-    public void run() {
-      getBrokers();
-    }
-  };
-  private final Runnable invokeGetTopics = new Runnable() {
-    @Override
-    public void run() {
-      getTopics();
-    }
-  };
-
-  KafkaBrokerCache(ZKClient zkClient) {
-    this.zkClient = zkClient;
-    this.brokers = Maps.newConcurrentMap();
-    this.topicBrokers = Maps.newConcurrentMap();
-  }
-
-  @Override
-  protected void startUp() throws Exception {
-    getBrokers();
-    getTopics();
-  }
-
-  @Override
-  protected void shutDown() throws Exception {
-    // No-op
-  }
-
-  public int getPartitionSize(String topic) {
-    SortedMap<Integer, Set<String>> partitionBrokers = topicBrokers.get(topic);
-    if (partitionBrokers == null || partitionBrokers.isEmpty()) {
-      return 1;
-    }
-    return partitionBrokers.lastKey();
-  }
-
-  public TopicBroker getBrokerAddress(String topic, int partition) {
-    SortedMap<Integer, Set<String>> partitionBrokers = topicBrokers.get(topic);
-    if (partitionBrokers == null || partitionBrokers.isEmpty()) {
-      return pickRandomBroker(topic);
-    }
-
-    // If the requested partition is greater than supported partition size, randomly pick one
-    if (partition >= partitionBrokers.lastKey()) {
-      return pickRandomBroker(topic);
-    }
-
-    // Randomly pick a partition size and randomly pick a broker from it
-    Random random = new Random();
-    partitionBrokers = partitionBrokers.tailMap(partition + 1);
-    List<Integer> sizes = Lists.newArrayList(partitionBrokers.keySet());
-    Integer partitionSize = pickRandomItem(sizes, random);
-    List<String> ids = Lists.newArrayList(partitionBrokers.get(partitionSize));
-    InetSocketAddress address = brokers.get(ids.get(new Random().nextInt(ids.size())));
-    return address == null ? pickRandomBroker(topic) : new TopicBroker(topic, address, partitionSize);
-  }
-
-  private TopicBroker pickRandomBroker(String topic) {
-    Map.Entry<String, InetSocketAddress> entry = Iterables.getFirst(brokers.entrySet(), null);
-    if (entry == null) {
-      return null;
-    }
-    InetSocketAddress address = entry.getValue();
-    return new TopicBroker(topic, address, 0);
-  }
-
-  private <T> T pickRandomItem(List<T> list, Random random) {
-    return list.get(random.nextInt(list.size()));
-  }
-
-  private void getBrokers() {
-    final String idsPath = BROKERS_PATH + "/ids";
-
-    Futures.addCallback(zkClient.getChildren(idsPath, new Watcher() {
-      @Override
-      public void process(WatchedEvent event) {
-        getBrokers();
-      }
-    }), new ExistsOnFailureFutureCallback<NodeChildren>(idsPath, invokeGetBrokers) {
-      @Override
-      public void onSuccess(NodeChildren result) {
-        Set<String> children = ImmutableSet.copyOf(result.getChildren());
-        for (String child : children) {
-          getBrokenData(idsPath + "/" + child, child);
-        }
-        // Remove all removed brokers
-        removeDiff(children, brokers);
-      }
-    });
-  }
-
-  private void getTopics() {
-    final String topicsPath = BROKERS_PATH + "/topics";
-    Futures.addCallback(zkClient.getChildren(topicsPath, new Watcher() {
-      @Override
-      public void process(WatchedEvent event) {
-        getTopics();
-      }
-    }), new ExistsOnFailureFutureCallback<NodeChildren>(topicsPath, invokeGetTopics) {
-      @Override
-      public void onSuccess(NodeChildren result) {
-        Set<String> children = ImmutableSet.copyOf(result.getChildren());
-
-        // Process new children
-        for (String topic : ImmutableSet.copyOf(Sets.difference(children, topicBrokers.keySet()))) {
-          getTopic(topicsPath + "/" + topic, topic);
-        }
-
-        // Remove old children
-        removeDiff(children, topicBrokers);
-      }
-    });
-  }
-
-  private void getBrokenData(String path, final String brokerId) {
-    Futures.addCallback(zkClient.getData(path), new FutureCallback<NodeData>() {
-      @Override
-      public void onSuccess(NodeData result) {
-        String data = new String(result.getData(), Charsets.UTF_8);
-        String hostPort = data.substring(data.indexOf(':') + 1);
-        int idx = hostPort.indexOf(':');
-        brokers.put(brokerId, new InetSocketAddress(hostPort.substring(0, idx),
-                                                    Integer.parseInt(hostPort.substring(idx + 1))));
-      }
-
-      @Override
-      public void onFailure(Throwable t) {
-        // No-op, the watch on the parent node will handle it.
-      }
-    });
-  }
-
-  private void getTopic(final String path, final String topic) {
-    Futures.addCallback(zkClient.getChildren(path, new Watcher() {
-      @Override
-      public void process(WatchedEvent event) {
-        // Other event type changes are either could be ignored or handled by parent watcher
-        if (event.getType() == Event.EventType.NodeChildrenChanged) {
-          getTopic(path, topic);
-        }
-      }
-    }), new FutureCallback<NodeChildren>() {
-      @Override
-      public void onSuccess(NodeChildren result) {
-        List<String> children = result.getChildren();
-        final List<ListenableFuture<BrokerPartition>> futures = Lists.newArrayListWithCapacity(children.size());
-
-        // Fetch data from each broken node
-        for (final String brokerId : children) {
-          Futures.transform(zkClient.getData(path + "/" + brokerId), new Function<NodeData, BrokerPartition>() {
-            @Override
-            public BrokerPartition apply(NodeData input) {
-              return new BrokerPartition(brokerId, Integer.parseInt(new String(input.getData(), Charsets.UTF_8)));
-            }
-          });
-        }
-
-        // When all fetching is done, build the partition size->broker map for this topic
-        Futures.successfulAsList(futures).addListener(new Runnable() {
-          @Override
-          public void run() {
-            Map<Integer, Set<String>> partitionBrokers = Maps.newHashMap();
-            for (ListenableFuture<BrokerPartition> future : futures) {
-              try {
-                BrokerPartition info = future.get();
-                Set<String> brokerSet = partitionBrokers.get(info.getPartitionSize());
-                if (brokerSet == null) {
-                  brokerSet = Sets.newHashSet();
-                  partitionBrokers.put(info.getPartitionSize(), brokerSet);
-                }
-                brokerSet.add(info.getBrokerId());
-              } catch (Exception e) {
-                // Exception is ignored, as it will be handled by parent watcher
-              }
-            }
-            topicBrokers.put(topic, ImmutableSortedMap.copyOf(partitionBrokers));
-          }
-        }, Threads.SAME_THREAD_EXECUTOR);
-      }
-
-      @Override
-      public void onFailure(Throwable t) {
-        // No-op. Failure would be handled by parent watcher already (e.g. node not exists -> children change in parent)
-      }
-    });
-  }
-
-  private <K, V> void removeDiff(Set<K> keys, Map<K, V> map) {
-    for (K key : ImmutableSet.copyOf(Sets.difference(map.keySet(), keys))) {
-      map.remove(key);
-    }
-  }
-
-  private abstract class ExistsOnFailureFutureCallback<V> implements FutureCallback<V> {
-
-    private final String path;
-    private final Runnable action;
-
-    protected ExistsOnFailureFutureCallback(String path, Runnable action) {
-      this.path = path;
-      this.action = action;
-    }
-
-    @Override
-    public final void onFailure(Throwable t) {
-      if (!isNotExists(t)) {
-        LOG.error("Fail to watch for kafka brokers: " + path, t);
-        return;
-      }
-
-      waitExists(path);
-    }
-
-    private boolean isNotExists(Throwable t) {
-      return ((t instanceof KeeperException) && ((KeeperException) t).code() == KeeperException.Code.NONODE);
-    }
-
-    private void waitExists(String path) {
-      LOG.info("Path " + path + " not exists. Watch for creation.");
-
-      // If the node doesn't exists, use the "exists" call to watch for node creation.
-      Futures.addCallback(zkClient.exists(path, new Watcher() {
-        @Override
-        public void process(WatchedEvent event) {
-          if (event.getType() == Event.EventType.NodeCreated || event.getType() == Event.EventType.NodeDeleted) {
-            action.run();
-          }
-        }
-      }), new FutureCallback<Stat>() {
-        @Override
-        public void onSuccess(Stat result) {
-          // If path exists, get children again, otherwise wait for watch to get triggered
-          if (result != null) {
-            action.run();
-          }
-        }
-        @Override
-        public void onFailure(Throwable t) {
-          action.run();
-        }
-      });
-    }
-  }
-
-  private static final class BrokerPartition {
-    private final String brokerId;
-    private final int partitionSize;
-
-    private BrokerPartition(String brokerId, int partitionSize) {
-      this.brokerId = brokerId;
-      this.partitionSize = partitionSize;
-    }
-
-    public String getBrokerId() {
-      return brokerId;
-    }
-
-    public int getPartitionSize() {
-      return partitionSize;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequest.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequest.java b/core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequest.java
deleted file mode 100644
index 7b43f8a..0000000
--- a/core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-import org.jboss.netty.buffer.ChannelBuffer;
-
-/**
- *
- */
-final class KafkaRequest {
-
-  public enum Type {
-    PRODUCE(0),
-    FETCH(1),
-    MULTI_FETCH(2),
-    MULTI_PRODUCE(3),
-    OFFSETS(4);
-
-    private final short id;
-
-    private Type(int id) {
-      this.id = (short) id;
-    }
-
-    public short getId() {
-      return id;
-    }
-  }
-
-  private final Type type;
-  private final String topic;
-  private final int partition;
-  private final ChannelBuffer body;
-  private final ResponseHandler responseHandler;
-
-
-  public static KafkaRequest createProduce(String topic, int partition, ChannelBuffer body) {
-    return new KafkaRequest(Type.PRODUCE, topic, partition, body, ResponseHandler.NO_OP);
-  }
-
-  public static KafkaRequest createFetch(String topic, int partition, ChannelBuffer body, ResponseHandler handler) {
-    return new KafkaRequest(Type.FETCH, topic, partition, body, handler);
-  }
-
-  public static KafkaRequest createOffsets(String topic, int partition, ChannelBuffer body, ResponseHandler handler) {
-    return new KafkaRequest(Type.OFFSETS, topic, partition, body, handler);
-  }
-
-  private KafkaRequest(Type type, String topic, int partition, ChannelBuffer body, ResponseHandler responseHandler) {
-    this.type = type;
-    this.topic = topic;
-    this.partition = partition;
-    this.body = body;
-    this.responseHandler = responseHandler;
-  }
-
-  Type getType() {
-    return type;
-  }
-
-  String getTopic() {
-    return topic;
-  }
-
-  int getPartition() {
-    return partition;
-  }
-
-  ChannelBuffer getBody() {
-    return body;
-  }
-
-  ResponseHandler getResponseHandler() {
-    return responseHandler;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestEncoder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestEncoder.java b/core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestEncoder.java
deleted file mode 100644
index ef78c76..0000000
--- a/core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestEncoder.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-import com.google.common.base.Charsets;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
-
-import java.nio.ByteBuffer;
-
-/**
- *
- */
-final class KafkaRequestEncoder extends OneToOneEncoder {
-
-  @Override
-  protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
-    if (!(msg instanceof KafkaRequest)) {
-      return msg;
-    }
-    KafkaRequest req = (KafkaRequest) msg;
-    ByteBuffer topic = Charsets.UTF_8.encode(req.getTopic());
-
-    ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(16 + topic.remaining() + req.getBody().readableBytes());
-    int writerIdx = buffer.writerIndex();
-    buffer.writerIndex(writerIdx + 4);    // Reserves 4 bytes for message length
-
-    // Write out <REQUEST_TYPE>, <TOPIC_LENGTH>, <TOPIC>, <PARTITION>
-    buffer.writeShort(req.getType().getId());
-    buffer.writeShort(topic.remaining());
-    buffer.writeBytes(topic);
-    buffer.writeInt(req.getPartition());
-
-    // Write out the size of the whole buffer (excluding the size field) at the beginning
-    buffer.setInt(writerIdx, buffer.readableBytes() - 4 + req.getBody().readableBytes());
-
-    ChannelBuffer buf = ChannelBuffers.wrappedBuffer(buffer, req.getBody());
-    buf = buf.readBytes(buf.readableBytes());
-
-    return buf;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestSender.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestSender.java b/core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestSender.java
deleted file mode 100644
index fbc552c..0000000
--- a/core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestSender.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-/**
- *
- */
-interface KafkaRequestSender {
-
-  void send(KafkaRequest request);
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponse.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponse.java b/core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponse.java
deleted file mode 100644
index 68c1bd8..0000000
--- a/core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponse.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-import org.apache.twill.kafka.client.FetchException;
-import org.jboss.netty.buffer.ChannelBuffer;
-
-/**
- *
- */
-final class KafkaResponse {
-
-  private final FetchException.ErrorCode errorCode;
-  private final ChannelBuffer body;
-  private final int size;
-
-  KafkaResponse(FetchException.ErrorCode errorCode, ChannelBuffer body, int size) {
-    this.errorCode = errorCode;
-    this.body = body;
-    this.size = size;
-  }
-
-  public int getSize() {
-    return size;
-  }
-
-  public FetchException.ErrorCode getErrorCode() {
-    return errorCode;
-  }
-
-  public ChannelBuffer getBody() {
-    return body;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseDispatcher.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseDispatcher.java b/core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseDispatcher.java
deleted file mode 100644
index 47f70ce..0000000
--- a/core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseDispatcher.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.SocketException;
-import java.nio.channels.ClosedChannelException;
-
-/**
- *
- */
-final class KafkaResponseDispatcher extends SimpleChannelHandler {
-
-  private static final Logger LOG = LoggerFactory.getLogger(KafkaResponseDispatcher.class);
-
-  @Override
-  public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
-    Object attachment = ctx.getAttachment();
-    if (e.getMessage() instanceof KafkaResponse && attachment instanceof ResponseHandler) {
-      ((ResponseHandler) attachment).received((KafkaResponse) e.getMessage());
-    } else {
-      super.messageReceived(ctx, e);
-    }
-  }
-
-  @Override
-  public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
-    if (e.getMessage() instanceof KafkaRequest) {
-      ctx.setAttachment(((KafkaRequest) e.getMessage()).getResponseHandler());
-    }
-    super.writeRequested(ctx, e);
-  }
-
-  @Override
-  public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
-    if (e.getCause() instanceof ClosedChannelException || e.getCause() instanceof SocketException) {
-      // No need to log for socket exception as the client has logic to retry.
-      return;
-    }
-    LOG.warn("Exception caught in kafka client connection.", e.getCause());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseHandler.java b/core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseHandler.java
deleted file mode 100644
index 5251e65..0000000
--- a/core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseHandler.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-import org.apache.twill.kafka.client.FetchException;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-
-/**
- *
- */
-final class KafkaResponseHandler extends SimpleChannelHandler {
-
-  private final Bufferer bufferer = new Bufferer();
-
-  @Override
-  public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
-    Object msg = e.getMessage();
-    if (!(msg instanceof ChannelBuffer)) {
-      super.messageReceived(ctx, e);
-      return;
-    }
-
-    bufferer.apply((ChannelBuffer) msg);
-    ChannelBuffer buffer = bufferer.getNext();
-    while (buffer.readable()) {
-      // Send the response object upstream
-      Channels.fireMessageReceived(ctx, new KafkaResponse(FetchException.ErrorCode.fromCode(buffer.readShort()),
-                                                          buffer, buffer.readableBytes() + 6));
-      buffer = bufferer.getNext();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/kafka/client/MessageFetcher.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/kafka/client/MessageFetcher.java b/core/src/main/java/org/apache/twill/internal/kafka/client/MessageFetcher.java
deleted file mode 100644
index 0814917..0000000
--- a/core/src/main/java/org/apache/twill/internal/kafka/client/MessageFetcher.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-import org.apache.twill.common.Threads;
-import org.apache.twill.kafka.client.FetchException;
-import org.apache.twill.kafka.client.FetchedMessage;
-import com.google.common.base.Throwables;
-import com.google.common.collect.AbstractIterator;
-import com.google.common.io.ByteStreams;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBufferInputStream;
-import org.jboss.netty.buffer.ChannelBufferOutputStream;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.xerial.snappy.SnappyInputStream;
-
-import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.zip.GZIPInputStream;
-
-/**
- * This class is for consuming messages from a kafka topic.
- */
-final class MessageFetcher extends AbstractIterator<FetchedMessage> implements ResponseHandler {
-
-  private static final long BACKOFF_INTERVAL_MS = 100;
-
-  private final KafkaRequestSender sender;
-  private final String topic;
-  private final int partition;
-  private final int maxSize;
-  private final AtomicLong offset;
-  private final BlockingQueue<FetchResult> messages;
-  private final ScheduledExecutorService scheduler;
-  private volatile long backoffMillis;
-  private final Runnable sendFetchRequest = new Runnable() {
-    @Override
-    public void run() {
-      sendFetchRequest();
-    }
-  };
-
-  MessageFetcher(String topic, int partition, long offset, int maxSize, KafkaRequestSender sender) {
-    this.topic = topic;
-    this.partition = partition;
-    this.sender = sender;
-    this.offset = new AtomicLong(offset);
-    this.maxSize = maxSize;
-    this.messages = new LinkedBlockingQueue<FetchResult>();
-    this.scheduler = Executors.newSingleThreadScheduledExecutor(
-                        Threads.createDaemonThreadFactory("kafka-" + topic + "-consumer"));
-  }
-
-  @Override
-  public void received(KafkaResponse response) {
-    if (response.getErrorCode() != FetchException.ErrorCode.OK) {
-      messages.add(FetchResult.failure(new FetchException("Error in fetching: " + response.getErrorCode(),
-                                                          response.getErrorCode())));
-      return;
-    }
-
-    try {
-      if (decodeResponse(response.getBody(), -1)) {
-        backoffMillis = 0;
-      } else {
-        backoffMillis = Math.max(backoffMillis + BACKOFF_INTERVAL_MS, 1000);
-        scheduler.schedule(sendFetchRequest, backoffMillis, TimeUnit.MILLISECONDS);
-      }
-    } catch (Throwable t) {
-      messages.add(FetchResult.failure(t));
-    }
-  }
-
-  private boolean decodeResponse(ChannelBuffer buffer, long nextOffset) {
-    boolean hasMessage = false;
-    boolean computeOffset = nextOffset < 0;
-    while (buffer.readableBytes() >= 4) {
-      int size = buffer.readInt();
-      if (buffer.readableBytes() < size) {
-        if (!hasMessage) {
-          throw new IllegalStateException("Size too small");
-        }
-        break;
-      }
-      nextOffset = computeOffset ? offset.addAndGet(size + 4) : nextOffset;
-      decodeMessage(size, buffer, nextOffset);
-      hasMessage = true;
-    }
-    return hasMessage;
-
-  }
-
-  private void decodeMessage(int size, ChannelBuffer buffer, long nextOffset) {
-    int readerIdx = buffer.readerIndex();
-    int magic = buffer.readByte();
-    Compression compression = magic == 0 ? Compression.NONE : Compression.fromCode(buffer.readByte());
-    int crc = buffer.readInt();
-
-    ChannelBuffer payload = buffer.readSlice(size - (buffer.readerIndex() - readerIdx));
-
-    // Verify CRC?
-    enqueueMessage(compression, payload, nextOffset);
-  }
-
-  private void enqueueMessage(Compression compression, ChannelBuffer payload, long nextOffset) {
-    switch (compression) {
-      case NONE:
-        messages.add(FetchResult.success(new BasicFetchedMessage(nextOffset, payload.toByteBuffer())));
-        break;
-      case GZIP:
-        decodeResponse(gunzip(payload), nextOffset);
-        break;
-      case SNAPPY:
-        decodeResponse(unsnappy(payload), nextOffset);
-        break;
-    }
-  }
-
-  private ChannelBuffer gunzip(ChannelBuffer source) {
-    ChannelBufferOutputStream output = new ChannelBufferOutputStream(
-                                              ChannelBuffers.dynamicBuffer(source.readableBytes() * 2));
-    try {
-      try {
-        GZIPInputStream gzipInput = new GZIPInputStream(new ChannelBufferInputStream(source));
-        try {
-          ByteStreams.copy(gzipInput, output);
-          return output.buffer();
-        } finally {
-          gzipInput.close();
-        }
-      } finally {
-        output.close();
-      }
-    } catch (IOException e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-  private ChannelBuffer unsnappy(ChannelBuffer source) {
-    ChannelBufferOutputStream output = new ChannelBufferOutputStream(
-                                              ChannelBuffers.dynamicBuffer(source.readableBytes() * 2));
-    try {
-      try {
-        SnappyInputStream snappyInput = new SnappyInputStream(new ChannelBufferInputStream(source));
-        try {
-          ByteStreams.copy(snappyInput, output);
-          return output.buffer();
-        } finally {
-          snappyInput.close();
-        }
-      } finally {
-        output.close();
-      }
-    } catch (IOException e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-  private void sendFetchRequest() {
-    ChannelBuffer fetchBody = ChannelBuffers.buffer(12);
-    fetchBody.writeLong(offset.get());
-    fetchBody.writeInt(maxSize);
-    sender.send(KafkaRequest.createFetch(topic, partition, fetchBody, MessageFetcher.this));
-  }
-
-  @Override
-  protected FetchedMessage computeNext() {
-    FetchResult result = messages.poll();
-    if (result != null) {
-      return getMessage(result);
-    }
-
-    try {
-      sendFetchRequest();
-      return getMessage(messages.take());
-    } catch (InterruptedException e) {
-      scheduler.shutdownNow();
-      return endOfData();
-    }
-  }
-
-  private FetchedMessage getMessage(FetchResult result) {
-    try {
-      if (result.isSuccess()) {
-        return result.getMessage();
-      } else {
-        throw result.getErrorCause();
-      }
-    } catch (Throwable t) {
-      throw Throwables.propagate(t);
-    }
-  }
-
-  private static final class FetchResult {
-    private final FetchedMessage message;
-    private final Throwable errorCause;
-
-    static FetchResult success(FetchedMessage message) {
-      return new FetchResult(message, null);
-    }
-
-    static FetchResult failure(Throwable cause) {
-      return new FetchResult(null, cause);
-    }
-
-    private FetchResult(FetchedMessage message, Throwable errorCause) {
-      this.message = message;
-      this.errorCause = errorCause;
-    }
-
-    public FetchedMessage getMessage() {
-      return message;
-    }
-
-    public Throwable getErrorCause() {
-      return errorCause;
-    }
-
-    public boolean isSuccess() {
-      return message != null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/kafka/client/MessageSetEncoder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/kafka/client/MessageSetEncoder.java b/core/src/main/java/org/apache/twill/internal/kafka/client/MessageSetEncoder.java
deleted file mode 100644
index 49008cc..0000000
--- a/core/src/main/java/org/apache/twill/internal/kafka/client/MessageSetEncoder.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-import org.jboss.netty.buffer.ChannelBuffer;
-
-/**
- * This represents a set of messages that goes into the same message set and get encoded as
- * single kafka message set.
- */
-interface MessageSetEncoder {
-
-  MessageSetEncoder add(ChannelBuffer payload);
-
-  ChannelBuffer finish();
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/kafka/client/ResponseHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/kafka/client/ResponseHandler.java b/core/src/main/java/org/apache/twill/internal/kafka/client/ResponseHandler.java
deleted file mode 100644
index f681b85..0000000
--- a/core/src/main/java/org/apache/twill/internal/kafka/client/ResponseHandler.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-/**
- * Represents handler for kafka response.
- */
-interface ResponseHandler {
-
-  ResponseHandler NO_OP = new ResponseHandler() {
-    @Override
-    public void received(KafkaResponse response) {
-      // No-op
-    }
-  };
-
-  void received(KafkaResponse response);
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaClient.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaClient.java b/core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaClient.java
deleted file mode 100644
index 8ff4856..0000000
--- a/core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaClient.java
+++ /dev/null
@@ -1,304 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-import org.apache.twill.common.Threads;
-import org.apache.twill.kafka.client.FetchException;
-import org.apache.twill.kafka.client.FetchedMessage;
-import org.apache.twill.kafka.client.KafkaClient;
-import org.apache.twill.kafka.client.PreparePublish;
-import org.apache.twill.zookeeper.ZKClient;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.primitives.Ints;
-import com.google.common.primitives.Longs;
-import com.google.common.util.concurrent.AbstractIdleService;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.socket.nio.NioClientBossPool;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioWorkerPool;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * Basic implementation of {@link KafkaClient}.
- */
-public final class SimpleKafkaClient extends AbstractIdleService implements KafkaClient {
-
-  private static final Logger LOG = LoggerFactory.getLogger(SimpleKafkaClient.class);
-  private static final int BROKER_POLL_INTERVAL = 100;
-
-  private final KafkaBrokerCache brokerCache;
-  private ClientBootstrap bootstrap;
-  private ConnectionPool connectionPool;
-
-  public SimpleKafkaClient(ZKClient zkClient) {
-    this.brokerCache = new KafkaBrokerCache(zkClient);
-  }
-
-  @Override
-  protected void startUp() throws Exception {
-    brokerCache.startAndWait();
-    ThreadFactory threadFactory = Threads.createDaemonThreadFactory("kafka-client-netty-%d");
-    NioClientBossPool bossPool = new NioClientBossPool(Executors.newSingleThreadExecutor(threadFactory), 1,
-                                                       new HashedWheelTimer(threadFactory), null);
-    NioWorkerPool workerPool = new NioWorkerPool(Executors.newFixedThreadPool(4, threadFactory), 4);
-
-    bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(bossPool, workerPool));
-    bootstrap.setPipelineFactory(new KafkaChannelPipelineFactory());
-    connectionPool = new ConnectionPool(bootstrap);
-  }
-
-  @Override
-  protected void shutDown() throws Exception {
-    connectionPool.close();
-    bootstrap.releaseExternalResources();
-    brokerCache.stopAndWait();
-  }
-
-  @Override
-  public PreparePublish preparePublish(final String topic, final Compression compression) {
-    final Map<Integer, MessageSetEncoder> encoders = Maps.newHashMap();
-
-    return new PreparePublish() {
-      @Override
-      public PreparePublish add(byte[] payload, Object partitionKey) {
-        return add(ByteBuffer.wrap(payload), partitionKey);
-      }
-
-      @Override
-      public PreparePublish add(ByteBuffer payload, Object partitionKey) {
-        // TODO: Partition
-        int partition = 0;
-
-        MessageSetEncoder encoder = encoders.get(partition);
-        if (encoder == null) {
-          encoder = getEncoder(compression);
-          encoders.put(partition, encoder);
-        }
-        encoder.add(ChannelBuffers.wrappedBuffer(payload));
-
-        return this;
-      }
-
-      @Override
-      public ListenableFuture<?> publish() {
-        List<ListenableFuture<?>> futures = Lists.newArrayListWithCapacity(encoders.size());
-        for (Map.Entry<Integer, MessageSetEncoder> entry : encoders.entrySet()) {
-          futures.add(doPublish(topic, entry.getKey(), entry.getValue().finish()));
-        }
-        encoders.clear();
-        return Futures.allAsList(futures);
-      }
-
-      private ListenableFuture<?> doPublish(String topic, int partition, ChannelBuffer messageSet) {
-        final KafkaRequest request = KafkaRequest.createProduce(topic, partition, messageSet);
-        final SettableFuture<?> result = SettableFuture.create();
-        final ConnectionPool.ConnectResult connection =
-              connectionPool.connect(getTopicBroker(topic, partition).getAddress());
-
-        connection.getChannelFuture().addListener(new ChannelFutureListener() {
-          @Override
-          public void operationComplete(ChannelFuture future) throws Exception {
-            try {
-              future.getChannel().write(request).addListener(getPublishChannelFutureListener(result, null, connection));
-            } catch (Exception e) {
-              result.setException(e);
-            }
-          }
-        });
-
-        return result;
-      }
-    };
-  }
-
-  @Override
-  public Iterator<FetchedMessage> consume(final String topic, final int partition, long offset, int maxSize) {
-    Preconditions.checkArgument(maxSize >= 10, "Message size cannot be smaller than 10.");
-
-    // Connect to broker. Consumer connection are long connection. No need to worry about reuse.
-    final AtomicReference<ChannelFuture> channelFutureRef = new AtomicReference<ChannelFuture>(
-          connectionPool.connect(getTopicBroker(topic, partition).getAddress()).getChannelFuture());
-
-    return new MessageFetcher(topic, partition, offset, maxSize, new KafkaRequestSender() {
-
-      @Override
-      public void send(final KafkaRequest request) {
-        if (!isRunning()) {
-          return;
-        }
-        try {
-          // Try to send the request
-          Channel channel = channelFutureRef.get().getChannel();
-          if (!channel.write(request).await().isSuccess()) {
-            // If failed, retry
-            channel.close();
-            ChannelFuture channelFuture = connectionPool.connect(
-                                              getTopicBroker(topic, partition).getAddress()).getChannelFuture();
-            channelFutureRef.set(channelFuture);
-            channelFuture.addListener(new ChannelFutureListener() {
-              @Override
-              public void operationComplete(ChannelFuture channelFuture) throws Exception {
-                send(request);
-              }
-            });
-          }
-        } catch (InterruptedException e) {
-          // Ignore it
-          LOG.info("Interrupted when sending consume request", e);
-        }
-      }
-    });
-  }
-
-  @Override
-  public ListenableFuture<long[]> getOffset(final String topic, final int partition, long time, int maxOffsets) {
-    final SettableFuture<long[]> resultFuture = SettableFuture.create();
-    final ChannelBuffer body = ChannelBuffers.buffer(Longs.BYTES + Ints.BYTES);
-    body.writeLong(time);
-    body.writeInt(maxOffsets);
-
-    connectionPool.connect(getTopicBroker(topic, partition).getAddress())
-                  .getChannelFuture().addListener(new ChannelFutureListener() {
-      @Override
-      public void operationComplete(ChannelFuture future) throws Exception {
-        if (checkFailure(future)) {
-          return;
-        }
-
-        future.getChannel().write(KafkaRequest.createOffsets(topic, partition, body, new ResponseHandler() {
-          @Override
-          public void received(KafkaResponse response) {
-            if (response.getErrorCode() != FetchException.ErrorCode.OK) {
-              resultFuture.setException(new FetchException("Failed to fetch offset.", response.getErrorCode()));
-            } else {
-              // Decode the offset response, which contains 4 bytes number of offsets, followed by number of offsets,
-              // each 8 bytes in size.
-              ChannelBuffer resultBuffer = response.getBody();
-              int size = resultBuffer.readInt();
-              long[] result = new long[size];
-              for (int i = 0; i < size; i++) {
-                result[i] = resultBuffer.readLong();
-              }
-              resultFuture.set(result);
-            }
-          }
-        })).addListener(new ChannelFutureListener() {
-          @Override
-          public void operationComplete(ChannelFuture future) throws Exception {
-            checkFailure(future);
-          }
-        });
-      }
-
-      private boolean checkFailure(ChannelFuture future) {
-        if (!future.isSuccess()) {
-          if (future.isCancelled()) {
-            resultFuture.cancel(true);
-          } else {
-            resultFuture.setException(future.getCause());
-          }
-          return true;
-        }
-        return false;
-      }
-    });
-
-    return resultFuture;
-  }
-
-  private TopicBroker getTopicBroker(String topic, int partition) {
-    TopicBroker topicBroker = brokerCache.getBrokerAddress(topic, partition);
-    while (topicBroker == null) {
-      try {
-        TimeUnit.MILLISECONDS.sleep(BROKER_POLL_INTERVAL);
-      } catch (InterruptedException e) {
-        return null;
-      }
-      topicBroker = brokerCache.getBrokerAddress(topic, partition);
-    }
-    return topicBroker;
-  }
-
-  private MessageSetEncoder getEncoder(Compression compression) {
-    switch (compression) {
-      case GZIP:
-        return new GZipMessageSetEncoder();
-      case SNAPPY:
-        return new SnappyMessageSetEncoder();
-      default:
-        return new IdentityMessageSetEncoder();
-    }
-  }
-
-  private <V> ChannelFutureListener getPublishChannelFutureListener(final SettableFuture<V> result, final V resultObj,
-                                                                    final ConnectionPool.ConnectionReleaser releaser) {
-    return new ChannelFutureListener() {
-      @Override
-      public void operationComplete(ChannelFuture future) throws Exception {
-        try {
-          if (future.isSuccess()) {
-            result.set(resultObj);
-          } else if (future.isCancelled()) {
-            result.cancel(true);
-          } else {
-            result.setException(future.getCause());
-          }
-        } finally {
-          releaser.release();
-        }
-      }
-    };
-  }
-
-  private static final class KafkaChannelPipelineFactory implements ChannelPipelineFactory {
-
-    @Override
-    public ChannelPipeline getPipeline() throws Exception {
-      ChannelPipeline pipeline = Channels.pipeline();
-
-      pipeline.addLast("encoder", new KafkaRequestEncoder());
-      pipeline.addLast("decoder", new KafkaResponseHandler());
-      pipeline.addLast("dispatcher", new KafkaResponseDispatcher());
-      return pipeline;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/kafka/client/SnappyMessageSetEncoder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/kafka/client/SnappyMessageSetEncoder.java b/core/src/main/java/org/apache/twill/internal/kafka/client/SnappyMessageSetEncoder.java
deleted file mode 100644
index bf18c08..0000000
--- a/core/src/main/java/org/apache/twill/internal/kafka/client/SnappyMessageSetEncoder.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-import org.xerial.snappy.SnappyOutputStream;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-/**
- * A {@link MessageSetEncoder} that compress messages using snappy.
- */
-final class SnappyMessageSetEncoder extends AbstractCompressedMessageSetEncoder {
-
-  SnappyMessageSetEncoder() {
-    super(Compression.SNAPPY);
-  }
-
-  @Override
-  protected OutputStream createCompressedStream(OutputStream os) throws IOException {
-    return new SnappyOutputStream(os);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/kafka/client/TopicBroker.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/kafka/client/TopicBroker.java b/core/src/main/java/org/apache/twill/internal/kafka/client/TopicBroker.java
deleted file mode 100644
index fd4bf03..0000000
--- a/core/src/main/java/org/apache/twill/internal/kafka/client/TopicBroker.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-import java.net.InetSocketAddress;
-
-/**
- * Represents broker information of a given topic.
- */
-final class TopicBroker {
-
-  private final String topic;
-  private final InetSocketAddress address;
-  private final int partitionSize;
-
-  TopicBroker(String topic, InetSocketAddress address, int partitionSize) {
-    this.topic = topic;
-    this.address = address;
-    this.partitionSize = partitionSize;
-  }
-
-  String getTopic() {
-    return topic;
-  }
-
-  InetSocketAddress getAddress() {
-    return address;
-  }
-
-  int getPartitionSize() {
-    return partitionSize;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/core/src/main/java/org/apache/twill/internal/kafka/client/package-info.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/twill/internal/kafka/client/package-info.java b/core/src/main/java/org/apache/twill/internal/kafka/client/package-info.java
deleted file mode 100644
index f3f615c..0000000
--- a/core/src/main/java/org/apache/twill/internal/kafka/client/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * This package provides pure java kafka client implementation.
- */
-package org.apache.twill.internal.kafka.client;