You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/12/12 22:59:59 UTC
[17/28] Making maven site works.
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/kafka/client/GZipMessageSetEncoder.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/GZipMessageSetEncoder.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/GZipMessageSetEncoder.java
new file mode 100644
index 0000000..daa0c2c
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/GZipMessageSetEncoder.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.zip.GZIPOutputStream;
+
+/**
+ * A {@link MessageSetEncoder} that compress message set using GZIP.
+ */
+final class GZipMessageSetEncoder extends AbstractCompressedMessageSetEncoder {
+
+ GZipMessageSetEncoder() {
+ super(Compression.GZIP);
+ }
+
+ @Override
+ protected OutputStream createCompressedStream(OutputStream os) throws IOException {
+ return new GZIPOutputStream(os);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IdentityMessageSetEncoder.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IdentityMessageSetEncoder.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IdentityMessageSetEncoder.java
new file mode 100644
index 0000000..51dc746
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IdentityMessageSetEncoder.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+/**
+ * A pass-through {@link MessageSetEncoder}.
+ */
+final class IdentityMessageSetEncoder extends AbstractMessageSetEncoder {
+
+ private ChannelBuffer messageSets = ChannelBuffers.EMPTY_BUFFER;
+
+ @Override
+ public MessageSetEncoder add(ChannelBuffer payload) {
+ messageSets = ChannelBuffers.wrappedBuffer(messageSets, encodePayload(payload));
+ return this;
+ }
+
+ @Override
+ public ChannelBuffer finish() {
+ ChannelBuffer buf = prefixLength(messageSets);
+ messageSets = ChannelBuffers.EMPTY_BUFFER;
+ return buf;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaBrokerCache.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaBrokerCache.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaBrokerCache.java
new file mode 100644
index 0000000..f2bb815
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaBrokerCache.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.apache.twill.common.Threads;
+import org.apache.twill.zookeeper.NodeChildren;
+import org.apache.twill.zookeeper.NodeData;
+import org.apache.twill.zookeeper.ZKClient;
+import com.google.common.base.Charsets;
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.SortedMap;
+
+/**
+ * A Service to cache kafka broker information by subscribing to ZooKeeper.
+ */
+final class KafkaBrokerCache extends AbstractIdleService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaBrokerCache.class);
+
+ private static final String BROKERS_PATH = "/brokers";
+
+ private final ZKClient zkClient;
+ private final Map<String, InetSocketAddress> brokers;
+ // topicBrokers is from topic->partition size->brokerId
+ private final Map<String, SortedMap<Integer, Set<String>>> topicBrokers;
+ private final Runnable invokeGetBrokers = new Runnable() {
+ @Override
+ public void run() {
+ getBrokers();
+ }
+ };
+ private final Runnable invokeGetTopics = new Runnable() {
+ @Override
+ public void run() {
+ getTopics();
+ }
+ };
+
+ KafkaBrokerCache(ZKClient zkClient) {
+ this.zkClient = zkClient;
+ this.brokers = Maps.newConcurrentMap();
+ this.topicBrokers = Maps.newConcurrentMap();
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ getBrokers();
+ getTopics();
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ // No-op
+ }
+
+ public int getPartitionSize(String topic) {
+ SortedMap<Integer, Set<String>> partitionBrokers = topicBrokers.get(topic);
+ if (partitionBrokers == null || partitionBrokers.isEmpty()) {
+ return 1;
+ }
+ return partitionBrokers.lastKey();
+ }
+
+ public TopicBroker getBrokerAddress(String topic, int partition) {
+ SortedMap<Integer, Set<String>> partitionBrokers = topicBrokers.get(topic);
+ if (partitionBrokers == null || partitionBrokers.isEmpty()) {
+ return pickRandomBroker(topic);
+ }
+
+ // If the requested partition is greater than supported partition size, randomly pick one
+ if (partition >= partitionBrokers.lastKey()) {
+ return pickRandomBroker(topic);
+ }
+
+ // Randomly pick a partition size and randomly pick a broker from it
+ Random random = new Random();
+ partitionBrokers = partitionBrokers.tailMap(partition + 1);
+ List<Integer> sizes = Lists.newArrayList(partitionBrokers.keySet());
+ Integer partitionSize = pickRandomItem(sizes, random);
+ List<String> ids = Lists.newArrayList(partitionBrokers.get(partitionSize));
+ InetSocketAddress address = brokers.get(ids.get(new Random().nextInt(ids.size())));
+ return address == null ? pickRandomBroker(topic) : new TopicBroker(topic, address, partitionSize);
+ }
+
+ private TopicBroker pickRandomBroker(String topic) {
+ Map.Entry<String, InetSocketAddress> entry = Iterables.getFirst(brokers.entrySet(), null);
+ if (entry == null) {
+ return null;
+ }
+ InetSocketAddress address = entry.getValue();
+ return new TopicBroker(topic, address, 0);
+ }
+
+ private <T> T pickRandomItem(List<T> list, Random random) {
+ return list.get(random.nextInt(list.size()));
+ }
+
+ private void getBrokers() {
+ final String idsPath = BROKERS_PATH + "/ids";
+
+ Futures.addCallback(zkClient.getChildren(idsPath, new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ getBrokers();
+ }
+ }), new ExistsOnFailureFutureCallback<NodeChildren>(idsPath, invokeGetBrokers) {
+ @Override
+ public void onSuccess(NodeChildren result) {
+ Set<String> children = ImmutableSet.copyOf(result.getChildren());
+ for (String child : children) {
+ getBrokenData(idsPath + "/" + child, child);
+ }
+ // Remove all removed brokers
+ removeDiff(children, brokers);
+ }
+ });
+ }
+
+ private void getTopics() {
+ final String topicsPath = BROKERS_PATH + "/topics";
+ Futures.addCallback(zkClient.getChildren(topicsPath, new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ getTopics();
+ }
+ }), new ExistsOnFailureFutureCallback<NodeChildren>(topicsPath, invokeGetTopics) {
+ @Override
+ public void onSuccess(NodeChildren result) {
+ Set<String> children = ImmutableSet.copyOf(result.getChildren());
+
+ // Process new children
+ for (String topic : ImmutableSet.copyOf(Sets.difference(children, topicBrokers.keySet()))) {
+ getTopic(topicsPath + "/" + topic, topic);
+ }
+
+ // Remove old children
+ removeDiff(children, topicBrokers);
+ }
+ });
+ }
+
+ private void getBrokenData(String path, final String brokerId) {
+ Futures.addCallback(zkClient.getData(path), new FutureCallback<NodeData>() {
+ @Override
+ public void onSuccess(NodeData result) {
+ String data = new String(result.getData(), Charsets.UTF_8);
+ String hostPort = data.substring(data.indexOf(':') + 1);
+ int idx = hostPort.indexOf(':');
+ brokers.put(brokerId, new InetSocketAddress(hostPort.substring(0, idx),
+ Integer.parseInt(hostPort.substring(idx + 1))));
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ // No-op, the watch on the parent node will handle it.
+ }
+ });
+ }
+
+ private void getTopic(final String path, final String topic) {
+ Futures.addCallback(zkClient.getChildren(path, new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ // Other event type changes are either could be ignored or handled by parent watcher
+ if (event.getType() == Event.EventType.NodeChildrenChanged) {
+ getTopic(path, topic);
+ }
+ }
+ }), new FutureCallback<NodeChildren>() {
+ @Override
+ public void onSuccess(NodeChildren result) {
+ List<String> children = result.getChildren();
+ final List<ListenableFuture<BrokerPartition>> futures = Lists.newArrayListWithCapacity(children.size());
+
+ // Fetch data from each broken node
+ for (final String brokerId : children) {
+ Futures.transform(zkClient.getData(path + "/" + brokerId), new Function<NodeData, BrokerPartition>() {
+ @Override
+ public BrokerPartition apply(NodeData input) {
+ return new BrokerPartition(brokerId, Integer.parseInt(new String(input.getData(), Charsets.UTF_8)));
+ }
+ });
+ }
+
+ // When all fetching is done, build the partition size->broker map for this topic
+ Futures.successfulAsList(futures).addListener(new Runnable() {
+ @Override
+ public void run() {
+ Map<Integer, Set<String>> partitionBrokers = Maps.newHashMap();
+ for (ListenableFuture<BrokerPartition> future : futures) {
+ try {
+ BrokerPartition info = future.get();
+ Set<String> brokerSet = partitionBrokers.get(info.getPartitionSize());
+ if (brokerSet == null) {
+ brokerSet = Sets.newHashSet();
+ partitionBrokers.put(info.getPartitionSize(), brokerSet);
+ }
+ brokerSet.add(info.getBrokerId());
+ } catch (Exception e) {
+ // Exception is ignored, as it will be handled by parent watcher
+ }
+ }
+ topicBrokers.put(topic, ImmutableSortedMap.copyOf(partitionBrokers));
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ // No-op. Failure would be handled by parent watcher already (e.g. node not exists -> children change in parent)
+ }
+ });
+ }
+
+ private <K, V> void removeDiff(Set<K> keys, Map<K, V> map) {
+ for (K key : ImmutableSet.copyOf(Sets.difference(map.keySet(), keys))) {
+ map.remove(key);
+ }
+ }
+
+ private abstract class ExistsOnFailureFutureCallback<V> implements FutureCallback<V> {
+
+ private final String path;
+ private final Runnable action;
+
+ protected ExistsOnFailureFutureCallback(String path, Runnable action) {
+ this.path = path;
+ this.action = action;
+ }
+
+ @Override
+ public final void onFailure(Throwable t) {
+ if (!isNotExists(t)) {
+ LOG.error("Fail to watch for kafka brokers: " + path, t);
+ return;
+ }
+
+ waitExists(path);
+ }
+
+ private boolean isNotExists(Throwable t) {
+ return ((t instanceof KeeperException) && ((KeeperException) t).code() == KeeperException.Code.NONODE);
+ }
+
+ private void waitExists(String path) {
+ LOG.info("Path " + path + " not exists. Watch for creation.");
+
+ // If the node doesn't exists, use the "exists" call to watch for node creation.
+ Futures.addCallback(zkClient.exists(path, new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ if (event.getType() == Event.EventType.NodeCreated || event.getType() == Event.EventType.NodeDeleted) {
+ action.run();
+ }
+ }
+ }), new FutureCallback<Stat>() {
+ @Override
+ public void onSuccess(Stat result) {
+ // If path exists, get children again, otherwise wait for watch to get triggered
+ if (result != null) {
+ action.run();
+ }
+ }
+ @Override
+ public void onFailure(Throwable t) {
+ action.run();
+ }
+ });
+ }
+ }
+
+ private static final class BrokerPartition {
+ private final String brokerId;
+ private final int partitionSize;
+
+ private BrokerPartition(String brokerId, int partitionSize) {
+ this.brokerId = brokerId;
+ this.partitionSize = partitionSize;
+ }
+
+ public String getBrokerId() {
+ return brokerId;
+ }
+
+ public int getPartitionSize() {
+ return partitionSize;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequest.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequest.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequest.java
new file mode 100644
index 0000000..7b43f8a
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+
+/**
+ *
+ */
+final class KafkaRequest {
+
+ public enum Type {
+ PRODUCE(0),
+ FETCH(1),
+ MULTI_FETCH(2),
+ MULTI_PRODUCE(3),
+ OFFSETS(4);
+
+ private final short id;
+
+ private Type(int id) {
+ this.id = (short) id;
+ }
+
+ public short getId() {
+ return id;
+ }
+ }
+
+ private final Type type;
+ private final String topic;
+ private final int partition;
+ private final ChannelBuffer body;
+ private final ResponseHandler responseHandler;
+
+
+ public static KafkaRequest createProduce(String topic, int partition, ChannelBuffer body) {
+ return new KafkaRequest(Type.PRODUCE, topic, partition, body, ResponseHandler.NO_OP);
+ }
+
+ public static KafkaRequest createFetch(String topic, int partition, ChannelBuffer body, ResponseHandler handler) {
+ return new KafkaRequest(Type.FETCH, topic, partition, body, handler);
+ }
+
+ public static KafkaRequest createOffsets(String topic, int partition, ChannelBuffer body, ResponseHandler handler) {
+ return new KafkaRequest(Type.OFFSETS, topic, partition, body, handler);
+ }
+
+ private KafkaRequest(Type type, String topic, int partition, ChannelBuffer body, ResponseHandler responseHandler) {
+ this.type = type;
+ this.topic = topic;
+ this.partition = partition;
+ this.body = body;
+ this.responseHandler = responseHandler;
+ }
+
+ Type getType() {
+ return type;
+ }
+
+ String getTopic() {
+ return topic;
+ }
+
+ int getPartition() {
+ return partition;
+ }
+
+ ChannelBuffer getBody() {
+ return body;
+ }
+
+ ResponseHandler getResponseHandler() {
+ return responseHandler;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestEncoder.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestEncoder.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestEncoder.java
new file mode 100644
index 0000000..ef78c76
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestEncoder.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import com.google.common.base.Charsets;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
+
+import java.nio.ByteBuffer;
+
+/**
+ *
+ */
+final class KafkaRequestEncoder extends OneToOneEncoder {
+
+ @Override
+ protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
+ if (!(msg instanceof KafkaRequest)) {
+ return msg;
+ }
+ KafkaRequest req = (KafkaRequest) msg;
+ ByteBuffer topic = Charsets.UTF_8.encode(req.getTopic());
+
+ ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(16 + topic.remaining() + req.getBody().readableBytes());
+ int writerIdx = buffer.writerIndex();
+ buffer.writerIndex(writerIdx + 4); // Reserves 4 bytes for message length
+
+ // Write out <REQUEST_TYPE>, <TOPIC_LENGTH>, <TOPIC>, <PARTITION>
+ buffer.writeShort(req.getType().getId());
+ buffer.writeShort(topic.remaining());
+ buffer.writeBytes(topic);
+ buffer.writeInt(req.getPartition());
+
+ // Write out the size of the whole buffer (excluding the size field) at the beginning
+ buffer.setInt(writerIdx, buffer.readableBytes() - 4 + req.getBody().readableBytes());
+
+ ChannelBuffer buf = ChannelBuffers.wrappedBuffer(buffer, req.getBody());
+ buf = buf.readBytes(buf.readableBytes());
+
+ return buf;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestSender.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestSender.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestSender.java
new file mode 100644
index 0000000..fbc552c
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestSender.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+/**
+ *
+ */
+interface KafkaRequestSender {
+
+ void send(KafkaRequest request);
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponse.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponse.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponse.java
new file mode 100644
index 0000000..68c1bd8
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponse.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.apache.twill.kafka.client.FetchException;
+import org.jboss.netty.buffer.ChannelBuffer;
+
+/**
+ *
+ */
+final class KafkaResponse {
+
+ private final FetchException.ErrorCode errorCode;
+ private final ChannelBuffer body;
+ private final int size;
+
+ KafkaResponse(FetchException.ErrorCode errorCode, ChannelBuffer body, int size) {
+ this.errorCode = errorCode;
+ this.body = body;
+ this.size = size;
+ }
+
+ public int getSize() {
+ return size;
+ }
+
+ public FetchException.ErrorCode getErrorCode() {
+ return errorCode;
+ }
+
+ public ChannelBuffer getBody() {
+ return body;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseDispatcher.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseDispatcher.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseDispatcher.java
new file mode 100644
index 0000000..47f70ce
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseDispatcher.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.SocketException;
+import java.nio.channels.ClosedChannelException;
+
+/**
+ *
+ */
+final class KafkaResponseDispatcher extends SimpleChannelHandler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaResponseDispatcher.class);
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+ Object attachment = ctx.getAttachment();
+ if (e.getMessage() instanceof KafkaResponse && attachment instanceof ResponseHandler) {
+ ((ResponseHandler) attachment).received((KafkaResponse) e.getMessage());
+ } else {
+ super.messageReceived(ctx, e);
+ }
+ }
+
+ @Override
+ public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+ if (e.getMessage() instanceof KafkaRequest) {
+ ctx.setAttachment(((KafkaRequest) e.getMessage()).getResponseHandler());
+ }
+ super.writeRequested(ctx, e);
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
+ if (e.getCause() instanceof ClosedChannelException || e.getCause() instanceof SocketException) {
+ // No need to log for socket exception as the client has logic to retry.
+ return;
+ }
+ LOG.warn("Exception caught in kafka client connection.", e.getCause());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseHandler.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseHandler.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseHandler.java
new file mode 100644
index 0000000..5251e65
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseHandler.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.apache.twill.kafka.client.FetchException;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+
+/**
+ *
+ */
+final class KafkaResponseHandler extends SimpleChannelHandler {
+
+ private final Bufferer bufferer = new Bufferer();
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+ Object msg = e.getMessage();
+ if (!(msg instanceof ChannelBuffer)) {
+ super.messageReceived(ctx, e);
+ return;
+ }
+
+ bufferer.apply((ChannelBuffer) msg);
+ ChannelBuffer buffer = bufferer.getNext();
+ while (buffer.readable()) {
+ // Send the response object upstream
+ Channels.fireMessageReceived(ctx, new KafkaResponse(FetchException.ErrorCode.fromCode(buffer.readShort()),
+ buffer, buffer.readableBytes() + 6));
+ buffer = bufferer.getNext();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/kafka/client/MessageFetcher.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/MessageFetcher.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/MessageFetcher.java
new file mode 100644
index 0000000..0814917
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/MessageFetcher.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.apache.twill.common.Threads;
+import org.apache.twill.kafka.client.FetchException;
+import org.apache.twill.kafka.client.FetchedMessage;
+import com.google.common.base.Throwables;
+import com.google.common.collect.AbstractIterator;
+import com.google.common.io.ByteStreams;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBufferInputStream;
+import org.jboss.netty.buffer.ChannelBufferOutputStream;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.xerial.snappy.SnappyInputStream;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.zip.GZIPInputStream;
+
+/**
+ * This class is for consuming messages from a kafka topic.
+ */
+final class MessageFetcher extends AbstractIterator<FetchedMessage> implements ResponseHandler {
+
+ private static final long BACKOFF_INTERVAL_MS = 100;
+
+ private final KafkaRequestSender sender;
+ private final String topic;
+ private final int partition;
+ private final int maxSize;
+ private final AtomicLong offset;
+ private final BlockingQueue<FetchResult> messages;
+ private final ScheduledExecutorService scheduler;
+ private volatile long backoffMillis;
+ private final Runnable sendFetchRequest = new Runnable() {
+ @Override
+ public void run() {
+ sendFetchRequest();
+ }
+ };
+
+ MessageFetcher(String topic, int partition, long offset, int maxSize, KafkaRequestSender sender) {
+ this.topic = topic;
+ this.partition = partition;
+ this.sender = sender;
+ this.offset = new AtomicLong(offset);
+ this.maxSize = maxSize;
+ this.messages = new LinkedBlockingQueue<FetchResult>();
+ this.scheduler = Executors.newSingleThreadScheduledExecutor(
+ Threads.createDaemonThreadFactory("kafka-" + topic + "-consumer"));
+ }
+
+ @Override
+ public void received(KafkaResponse response) {
+ if (response.getErrorCode() != FetchException.ErrorCode.OK) {
+ messages.add(FetchResult.failure(new FetchException("Error in fetching: " + response.getErrorCode(),
+ response.getErrorCode())));
+ return;
+ }
+
+ try {
+ if (decodeResponse(response.getBody(), -1)) {
+ backoffMillis = 0;
+ } else {
+ backoffMillis = Math.max(backoffMillis + BACKOFF_INTERVAL_MS, 1000);
+ scheduler.schedule(sendFetchRequest, backoffMillis, TimeUnit.MILLISECONDS);
+ }
+ } catch (Throwable t) {
+ messages.add(FetchResult.failure(t));
+ }
+ }
+
+ private boolean decodeResponse(ChannelBuffer buffer, long nextOffset) {
+ boolean hasMessage = false;
+ boolean computeOffset = nextOffset < 0;
+ while (buffer.readableBytes() >= 4) {
+ int size = buffer.readInt();
+ if (buffer.readableBytes() < size) {
+ if (!hasMessage) {
+ throw new IllegalStateException("Size too small");
+ }
+ break;
+ }
+ nextOffset = computeOffset ? offset.addAndGet(size + 4) : nextOffset;
+ decodeMessage(size, buffer, nextOffset);
+ hasMessage = true;
+ }
+ return hasMessage;
+
+ }
+
+ private void decodeMessage(int size, ChannelBuffer buffer, long nextOffset) {
+ int readerIdx = buffer.readerIndex();
+ int magic = buffer.readByte();
+ Compression compression = magic == 0 ? Compression.NONE : Compression.fromCode(buffer.readByte());
+ int crc = buffer.readInt();
+
+ ChannelBuffer payload = buffer.readSlice(size - (buffer.readerIndex() - readerIdx));
+
+ // Verify CRC?
+ enqueueMessage(compression, payload, nextOffset);
+ }
+
+ private void enqueueMessage(Compression compression, ChannelBuffer payload, long nextOffset) {
+ switch (compression) {
+ case NONE:
+ messages.add(FetchResult.success(new BasicFetchedMessage(nextOffset, payload.toByteBuffer())));
+ break;
+ case GZIP:
+ decodeResponse(gunzip(payload), nextOffset);
+ break;
+ case SNAPPY:
+ decodeResponse(unsnappy(payload), nextOffset);
+ break;
+ }
+ }
+
+ private ChannelBuffer gunzip(ChannelBuffer source) {
+ ChannelBufferOutputStream output = new ChannelBufferOutputStream(
+ ChannelBuffers.dynamicBuffer(source.readableBytes() * 2));
+ try {
+ try {
+ GZIPInputStream gzipInput = new GZIPInputStream(new ChannelBufferInputStream(source));
+ try {
+ ByteStreams.copy(gzipInput, output);
+ return output.buffer();
+ } finally {
+ gzipInput.close();
+ }
+ } finally {
+ output.close();
+ }
+ } catch (IOException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ private ChannelBuffer unsnappy(ChannelBuffer source) {
+ ChannelBufferOutputStream output = new ChannelBufferOutputStream(
+ ChannelBuffers.dynamicBuffer(source.readableBytes() * 2));
+ try {
+ try {
+ SnappyInputStream snappyInput = new SnappyInputStream(new ChannelBufferInputStream(source));
+ try {
+ ByteStreams.copy(snappyInput, output);
+ return output.buffer();
+ } finally {
+ snappyInput.close();
+ }
+ } finally {
+ output.close();
+ }
+ } catch (IOException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ private void sendFetchRequest() {
+ ChannelBuffer fetchBody = ChannelBuffers.buffer(12);
+ fetchBody.writeLong(offset.get());
+ fetchBody.writeInt(maxSize);
+ sender.send(KafkaRequest.createFetch(topic, partition, fetchBody, MessageFetcher.this));
+ }
+
+ @Override
+ protected FetchedMessage computeNext() {
+ FetchResult result = messages.poll();
+ if (result != null) {
+ return getMessage(result);
+ }
+
+ try {
+ sendFetchRequest();
+ return getMessage(messages.take());
+ } catch (InterruptedException e) {
+ scheduler.shutdownNow();
+ return endOfData();
+ }
+ }
+
+ private FetchedMessage getMessage(FetchResult result) {
+ try {
+ if (result.isSuccess()) {
+ return result.getMessage();
+ } else {
+ throw result.getErrorCause();
+ }
+ } catch (Throwable t) {
+ throw Throwables.propagate(t);
+ }
+ }
+
+ private static final class FetchResult {
+ private final FetchedMessage message;
+ private final Throwable errorCause;
+
+ static FetchResult success(FetchedMessage message) {
+ return new FetchResult(message, null);
+ }
+
+ static FetchResult failure(Throwable cause) {
+ return new FetchResult(null, cause);
+ }
+
+ private FetchResult(FetchedMessage message, Throwable errorCause) {
+ this.message = message;
+ this.errorCause = errorCause;
+ }
+
+ public FetchedMessage getMessage() {
+ return message;
+ }
+
+ public Throwable getErrorCause() {
+ return errorCause;
+ }
+
+ public boolean isSuccess() {
+ return message != null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/kafka/client/MessageSetEncoder.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/MessageSetEncoder.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/MessageSetEncoder.java
new file mode 100644
index 0000000..49008cc
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/MessageSetEncoder.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+
+/**
+ * This represents a set of messages that goes into the same message set and get encoded as
+ * single kafka message set.
+ */
+interface MessageSetEncoder {
+
+ MessageSetEncoder add(ChannelBuffer payload);
+
+ ChannelBuffer finish();
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ResponseHandler.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ResponseHandler.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ResponseHandler.java
new file mode 100644
index 0000000..f681b85
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ResponseHandler.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+/**
+ * Represents handler for kafka response.
+ */
+interface ResponseHandler {
+
+ ResponseHandler NO_OP = new ResponseHandler() {
+ @Override
+ public void received(KafkaResponse response) {
+ // No-op
+ }
+ };
+
+ void received(KafkaResponse response);
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaClient.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaClient.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaClient.java
new file mode 100644
index 0000000..8ff4856
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaClient.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.apache.twill.common.Threads;
+import org.apache.twill.kafka.client.FetchException;
+import org.apache.twill.kafka.client.FetchedMessage;
+import org.apache.twill.kafka.client.KafkaClient;
+import org.apache.twill.kafka.client.PreparePublish;
+import org.apache.twill.zookeeper.ZKClient;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.socket.nio.NioClientBossPool;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioWorkerPool;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Basic implementation of {@link KafkaClient}.
+ */
+public final class SimpleKafkaClient extends AbstractIdleService implements KafkaClient {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SimpleKafkaClient.class);
+ private static final int BROKER_POLL_INTERVAL = 100;
+
+ private final KafkaBrokerCache brokerCache;
+ private ClientBootstrap bootstrap;
+ private ConnectionPool connectionPool;
+
+ public SimpleKafkaClient(ZKClient zkClient) {
+ this.brokerCache = new KafkaBrokerCache(zkClient);
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ brokerCache.startAndWait();
+ ThreadFactory threadFactory = Threads.createDaemonThreadFactory("kafka-client-netty-%d");
+ NioClientBossPool bossPool = new NioClientBossPool(Executors.newSingleThreadExecutor(threadFactory), 1,
+ new HashedWheelTimer(threadFactory), null);
+ NioWorkerPool workerPool = new NioWorkerPool(Executors.newFixedThreadPool(4, threadFactory), 4);
+
+ bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(bossPool, workerPool));
+ bootstrap.setPipelineFactory(new KafkaChannelPipelineFactory());
+ connectionPool = new ConnectionPool(bootstrap);
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ connectionPool.close();
+ bootstrap.releaseExternalResources();
+ brokerCache.stopAndWait();
+ }
+
+ @Override
+ public PreparePublish preparePublish(final String topic, final Compression compression) {
+ final Map<Integer, MessageSetEncoder> encoders = Maps.newHashMap();
+
+ return new PreparePublish() {
+ @Override
+ public PreparePublish add(byte[] payload, Object partitionKey) {
+ return add(ByteBuffer.wrap(payload), partitionKey);
+ }
+
+ @Override
+ public PreparePublish add(ByteBuffer payload, Object partitionKey) {
+ // TODO: Partition
+ int partition = 0;
+
+ MessageSetEncoder encoder = encoders.get(partition);
+ if (encoder == null) {
+ encoder = getEncoder(compression);
+ encoders.put(partition, encoder);
+ }
+ encoder.add(ChannelBuffers.wrappedBuffer(payload));
+
+ return this;
+ }
+
+ @Override
+ public ListenableFuture<?> publish() {
+ List<ListenableFuture<?>> futures = Lists.newArrayListWithCapacity(encoders.size());
+ for (Map.Entry<Integer, MessageSetEncoder> entry : encoders.entrySet()) {
+ futures.add(doPublish(topic, entry.getKey(), entry.getValue().finish()));
+ }
+ encoders.clear();
+ return Futures.allAsList(futures);
+ }
+
+ private ListenableFuture<?> doPublish(String topic, int partition, ChannelBuffer messageSet) {
+ final KafkaRequest request = KafkaRequest.createProduce(topic, partition, messageSet);
+ final SettableFuture<?> result = SettableFuture.create();
+ final ConnectionPool.ConnectResult connection =
+ connectionPool.connect(getTopicBroker(topic, partition).getAddress());
+
+ connection.getChannelFuture().addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ try {
+ future.getChannel().write(request).addListener(getPublishChannelFutureListener(result, null, connection));
+ } catch (Exception e) {
+ result.setException(e);
+ }
+ }
+ });
+
+ return result;
+ }
+ };
+ }
+
+ @Override
+ public Iterator<FetchedMessage> consume(final String topic, final int partition, long offset, int maxSize) {
+ Preconditions.checkArgument(maxSize >= 10, "Message size cannot be smaller than 10.");
+
+ // Connect to broker. Consumer connection are long connection. No need to worry about reuse.
+ final AtomicReference<ChannelFuture> channelFutureRef = new AtomicReference<ChannelFuture>(
+ connectionPool.connect(getTopicBroker(topic, partition).getAddress()).getChannelFuture());
+
+ return new MessageFetcher(topic, partition, offset, maxSize, new KafkaRequestSender() {
+
+ @Override
+ public void send(final KafkaRequest request) {
+ if (!isRunning()) {
+ return;
+ }
+ try {
+ // Try to send the request
+ Channel channel = channelFutureRef.get().getChannel();
+ if (!channel.write(request).await().isSuccess()) {
+ // If failed, retry
+ channel.close();
+ ChannelFuture channelFuture = connectionPool.connect(
+ getTopicBroker(topic, partition).getAddress()).getChannelFuture();
+ channelFutureRef.set(channelFuture);
+ channelFuture.addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture channelFuture) throws Exception {
+ send(request);
+ }
+ });
+ }
+ } catch (InterruptedException e) {
+ // Ignore it
+ LOG.info("Interrupted when sending consume request", e);
+ }
+ }
+ });
+ }
+
+ @Override
+ public ListenableFuture<long[]> getOffset(final String topic, final int partition, long time, int maxOffsets) {
+ final SettableFuture<long[]> resultFuture = SettableFuture.create();
+ final ChannelBuffer body = ChannelBuffers.buffer(Longs.BYTES + Ints.BYTES);
+ body.writeLong(time);
+ body.writeInt(maxOffsets);
+
+ connectionPool.connect(getTopicBroker(topic, partition).getAddress())
+ .getChannelFuture().addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (checkFailure(future)) {
+ return;
+ }
+
+ future.getChannel().write(KafkaRequest.createOffsets(topic, partition, body, new ResponseHandler() {
+ @Override
+ public void received(KafkaResponse response) {
+ if (response.getErrorCode() != FetchException.ErrorCode.OK) {
+ resultFuture.setException(new FetchException("Failed to fetch offset.", response.getErrorCode()));
+ } else {
+ // Decode the offset response, which contains 4 bytes number of offsets, followed by number of offsets,
+ // each 8 bytes in size.
+ ChannelBuffer resultBuffer = response.getBody();
+ int size = resultBuffer.readInt();
+ long[] result = new long[size];
+ for (int i = 0; i < size; i++) {
+ result[i] = resultBuffer.readLong();
+ }
+ resultFuture.set(result);
+ }
+ }
+ })).addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ checkFailure(future);
+ }
+ });
+ }
+
+ private boolean checkFailure(ChannelFuture future) {
+ if (!future.isSuccess()) {
+ if (future.isCancelled()) {
+ resultFuture.cancel(true);
+ } else {
+ resultFuture.setException(future.getCause());
+ }
+ return true;
+ }
+ return false;
+ }
+ });
+
+ return resultFuture;
+ }
+
+ private TopicBroker getTopicBroker(String topic, int partition) {
+ TopicBroker topicBroker = brokerCache.getBrokerAddress(topic, partition);
+ while (topicBroker == null) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(BROKER_POLL_INTERVAL);
+ } catch (InterruptedException e) {
+ return null;
+ }
+ topicBroker = brokerCache.getBrokerAddress(topic, partition);
+ }
+ return topicBroker;
+ }
+
+ private MessageSetEncoder getEncoder(Compression compression) {
+ switch (compression) {
+ case GZIP:
+ return new GZipMessageSetEncoder();
+ case SNAPPY:
+ return new SnappyMessageSetEncoder();
+ default:
+ return new IdentityMessageSetEncoder();
+ }
+ }
+
+ private <V> ChannelFutureListener getPublishChannelFutureListener(final SettableFuture<V> result, final V resultObj,
+ final ConnectionPool.ConnectionReleaser releaser) {
+ return new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ try {
+ if (future.isSuccess()) {
+ result.set(resultObj);
+ } else if (future.isCancelled()) {
+ result.cancel(true);
+ } else {
+ result.setException(future.getCause());
+ }
+ } finally {
+ releaser.release();
+ }
+ }
+ };
+ }
+
+ private static final class KafkaChannelPipelineFactory implements ChannelPipelineFactory {
+
+ @Override
+ public ChannelPipeline getPipeline() throws Exception {
+ ChannelPipeline pipeline = Channels.pipeline();
+
+ pipeline.addLast("encoder", new KafkaRequestEncoder());
+ pipeline.addLast("decoder", new KafkaResponseHandler());
+ pipeline.addLast("dispatcher", new KafkaResponseDispatcher());
+ return pipeline;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SnappyMessageSetEncoder.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SnappyMessageSetEncoder.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SnappyMessageSetEncoder.java
new file mode 100644
index 0000000..bf18c08
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SnappyMessageSetEncoder.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.xerial.snappy.SnappyOutputStream;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A {@link MessageSetEncoder} that compress messages using snappy.
+ */
+final class SnappyMessageSetEncoder extends AbstractCompressedMessageSetEncoder {
+
+ SnappyMessageSetEncoder() {
+ super(Compression.SNAPPY);
+ }
+
+ @Override
+ protected OutputStream createCompressedStream(OutputStream os) throws IOException {
+ return new SnappyOutputStream(os);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/kafka/client/TopicBroker.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/TopicBroker.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/TopicBroker.java
new file mode 100644
index 0000000..fd4bf03
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/TopicBroker.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Represents broker information of a given topic.
+ */
+final class TopicBroker {
+
+ private final String topic;
+ private final InetSocketAddress address;
+ private final int partitionSize;
+
+ TopicBroker(String topic, InetSocketAddress address, int partitionSize) {
+ this.topic = topic;
+ this.address = address;
+ this.partitionSize = partitionSize;
+ }
+
+ String getTopic() {
+ return topic;
+ }
+
+ InetSocketAddress getAddress() {
+ return address;
+ }
+
+ int getPartitionSize() {
+ return partitionSize;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/kafka/client/package-info.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/package-info.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/package-info.java
new file mode 100644
index 0000000..f3f615c
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package provides pure java kafka client implementation.
+ */
+package org.apache.twill.internal.kafka.client;
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java b/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
new file mode 100644
index 0000000..12818ef
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.logging;
+
+import org.apache.twill.common.Services;
+import org.apache.twill.common.Threads;
+import org.apache.twill.internal.kafka.client.Compression;
+import org.apache.twill.internal.kafka.client.SimpleKafkaClient;
+import org.apache.twill.kafka.client.KafkaClient;
+import org.apache.twill.kafka.client.PreparePublish;
+import org.apache.twill.zookeeper.RetryStrategies;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.apache.twill.zookeeper.ZKClientServices;
+import org.apache.twill.zookeeper.ZKClients;
+import ch.qos.logback.classic.pattern.ClassOfCallerConverter;
+import ch.qos.logback.classic.pattern.FileOfCallerConverter;
+import ch.qos.logback.classic.pattern.LineOfCallerConverter;
+import ch.qos.logback.classic.pattern.MethodOfCallerConverter;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.classic.spi.IThrowableProxy;
+import ch.qos.logback.classic.spi.StackTraceElementProxy;
+import ch.qos.logback.core.AppenderBase;
+import com.google.common.base.Charsets;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.gson.stream.JsonWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ *
+ */
+public final class KafkaAppender extends AppenderBase<ILoggingEvent> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaAppender.class);
+
+ private final LogEventConverter eventConverter;
+ private final AtomicReference<PreparePublish> publisher;
+ private final Runnable flushTask;
+ /**
+ * Rough count of how many entries are being buffered. It's just approximate, not exact.
+ */
+ private final AtomicInteger bufferedSize;
+
+ private ZKClientService zkClientService;
+ private KafkaClient kafkaClient;
+ private String zkConnectStr;
+ private String hostname;
+ private String topic;
+ private Queue<String> buffer;
+ private int flushLimit = 20;
+ private int flushPeriod = 100;
+ private ScheduledExecutorService scheduler;
+
+ public KafkaAppender() {
+ eventConverter = new LogEventConverter();
+ publisher = new AtomicReference<PreparePublish>();
+ flushTask = createFlushTask();
+ bufferedSize = new AtomicInteger();
+ buffer = new ConcurrentLinkedQueue<String>();
+ }
+
+ /**
+ * Sets the zookeeper connection string. Called by slf4j.
+ */
+ @SuppressWarnings("unused")
+ public void setZookeeper(String zkConnectStr) {
+ this.zkConnectStr = zkConnectStr;
+ }
+
+ /**
+ * Sets the hostname. Called by slf4j.
+ */
+ @SuppressWarnings("unused")
+ public void setHostname(String hostname) {
+ this.hostname = hostname;
+ }
+
+ /**
+ * Sets the topic name for publishing logs. Called by slf4j.
+ */
+ @SuppressWarnings("unused")
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ /**
+ * Sets the maximum number of cached log entries before performing an force flush. Called by slf4j.
+ */
+ @SuppressWarnings("unused")
+ public void setFlushLimit(int flushLimit) {
+ this.flushLimit = flushLimit;
+ }
+
+ /**
+ * Sets the periodic flush time in milliseconds. Called by slf4j.
+ */
+ @SuppressWarnings("unused")
+ public void setFlushPeriod(int flushPeriod) {
+ this.flushPeriod = flushPeriod;
+ }
+
+ @Override
+ public void start() {
+ Preconditions.checkNotNull(zkConnectStr);
+
+ scheduler = Executors.newSingleThreadScheduledExecutor(Threads.createDaemonThreadFactory("kafka-logger"));
+
+ zkClientService = ZKClientServices.delegate(
+ ZKClients.reWatchOnExpire(
+ ZKClients.retryOnFailure(ZKClientService.Builder.of(zkConnectStr).build(),
+ RetryStrategies.fixDelay(1, TimeUnit.SECONDS))));
+
+ kafkaClient = new SimpleKafkaClient(zkClientService);
+ Futures.addCallback(Services.chainStart(zkClientService, kafkaClient), new FutureCallback<Object>() {
+ @Override
+ public void onSuccess(Object result) {
+ LOG.info("Kafka client started: " + zkConnectStr);
+ publisher.set(kafkaClient.preparePublish(topic, Compression.SNAPPY));
+ scheduler.scheduleWithFixedDelay(flushTask, 0, flushPeriod, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ // Fail to talk to kafka. Other than logging, what can be done?
+ LOG.error("Failed to start kafka client.", t);
+ }
+ });
+
+ super.start();
+ }
+
+ @Override
+ public void stop() {
+ super.stop();
+ scheduler.shutdownNow();
+ Futures.getUnchecked(Services.chainStop(kafkaClient, zkClientService));
+ }
+
+ public void forceFlush() {
+ try {
+ publishLogs().get(2, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ LOG.error("Failed to publish last batch of log.", e);
+ }
+ }
+
+ @Override
+ protected void append(ILoggingEvent eventObject) {
+ buffer.offer(eventConverter.convert(eventObject));
+ if (bufferedSize.incrementAndGet() >= flushLimit && publisher.get() != null) {
+ // Try to do a extra flush
+ scheduler.submit(flushTask);
+ }
+ }
+
+ private ListenableFuture<Integer> publishLogs() {
+ // If the publisher is not available, simply returns a completed future.
+ PreparePublish publisher = KafkaAppender.this.publisher.get();
+ if (publisher == null) {
+ return Futures.immediateFuture(0);
+ }
+
+ int count = 0;
+ for (String json : Iterables.consumingIterable(buffer)) {
+ publisher.add(Charsets.UTF_8.encode(json), 0);
+ count++;
+ }
+ // Nothing to publish, simply returns a completed future.
+ if (count == 0) {
+ return Futures.immediateFuture(0);
+ }
+
+ bufferedSize.set(0);
+ final int finalCount = count;
+ return Futures.transform(publisher.publish(), new Function<Object, Integer>() {
+ @Override
+ public Integer apply(Object input) {
+ return finalCount;
+ }
+ });
+ }
+
+ /**
+ * Creates a {@link Runnable} that writes all logs in the buffer into kafka.
+ * @return The Runnable task
+ */
+ private Runnable createFlushTask() {
+ return new Runnable() {
+ @Override
+ public void run() {
+ Futures.addCallback(publishLogs(), new FutureCallback<Integer>() {
+ @Override
+ public void onSuccess(Integer result) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Log entries published, size=" + result);
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.error("Failed to push logs to kafka. Log entries dropped.", t);
+ }
+ });
+ }
+ };
+ }
+
+ /**
+ * Helper class to convert {@link ILoggingEvent} into json string.
+ */
+ private final class LogEventConverter {
+
+ private final ClassOfCallerConverter classNameConverter = new ClassOfCallerConverter();
+ private final MethodOfCallerConverter methodConverter = new MethodOfCallerConverter();
+ private final FileOfCallerConverter fileConverter = new FileOfCallerConverter();
+ private final LineOfCallerConverter lineConverter = new LineOfCallerConverter();
+
+ private String convert(ILoggingEvent event) {
+ StringWriter result = new StringWriter();
+ JsonWriter writer = new JsonWriter(result);
+
+ try {
+ try {
+ writer.beginObject();
+ writer.name("name").value(event.getLoggerName());
+ writer.name("host").value(hostname);
+ writer.name("timestamp").value(Long.toString(event.getTimeStamp()));
+ writer.name("level").value(event.getLevel().toString());
+ writer.name("className").value(classNameConverter.convert(event));
+ writer.name("method").value(methodConverter.convert(event));
+ writer.name("file").value(fileConverter.convert(event));
+ writer.name("line").value(lineConverter.convert(event));
+ writer.name("thread").value(event.getThreadName());
+ writer.name("message").value(event.getFormattedMessage());
+ writer.name("stackTraces");
+ encodeStackTraces(event.getThrowableProxy(), writer);
+
+ writer.endObject();
+ } finally {
+ writer.close();
+ }
+ } catch (IOException e) {
+ throw Throwables.propagate(e);
+ }
+
+ return result.toString();
+ }
+
+ private void encodeStackTraces(IThrowableProxy throwable, JsonWriter writer) throws IOException {
+ writer.beginArray();
+ try {
+ if (throwable == null) {
+ return;
+ }
+
+ for (StackTraceElementProxy stackTrace : throwable.getStackTraceElementProxyArray()) {
+ writer.beginObject();
+
+ StackTraceElement element = stackTrace.getStackTraceElement();
+ writer.name("className").value(element.getClassName());
+ writer.name("method").value(element.getMethodName());
+ writer.name("file").value(element.getFileName());
+ writer.name("line").value(element.getLineNumber());
+
+ writer.endObject();
+ }
+ } finally {
+ writer.endArray();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaTwillRunnable.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaTwillRunnable.java b/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaTwillRunnable.java
new file mode 100644
index 0000000..c1695de
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaTwillRunnable.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.logging;
+
+import org.apache.twill.api.Command;
+import org.apache.twill.api.TwillContext;
+import org.apache.twill.api.TwillRunnable;
+import org.apache.twill.api.TwillRunnableSpecification;
+import org.apache.twill.internal.EnvKeys;
+import org.apache.twill.internal.kafka.EmbeddedKafkaServer;
+import org.apache.twill.internal.utils.Networks;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * A {@link org.apache.twill.api.TwillRunnable} for managing Kafka server.
+ */
+public final class KafkaTwillRunnable implements TwillRunnable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaTwillRunnable.class);
+
+ private final String kafkaDir;
+ private EmbeddedKafkaServer server;
+ private CountDownLatch stopLatch;
+
+ public KafkaTwillRunnable(String kafkaDir) {
+ this.kafkaDir = kafkaDir;
+ }
+
+ @Override
+ public TwillRunnableSpecification configure() {
+ return TwillRunnableSpecification.Builder.with()
+ .setName("kafka")
+ .withConfigs(ImmutableMap.of("kafkaDir", kafkaDir))
+ .build();
+ }
+
+ @Override
+ public void initialize(TwillContext context) {
+ Map<String, String> args = context.getSpecification().getConfigs();
+ String zkConnectStr = System.getenv(EnvKeys.TWILL_LOG_KAFKA_ZK);
+ stopLatch = new CountDownLatch(1);
+
+ try {
+ server = new EmbeddedKafkaServer(new File(args.get("kafkaDir")), generateKafkaConfig(zkConnectStr));
+ server.startAndWait();
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ public void handleCommand(Command command) throws Exception {
+ }
+
+ @Override
+ public void stop() {
+ stopLatch.countDown();
+ }
+
+ @Override
+ public void destroy() {
+ server.stopAndWait();
+ }
+
+ @Override
+ public void run() {
+ try {
+ stopLatch.await();
+ } catch (InterruptedException e) {
+ LOG.info("Running thread interrupted, shutting down kafka server.", e);
+ }
+ }
+
+ private Properties generateKafkaConfig(String zkConnectStr) {
+ int port = Networks.getRandomPort();
+ Preconditions.checkState(port > 0, "Failed to get random port.");
+
+ Properties prop = new Properties();
+ prop.setProperty("log.dir", new File("kafka-logs").getAbsolutePath());
+ prop.setProperty("zk.connect", zkConnectStr);
+ prop.setProperty("num.threads", "8");
+ prop.setProperty("port", Integer.toString(port));
+ prop.setProperty("log.flush.interval", "10000");
+ prop.setProperty("max.socket.request.bytes", "104857600");
+ prop.setProperty("log.cleanup.interval.mins", "1");
+ prop.setProperty("log.default.flush.scheduler.interval.ms", "1000");
+ prop.setProperty("zk.connectiontimeout.ms", "1000000");
+ prop.setProperty("socket.receive.buffer", "1048576");
+ prop.setProperty("enable.zookeeper", "true");
+ prop.setProperty("log.retention.hours", "168");
+ prop.setProperty("brokerid", "0");
+ prop.setProperty("socket.send.buffer", "1048576");
+ prop.setProperty("num.partitions", "1");
+ prop.setProperty("log.file.size", "536870912");
+ prop.setProperty("log.default.flush.interval.ms", "1000");
+ return prop;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/logging/LogEntryDecoder.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/logging/LogEntryDecoder.java b/twill-core/src/main/java/org/apache/twill/internal/logging/LogEntryDecoder.java
new file mode 100644
index 0000000..dc11666
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/logging/LogEntryDecoder.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.logging;
+
+import org.apache.twill.api.logging.LogEntry;
+import org.apache.twill.internal.json.JsonUtils;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+
+import java.lang.reflect.Type;
+
+/**
+ * A {@link com.google.gson.Gson} decoder for {@link LogEntry}.
+ */
+public final class LogEntryDecoder implements JsonDeserializer<LogEntry> {
+
+ @Override
+ public LogEntry deserialize(JsonElement json, Type typeOfT,
+ JsonDeserializationContext context) throws JsonParseException {
+ if (!json.isJsonObject()) {
+ return null;
+ }
+ JsonObject jsonObj = json.getAsJsonObject();
+
+ final String name = JsonUtils.getAsString(jsonObj, "name");
+ final String host = JsonUtils.getAsString(jsonObj, "host");
+ final long timestamp = JsonUtils.getAsLong(jsonObj, "timestamp", 0);
+ LogEntry.Level l;
+ try {
+ l = LogEntry.Level.valueOf(JsonUtils.getAsString(jsonObj, "level"));
+ } catch (Exception e) {
+ l = LogEntry.Level.FATAL;
+ }
+ final LogEntry.Level logLevel = l;
+ final String className = JsonUtils.getAsString(jsonObj, "className");
+ final String method = JsonUtils.getAsString(jsonObj, "method");
+ final String file = JsonUtils.getAsString(jsonObj, "file");
+ final String line = JsonUtils.getAsString(jsonObj, "line");
+ final String thread = JsonUtils.getAsString(jsonObj, "thread");
+ final String message = JsonUtils.getAsString(jsonObj, "message");
+
+ final StackTraceElement[] stackTraces = context.deserialize(jsonObj.get("stackTraces").getAsJsonArray(),
+ StackTraceElement[].class);
+
+ return new LogEntry() {
+ @Override
+ public String getLoggerName() {
+ return name;
+ }
+
+ @Override
+ public String getHost() {
+ return host;
+ }
+
+ @Override
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public Level getLogLevel() {
+ return logLevel;
+ }
+
+ @Override
+ public String getSourceClassName() {
+ return className;
+ }
+
+ @Override
+ public String getSourceMethodName() {
+ return method;
+ }
+
+ @Override
+ public String getFileName() {
+ return file;
+ }
+
+ @Override
+ public int getLineNumber() {
+ if (line.equals("?")) {
+ return -1;
+ } else {
+ return Integer.parseInt(line);
+ }
+ }
+
+ @Override
+ public String getThreadName() {
+ return thread;
+ }
+
+ @Override
+ public String getMessage() {
+ return message;
+ }
+
+ @Override
+ public StackTraceElement[] getStackTraces() {
+ return stackTraces;
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/logging/Loggings.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/logging/Loggings.java b/twill-core/src/main/java/org/apache/twill/internal/logging/Loggings.java
new file mode 100644
index 0000000..9baed63
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/logging/Loggings.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.logging;
+
+import ch.qos.logback.classic.Logger;
+import ch.qos.logback.classic.LoggerContext;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.Appender;
+import org.slf4j.ILoggerFactory;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public final class Loggings {
+
+ public static void forceFlush() {
+ ILoggerFactory loggerFactory = LoggerFactory.getILoggerFactory();
+
+ if (loggerFactory instanceof LoggerContext) {
+ Appender<ILoggingEvent> appender = ((LoggerContext) loggerFactory).getLogger(Logger.ROOT_LOGGER_NAME)
+ .getAppender("KAFKA");
+ if (appender != null && appender instanceof KafkaAppender) {
+ ((KafkaAppender) appender).forceFlush();
+ }
+ }
+ }
+
+ private Loggings() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/package-info.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/package-info.java b/twill-core/src/main/java/org/apache/twill/internal/package-info.java
new file mode 100644
index 0000000..a8459e0
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package provides internal classes for Twill.
+ */
+package org.apache.twill.internal;
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/twill-core/src/main/java/org/apache/twill/internal/state/Message.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/state/Message.java b/twill-core/src/main/java/org/apache/twill/internal/state/Message.java
new file mode 100644
index 0000000..6c3e719
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/state/Message.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.state;
+
+import org.apache.twill.api.Command;
+
+/**
+ *
+ */
+public interface Message {
+
+ /**
+ * Type of message.
+ */
+ enum Type {
+ SYSTEM,
+ USER
+ }
+
+ /**
+ * Scope of the message.
+ */
+ enum Scope {
+ APPLICATION,
+ ALL_RUNNABLE,
+ RUNNABLE
+ }
+
+ Type getType();
+
+ Scope getScope();
+
+ /**
+ * @return the name of the target runnable if scope is {@link Scope#RUNNABLE} or {@code null} otherwise.
+ */
+ String getRunnableName();
+
+ Command getCommand();
+}