You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/12/12 22:59:59 UTC

[17/28] Making maven site works.

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/kafka/client/GZipMessageSetEncoder.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/GZipMessageSetEncoder.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/GZipMessageSetEncoder.java
new file mode 100644
index 0000000..daa0c2c
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/GZipMessageSetEncoder.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.zip.GZIPOutputStream;
+
+/**
+ * A {@link MessageSetEncoder} that compress message set using GZIP.
+ */
+final class GZipMessageSetEncoder extends AbstractCompressedMessageSetEncoder {
+
+  GZipMessageSetEncoder() {
+    super(Compression.GZIP);
+  }
+
+  @Override
+  protected OutputStream createCompressedStream(OutputStream os) throws IOException {
+    return new GZIPOutputStream(os);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IdentityMessageSetEncoder.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IdentityMessageSetEncoder.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IdentityMessageSetEncoder.java
new file mode 100644
index 0000000..51dc746
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IdentityMessageSetEncoder.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+/**
+ * A pass-through {@link MessageSetEncoder}.
+ */
+final class IdentityMessageSetEncoder extends AbstractMessageSetEncoder {
+
+  private ChannelBuffer messageSets = ChannelBuffers.EMPTY_BUFFER;
+
+  @Override
+  public MessageSetEncoder add(ChannelBuffer payload) {
+    messageSets = ChannelBuffers.wrappedBuffer(messageSets, encodePayload(payload));
+    return this;
+  }
+
+  @Override
+  public ChannelBuffer finish() {
+    ChannelBuffer buf = prefixLength(messageSets);
+    messageSets = ChannelBuffers.EMPTY_BUFFER;
+    return buf;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaBrokerCache.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaBrokerCache.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaBrokerCache.java
new file mode 100644
index 0000000..f2bb815
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaBrokerCache.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.apache.twill.common.Threads;
+import org.apache.twill.zookeeper.NodeChildren;
+import org.apache.twill.zookeeper.NodeData;
+import org.apache.twill.zookeeper.ZKClient;
+import com.google.common.base.Charsets;
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.SortedMap;
+
+/**
+ * A Service to cache kafka broker information by subscribing to ZooKeeper.
+ */
+final class KafkaBrokerCache extends AbstractIdleService {
+
+  private static final Logger LOG = LoggerFactory.getLogger(KafkaBrokerCache.class);
+
+  private static final String BROKERS_PATH = "/brokers";
+
+  private final ZKClient zkClient;
+  private final Map<String, InetSocketAddress> brokers;
+  // topicBrokers is from topic->partition size->brokerId
+  private final Map<String, SortedMap<Integer, Set<String>>> topicBrokers;
+  private final Runnable invokeGetBrokers = new Runnable() {
+    @Override
+    public void run() {
+      getBrokers();
+    }
+  };
+  private final Runnable invokeGetTopics = new Runnable() {
+    @Override
+    public void run() {
+      getTopics();
+    }
+  };
+
+  KafkaBrokerCache(ZKClient zkClient) {
+    this.zkClient = zkClient;
+    this.brokers = Maps.newConcurrentMap();
+    this.topicBrokers = Maps.newConcurrentMap();
+  }
+
+  @Override
+  protected void startUp() throws Exception {
+    getBrokers();
+    getTopics();
+  }
+
+  @Override
+  protected void shutDown() throws Exception {
+    // No-op
+  }
+
+  public int getPartitionSize(String topic) {
+    SortedMap<Integer, Set<String>> partitionBrokers = topicBrokers.get(topic);
+    if (partitionBrokers == null || partitionBrokers.isEmpty()) {
+      return 1;
+    }
+    return partitionBrokers.lastKey();
+  }
+
+  public TopicBroker getBrokerAddress(String topic, int partition) {
+    SortedMap<Integer, Set<String>> partitionBrokers = topicBrokers.get(topic);
+    if (partitionBrokers == null || partitionBrokers.isEmpty()) {
+      return pickRandomBroker(topic);
+    }
+
+    // If the requested partition is greater than supported partition size, randomly pick one
+    if (partition >= partitionBrokers.lastKey()) {
+      return pickRandomBroker(topic);
+    }
+
+    // Randomly pick a partition size and randomly pick a broker from it
+    Random random = new Random();
+    partitionBrokers = partitionBrokers.tailMap(partition + 1);
+    List<Integer> sizes = Lists.newArrayList(partitionBrokers.keySet());
+    Integer partitionSize = pickRandomItem(sizes, random);
+    List<String> ids = Lists.newArrayList(partitionBrokers.get(partitionSize));
+    InetSocketAddress address = brokers.get(ids.get(new Random().nextInt(ids.size())));
+    return address == null ? pickRandomBroker(topic) : new TopicBroker(topic, address, partitionSize);
+  }
+
+  private TopicBroker pickRandomBroker(String topic) {
+    Map.Entry<String, InetSocketAddress> entry = Iterables.getFirst(brokers.entrySet(), null);
+    if (entry == null) {
+      return null;
+    }
+    InetSocketAddress address = entry.getValue();
+    return new TopicBroker(topic, address, 0);
+  }
+
+  private <T> T pickRandomItem(List<T> list, Random random) {
+    return list.get(random.nextInt(list.size()));
+  }
+
+  private void getBrokers() {
+    final String idsPath = BROKERS_PATH + "/ids";
+
+    Futures.addCallback(zkClient.getChildren(idsPath, new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+        getBrokers();
+      }
+    }), new ExistsOnFailureFutureCallback<NodeChildren>(idsPath, invokeGetBrokers) {
+      @Override
+      public void onSuccess(NodeChildren result) {
+        Set<String> children = ImmutableSet.copyOf(result.getChildren());
+        for (String child : children) {
+          getBrokenData(idsPath + "/" + child, child);
+        }
+        // Remove all removed brokers
+        removeDiff(children, brokers);
+      }
+    });
+  }
+
+  private void getTopics() {
+    final String topicsPath = BROKERS_PATH + "/topics";
+    Futures.addCallback(zkClient.getChildren(topicsPath, new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+        getTopics();
+      }
+    }), new ExistsOnFailureFutureCallback<NodeChildren>(topicsPath, invokeGetTopics) {
+      @Override
+      public void onSuccess(NodeChildren result) {
+        Set<String> children = ImmutableSet.copyOf(result.getChildren());
+
+        // Process new children
+        for (String topic : ImmutableSet.copyOf(Sets.difference(children, topicBrokers.keySet()))) {
+          getTopic(topicsPath + "/" + topic, topic);
+        }
+
+        // Remove old children
+        removeDiff(children, topicBrokers);
+      }
+    });
+  }
+
+  private void getBrokenData(String path, final String brokerId) {
+    Futures.addCallback(zkClient.getData(path), new FutureCallback<NodeData>() {
+      @Override
+      public void onSuccess(NodeData result) {
+        String data = new String(result.getData(), Charsets.UTF_8);
+        String hostPort = data.substring(data.indexOf(':') + 1);
+        int idx = hostPort.indexOf(':');
+        brokers.put(brokerId, new InetSocketAddress(hostPort.substring(0, idx),
+                                                    Integer.parseInt(hostPort.substring(idx + 1))));
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        // No-op, the watch on the parent node will handle it.
+      }
+    });
+  }
+
+  private void getTopic(final String path, final String topic) {
+    Futures.addCallback(zkClient.getChildren(path, new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+        // Other event type changes are either could be ignored or handled by parent watcher
+        if (event.getType() == Event.EventType.NodeChildrenChanged) {
+          getTopic(path, topic);
+        }
+      }
+    }), new FutureCallback<NodeChildren>() {
+      @Override
+      public void onSuccess(NodeChildren result) {
+        List<String> children = result.getChildren();
+        final List<ListenableFuture<BrokerPartition>> futures = Lists.newArrayListWithCapacity(children.size());
+
+        // Fetch data from each broken node
+        for (final String brokerId : children) {
+          Futures.transform(zkClient.getData(path + "/" + brokerId), new Function<NodeData, BrokerPartition>() {
+            @Override
+            public BrokerPartition apply(NodeData input) {
+              return new BrokerPartition(brokerId, Integer.parseInt(new String(input.getData(), Charsets.UTF_8)));
+            }
+          });
+        }
+
+        // When all fetching is done, build the partition size->broker map for this topic
+        Futures.successfulAsList(futures).addListener(new Runnable() {
+          @Override
+          public void run() {
+            Map<Integer, Set<String>> partitionBrokers = Maps.newHashMap();
+            for (ListenableFuture<BrokerPartition> future : futures) {
+              try {
+                BrokerPartition info = future.get();
+                Set<String> brokerSet = partitionBrokers.get(info.getPartitionSize());
+                if (brokerSet == null) {
+                  brokerSet = Sets.newHashSet();
+                  partitionBrokers.put(info.getPartitionSize(), brokerSet);
+                }
+                brokerSet.add(info.getBrokerId());
+              } catch (Exception e) {
+                // Exception is ignored, as it will be handled by parent watcher
+              }
+            }
+            topicBrokers.put(topic, ImmutableSortedMap.copyOf(partitionBrokers));
+          }
+        }, Threads.SAME_THREAD_EXECUTOR);
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        // No-op. Failure would be handled by parent watcher already (e.g. node not exists -> children change in parent)
+      }
+    });
+  }
+
+  private <K, V> void removeDiff(Set<K> keys, Map<K, V> map) {
+    for (K key : ImmutableSet.copyOf(Sets.difference(map.keySet(), keys))) {
+      map.remove(key);
+    }
+  }
+
+  private abstract class ExistsOnFailureFutureCallback<V> implements FutureCallback<V> {
+
+    private final String path;
+    private final Runnable action;
+
+    protected ExistsOnFailureFutureCallback(String path, Runnable action) {
+      this.path = path;
+      this.action = action;
+    }
+
+    @Override
+    public final void onFailure(Throwable t) {
+      if (!isNotExists(t)) {
+        LOG.error("Fail to watch for kafka brokers: " + path, t);
+        return;
+      }
+
+      waitExists(path);
+    }
+
+    private boolean isNotExists(Throwable t) {
+      return ((t instanceof KeeperException) && ((KeeperException) t).code() == KeeperException.Code.NONODE);
+    }
+
+    private void waitExists(String path) {
+      LOG.info("Path " + path + " not exists. Watch for creation.");
+
+      // If the node doesn't exists, use the "exists" call to watch for node creation.
+      Futures.addCallback(zkClient.exists(path, new Watcher() {
+        @Override
+        public void process(WatchedEvent event) {
+          if (event.getType() == Event.EventType.NodeCreated || event.getType() == Event.EventType.NodeDeleted) {
+            action.run();
+          }
+        }
+      }), new FutureCallback<Stat>() {
+        @Override
+        public void onSuccess(Stat result) {
+          // If path exists, get children again, otherwise wait for watch to get triggered
+          if (result != null) {
+            action.run();
+          }
+        }
+        @Override
+        public void onFailure(Throwable t) {
+          action.run();
+        }
+      });
+    }
+  }
+
+  private static final class BrokerPartition {
+    private final String brokerId;
+    private final int partitionSize;
+
+    private BrokerPartition(String brokerId, int partitionSize) {
+      this.brokerId = brokerId;
+      this.partitionSize = partitionSize;
+    }
+
+    public String getBrokerId() {
+      return brokerId;
+    }
+
+    public int getPartitionSize() {
+      return partitionSize;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequest.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequest.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequest.java
new file mode 100644
index 0000000..7b43f8a
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+
+/**
+ *
+ */
+final class KafkaRequest {
+
+  public enum Type {
+    PRODUCE(0),
+    FETCH(1),
+    MULTI_FETCH(2),
+    MULTI_PRODUCE(3),
+    OFFSETS(4);
+
+    private final short id;
+
+    private Type(int id) {
+      this.id = (short) id;
+    }
+
+    public short getId() {
+      return id;
+    }
+  }
+
+  private final Type type;
+  private final String topic;
+  private final int partition;
+  private final ChannelBuffer body;
+  private final ResponseHandler responseHandler;
+
+
+  public static KafkaRequest createProduce(String topic, int partition, ChannelBuffer body) {
+    return new KafkaRequest(Type.PRODUCE, topic, partition, body, ResponseHandler.NO_OP);
+  }
+
+  public static KafkaRequest createFetch(String topic, int partition, ChannelBuffer body, ResponseHandler handler) {
+    return new KafkaRequest(Type.FETCH, topic, partition, body, handler);
+  }
+
+  public static KafkaRequest createOffsets(String topic, int partition, ChannelBuffer body, ResponseHandler handler) {
+    return new KafkaRequest(Type.OFFSETS, topic, partition, body, handler);
+  }
+
+  private KafkaRequest(Type type, String topic, int partition, ChannelBuffer body, ResponseHandler responseHandler) {
+    this.type = type;
+    this.topic = topic;
+    this.partition = partition;
+    this.body = body;
+    this.responseHandler = responseHandler;
+  }
+
+  Type getType() {
+    return type;
+  }
+
+  String getTopic() {
+    return topic;
+  }
+
+  int getPartition() {
+    return partition;
+  }
+
+  ChannelBuffer getBody() {
+    return body;
+  }
+
+  ResponseHandler getResponseHandler() {
+    return responseHandler;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestEncoder.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestEncoder.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestEncoder.java
new file mode 100644
index 0000000..ef78c76
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestEncoder.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import com.google.common.base.Charsets;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
+
+import java.nio.ByteBuffer;
+
+/**
+ *
+ */
+final class KafkaRequestEncoder extends OneToOneEncoder {
+
+  @Override
+  protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
+    if (!(msg instanceof KafkaRequest)) {
+      return msg;
+    }
+    KafkaRequest req = (KafkaRequest) msg;
+    ByteBuffer topic = Charsets.UTF_8.encode(req.getTopic());
+
+    ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(16 + topic.remaining() + req.getBody().readableBytes());
+    int writerIdx = buffer.writerIndex();
+    buffer.writerIndex(writerIdx + 4);    // Reserves 4 bytes for message length
+
+    // Write out <REQUEST_TYPE>, <TOPIC_LENGTH>, <TOPIC>, <PARTITION>
+    buffer.writeShort(req.getType().getId());
+    buffer.writeShort(topic.remaining());
+    buffer.writeBytes(topic);
+    buffer.writeInt(req.getPartition());
+
+    // Write out the size of the whole buffer (excluding the size field) at the beginning
+    buffer.setInt(writerIdx, buffer.readableBytes() - 4 + req.getBody().readableBytes());
+
+    ChannelBuffer buf = ChannelBuffers.wrappedBuffer(buffer, req.getBody());
+    buf = buf.readBytes(buf.readableBytes());
+
+    return buf;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestSender.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestSender.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestSender.java
new file mode 100644
index 0000000..fbc552c
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestSender.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+/**
+ *
+ */
+interface KafkaRequestSender {
+
+  void send(KafkaRequest request);
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponse.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponse.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponse.java
new file mode 100644
index 0000000..68c1bd8
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponse.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.apache.twill.kafka.client.FetchException;
+import org.jboss.netty.buffer.ChannelBuffer;
+
+/**
+ *
+ */
+final class KafkaResponse {
+
+  private final FetchException.ErrorCode errorCode;
+  private final ChannelBuffer body;
+  private final int size;
+
+  KafkaResponse(FetchException.ErrorCode errorCode, ChannelBuffer body, int size) {
+    this.errorCode = errorCode;
+    this.body = body;
+    this.size = size;
+  }
+
+  public int getSize() {
+    return size;
+  }
+
+  public FetchException.ErrorCode getErrorCode() {
+    return errorCode;
+  }
+
+  public ChannelBuffer getBody() {
+    return body;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseDispatcher.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseDispatcher.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseDispatcher.java
new file mode 100644
index 0000000..47f70ce
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseDispatcher.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.SocketException;
+import java.nio.channels.ClosedChannelException;
+
+/**
+ *
+ */
+final class KafkaResponseDispatcher extends SimpleChannelHandler {
+
+  private static final Logger LOG = LoggerFactory.getLogger(KafkaResponseDispatcher.class);
+
+  @Override
+  public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+    Object attachment = ctx.getAttachment();
+    if (e.getMessage() instanceof KafkaResponse && attachment instanceof ResponseHandler) {
+      ((ResponseHandler) attachment).received((KafkaResponse) e.getMessage());
+    } else {
+      super.messageReceived(ctx, e);
+    }
+  }
+
+  @Override
+  public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+    if (e.getMessage() instanceof KafkaRequest) {
+      ctx.setAttachment(((KafkaRequest) e.getMessage()).getResponseHandler());
+    }
+    super.writeRequested(ctx, e);
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
+    if (e.getCause() instanceof ClosedChannelException || e.getCause() instanceof SocketException) {
+      // No need to log for socket exception as the client has logic to retry.
+      return;
+    }
+    LOG.warn("Exception caught in kafka client connection.", e.getCause());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseHandler.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseHandler.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseHandler.java
new file mode 100644
index 0000000..5251e65
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseHandler.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.apache.twill.kafka.client.FetchException;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+
+/**
+ *
+ */
+final class KafkaResponseHandler extends SimpleChannelHandler {
+
+  private final Bufferer bufferer = new Bufferer();
+
+  @Override
+  public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+    Object msg = e.getMessage();
+    if (!(msg instanceof ChannelBuffer)) {
+      super.messageReceived(ctx, e);
+      return;
+    }
+
+    bufferer.apply((ChannelBuffer) msg);
+    ChannelBuffer buffer = bufferer.getNext();
+    while (buffer.readable()) {
+      // Send the response object upstream
+      Channels.fireMessageReceived(ctx, new KafkaResponse(FetchException.ErrorCode.fromCode(buffer.readShort()),
+                                                          buffer, buffer.readableBytes() + 6));
+      buffer = bufferer.getNext();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/kafka/client/MessageFetcher.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/MessageFetcher.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/MessageFetcher.java
new file mode 100644
index 0000000..0814917
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/MessageFetcher.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.apache.twill.common.Threads;
+import org.apache.twill.kafka.client.FetchException;
+import org.apache.twill.kafka.client.FetchedMessage;
+import com.google.common.base.Throwables;
+import com.google.common.collect.AbstractIterator;
+import com.google.common.io.ByteStreams;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBufferInputStream;
+import org.jboss.netty.buffer.ChannelBufferOutputStream;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.xerial.snappy.SnappyInputStream;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.zip.GZIPInputStream;
+
+/**
+ * This class is for consuming messages from a kafka topic.
+ */
+final class MessageFetcher extends AbstractIterator<FetchedMessage> implements ResponseHandler {
+
+  private static final long BACKOFF_INTERVAL_MS = 100;
+
+  private final KafkaRequestSender sender;
+  private final String topic;
+  private final int partition;
+  private final int maxSize;
+  private final AtomicLong offset;
+  private final BlockingQueue<FetchResult> messages;
+  private final ScheduledExecutorService scheduler;
+  private volatile long backoffMillis;
+  private final Runnable sendFetchRequest = new Runnable() {
+    @Override
+    public void run() {
+      sendFetchRequest();
+    }
+  };
+
+  MessageFetcher(String topic, int partition, long offset, int maxSize, KafkaRequestSender sender) {
+    this.topic = topic;
+    this.partition = partition;
+    this.sender = sender;
+    this.offset = new AtomicLong(offset);
+    this.maxSize = maxSize;
+    this.messages = new LinkedBlockingQueue<FetchResult>();
+    this.scheduler = Executors.newSingleThreadScheduledExecutor(
+                        Threads.createDaemonThreadFactory("kafka-" + topic + "-consumer"));
+  }
+
+  @Override
+  public void received(KafkaResponse response) {
+    if (response.getErrorCode() != FetchException.ErrorCode.OK) {
+      messages.add(FetchResult.failure(new FetchException("Error in fetching: " + response.getErrorCode(),
+                                                          response.getErrorCode())));
+      return;
+    }
+
+    try {
+      if (decodeResponse(response.getBody(), -1)) {
+        backoffMillis = 0;
+      } else {
+        backoffMillis = Math.max(backoffMillis + BACKOFF_INTERVAL_MS, 1000);
+        scheduler.schedule(sendFetchRequest, backoffMillis, TimeUnit.MILLISECONDS);
+      }
+    } catch (Throwable t) {
+      messages.add(FetchResult.failure(t));
+    }
+  }
+
+  private boolean decodeResponse(ChannelBuffer buffer, long nextOffset) {
+    boolean hasMessage = false;
+    boolean computeOffset = nextOffset < 0;
+    while (buffer.readableBytes() >= 4) {
+      int size = buffer.readInt();
+      if (buffer.readableBytes() < size) {
+        if (!hasMessage) {
+          throw new IllegalStateException("Size too small");
+        }
+        break;
+      }
+      nextOffset = computeOffset ? offset.addAndGet(size + 4) : nextOffset;
+      decodeMessage(size, buffer, nextOffset);
+      hasMessage = true;
+    }
+    return hasMessage;
+
+  }
+
+  private void decodeMessage(int size, ChannelBuffer buffer, long nextOffset) {
+    int readerIdx = buffer.readerIndex();
+    int magic = buffer.readByte();
+    Compression compression = magic == 0 ? Compression.NONE : Compression.fromCode(buffer.readByte());
+    int crc = buffer.readInt();
+
+    ChannelBuffer payload = buffer.readSlice(size - (buffer.readerIndex() - readerIdx));
+
+    // Verify CRC?
+    enqueueMessage(compression, payload, nextOffset);
+  }
+
+  private void enqueueMessage(Compression compression, ChannelBuffer payload, long nextOffset) {
+    switch (compression) {
+      case NONE:
+        messages.add(FetchResult.success(new BasicFetchedMessage(nextOffset, payload.toByteBuffer())));
+        break;
+      case GZIP:
+        decodeResponse(gunzip(payload), nextOffset);
+        break;
+      case SNAPPY:
+        decodeResponse(unsnappy(payload), nextOffset);
+        break;
+    }
+  }
+
+  private ChannelBuffer gunzip(ChannelBuffer source) {
+    ChannelBufferOutputStream output = new ChannelBufferOutputStream(
+                                              ChannelBuffers.dynamicBuffer(source.readableBytes() * 2));
+    try {
+      try {
+        GZIPInputStream gzipInput = new GZIPInputStream(new ChannelBufferInputStream(source));
+        try {
+          ByteStreams.copy(gzipInput, output);
+          return output.buffer();
+        } finally {
+          gzipInput.close();
+        }
+      } finally {
+        output.close();
+      }
+    } catch (IOException e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  private ChannelBuffer unsnappy(ChannelBuffer source) {
+    ChannelBufferOutputStream output = new ChannelBufferOutputStream(
+                                              ChannelBuffers.dynamicBuffer(source.readableBytes() * 2));
+    try {
+      try {
+        SnappyInputStream snappyInput = new SnappyInputStream(new ChannelBufferInputStream(source));
+        try {
+          ByteStreams.copy(snappyInput, output);
+          return output.buffer();
+        } finally {
+          snappyInput.close();
+        }
+      } finally {
+        output.close();
+      }
+    } catch (IOException e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  private void sendFetchRequest() {
+    ChannelBuffer fetchBody = ChannelBuffers.buffer(12);
+    fetchBody.writeLong(offset.get());
+    fetchBody.writeInt(maxSize);
+    sender.send(KafkaRequest.createFetch(topic, partition, fetchBody, MessageFetcher.this));
+  }
+
+  @Override
+  protected FetchedMessage computeNext() {
+    FetchResult result = messages.poll();
+    if (result != null) {
+      return getMessage(result);
+    }
+
+    try {
+      sendFetchRequest();
+      return getMessage(messages.take());
+    } catch (InterruptedException e) {
+      scheduler.shutdownNow();
+      return endOfData();
+    }
+  }
+
+  private FetchedMessage getMessage(FetchResult result) {
+    try {
+      if (result.isSuccess()) {
+        return result.getMessage();
+      } else {
+        throw result.getErrorCause();
+      }
+    } catch (Throwable t) {
+      throw Throwables.propagate(t);
+    }
+  }
+
+  private static final class FetchResult {
+    private final FetchedMessage message;
+    private final Throwable errorCause;
+
+    static FetchResult success(FetchedMessage message) {
+      return new FetchResult(message, null);
+    }
+
+    static FetchResult failure(Throwable cause) {
+      return new FetchResult(null, cause);
+    }
+
+    private FetchResult(FetchedMessage message, Throwable errorCause) {
+      this.message = message;
+      this.errorCause = errorCause;
+    }
+
+    public FetchedMessage getMessage() {
+      return message;
+    }
+
+    public Throwable getErrorCause() {
+      return errorCause;
+    }
+
+    public boolean isSuccess() {
+      return message != null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/kafka/client/MessageSetEncoder.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/MessageSetEncoder.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/MessageSetEncoder.java
new file mode 100644
index 0000000..49008cc
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/MessageSetEncoder.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+
+/**
+ * This represents a set of messages that goes into the same message set and get encoded as
+ * single kafka message set.
+ */
+interface MessageSetEncoder {
+
+  MessageSetEncoder add(ChannelBuffer payload);
+
+  ChannelBuffer finish();
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ResponseHandler.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ResponseHandler.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ResponseHandler.java
new file mode 100644
index 0000000..f681b85
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ResponseHandler.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+/**
+ * Represents handler for kafka response.
+ */
+interface ResponseHandler {
+
+  ResponseHandler NO_OP = new ResponseHandler() {
+    @Override
+    public void received(KafkaResponse response) {
+      // No-op
+    }
+  };
+
+  void received(KafkaResponse response);
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaClient.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaClient.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaClient.java
new file mode 100644
index 0000000..8ff4856
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaClient.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.apache.twill.common.Threads;
+import org.apache.twill.kafka.client.FetchException;
+import org.apache.twill.kafka.client.FetchedMessage;
+import org.apache.twill.kafka.client.KafkaClient;
+import org.apache.twill.kafka.client.PreparePublish;
+import org.apache.twill.zookeeper.ZKClient;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.socket.nio.NioClientBossPool;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioWorkerPool;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Basic implementation of {@link KafkaClient}.
+ */
+public final class SimpleKafkaClient extends AbstractIdleService implements KafkaClient {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SimpleKafkaClient.class);
+  private static final int BROKER_POLL_INTERVAL = 100;
+
+  private final KafkaBrokerCache brokerCache;
+  private ClientBootstrap bootstrap;
+  private ConnectionPool connectionPool;
+
+  public SimpleKafkaClient(ZKClient zkClient) {
+    this.brokerCache = new KafkaBrokerCache(zkClient);
+  }
+
+  @Override
+  protected void startUp() throws Exception {
+    brokerCache.startAndWait();
+    ThreadFactory threadFactory = Threads.createDaemonThreadFactory("kafka-client-netty-%d");
+    NioClientBossPool bossPool = new NioClientBossPool(Executors.newSingleThreadExecutor(threadFactory), 1,
+                                                       new HashedWheelTimer(threadFactory), null);
+    NioWorkerPool workerPool = new NioWorkerPool(Executors.newFixedThreadPool(4, threadFactory), 4);
+
+    bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(bossPool, workerPool));
+    bootstrap.setPipelineFactory(new KafkaChannelPipelineFactory());
+    connectionPool = new ConnectionPool(bootstrap);
+  }
+
+  @Override
+  protected void shutDown() throws Exception {
+    connectionPool.close();
+    bootstrap.releaseExternalResources();
+    brokerCache.stopAndWait();
+  }
+
+  @Override
+  public PreparePublish preparePublish(final String topic, final Compression compression) {
+    final Map<Integer, MessageSetEncoder> encoders = Maps.newHashMap();
+
+    return new PreparePublish() {
+      @Override
+      public PreparePublish add(byte[] payload, Object partitionKey) {
+        return add(ByteBuffer.wrap(payload), partitionKey);
+      }
+
+      @Override
+      public PreparePublish add(ByteBuffer payload, Object partitionKey) {
+        // TODO: Partition
+        int partition = 0;
+
+        MessageSetEncoder encoder = encoders.get(partition);
+        if (encoder == null) {
+          encoder = getEncoder(compression);
+          encoders.put(partition, encoder);
+        }
+        encoder.add(ChannelBuffers.wrappedBuffer(payload));
+
+        return this;
+      }
+
+      @Override
+      public ListenableFuture<?> publish() {
+        List<ListenableFuture<?>> futures = Lists.newArrayListWithCapacity(encoders.size());
+        for (Map.Entry<Integer, MessageSetEncoder> entry : encoders.entrySet()) {
+          futures.add(doPublish(topic, entry.getKey(), entry.getValue().finish()));
+        }
+        encoders.clear();
+        return Futures.allAsList(futures);
+      }
+
+      private ListenableFuture<?> doPublish(String topic, int partition, ChannelBuffer messageSet) {
+        final KafkaRequest request = KafkaRequest.createProduce(topic, partition, messageSet);
+        final SettableFuture<?> result = SettableFuture.create();
+        final ConnectionPool.ConnectResult connection =
+              connectionPool.connect(getTopicBroker(topic, partition).getAddress());
+
+        connection.getChannelFuture().addListener(new ChannelFutureListener() {
+          @Override
+          public void operationComplete(ChannelFuture future) throws Exception {
+            try {
+              future.getChannel().write(request).addListener(getPublishChannelFutureListener(result, null, connection));
+            } catch (Exception e) {
+              result.setException(e);
+            }
+          }
+        });
+
+        return result;
+      }
+    };
+  }
+
+  @Override
+  public Iterator<FetchedMessage> consume(final String topic, final int partition, long offset, int maxSize) {
+    Preconditions.checkArgument(maxSize >= 10, "Message size cannot be smaller than 10.");
+
+    // Connect to broker. Consumer connection are long connection. No need to worry about reuse.
+    final AtomicReference<ChannelFuture> channelFutureRef = new AtomicReference<ChannelFuture>(
+          connectionPool.connect(getTopicBroker(topic, partition).getAddress()).getChannelFuture());
+
+    return new MessageFetcher(topic, partition, offset, maxSize, new KafkaRequestSender() {
+
+      @Override
+      public void send(final KafkaRequest request) {
+        if (!isRunning()) {
+          return;
+        }
+        try {
+          // Try to send the request
+          Channel channel = channelFutureRef.get().getChannel();
+          if (!channel.write(request).await().isSuccess()) {
+            // If failed, retry
+            channel.close();
+            ChannelFuture channelFuture = connectionPool.connect(
+                                              getTopicBroker(topic, partition).getAddress()).getChannelFuture();
+            channelFutureRef.set(channelFuture);
+            channelFuture.addListener(new ChannelFutureListener() {
+              @Override
+              public void operationComplete(ChannelFuture channelFuture) throws Exception {
+                send(request);
+              }
+            });
+          }
+        } catch (InterruptedException e) {
+          // Ignore it
+          LOG.info("Interrupted when sending consume request", e);
+        }
+      }
+    });
+  }
+
+  @Override
+  public ListenableFuture<long[]> getOffset(final String topic, final int partition, long time, int maxOffsets) {
+    final SettableFuture<long[]> resultFuture = SettableFuture.create();
+    final ChannelBuffer body = ChannelBuffers.buffer(Longs.BYTES + Ints.BYTES);
+    body.writeLong(time);
+    body.writeInt(maxOffsets);
+
+    connectionPool.connect(getTopicBroker(topic, partition).getAddress())
+                  .getChannelFuture().addListener(new ChannelFutureListener() {
+      @Override
+      public void operationComplete(ChannelFuture future) throws Exception {
+        if (checkFailure(future)) {
+          return;
+        }
+
+        future.getChannel().write(KafkaRequest.createOffsets(topic, partition, body, new ResponseHandler() {
+          @Override
+          public void received(KafkaResponse response) {
+            if (response.getErrorCode() != FetchException.ErrorCode.OK) {
+              resultFuture.setException(new FetchException("Failed to fetch offset.", response.getErrorCode()));
+            } else {
+              // Decode the offset response, which contains 4 bytes number of offsets, followed by number of offsets,
+              // each 8 bytes in size.
+              ChannelBuffer resultBuffer = response.getBody();
+              int size = resultBuffer.readInt();
+              long[] result = new long[size];
+              for (int i = 0; i < size; i++) {
+                result[i] = resultBuffer.readLong();
+              }
+              resultFuture.set(result);
+            }
+          }
+        })).addListener(new ChannelFutureListener() {
+          @Override
+          public void operationComplete(ChannelFuture future) throws Exception {
+            checkFailure(future);
+          }
+        });
+      }
+
+      private boolean checkFailure(ChannelFuture future) {
+        if (!future.isSuccess()) {
+          if (future.isCancelled()) {
+            resultFuture.cancel(true);
+          } else {
+            resultFuture.setException(future.getCause());
+          }
+          return true;
+        }
+        return false;
+      }
+    });
+
+    return resultFuture;
+  }
+
+  private TopicBroker getTopicBroker(String topic, int partition) {
+    TopicBroker topicBroker = brokerCache.getBrokerAddress(topic, partition);
+    while (topicBroker == null) {
+      try {
+        TimeUnit.MILLISECONDS.sleep(BROKER_POLL_INTERVAL);
+      } catch (InterruptedException e) {
+        return null;
+      }
+      topicBroker = brokerCache.getBrokerAddress(topic, partition);
+    }
+    return topicBroker;
+  }
+
+  private MessageSetEncoder getEncoder(Compression compression) {
+    switch (compression) {
+      case GZIP:
+        return new GZipMessageSetEncoder();
+      case SNAPPY:
+        return new SnappyMessageSetEncoder();
+      default:
+        return new IdentityMessageSetEncoder();
+    }
+  }
+
+  private <V> ChannelFutureListener getPublishChannelFutureListener(final SettableFuture<V> result, final V resultObj,
+                                                                    final ConnectionPool.ConnectionReleaser releaser) {
+    return new ChannelFutureListener() {
+      @Override
+      public void operationComplete(ChannelFuture future) throws Exception {
+        try {
+          if (future.isSuccess()) {
+            result.set(resultObj);
+          } else if (future.isCancelled()) {
+            result.cancel(true);
+          } else {
+            result.setException(future.getCause());
+          }
+        } finally {
+          releaser.release();
+        }
+      }
+    };
+  }
+
+  private static final class KafkaChannelPipelineFactory implements ChannelPipelineFactory {
+
+    @Override
+    public ChannelPipeline getPipeline() throws Exception {
+      ChannelPipeline pipeline = Channels.pipeline();
+
+      pipeline.addLast("encoder", new KafkaRequestEncoder());
+      pipeline.addLast("decoder", new KafkaResponseHandler());
+      pipeline.addLast("dispatcher", new KafkaResponseDispatcher());
+      return pipeline;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SnappyMessageSetEncoder.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SnappyMessageSetEncoder.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SnappyMessageSetEncoder.java
new file mode 100644
index 0000000..bf18c08
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SnappyMessageSetEncoder.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.xerial.snappy.SnappyOutputStream;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A {@link MessageSetEncoder} that compress messages using snappy.
+ */
+final class SnappyMessageSetEncoder extends AbstractCompressedMessageSetEncoder {
+
+  SnappyMessageSetEncoder() {
+    super(Compression.SNAPPY);
+  }
+
+  @Override
+  protected OutputStream createCompressedStream(OutputStream os) throws IOException {
+    return new SnappyOutputStream(os);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/kafka/client/TopicBroker.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/TopicBroker.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/TopicBroker.java
new file mode 100644
index 0000000..fd4bf03
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/TopicBroker.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Represents broker information of a given topic.
+ */
+final class TopicBroker {
+
+  private final String topic;
+  private final InetSocketAddress address;
+  private final int partitionSize;
+
+  TopicBroker(String topic, InetSocketAddress address, int partitionSize) {
+    this.topic = topic;
+    this.address = address;
+    this.partitionSize = partitionSize;
+  }
+
+  String getTopic() {
+    return topic;
+  }
+
+  InetSocketAddress getAddress() {
+    return address;
+  }
+
+  int getPartitionSize() {
+    return partitionSize;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/kafka/client/package-info.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/package-info.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/package-info.java
new file mode 100644
index 0000000..f3f615c
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package provides pure java kafka client implementation.
+ */
+package org.apache.twill.internal.kafka.client;

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java b/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
new file mode 100644
index 0000000..12818ef
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.logging;
+
+import org.apache.twill.common.Services;
+import org.apache.twill.common.Threads;
+import org.apache.twill.internal.kafka.client.Compression;
+import org.apache.twill.internal.kafka.client.SimpleKafkaClient;
+import org.apache.twill.kafka.client.KafkaClient;
+import org.apache.twill.kafka.client.PreparePublish;
+import org.apache.twill.zookeeper.RetryStrategies;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.apache.twill.zookeeper.ZKClientServices;
+import org.apache.twill.zookeeper.ZKClients;
+import ch.qos.logback.classic.pattern.ClassOfCallerConverter;
+import ch.qos.logback.classic.pattern.FileOfCallerConverter;
+import ch.qos.logback.classic.pattern.LineOfCallerConverter;
+import ch.qos.logback.classic.pattern.MethodOfCallerConverter;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.classic.spi.IThrowableProxy;
+import ch.qos.logback.classic.spi.StackTraceElementProxy;
+import ch.qos.logback.core.AppenderBase;
+import com.google.common.base.Charsets;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.gson.stream.JsonWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ *
+ */
+public final class KafkaAppender extends AppenderBase<ILoggingEvent> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(KafkaAppender.class);
+
+  private final LogEventConverter eventConverter;
+  private final AtomicReference<PreparePublish> publisher;
+  private final Runnable flushTask;
+  /**
+   * Rough count of how many entries are being buffered. It's just approximate, not exact.
+   */
+  private final AtomicInteger bufferedSize;
+
+  private ZKClientService zkClientService;
+  private KafkaClient kafkaClient;
+  private String zkConnectStr;
+  private String hostname;
+  private String topic;
+  private Queue<String> buffer;
+  private int flushLimit = 20;
+  private int flushPeriod = 100;
+  private ScheduledExecutorService scheduler;
+
+  public KafkaAppender() {
+    eventConverter = new LogEventConverter();
+    publisher = new AtomicReference<PreparePublish>();
+    flushTask = createFlushTask();
+    bufferedSize = new AtomicInteger();
+    buffer = new ConcurrentLinkedQueue<String>();
+  }
+
+  /**
+   * Sets the zookeeper connection string. Called by slf4j.
+   */
+  @SuppressWarnings("unused")
+  public void setZookeeper(String zkConnectStr) {
+    this.zkConnectStr = zkConnectStr;
+  }
+
+  /**
+   * Sets the hostname. Called by slf4j.
+   */
+  @SuppressWarnings("unused")
+  public void setHostname(String hostname) {
+    this.hostname = hostname;
+  }
+
+  /**
+   * Sets the topic name for publishing logs. Called by slf4j.
+   */
+  @SuppressWarnings("unused")
+  public void setTopic(String topic) {
+    this.topic = topic;
+  }
+
+  /**
+   * Sets the maximum number of cached log entries before performing an force flush. Called by slf4j.
+   */
+  @SuppressWarnings("unused")
+  public void setFlushLimit(int flushLimit) {
+    this.flushLimit = flushLimit;
+  }
+
+  /**
+   * Sets the periodic flush time in milliseconds. Called by slf4j.
+   */
+  @SuppressWarnings("unused")
+  public void setFlushPeriod(int flushPeriod) {
+    this.flushPeriod = flushPeriod;
+  }
+
+  @Override
+  public void start() {
+    Preconditions.checkNotNull(zkConnectStr);
+
+    scheduler = Executors.newSingleThreadScheduledExecutor(Threads.createDaemonThreadFactory("kafka-logger"));
+
+    zkClientService = ZKClientServices.delegate(
+      ZKClients.reWatchOnExpire(
+        ZKClients.retryOnFailure(ZKClientService.Builder.of(zkConnectStr).build(),
+                                 RetryStrategies.fixDelay(1, TimeUnit.SECONDS))));
+
+    kafkaClient = new SimpleKafkaClient(zkClientService);
+    Futures.addCallback(Services.chainStart(zkClientService, kafkaClient), new FutureCallback<Object>() {
+      @Override
+      public void onSuccess(Object result) {
+        LOG.info("Kafka client started: " + zkConnectStr);
+        publisher.set(kafkaClient.preparePublish(topic, Compression.SNAPPY));
+        scheduler.scheduleWithFixedDelay(flushTask, 0, flushPeriod, TimeUnit.MILLISECONDS);
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        // Fail to talk to kafka. Other than logging, what can be done?
+        LOG.error("Failed to start kafka client.", t);
+      }
+    });
+
+    super.start();
+  }
+
+  @Override
+  public void stop() {
+    super.stop();
+    scheduler.shutdownNow();
+    Futures.getUnchecked(Services.chainStop(kafkaClient, zkClientService));
+  }
+
+  public void forceFlush() {
+    try {
+      publishLogs().get(2, TimeUnit.SECONDS);
+    } catch (Exception e) {
+      LOG.error("Failed to publish last batch of log.", e);
+    }
+  }
+
+  @Override
+  protected void append(ILoggingEvent eventObject) {
+    buffer.offer(eventConverter.convert(eventObject));
+    if (bufferedSize.incrementAndGet() >= flushLimit && publisher.get() != null) {
+      // Try to do a extra flush
+      scheduler.submit(flushTask);
+    }
+  }
+
+  private ListenableFuture<Integer> publishLogs() {
+    // If the publisher is not available, simply returns a completed future.
+    PreparePublish publisher = KafkaAppender.this.publisher.get();
+    if (publisher == null) {
+      return Futures.immediateFuture(0);
+    }
+
+    int count = 0;
+    for (String json : Iterables.consumingIterable(buffer)) {
+      publisher.add(Charsets.UTF_8.encode(json), 0);
+      count++;
+    }
+    // Nothing to publish, simply returns a completed future.
+    if (count == 0) {
+      return Futures.immediateFuture(0);
+    }
+
+    bufferedSize.set(0);
+    final int finalCount = count;
+    return Futures.transform(publisher.publish(), new Function<Object, Integer>() {
+      @Override
+      public Integer apply(Object input) {
+        return finalCount;
+      }
+    });
+  }
+
+  /**
+   * Creates a {@link Runnable} that writes all logs in the buffer into kafka.
+   * @return The Runnable task
+   */
+  private Runnable createFlushTask() {
+    return new Runnable() {
+      @Override
+      public void run() {
+        Futures.addCallback(publishLogs(), new FutureCallback<Integer>() {
+          @Override
+          public void onSuccess(Integer result) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Log entries published, size=" + result);
+            }
+          }
+
+          @Override
+          public void onFailure(Throwable t) {
+            LOG.error("Failed to push logs to kafka. Log entries dropped.", t);
+          }
+        });
+      }
+    };
+  }
+
+  /**
+   * Helper class to convert {@link ILoggingEvent} into json string.
+   */
+  private final class LogEventConverter {
+
+    private final ClassOfCallerConverter classNameConverter = new ClassOfCallerConverter();
+    private final MethodOfCallerConverter methodConverter = new MethodOfCallerConverter();
+    private final FileOfCallerConverter fileConverter = new FileOfCallerConverter();
+    private final LineOfCallerConverter lineConverter = new LineOfCallerConverter();
+
+    private String convert(ILoggingEvent event) {
+      StringWriter result = new StringWriter();
+      JsonWriter writer = new JsonWriter(result);
+
+      try {
+        try {
+          writer.beginObject();
+          writer.name("name").value(event.getLoggerName());
+          writer.name("host").value(hostname);
+          writer.name("timestamp").value(Long.toString(event.getTimeStamp()));
+          writer.name("level").value(event.getLevel().toString());
+          writer.name("className").value(classNameConverter.convert(event));
+          writer.name("method").value(methodConverter.convert(event));
+          writer.name("file").value(fileConverter.convert(event));
+          writer.name("line").value(lineConverter.convert(event));
+          writer.name("thread").value(event.getThreadName());
+          writer.name("message").value(event.getFormattedMessage());
+          writer.name("stackTraces");
+          encodeStackTraces(event.getThrowableProxy(), writer);
+
+          writer.endObject();
+        } finally {
+          writer.close();
+        }
+      } catch (IOException e) {
+        throw Throwables.propagate(e);
+      }
+
+      return result.toString();
+    }
+
+    private void encodeStackTraces(IThrowableProxy throwable, JsonWriter writer) throws IOException {
+      writer.beginArray();
+      try {
+        if (throwable == null) {
+          return;
+        }
+
+        for (StackTraceElementProxy stackTrace : throwable.getStackTraceElementProxyArray()) {
+          writer.beginObject();
+
+          StackTraceElement element = stackTrace.getStackTraceElement();
+          writer.name("className").value(element.getClassName());
+          writer.name("method").value(element.getMethodName());
+          writer.name("file").value(element.getFileName());
+          writer.name("line").value(element.getLineNumber());
+
+          writer.endObject();
+        }
+      } finally {
+        writer.endArray();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaTwillRunnable.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaTwillRunnable.java b/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaTwillRunnable.java
new file mode 100644
index 0000000..c1695de
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaTwillRunnable.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.logging;
+
+import org.apache.twill.api.Command;
+import org.apache.twill.api.TwillContext;
+import org.apache.twill.api.TwillRunnable;
+import org.apache.twill.api.TwillRunnableSpecification;
+import org.apache.twill.internal.EnvKeys;
+import org.apache.twill.internal.kafka.EmbeddedKafkaServer;
+import org.apache.twill.internal.utils.Networks;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * A {@link org.apache.twill.api.TwillRunnable} for managing Kafka server.
+ */
+public final class KafkaTwillRunnable implements TwillRunnable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(KafkaTwillRunnable.class);
+
+  private final String kafkaDir;
+  private EmbeddedKafkaServer server;
+  private CountDownLatch stopLatch;
+
+  public KafkaTwillRunnable(String kafkaDir) {
+    this.kafkaDir = kafkaDir;
+  }
+
+  @Override
+  public TwillRunnableSpecification configure() {
+    return TwillRunnableSpecification.Builder.with()
+      .setName("kafka")
+      .withConfigs(ImmutableMap.of("kafkaDir", kafkaDir))
+      .build();
+  }
+
+  @Override
+  public void initialize(TwillContext context) {
+    Map<String, String> args = context.getSpecification().getConfigs();
+    String zkConnectStr = System.getenv(EnvKeys.TWILL_LOG_KAFKA_ZK);
+    stopLatch = new CountDownLatch(1);
+
+    try {
+      server = new EmbeddedKafkaServer(new File(args.get("kafkaDir")), generateKafkaConfig(zkConnectStr));
+      server.startAndWait();
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public void handleCommand(Command command) throws Exception {
+  }
+
+  @Override
+  public void stop() {
+    stopLatch.countDown();
+  }
+
+  @Override
+  public void destroy() {
+    server.stopAndWait();
+  }
+
+  @Override
+  public void run() {
+    try {
+      stopLatch.await();
+    } catch (InterruptedException e) {
+      LOG.info("Running thread interrupted, shutting down kafka server.", e);
+    }
+  }
+
+  private Properties generateKafkaConfig(String zkConnectStr) {
+    int port = Networks.getRandomPort();
+    Preconditions.checkState(port > 0, "Failed to get random port.");
+
+    Properties prop = new Properties();
+    prop.setProperty("log.dir", new File("kafka-logs").getAbsolutePath());
+    prop.setProperty("zk.connect", zkConnectStr);
+    prop.setProperty("num.threads", "8");
+    prop.setProperty("port", Integer.toString(port));
+    prop.setProperty("log.flush.interval", "10000");
+    prop.setProperty("max.socket.request.bytes", "104857600");
+    prop.setProperty("log.cleanup.interval.mins", "1");
+    prop.setProperty("log.default.flush.scheduler.interval.ms", "1000");
+    prop.setProperty("zk.connectiontimeout.ms", "1000000");
+    prop.setProperty("socket.receive.buffer", "1048576");
+    prop.setProperty("enable.zookeeper", "true");
+    prop.setProperty("log.retention.hours", "168");
+    prop.setProperty("brokerid", "0");
+    prop.setProperty("socket.send.buffer", "1048576");
+    prop.setProperty("num.partitions", "1");
+    prop.setProperty("log.file.size", "536870912");
+    prop.setProperty("log.default.flush.interval.ms", "1000");
+    return prop;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/logging/LogEntryDecoder.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/logging/LogEntryDecoder.java b/twill-core/src/main/java/org/apache/twill/internal/logging/LogEntryDecoder.java
new file mode 100644
index 0000000..dc11666
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/logging/LogEntryDecoder.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.logging;
+
+import org.apache.twill.api.logging.LogEntry;
+import org.apache.twill.internal.json.JsonUtils;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+
+import java.lang.reflect.Type;
+
+/**
+ * A {@link com.google.gson.Gson} decoder for {@link LogEntry}.
+ */
+public final class LogEntryDecoder implements JsonDeserializer<LogEntry> {
+
+  @Override
+  public LogEntry deserialize(JsonElement json, Type typeOfT,
+                              JsonDeserializationContext context) throws JsonParseException {
+    if (!json.isJsonObject()) {
+      return null;
+    }
+    JsonObject jsonObj = json.getAsJsonObject();
+
+    final String name = JsonUtils.getAsString(jsonObj, "name");
+    final String host = JsonUtils.getAsString(jsonObj, "host");
+    final long timestamp = JsonUtils.getAsLong(jsonObj, "timestamp", 0);
+    LogEntry.Level l;
+    try {
+      l = LogEntry.Level.valueOf(JsonUtils.getAsString(jsonObj, "level"));
+    } catch (Exception e) {
+      l = LogEntry.Level.FATAL;
+    }
+    final LogEntry.Level logLevel = l;
+    final String className = JsonUtils.getAsString(jsonObj, "className");
+    final String method = JsonUtils.getAsString(jsonObj, "method");
+    final String file = JsonUtils.getAsString(jsonObj, "file");
+    final String line = JsonUtils.getAsString(jsonObj, "line");
+    final String thread = JsonUtils.getAsString(jsonObj, "thread");
+    final String message = JsonUtils.getAsString(jsonObj, "message");
+
+    final StackTraceElement[] stackTraces = context.deserialize(jsonObj.get("stackTraces").getAsJsonArray(),
+                                                                StackTraceElement[].class);
+
+    return new LogEntry() {
+      @Override
+      public String getLoggerName() {
+        return name;
+      }
+
+      @Override
+      public String getHost() {
+        return host;
+      }
+
+      @Override
+      public long getTimestamp() {
+        return timestamp;
+      }
+
+      @Override
+      public Level getLogLevel() {
+        return logLevel;
+      }
+
+      @Override
+      public String getSourceClassName() {
+        return className;
+      }
+
+      @Override
+      public String getSourceMethodName() {
+        return method;
+      }
+
+      @Override
+      public String getFileName() {
+        return file;
+      }
+
+      @Override
+      public int getLineNumber() {
+        if (line.equals("?")) {
+          return -1;
+        } else {
+          return Integer.parseInt(line);
+        }
+      }
+
+      @Override
+      public String getThreadName() {
+        return thread;
+      }
+
+      @Override
+      public String getMessage() {
+        return message;
+      }
+
+      @Override
+      public StackTraceElement[] getStackTraces() {
+        return stackTraces;
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/logging/Loggings.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/logging/Loggings.java b/twill-core/src/main/java/org/apache/twill/internal/logging/Loggings.java
new file mode 100644
index 0000000..9baed63
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/logging/Loggings.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.logging;
+
+import ch.qos.logback.classic.Logger;
+import ch.qos.logback.classic.LoggerContext;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.Appender;
+import org.slf4j.ILoggerFactory;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public final class Loggings {
+
+  public static void forceFlush() {
+    ILoggerFactory loggerFactory = LoggerFactory.getILoggerFactory();
+
+    if (loggerFactory instanceof LoggerContext) {
+      Appender<ILoggingEvent> appender = ((LoggerContext) loggerFactory).getLogger(Logger.ROOT_LOGGER_NAME)
+                                                                        .getAppender("KAFKA");
+      if (appender != null && appender instanceof KafkaAppender) {
+        ((KafkaAppender) appender).forceFlush();
+      }
+    }
+  }
+
+  private Loggings() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/package-info.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/package-info.java b/twill-core/src/main/java/org/apache/twill/internal/package-info.java
new file mode 100644
index 0000000..a8459e0
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package provides internal classes for Twill.
+ */
+package org.apache.twill.internal;

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/state/Message.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/state/Message.java b/twill-core/src/main/java/org/apache/twill/internal/state/Message.java
new file mode 100644
index 0000000..6c3e719
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/state/Message.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.state;
+
+import org.apache.twill.api.Command;
+
+/**
+ *
+ */
+public interface Message {
+
+  /**
+   * Type of message.
+   */
+  enum Type {
+    SYSTEM,
+    USER
+  }
+
+  /**
+   * Scope of the message.
+   */
+  enum Scope {
+    APPLICATION,
+    ALL_RUNNABLE,
+    RUNNABLE
+  }
+
+  Type getType();
+
+  Scope getScope();
+
+  /**
+   * @return the name of the target runnable if scope is {@link Scope#RUNNABLE} or {@code null} otherwise.
+   */
+  String getRunnableName();
+
+  Command getCommand();
+}