You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/12/13 23:27:33 UTC

[17/28] [TWILL-14] Bootstrapping for the site generation. Reorganization of the source tree happens:

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/kafka/client/AbstractMessageSetEncoder.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/AbstractMessageSetEncoder.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/AbstractMessageSetEncoder.java
new file mode 100644
index 0000000..9955d6a
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/AbstractMessageSetEncoder.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import java.util.zip.CRC32;
+
+/**
+ * A base implementation of {@link MessageSetEncoder}.
+ */
+abstract class AbstractMessageSetEncoder implements MessageSetEncoder {
+
+  private static final ThreadLocal<CRC32> CRC32_LOCAL = new ThreadLocal<CRC32>() {
+    @Override
+    protected CRC32 initialValue() {
+      return new CRC32();
+    }
+  };
+
+  protected final int computeCRC32(ChannelBuffer buffer) {
+    CRC32 crc32 = CRC32_LOCAL.get();
+    crc32.reset();
+
+    if (buffer.hasArray()) {
+      crc32.update(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), buffer.readableBytes());
+    } else {
+      byte[] bytes = new byte[buffer.readableBytes()];
+      buffer.getBytes(buffer.readerIndex(), bytes);
+      crc32.update(bytes);
+    }
+    return (int) crc32.getValue();
+  }
+
+  protected final ChannelBuffer encodePayload(ChannelBuffer payload) {
+    return encodePayload(payload, Compression.NONE);
+  }
+
+  protected final ChannelBuffer encodePayload(ChannelBuffer payload, Compression compression) {
+    ChannelBuffer header = ChannelBuffers.buffer(10);
+
+    int crc = computeCRC32(payload);
+
+    int magic = ((compression == Compression.NONE) ? 0 : 1);
+
+    // Message length = 1 byte magic + (optional 1 compression byte) + 4 bytes crc + payload length
+    header.writeInt(5 + magic + payload.readableBytes());
+    // Magic number = 0 for non-compressed data
+    header.writeByte(magic);
+    if (magic > 0) {
+      header.writeByte(compression.getCode());
+    }
+    header.writeInt(crc);
+
+    return ChannelBuffers.wrappedBuffer(header, payload);
+  }
+
+  protected final ChannelBuffer prefixLength(ChannelBuffer buffer) {
+    ChannelBuffer sizeBuf = ChannelBuffers.buffer(4);
+    sizeBuf.writeInt(buffer.readableBytes());
+    return ChannelBuffers.wrappedBuffer(sizeBuf, buffer);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/kafka/client/BasicFetchedMessage.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/BasicFetchedMessage.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/BasicFetchedMessage.java
new file mode 100644
index 0000000..286bf82
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/BasicFetchedMessage.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.apache.twill.kafka.client.FetchedMessage;
+
+import java.nio.ByteBuffer;
+
+/**
+ *
+ */
+final class BasicFetchedMessage implements FetchedMessage {
+
+  private final long offset;
+  private final ByteBuffer buffer;
+
+  BasicFetchedMessage(long offset, ByteBuffer buffer) {
+    this.offset = offset;
+    this.buffer = buffer;
+  }
+
+  @Override
+  public long getOffset() {
+    return offset;
+  }
+
+  @Override
+  public ByteBuffer getBuffer() {
+    return buffer;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/kafka/client/Bufferer.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/Bufferer.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/Bufferer.java
new file mode 100644
index 0000000..c1fb4f2
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/Bufferer.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+/**
+ * A class to help buffering data of format [len][payload-of-len].
+ */
+final class Bufferer {
+
+  private ChannelBuffer currentBuffer = null;
+  private int currentSize = -1;
+
+  void apply(ChannelBuffer buffer) {
+    currentBuffer = concatBuffer(currentBuffer, buffer);
+  }
+
+  /**
+   * Returns the buffer if the buffer data is ready to be consumed,
+   * otherwise return {@link ChannelBuffers#EMPTY_BUFFER}.
+   */
+  ChannelBuffer getNext() {
+    if (currentSize < 0) {
+      if (currentBuffer.readableBytes() < 4) {
+        return ChannelBuffers.EMPTY_BUFFER;
+      }
+      currentSize = currentBuffer.readInt();
+    }
+
+    // Keep buffering if less then required number of bytes
+    if (currentBuffer.readableBytes() < currentSize) {
+      return ChannelBuffers.EMPTY_BUFFER;
+    }
+
+    ChannelBuffer result = currentBuffer.readSlice(currentSize);
+    currentSize = -1;
+
+    return result;
+  }
+
+  private ChannelBuffer concatBuffer(ChannelBuffer current, ChannelBuffer buffer) {
+    return current == null ? buffer : ChannelBuffers.wrappedBuffer(current, buffer);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/kafka/client/Compression.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/Compression.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/Compression.java
new file mode 100644
index 0000000..3355b9f
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/Compression.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+/**
+ * Enum for indicating compression method.
+ */
+public enum Compression {
+  NONE(0),
+  GZIP(1),
+  SNAPPY(2);
+
+  private final int code;
+
+  Compression(int code) {
+    this.code = code;
+  }
+
+  public int getCode() {
+    return code;
+  }
+
+  public static Compression fromCode(int code) {
+    switch (code) {
+      case 0:
+        return NONE;
+      case 1:
+        return GZIP;
+      case 2:
+        return SNAPPY;
+    }
+    throw new IllegalArgumentException("Unknown compression code.");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ConnectionPool.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ConnectionPool.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ConnectionPool.java
new file mode 100644
index 0000000..c2865ba
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ConnectionPool.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import com.google.common.collect.Maps;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.ChannelGroupFuture;
+import org.jboss.netty.channel.group.ChannelGroupFutureListener;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+
+import java.net.InetSocketAddress;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Provides netty socket connection reuse.
+ */
+final class ConnectionPool {
+
+  private final ClientBootstrap bootstrap;
+  private final ChannelGroup channelGroup;
+  private final ConcurrentMap<InetSocketAddress, Queue<ChannelFuture>> connections;
+
+  /**
+   * For releasing a connection back to the pool.
+   */
+  interface ConnectionReleaser {
+    void release();
+  }
+
+  /**
+   * Result of a connect request.
+   */
+  interface ConnectResult extends ConnectionReleaser {
+    ChannelFuture getChannelFuture();
+  }
+
+  ConnectionPool(ClientBootstrap bootstrap) {
+    this.bootstrap = bootstrap;
+    this.channelGroup = new DefaultChannelGroup();
+    this.connections = Maps.newConcurrentMap();
+  }
+
+  ConnectResult connect(InetSocketAddress address) {
+    Queue<ChannelFuture> channelFutures = connections.get(address);
+    if (channelFutures == null) {
+      channelFutures = new ConcurrentLinkedQueue<ChannelFuture>();
+      Queue<ChannelFuture> result = connections.putIfAbsent(address, channelFutures);
+      channelFutures = result == null ? channelFutures : result;
+    }
+
+    ChannelFuture channelFuture = channelFutures.poll();
+    while (channelFuture != null) {
+      if (channelFuture.isSuccess() && channelFuture.getChannel().isConnected()) {
+        return new SimpleConnectResult(address, channelFuture);
+      }
+      channelFuture = channelFutures.poll();
+    }
+
+    channelFuture = bootstrap.connect(address);
+    channelFuture.addListener(new ChannelFutureListener() {
+      @Override
+      public void operationComplete(ChannelFuture future) throws Exception {
+        if (future.isSuccess()) {
+          channelGroup.add(future.getChannel());
+        }
+      }
+    });
+    return new SimpleConnectResult(address, channelFuture);
+  }
+
+  ChannelGroupFuture close() {
+    ChannelGroupFuture result = channelGroup.close();
+    result.addListener(new ChannelGroupFutureListener() {
+      @Override
+      public void operationComplete(ChannelGroupFuture future) throws Exception {
+        bootstrap.releaseExternalResources();
+      }
+    });
+    return result;
+  }
+
+  private final class SimpleConnectResult implements ConnectResult {
+
+    private final InetSocketAddress address;
+    private final ChannelFuture future;
+
+
+    private SimpleConnectResult(InetSocketAddress address, ChannelFuture future) {
+      this.address = address;
+      this.future = future;
+    }
+
+    @Override
+    public ChannelFuture getChannelFuture() {
+      return future;
+    }
+
+    @Override
+    public void release() {
+      if (future.isSuccess()) {
+        connections.get(address).offer(future);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/kafka/client/GZipMessageSetEncoder.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/GZipMessageSetEncoder.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/GZipMessageSetEncoder.java
new file mode 100644
index 0000000..daa0c2c
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/GZipMessageSetEncoder.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.zip.GZIPOutputStream;
+
+/**
+ * A {@link MessageSetEncoder} that compress message set using GZIP.
+ */
+final class GZipMessageSetEncoder extends AbstractCompressedMessageSetEncoder {
+
+  GZipMessageSetEncoder() {
+    super(Compression.GZIP);
+  }
+
+  @Override
+  protected OutputStream createCompressedStream(OutputStream os) throws IOException {
+    return new GZIPOutputStream(os);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IdentityMessageSetEncoder.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IdentityMessageSetEncoder.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IdentityMessageSetEncoder.java
new file mode 100644
index 0000000..51dc746
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IdentityMessageSetEncoder.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+/**
+ * A pass-through {@link MessageSetEncoder}.
+ */
+final class IdentityMessageSetEncoder extends AbstractMessageSetEncoder {
+
+  private ChannelBuffer messageSets = ChannelBuffers.EMPTY_BUFFER;
+
+  @Override
+  public MessageSetEncoder add(ChannelBuffer payload) {
+    messageSets = ChannelBuffers.wrappedBuffer(messageSets, encodePayload(payload));
+    return this;
+  }
+
+  @Override
+  public ChannelBuffer finish() {
+    ChannelBuffer buf = prefixLength(messageSets);
+    messageSets = ChannelBuffers.EMPTY_BUFFER;
+    return buf;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaBrokerCache.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaBrokerCache.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaBrokerCache.java
new file mode 100644
index 0000000..f2bb815
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaBrokerCache.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.apache.twill.common.Threads;
+import org.apache.twill.zookeeper.NodeChildren;
+import org.apache.twill.zookeeper.NodeData;
+import org.apache.twill.zookeeper.ZKClient;
+import com.google.common.base.Charsets;
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.SortedMap;
+
+/**
+ * A Service to cache kafka broker information by subscribing to ZooKeeper.
+ */
+final class KafkaBrokerCache extends AbstractIdleService {
+
+  private static final Logger LOG = LoggerFactory.getLogger(KafkaBrokerCache.class);
+
+  private static final String BROKERS_PATH = "/brokers";
+
+  private final ZKClient zkClient;
+  private final Map<String, InetSocketAddress> brokers;
+  // topicBrokers is from topic->partition size->brokerId
+  private final Map<String, SortedMap<Integer, Set<String>>> topicBrokers;
+  private final Runnable invokeGetBrokers = new Runnable() {
+    @Override
+    public void run() {
+      getBrokers();
+    }
+  };
+  private final Runnable invokeGetTopics = new Runnable() {
+    @Override
+    public void run() {
+      getTopics();
+    }
+  };
+
+  KafkaBrokerCache(ZKClient zkClient) {
+    this.zkClient = zkClient;
+    this.brokers = Maps.newConcurrentMap();
+    this.topicBrokers = Maps.newConcurrentMap();
+  }
+
+  @Override
+  protected void startUp() throws Exception {
+    getBrokers();
+    getTopics();
+  }
+
+  @Override
+  protected void shutDown() throws Exception {
+    // No-op
+  }
+
+  public int getPartitionSize(String topic) {
+    SortedMap<Integer, Set<String>> partitionBrokers = topicBrokers.get(topic);
+    if (partitionBrokers == null || partitionBrokers.isEmpty()) {
+      return 1;
+    }
+    return partitionBrokers.lastKey();
+  }
+
+  public TopicBroker getBrokerAddress(String topic, int partition) {
+    SortedMap<Integer, Set<String>> partitionBrokers = topicBrokers.get(topic);
+    if (partitionBrokers == null || partitionBrokers.isEmpty()) {
+      return pickRandomBroker(topic);
+    }
+
+    // If the requested partition is greater than supported partition size, randomly pick one
+    if (partition >= partitionBrokers.lastKey()) {
+      return pickRandomBroker(topic);
+    }
+
+    // Randomly pick a partition size and randomly pick a broker from it
+    Random random = new Random();
+    partitionBrokers = partitionBrokers.tailMap(partition + 1);
+    List<Integer> sizes = Lists.newArrayList(partitionBrokers.keySet());
+    Integer partitionSize = pickRandomItem(sizes, random);
+    List<String> ids = Lists.newArrayList(partitionBrokers.get(partitionSize));
+    InetSocketAddress address = brokers.get(ids.get(new Random().nextInt(ids.size())));
+    return address == null ? pickRandomBroker(topic) : new TopicBroker(topic, address, partitionSize);
+  }
+
+  private TopicBroker pickRandomBroker(String topic) {
+    Map.Entry<String, InetSocketAddress> entry = Iterables.getFirst(brokers.entrySet(), null);
+    if (entry == null) {
+      return null;
+    }
+    InetSocketAddress address = entry.getValue();
+    return new TopicBroker(topic, address, 0);
+  }
+
+  private <T> T pickRandomItem(List<T> list, Random random) {
+    return list.get(random.nextInt(list.size()));
+  }
+
+  private void getBrokers() {
+    final String idsPath = BROKERS_PATH + "/ids";
+
+    Futures.addCallback(zkClient.getChildren(idsPath, new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+        getBrokers();
+      }
+    }), new ExistsOnFailureFutureCallback<NodeChildren>(idsPath, invokeGetBrokers) {
+      @Override
+      public void onSuccess(NodeChildren result) {
+        Set<String> children = ImmutableSet.copyOf(result.getChildren());
+        for (String child : children) {
+          getBrokenData(idsPath + "/" + child, child);
+        }
+        // Remove all removed brokers
+        removeDiff(children, brokers);
+      }
+    });
+  }
+
+  private void getTopics() {
+    final String topicsPath = BROKERS_PATH + "/topics";
+    Futures.addCallback(zkClient.getChildren(topicsPath, new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+        getTopics();
+      }
+    }), new ExistsOnFailureFutureCallback<NodeChildren>(topicsPath, invokeGetTopics) {
+      @Override
+      public void onSuccess(NodeChildren result) {
+        Set<String> children = ImmutableSet.copyOf(result.getChildren());
+
+        // Process new children
+        for (String topic : ImmutableSet.copyOf(Sets.difference(children, topicBrokers.keySet()))) {
+          getTopic(topicsPath + "/" + topic, topic);
+        }
+
+        // Remove old children
+        removeDiff(children, topicBrokers);
+      }
+    });
+  }
+
+  private void getBrokenData(String path, final String brokerId) {
+    Futures.addCallback(zkClient.getData(path), new FutureCallback<NodeData>() {
+      @Override
+      public void onSuccess(NodeData result) {
+        String data = new String(result.getData(), Charsets.UTF_8);
+        String hostPort = data.substring(data.indexOf(':') + 1);
+        int idx = hostPort.indexOf(':');
+        brokers.put(brokerId, new InetSocketAddress(hostPort.substring(0, idx),
+                                                    Integer.parseInt(hostPort.substring(idx + 1))));
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        // No-op, the watch on the parent node will handle it.
+      }
+    });
+  }
+
+  private void getTopic(final String path, final String topic) {
+    Futures.addCallback(zkClient.getChildren(path, new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+        // Other event type changes are either could be ignored or handled by parent watcher
+        if (event.getType() == Event.EventType.NodeChildrenChanged) {
+          getTopic(path, topic);
+        }
+      }
+    }), new FutureCallback<NodeChildren>() {
+      @Override
+      public void onSuccess(NodeChildren result) {
+        List<String> children = result.getChildren();
+        final List<ListenableFuture<BrokerPartition>> futures = Lists.newArrayListWithCapacity(children.size());
+
+        // Fetch data from each broken node
+        for (final String brokerId : children) {
+          Futures.transform(zkClient.getData(path + "/" + brokerId), new Function<NodeData, BrokerPartition>() {
+            @Override
+            public BrokerPartition apply(NodeData input) {
+              return new BrokerPartition(brokerId, Integer.parseInt(new String(input.getData(), Charsets.UTF_8)));
+            }
+          });
+        }
+
+        // When all fetching is done, build the partition size->broker map for this topic
+        Futures.successfulAsList(futures).addListener(new Runnable() {
+          @Override
+          public void run() {
+            Map<Integer, Set<String>> partitionBrokers = Maps.newHashMap();
+            for (ListenableFuture<BrokerPartition> future : futures) {
+              try {
+                BrokerPartition info = future.get();
+                Set<String> brokerSet = partitionBrokers.get(info.getPartitionSize());
+                if (brokerSet == null) {
+                  brokerSet = Sets.newHashSet();
+                  partitionBrokers.put(info.getPartitionSize(), brokerSet);
+                }
+                brokerSet.add(info.getBrokerId());
+              } catch (Exception e) {
+                // Exception is ignored, as it will be handled by parent watcher
+              }
+            }
+            topicBrokers.put(topic, ImmutableSortedMap.copyOf(partitionBrokers));
+          }
+        }, Threads.SAME_THREAD_EXECUTOR);
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        // No-op. Failure would be handled by parent watcher already (e.g. node not exists -> children change in parent)
+      }
+    });
+  }
+
+  private <K, V> void removeDiff(Set<K> keys, Map<K, V> map) {
+    for (K key : ImmutableSet.copyOf(Sets.difference(map.keySet(), keys))) {
+      map.remove(key);
+    }
+  }
+
+  private abstract class ExistsOnFailureFutureCallback<V> implements FutureCallback<V> {
+
+    private final String path;
+    private final Runnable action;
+
+    protected ExistsOnFailureFutureCallback(String path, Runnable action) {
+      this.path = path;
+      this.action = action;
+    }
+
+    @Override
+    public final void onFailure(Throwable t) {
+      if (!isNotExists(t)) {
+        LOG.error("Fail to watch for kafka brokers: " + path, t);
+        return;
+      }
+
+      waitExists(path);
+    }
+
+    private boolean isNotExists(Throwable t) {
+      return ((t instanceof KeeperException) && ((KeeperException) t).code() == KeeperException.Code.NONODE);
+    }
+
+    private void waitExists(String path) {
+      LOG.info("Path " + path + " not exists. Watch for creation.");
+
+      // If the node doesn't exists, use the "exists" call to watch for node creation.
+      Futures.addCallback(zkClient.exists(path, new Watcher() {
+        @Override
+        public void process(WatchedEvent event) {
+          if (event.getType() == Event.EventType.NodeCreated || event.getType() == Event.EventType.NodeDeleted) {
+            action.run();
+          }
+        }
+      }), new FutureCallback<Stat>() {
+        @Override
+        public void onSuccess(Stat result) {
+          // If path exists, get children again, otherwise wait for watch to get triggered
+          if (result != null) {
+            action.run();
+          }
+        }
+        @Override
+        public void onFailure(Throwable t) {
+          action.run();
+        }
+      });
+    }
+  }
+
+  private static final class BrokerPartition {
+    private final String brokerId;
+    private final int partitionSize;
+
+    private BrokerPartition(String brokerId, int partitionSize) {
+      this.brokerId = brokerId;
+      this.partitionSize = partitionSize;
+    }
+
+    public String getBrokerId() {
+      return brokerId;
+    }
+
+    public int getPartitionSize() {
+      return partitionSize;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequest.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequest.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequest.java
new file mode 100644
index 0000000..7b43f8a
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+
+/**
+ *
+ */
+final class KafkaRequest {
+
+  public enum Type {
+    PRODUCE(0),
+    FETCH(1),
+    MULTI_FETCH(2),
+    MULTI_PRODUCE(3),
+    OFFSETS(4);
+
+    private final short id;
+
+    private Type(int id) {
+      this.id = (short) id;
+    }
+
+    public short getId() {
+      return id;
+    }
+  }
+
+  private final Type type;
+  private final String topic;
+  private final int partition;
+  private final ChannelBuffer body;
+  private final ResponseHandler responseHandler;
+
+
+  public static KafkaRequest createProduce(String topic, int partition, ChannelBuffer body) {
+    return new KafkaRequest(Type.PRODUCE, topic, partition, body, ResponseHandler.NO_OP);
+  }
+
+  public static KafkaRequest createFetch(String topic, int partition, ChannelBuffer body, ResponseHandler handler) {
+    return new KafkaRequest(Type.FETCH, topic, partition, body, handler);
+  }
+
+  public static KafkaRequest createOffsets(String topic, int partition, ChannelBuffer body, ResponseHandler handler) {
+    return new KafkaRequest(Type.OFFSETS, topic, partition, body, handler);
+  }
+
+  private KafkaRequest(Type type, String topic, int partition, ChannelBuffer body, ResponseHandler responseHandler) {
+    this.type = type;
+    this.topic = topic;
+    this.partition = partition;
+    this.body = body;
+    this.responseHandler = responseHandler;
+  }
+
+  Type getType() {
+    return type;
+  }
+
+  String getTopic() {
+    return topic;
+  }
+
+  int getPartition() {
+    return partition;
+  }
+
+  ChannelBuffer getBody() {
+    return body;
+  }
+
+  ResponseHandler getResponseHandler() {
+    return responseHandler;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestEncoder.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestEncoder.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestEncoder.java
new file mode 100644
index 0000000..ef78c76
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestEncoder.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import com.google.common.base.Charsets;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
+
+import java.nio.ByteBuffer;
+
+/**
+ *
+ */
+final class KafkaRequestEncoder extends OneToOneEncoder {
+
+  @Override
+  protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
+    if (!(msg instanceof KafkaRequest)) {
+      return msg;
+    }
+    KafkaRequest req = (KafkaRequest) msg;
+    ByteBuffer topic = Charsets.UTF_8.encode(req.getTopic());
+
+    ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(16 + topic.remaining() + req.getBody().readableBytes());
+    int writerIdx = buffer.writerIndex();
+    buffer.writerIndex(writerIdx + 4);    // Reserves 4 bytes for message length
+
+    // Write out <REQUEST_TYPE>, <TOPIC_LENGTH>, <TOPIC>, <PARTITION>
+    buffer.writeShort(req.getType().getId());
+    buffer.writeShort(topic.remaining());
+    buffer.writeBytes(topic);
+    buffer.writeInt(req.getPartition());
+
+    // Write out the size of the whole buffer (excluding the size field) at the beginning
+    buffer.setInt(writerIdx, buffer.readableBytes() - 4 + req.getBody().readableBytes());
+
+    ChannelBuffer buf = ChannelBuffers.wrappedBuffer(buffer, req.getBody());
+    buf = buf.readBytes(buf.readableBytes());
+
+    return buf;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestSender.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestSender.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestSender.java
new file mode 100644
index 0000000..fbc552c
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestSender.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+/**
+ *
+ */
+interface KafkaRequestSender {
+
+  void send(KafkaRequest request);
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponse.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponse.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponse.java
new file mode 100644
index 0000000..68c1bd8
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponse.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.apache.twill.kafka.client.FetchException;
+import org.jboss.netty.buffer.ChannelBuffer;
+
+/**
+ *
+ */
+final class KafkaResponse {
+
+  private final FetchException.ErrorCode errorCode;
+  private final ChannelBuffer body;
+  private final int size;
+
+  KafkaResponse(FetchException.ErrorCode errorCode, ChannelBuffer body, int size) {
+    this.errorCode = errorCode;
+    this.body = body;
+    this.size = size;
+  }
+
+  public int getSize() {
+    return size;
+  }
+
+  public FetchException.ErrorCode getErrorCode() {
+    return errorCode;
+  }
+
+  public ChannelBuffer getBody() {
+    return body;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseDispatcher.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseDispatcher.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseDispatcher.java
new file mode 100644
index 0000000..47f70ce
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseDispatcher.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.SocketException;
+import java.nio.channels.ClosedChannelException;
+
+/**
+ *
+ */
+final class KafkaResponseDispatcher extends SimpleChannelHandler {
+
+  private static final Logger LOG = LoggerFactory.getLogger(KafkaResponseDispatcher.class);
+
+  @Override
+  public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+    Object attachment = ctx.getAttachment();
+    if (e.getMessage() instanceof KafkaResponse && attachment instanceof ResponseHandler) {
+      ((ResponseHandler) attachment).received((KafkaResponse) e.getMessage());
+    } else {
+      super.messageReceived(ctx, e);
+    }
+  }
+
+  @Override
+  public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+    if (e.getMessage() instanceof KafkaRequest) {
+      ctx.setAttachment(((KafkaRequest) e.getMessage()).getResponseHandler());
+    }
+    super.writeRequested(ctx, e);
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
+    if (e.getCause() instanceof ClosedChannelException || e.getCause() instanceof SocketException) {
+      // No need to log for socket exception as the client has logic to retry.
+      return;
+    }
+    LOG.warn("Exception caught in kafka client connection.", e.getCause());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseHandler.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseHandler.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseHandler.java
new file mode 100644
index 0000000..5251e65
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseHandler.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.apache.twill.kafka.client.FetchException;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+
+/**
+ *
+ */
+final class KafkaResponseHandler extends SimpleChannelHandler {
+
+  private final Bufferer bufferer = new Bufferer();
+
+  @Override
+  public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+    Object msg = e.getMessage();
+    if (!(msg instanceof ChannelBuffer)) {
+      super.messageReceived(ctx, e);
+      return;
+    }
+
+    bufferer.apply((ChannelBuffer) msg);
+    ChannelBuffer buffer = bufferer.getNext();
+    while (buffer.readable()) {
+      // Send the response object upstream
+      Channels.fireMessageReceived(ctx, new KafkaResponse(FetchException.ErrorCode.fromCode(buffer.readShort()),
+                                                          buffer, buffer.readableBytes() + 6));
+      buffer = bufferer.getNext();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/kafka/client/MessageFetcher.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/MessageFetcher.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/MessageFetcher.java
new file mode 100644
index 0000000..0814917
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/MessageFetcher.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.apache.twill.common.Threads;
+import org.apache.twill.kafka.client.FetchException;
+import org.apache.twill.kafka.client.FetchedMessage;
+import com.google.common.base.Throwables;
+import com.google.common.collect.AbstractIterator;
+import com.google.common.io.ByteStreams;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBufferInputStream;
+import org.jboss.netty.buffer.ChannelBufferOutputStream;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.xerial.snappy.SnappyInputStream;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.zip.GZIPInputStream;
+
+/**
+ * This class is for consuming messages from a kafka topic.
+ */
+final class MessageFetcher extends AbstractIterator<FetchedMessage> implements ResponseHandler {
+
+  private static final long BACKOFF_INTERVAL_MS = 100;
+
+  private final KafkaRequestSender sender;
+  private final String topic;
+  private final int partition;
+  private final int maxSize;
+  private final AtomicLong offset;
+  private final BlockingQueue<FetchResult> messages;
+  private final ScheduledExecutorService scheduler;
+  private volatile long backoffMillis;
+  private final Runnable sendFetchRequest = new Runnable() {
+    @Override
+    public void run() {
+      sendFetchRequest();
+    }
+  };
+
+  MessageFetcher(String topic, int partition, long offset, int maxSize, KafkaRequestSender sender) {
+    this.topic = topic;
+    this.partition = partition;
+    this.sender = sender;
+    this.offset = new AtomicLong(offset);
+    this.maxSize = maxSize;
+    this.messages = new LinkedBlockingQueue<FetchResult>();
+    this.scheduler = Executors.newSingleThreadScheduledExecutor(
+                        Threads.createDaemonThreadFactory("kafka-" + topic + "-consumer"));
+  }
+
+  @Override
+  public void received(KafkaResponse response) {
+    if (response.getErrorCode() != FetchException.ErrorCode.OK) {
+      messages.add(FetchResult.failure(new FetchException("Error in fetching: " + response.getErrorCode(),
+                                                          response.getErrorCode())));
+      return;
+    }
+
+    try {
+      if (decodeResponse(response.getBody(), -1)) {
+        backoffMillis = 0;
+      } else {
+        backoffMillis = Math.max(backoffMillis + BACKOFF_INTERVAL_MS, 1000);
+        scheduler.schedule(sendFetchRequest, backoffMillis, TimeUnit.MILLISECONDS);
+      }
+    } catch (Throwable t) {
+      messages.add(FetchResult.failure(t));
+    }
+  }
+
+  private boolean decodeResponse(ChannelBuffer buffer, long nextOffset) {
+    boolean hasMessage = false;
+    boolean computeOffset = nextOffset < 0;
+    while (buffer.readableBytes() >= 4) {
+      int size = buffer.readInt();
+      if (buffer.readableBytes() < size) {
+        if (!hasMessage) {
+          throw new IllegalStateException("Size too small");
+        }
+        break;
+      }
+      nextOffset = computeOffset ? offset.addAndGet(size + 4) : nextOffset;
+      decodeMessage(size, buffer, nextOffset);
+      hasMessage = true;
+    }
+    return hasMessage;
+
+  }
+
+  private void decodeMessage(int size, ChannelBuffer buffer, long nextOffset) {
+    int readerIdx = buffer.readerIndex();
+    int magic = buffer.readByte();
+    Compression compression = magic == 0 ? Compression.NONE : Compression.fromCode(buffer.readByte());
+    int crc = buffer.readInt();
+
+    ChannelBuffer payload = buffer.readSlice(size - (buffer.readerIndex() - readerIdx));
+
+    // Verify CRC?
+    enqueueMessage(compression, payload, nextOffset);
+  }
+
+  private void enqueueMessage(Compression compression, ChannelBuffer payload, long nextOffset) {
+    switch (compression) {
+      case NONE:
+        messages.add(FetchResult.success(new BasicFetchedMessage(nextOffset, payload.toByteBuffer())));
+        break;
+      case GZIP:
+        decodeResponse(gunzip(payload), nextOffset);
+        break;
+      case SNAPPY:
+        decodeResponse(unsnappy(payload), nextOffset);
+        break;
+    }
+  }
+
+  private ChannelBuffer gunzip(ChannelBuffer source) {
+    ChannelBufferOutputStream output = new ChannelBufferOutputStream(
+                                              ChannelBuffers.dynamicBuffer(source.readableBytes() * 2));
+    try {
+      try {
+        GZIPInputStream gzipInput = new GZIPInputStream(new ChannelBufferInputStream(source));
+        try {
+          ByteStreams.copy(gzipInput, output);
+          return output.buffer();
+        } finally {
+          gzipInput.close();
+        }
+      } finally {
+        output.close();
+      }
+    } catch (IOException e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  private ChannelBuffer unsnappy(ChannelBuffer source) {
+    ChannelBufferOutputStream output = new ChannelBufferOutputStream(
+                                              ChannelBuffers.dynamicBuffer(source.readableBytes() * 2));
+    try {
+      try {
+        SnappyInputStream snappyInput = new SnappyInputStream(new ChannelBufferInputStream(source));
+        try {
+          ByteStreams.copy(snappyInput, output);
+          return output.buffer();
+        } finally {
+          snappyInput.close();
+        }
+      } finally {
+        output.close();
+      }
+    } catch (IOException e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  private void sendFetchRequest() {
+    ChannelBuffer fetchBody = ChannelBuffers.buffer(12);
+    fetchBody.writeLong(offset.get());
+    fetchBody.writeInt(maxSize);
+    sender.send(KafkaRequest.createFetch(topic, partition, fetchBody, MessageFetcher.this));
+  }
+
+  @Override
+  protected FetchedMessage computeNext() {
+    FetchResult result = messages.poll();
+    if (result != null) {
+      return getMessage(result);
+    }
+
+    try {
+      sendFetchRequest();
+      return getMessage(messages.take());
+    } catch (InterruptedException e) {
+      scheduler.shutdownNow();
+      return endOfData();
+    }
+  }
+
+  private FetchedMessage getMessage(FetchResult result) {
+    try {
+      if (result.isSuccess()) {
+        return result.getMessage();
+      } else {
+        throw result.getErrorCause();
+      }
+    } catch (Throwable t) {
+      throw Throwables.propagate(t);
+    }
+  }
+
+  private static final class FetchResult {
+    private final FetchedMessage message;
+    private final Throwable errorCause;
+
+    static FetchResult success(FetchedMessage message) {
+      return new FetchResult(message, null);
+    }
+
+    static FetchResult failure(Throwable cause) {
+      return new FetchResult(null, cause);
+    }
+
+    private FetchResult(FetchedMessage message, Throwable errorCause) {
+      this.message = message;
+      this.errorCause = errorCause;
+    }
+
+    public FetchedMessage getMessage() {
+      return message;
+    }
+
+    public Throwable getErrorCause() {
+      return errorCause;
+    }
+
+    public boolean isSuccess() {
+      return message != null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/kafka/client/MessageSetEncoder.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/MessageSetEncoder.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/MessageSetEncoder.java
new file mode 100644
index 0000000..49008cc
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/MessageSetEncoder.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+
+/**
+ * This represents a set of messages that goes into the same message set and get encoded as
+ * single kafka message set.
+ */
+interface MessageSetEncoder {
+
+  MessageSetEncoder add(ChannelBuffer payload);
+
+  ChannelBuffer finish();
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ResponseHandler.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ResponseHandler.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ResponseHandler.java
new file mode 100644
index 0000000..f681b85
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ResponseHandler.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+/**
+ * Represents handler for kafka response.
+ */
+interface ResponseHandler {
+
+  ResponseHandler NO_OP = new ResponseHandler() {
+    @Override
+    public void received(KafkaResponse response) {
+      // No-op
+    }
+  };
+
+  void received(KafkaResponse response);
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaClient.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaClient.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaClient.java
new file mode 100644
index 0000000..8ff4856
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaClient.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.apache.twill.common.Threads;
+import org.apache.twill.kafka.client.FetchException;
+import org.apache.twill.kafka.client.FetchedMessage;
+import org.apache.twill.kafka.client.KafkaClient;
+import org.apache.twill.kafka.client.PreparePublish;
+import org.apache.twill.zookeeper.ZKClient;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.socket.nio.NioClientBossPool;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioWorkerPool;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Basic implementation of {@link KafkaClient}.
+ */
+public final class SimpleKafkaClient extends AbstractIdleService implements KafkaClient {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SimpleKafkaClient.class);
+  private static final int BROKER_POLL_INTERVAL = 100;
+
+  private final KafkaBrokerCache brokerCache;
+  private ClientBootstrap bootstrap;
+  private ConnectionPool connectionPool;
+
+  public SimpleKafkaClient(ZKClient zkClient) {
+    this.brokerCache = new KafkaBrokerCache(zkClient);
+  }
+
+  @Override
+  protected void startUp() throws Exception {
+    brokerCache.startAndWait();
+    ThreadFactory threadFactory = Threads.createDaemonThreadFactory("kafka-client-netty-%d");
+    NioClientBossPool bossPool = new NioClientBossPool(Executors.newSingleThreadExecutor(threadFactory), 1,
+                                                       new HashedWheelTimer(threadFactory), null);
+    NioWorkerPool workerPool = new NioWorkerPool(Executors.newFixedThreadPool(4, threadFactory), 4);
+
+    bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(bossPool, workerPool));
+    bootstrap.setPipelineFactory(new KafkaChannelPipelineFactory());
+    connectionPool = new ConnectionPool(bootstrap);
+  }
+
+  @Override
+  protected void shutDown() throws Exception {
+    connectionPool.close();
+    bootstrap.releaseExternalResources();
+    brokerCache.stopAndWait();
+  }
+
+  @Override
+  public PreparePublish preparePublish(final String topic, final Compression compression) {
+    final Map<Integer, MessageSetEncoder> encoders = Maps.newHashMap();
+
+    return new PreparePublish() {
+      @Override
+      public PreparePublish add(byte[] payload, Object partitionKey) {
+        return add(ByteBuffer.wrap(payload), partitionKey);
+      }
+
+      @Override
+      public PreparePublish add(ByteBuffer payload, Object partitionKey) {
+        // TODO: Partition
+        int partition = 0;
+
+        MessageSetEncoder encoder = encoders.get(partition);
+        if (encoder == null) {
+          encoder = getEncoder(compression);
+          encoders.put(partition, encoder);
+        }
+        encoder.add(ChannelBuffers.wrappedBuffer(payload));
+
+        return this;
+      }
+
+      @Override
+      public ListenableFuture<?> publish() {
+        List<ListenableFuture<?>> futures = Lists.newArrayListWithCapacity(encoders.size());
+        for (Map.Entry<Integer, MessageSetEncoder> entry : encoders.entrySet()) {
+          futures.add(doPublish(topic, entry.getKey(), entry.getValue().finish()));
+        }
+        encoders.clear();
+        return Futures.allAsList(futures);
+      }
+
+      private ListenableFuture<?> doPublish(String topic, int partition, ChannelBuffer messageSet) {
+        final KafkaRequest request = KafkaRequest.createProduce(topic, partition, messageSet);
+        final SettableFuture<?> result = SettableFuture.create();
+        final ConnectionPool.ConnectResult connection =
+              connectionPool.connect(getTopicBroker(topic, partition).getAddress());
+
+        connection.getChannelFuture().addListener(new ChannelFutureListener() {
+          @Override
+          public void operationComplete(ChannelFuture future) throws Exception {
+            try {
+              future.getChannel().write(request).addListener(getPublishChannelFutureListener(result, null, connection));
+            } catch (Exception e) {
+              result.setException(e);
+            }
+          }
+        });
+
+        return result;
+      }
+    };
+  }
+
+  @Override
+  public Iterator<FetchedMessage> consume(final String topic, final int partition, long offset, int maxSize) {
+    Preconditions.checkArgument(maxSize >= 10, "Message size cannot be smaller than 10.");
+
+    // Connect to broker. Consumer connection are long connection. No need to worry about reuse.
+    final AtomicReference<ChannelFuture> channelFutureRef = new AtomicReference<ChannelFuture>(
+          connectionPool.connect(getTopicBroker(topic, partition).getAddress()).getChannelFuture());
+
+    return new MessageFetcher(topic, partition, offset, maxSize, new KafkaRequestSender() {
+
+      @Override
+      public void send(final KafkaRequest request) {
+        if (!isRunning()) {
+          return;
+        }
+        try {
+          // Try to send the request
+          Channel channel = channelFutureRef.get().getChannel();
+          if (!channel.write(request).await().isSuccess()) {
+            // If failed, retry
+            channel.close();
+            ChannelFuture channelFuture = connectionPool.connect(
+                                              getTopicBroker(topic, partition).getAddress()).getChannelFuture();
+            channelFutureRef.set(channelFuture);
+            channelFuture.addListener(new ChannelFutureListener() {
+              @Override
+              public void operationComplete(ChannelFuture channelFuture) throws Exception {
+                send(request);
+              }
+            });
+          }
+        } catch (InterruptedException e) {
+          // Ignore it
+          LOG.info("Interrupted when sending consume request", e);
+        }
+      }
+    });
+  }
+
+  @Override
+  public ListenableFuture<long[]> getOffset(final String topic, final int partition, long time, int maxOffsets) {
+    final SettableFuture<long[]> resultFuture = SettableFuture.create();
+    final ChannelBuffer body = ChannelBuffers.buffer(Longs.BYTES + Ints.BYTES);
+    body.writeLong(time);
+    body.writeInt(maxOffsets);
+
+    connectionPool.connect(getTopicBroker(topic, partition).getAddress())
+                  .getChannelFuture().addListener(new ChannelFutureListener() {
+      @Override
+      public void operationComplete(ChannelFuture future) throws Exception {
+        if (checkFailure(future)) {
+          return;
+        }
+
+        future.getChannel().write(KafkaRequest.createOffsets(topic, partition, body, new ResponseHandler() {
+          @Override
+          public void received(KafkaResponse response) {
+            if (response.getErrorCode() != FetchException.ErrorCode.OK) {
+              resultFuture.setException(new FetchException("Failed to fetch offset.", response.getErrorCode()));
+            } else {
+              // Decode the offset response, which contains 4 bytes number of offsets, followed by number of offsets,
+              // each 8 bytes in size.
+              ChannelBuffer resultBuffer = response.getBody();
+              int size = resultBuffer.readInt();
+              long[] result = new long[size];
+              for (int i = 0; i < size; i++) {
+                result[i] = resultBuffer.readLong();
+              }
+              resultFuture.set(result);
+            }
+          }
+        })).addListener(new ChannelFutureListener() {
+          @Override
+          public void operationComplete(ChannelFuture future) throws Exception {
+            checkFailure(future);
+          }
+        });
+      }
+
+      private boolean checkFailure(ChannelFuture future) {
+        if (!future.isSuccess()) {
+          if (future.isCancelled()) {
+            resultFuture.cancel(true);
+          } else {
+            resultFuture.setException(future.getCause());
+          }
+          return true;
+        }
+        return false;
+      }
+    });
+
+    return resultFuture;
+  }
+
+  private TopicBroker getTopicBroker(String topic, int partition) {
+    TopicBroker topicBroker = brokerCache.getBrokerAddress(topic, partition);
+    while (topicBroker == null) {
+      try {
+        TimeUnit.MILLISECONDS.sleep(BROKER_POLL_INTERVAL);
+      } catch (InterruptedException e) {
+        return null;
+      }
+      topicBroker = brokerCache.getBrokerAddress(topic, partition);
+    }
+    return topicBroker;
+  }
+
+  private MessageSetEncoder getEncoder(Compression compression) {
+    switch (compression) {
+      case GZIP:
+        return new GZipMessageSetEncoder();
+      case SNAPPY:
+        return new SnappyMessageSetEncoder();
+      default:
+        return new IdentityMessageSetEncoder();
+    }
+  }
+
+  private <V> ChannelFutureListener getPublishChannelFutureListener(final SettableFuture<V> result, final V resultObj,
+                                                                    final ConnectionPool.ConnectionReleaser releaser) {
+    return new ChannelFutureListener() {
+      @Override
+      public void operationComplete(ChannelFuture future) throws Exception {
+        try {
+          if (future.isSuccess()) {
+            result.set(resultObj);
+          } else if (future.isCancelled()) {
+            result.cancel(true);
+          } else {
+            result.setException(future.getCause());
+          }
+        } finally {
+          releaser.release();
+        }
+      }
+    };
+  }
+
+  private static final class KafkaChannelPipelineFactory implements ChannelPipelineFactory {
+
+    @Override
+    public ChannelPipeline getPipeline() throws Exception {
+      ChannelPipeline pipeline = Channels.pipeline();
+
+      pipeline.addLast("encoder", new KafkaRequestEncoder());
+      pipeline.addLast("decoder", new KafkaResponseHandler());
+      pipeline.addLast("dispatcher", new KafkaResponseDispatcher());
+      return pipeline;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SnappyMessageSetEncoder.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SnappyMessageSetEncoder.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SnappyMessageSetEncoder.java
new file mode 100644
index 0000000..bf18c08
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SnappyMessageSetEncoder.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.xerial.snappy.SnappyOutputStream;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A {@link MessageSetEncoder} that compress messages using snappy.
+ */
+final class SnappyMessageSetEncoder extends AbstractCompressedMessageSetEncoder {
+
+  SnappyMessageSetEncoder() {
+    super(Compression.SNAPPY);
+  }
+
+  @Override
+  protected OutputStream createCompressedStream(OutputStream os) throws IOException {
+    return new SnappyOutputStream(os);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/kafka/client/TopicBroker.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/TopicBroker.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/TopicBroker.java
new file mode 100644
index 0000000..fd4bf03
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/TopicBroker.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Represents broker information of a given topic.
+ */
+final class TopicBroker {
+
+  private final String topic;
+  private final InetSocketAddress address;
+  private final int partitionSize;
+
+  TopicBroker(String topic, InetSocketAddress address, int partitionSize) {
+    this.topic = topic;
+    this.address = address;
+    this.partitionSize = partitionSize;
+  }
+
+  String getTopic() {
+    return topic;
+  }
+
+  InetSocketAddress getAddress() {
+    return address;
+  }
+
+  int getPartitionSize() {
+    return partitionSize;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/kafka/client/package-info.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/package-info.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/package-info.java
new file mode 100644
index 0000000..f3f615c
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package provides pure java kafka client implementation.
+ */
+package org.apache.twill.internal.kafka.client;

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java b/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
new file mode 100644
index 0000000..12818ef
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.logging;
+
+import org.apache.twill.common.Services;
+import org.apache.twill.common.Threads;
+import org.apache.twill.internal.kafka.client.Compression;
+import org.apache.twill.internal.kafka.client.SimpleKafkaClient;
+import org.apache.twill.kafka.client.KafkaClient;
+import org.apache.twill.kafka.client.PreparePublish;
+import org.apache.twill.zookeeper.RetryStrategies;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.apache.twill.zookeeper.ZKClientServices;
+import org.apache.twill.zookeeper.ZKClients;
+import ch.qos.logback.classic.pattern.ClassOfCallerConverter;
+import ch.qos.logback.classic.pattern.FileOfCallerConverter;
+import ch.qos.logback.classic.pattern.LineOfCallerConverter;
+import ch.qos.logback.classic.pattern.MethodOfCallerConverter;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.classic.spi.IThrowableProxy;
+import ch.qos.logback.classic.spi.StackTraceElementProxy;
+import ch.qos.logback.core.AppenderBase;
+import com.google.common.base.Charsets;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.gson.stream.JsonWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ *
+ */
+public final class KafkaAppender extends AppenderBase<ILoggingEvent> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(KafkaAppender.class);
+
+  private final LogEventConverter eventConverter;
+  private final AtomicReference<PreparePublish> publisher;
+  private final Runnable flushTask;
+  /**
+   * Rough count of how many entries are being buffered. It's just approximate, not exact.
+   */
+  private final AtomicInteger bufferedSize;
+
+  private ZKClientService zkClientService;
+  private KafkaClient kafkaClient;
+  private String zkConnectStr;
+  private String hostname;
+  private String topic;
+  private Queue<String> buffer;
+  private int flushLimit = 20;
+  private int flushPeriod = 100;
+  private ScheduledExecutorService scheduler;
+
+  public KafkaAppender() {
+    eventConverter = new LogEventConverter();
+    publisher = new AtomicReference<PreparePublish>();
+    flushTask = createFlushTask();
+    bufferedSize = new AtomicInteger();
+    buffer = new ConcurrentLinkedQueue<String>();
+  }
+
+  /**
+   * Sets the zookeeper connection string. Called by slf4j.
+   */
+  @SuppressWarnings("unused")
+  public void setZookeeper(String zkConnectStr) {
+    this.zkConnectStr = zkConnectStr;
+  }
+
+  /**
+   * Sets the hostname. Called by slf4j.
+   */
+  @SuppressWarnings("unused")
+  public void setHostname(String hostname) {
+    this.hostname = hostname;
+  }
+
+  /**
+   * Sets the topic name for publishing logs. Called by slf4j.
+   */
+  @SuppressWarnings("unused")
+  public void setTopic(String topic) {
+    this.topic = topic;
+  }
+
+  /**
+   * Sets the maximum number of cached log entries before performing an force flush. Called by slf4j.
+   */
+  @SuppressWarnings("unused")
+  public void setFlushLimit(int flushLimit) {
+    this.flushLimit = flushLimit;
+  }
+
+  /**
+   * Sets the periodic flush time in milliseconds. Called by slf4j.
+   */
+  @SuppressWarnings("unused")
+  public void setFlushPeriod(int flushPeriod) {
+    this.flushPeriod = flushPeriod;
+  }
+
+  @Override
+  public void start() {
+    Preconditions.checkNotNull(zkConnectStr);
+
+    scheduler = Executors.newSingleThreadScheduledExecutor(Threads.createDaemonThreadFactory("kafka-logger"));
+
+    zkClientService = ZKClientServices.delegate(
+      ZKClients.reWatchOnExpire(
+        ZKClients.retryOnFailure(ZKClientService.Builder.of(zkConnectStr).build(),
+                                 RetryStrategies.fixDelay(1, TimeUnit.SECONDS))));
+
+    kafkaClient = new SimpleKafkaClient(zkClientService);
+    Futures.addCallback(Services.chainStart(zkClientService, kafkaClient), new FutureCallback<Object>() {
+      @Override
+      public void onSuccess(Object result) {
+        LOG.info("Kafka client started: " + zkConnectStr);
+        publisher.set(kafkaClient.preparePublish(topic, Compression.SNAPPY));
+        scheduler.scheduleWithFixedDelay(flushTask, 0, flushPeriod, TimeUnit.MILLISECONDS);
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        // Fail to talk to kafka. Other than logging, what can be done?
+        LOG.error("Failed to start kafka client.", t);
+      }
+    });
+
+    super.start();
+  }
+
+  @Override
+  public void stop() {
+    super.stop();
+    scheduler.shutdownNow();
+    Futures.getUnchecked(Services.chainStop(kafkaClient, zkClientService));
+  }
+
+  public void forceFlush() {
+    try {
+      publishLogs().get(2, TimeUnit.SECONDS);
+    } catch (Exception e) {
+      LOG.error("Failed to publish last batch of log.", e);
+    }
+  }
+
+  @Override
+  protected void append(ILoggingEvent eventObject) {
+    buffer.offer(eventConverter.convert(eventObject));
+    if (bufferedSize.incrementAndGet() >= flushLimit && publisher.get() != null) {
+      // Try to do a extra flush
+      scheduler.submit(flushTask);
+    }
+  }
+
+  private ListenableFuture<Integer> publishLogs() {
+    // If the publisher is not available, simply returns a completed future.
+    PreparePublish publisher = KafkaAppender.this.publisher.get();
+    if (publisher == null) {
+      return Futures.immediateFuture(0);
+    }
+
+    int count = 0;
+    for (String json : Iterables.consumingIterable(buffer)) {
+      publisher.add(Charsets.UTF_8.encode(json), 0);
+      count++;
+    }
+    // Nothing to publish, simply returns a completed future.
+    if (count == 0) {
+      return Futures.immediateFuture(0);
+    }
+
+    bufferedSize.set(0);
+    final int finalCount = count;
+    return Futures.transform(publisher.publish(), new Function<Object, Integer>() {
+      @Override
+      public Integer apply(Object input) {
+        return finalCount;
+      }
+    });
+  }
+
+  /**
+   * Creates a {@link Runnable} that writes all logs in the buffer into kafka.
+   * @return The Runnable task
+   */
+  private Runnable createFlushTask() {
+    return new Runnable() {
+      @Override
+      public void run() {
+        Futures.addCallback(publishLogs(), new FutureCallback<Integer>() {
+          @Override
+          public void onSuccess(Integer result) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Log entries published, size=" + result);
+            }
+          }
+
+          @Override
+          public void onFailure(Throwable t) {
+            LOG.error("Failed to push logs to kafka. Log entries dropped.", t);
+          }
+        });
+      }
+    };
+  }
+
+  /**
+   * Helper class to convert {@link ILoggingEvent} into json string.
+   */
+  private final class LogEventConverter {
+
+    private final ClassOfCallerConverter classNameConverter = new ClassOfCallerConverter();
+    private final MethodOfCallerConverter methodConverter = new MethodOfCallerConverter();
+    private final FileOfCallerConverter fileConverter = new FileOfCallerConverter();
+    private final LineOfCallerConverter lineConverter = new LineOfCallerConverter();
+
+    private String convert(ILoggingEvent event) {
+      StringWriter result = new StringWriter();
+      JsonWriter writer = new JsonWriter(result);
+
+      try {
+        try {
+          writer.beginObject();
+          writer.name("name").value(event.getLoggerName());
+          writer.name("host").value(hostname);
+          writer.name("timestamp").value(Long.toString(event.getTimeStamp()));
+          writer.name("level").value(event.getLevel().toString());
+          writer.name("className").value(classNameConverter.convert(event));
+          writer.name("method").value(methodConverter.convert(event));
+          writer.name("file").value(fileConverter.convert(event));
+          writer.name("line").value(lineConverter.convert(event));
+          writer.name("thread").value(event.getThreadName());
+          writer.name("message").value(event.getFormattedMessage());
+          writer.name("stackTraces");
+          encodeStackTraces(event.getThrowableProxy(), writer);
+
+          writer.endObject();
+        } finally {
+          writer.close();
+        }
+      } catch (IOException e) {
+        throw Throwables.propagate(e);
+      }
+
+      return result.toString();
+    }
+
+    private void encodeStackTraces(IThrowableProxy throwable, JsonWriter writer) throws IOException {
+      writer.beginArray();
+      try {
+        if (throwable == null) {
+          return;
+        }
+
+        for (StackTraceElementProxy stackTrace : throwable.getStackTraceElementProxyArray()) {
+          writer.beginObject();
+
+          StackTraceElement element = stackTrace.getStackTraceElement();
+          writer.name("className").value(element.getClassName());
+          writer.name("method").value(element.getMethodName());
+          writer.name("file").value(element.getFileName());
+          writer.name("line").value(element.getLineNumber());
+
+          writer.endObject();
+        }
+      } finally {
+        writer.endArray();
+      }
+    }
+  }
+}