You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/12/13 23:27:33 UTC
[17/28] [TWILL-14] Bootstrapping for the site generation.
Reorganization of the source tree happens:
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/kafka/client/AbstractMessageSetEncoder.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/AbstractMessageSetEncoder.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/AbstractMessageSetEncoder.java
new file mode 100644
index 0000000..9955d6a
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/AbstractMessageSetEncoder.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import java.util.zip.CRC32;
+
+/**
+ * A base implementation of {@link MessageSetEncoder}.
+ */
+abstract class AbstractMessageSetEncoder implements MessageSetEncoder {
+
+ private static final ThreadLocal<CRC32> CRC32_LOCAL = new ThreadLocal<CRC32>() {
+ @Override
+ protected CRC32 initialValue() {
+ return new CRC32();
+ }
+ };
+
+ protected final int computeCRC32(ChannelBuffer buffer) {
+ CRC32 crc32 = CRC32_LOCAL.get();
+ crc32.reset();
+
+ if (buffer.hasArray()) {
+ crc32.update(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), buffer.readableBytes());
+ } else {
+ byte[] bytes = new byte[buffer.readableBytes()];
+ buffer.getBytes(buffer.readerIndex(), bytes);
+ crc32.update(bytes);
+ }
+ return (int) crc32.getValue();
+ }
+
+ protected final ChannelBuffer encodePayload(ChannelBuffer payload) {
+ return encodePayload(payload, Compression.NONE);
+ }
+
+ protected final ChannelBuffer encodePayload(ChannelBuffer payload, Compression compression) {
+ ChannelBuffer header = ChannelBuffers.buffer(10);
+
+ int crc = computeCRC32(payload);
+
+ int magic = ((compression == Compression.NONE) ? 0 : 1);
+
+ // Message length = 1 byte magic + (optional 1 compression byte) + 4 bytes crc + payload length
+ header.writeInt(5 + magic + payload.readableBytes());
+ // Magic number = 0 for non-compressed data
+ header.writeByte(magic);
+ if (magic > 0) {
+ header.writeByte(compression.getCode());
+ }
+ header.writeInt(crc);
+
+ return ChannelBuffers.wrappedBuffer(header, payload);
+ }
+
+ protected final ChannelBuffer prefixLength(ChannelBuffer buffer) {
+ ChannelBuffer sizeBuf = ChannelBuffers.buffer(4);
+ sizeBuf.writeInt(buffer.readableBytes());
+ return ChannelBuffers.wrappedBuffer(sizeBuf, buffer);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/kafka/client/BasicFetchedMessage.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/BasicFetchedMessage.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/BasicFetchedMessage.java
new file mode 100644
index 0000000..286bf82
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/BasicFetchedMessage.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.apache.twill.kafka.client.FetchedMessage;
+
+import java.nio.ByteBuffer;
+
+/**
+ *
+ */
+final class BasicFetchedMessage implements FetchedMessage {
+
+ private final long offset;
+ private final ByteBuffer buffer;
+
+ BasicFetchedMessage(long offset, ByteBuffer buffer) {
+ this.offset = offset;
+ this.buffer = buffer;
+ }
+
+ @Override
+ public long getOffset() {
+ return offset;
+ }
+
+ @Override
+ public ByteBuffer getBuffer() {
+ return buffer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/kafka/client/Bufferer.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/Bufferer.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/Bufferer.java
new file mode 100644
index 0000000..c1fb4f2
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/Bufferer.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+/**
+ * A class to help buffering data of format [len][payload-of-len].
+ */
+final class Bufferer {
+
+ private ChannelBuffer currentBuffer = null;
+ private int currentSize = -1;
+
+ void apply(ChannelBuffer buffer) {
+ currentBuffer = concatBuffer(currentBuffer, buffer);
+ }
+
+ /**
+ * Returns the buffer if the buffer data is ready to be consumed,
+ * otherwise return {@link ChannelBuffers#EMPTY_BUFFER}.
+ */
+ ChannelBuffer getNext() {
+ if (currentSize < 0) {
+ if (currentBuffer.readableBytes() < 4) {
+ return ChannelBuffers.EMPTY_BUFFER;
+ }
+ currentSize = currentBuffer.readInt();
+ }
+
+ // Keep buffering if less then required number of bytes
+ if (currentBuffer.readableBytes() < currentSize) {
+ return ChannelBuffers.EMPTY_BUFFER;
+ }
+
+ ChannelBuffer result = currentBuffer.readSlice(currentSize);
+ currentSize = -1;
+
+ return result;
+ }
+
+ private ChannelBuffer concatBuffer(ChannelBuffer current, ChannelBuffer buffer) {
+ return current == null ? buffer : ChannelBuffers.wrappedBuffer(current, buffer);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/kafka/client/Compression.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/Compression.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/Compression.java
new file mode 100644
index 0000000..3355b9f
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/Compression.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+/**
+ * Enum for indicating compression method.
+ */
+public enum Compression {
+ NONE(0),
+ GZIP(1),
+ SNAPPY(2);
+
+ private final int code;
+
+ Compression(int code) {
+ this.code = code;
+ }
+
+ public int getCode() {
+ return code;
+ }
+
+ public static Compression fromCode(int code) {
+ switch (code) {
+ case 0:
+ return NONE;
+ case 1:
+ return GZIP;
+ case 2:
+ return SNAPPY;
+ }
+ throw new IllegalArgumentException("Unknown compression code.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ConnectionPool.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ConnectionPool.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ConnectionPool.java
new file mode 100644
index 0000000..c2865ba
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ConnectionPool.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import com.google.common.collect.Maps;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.ChannelGroupFuture;
+import org.jboss.netty.channel.group.ChannelGroupFutureListener;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+
+import java.net.InetSocketAddress;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Provides netty socket connection reuse.
+ */
+final class ConnectionPool {
+
+ private final ClientBootstrap bootstrap;
+ private final ChannelGroup channelGroup;
+ private final ConcurrentMap<InetSocketAddress, Queue<ChannelFuture>> connections;
+
+ /**
+ * For releasing a connection back to the pool.
+ */
+ interface ConnectionReleaser {
+ void release();
+ }
+
+ /**
+ * Result of a connect request.
+ */
+ interface ConnectResult extends ConnectionReleaser {
+ ChannelFuture getChannelFuture();
+ }
+
+ ConnectionPool(ClientBootstrap bootstrap) {
+ this.bootstrap = bootstrap;
+ this.channelGroup = new DefaultChannelGroup();
+ this.connections = Maps.newConcurrentMap();
+ }
+
+ ConnectResult connect(InetSocketAddress address) {
+ Queue<ChannelFuture> channelFutures = connections.get(address);
+ if (channelFutures == null) {
+ channelFutures = new ConcurrentLinkedQueue<ChannelFuture>();
+ Queue<ChannelFuture> result = connections.putIfAbsent(address, channelFutures);
+ channelFutures = result == null ? channelFutures : result;
+ }
+
+ ChannelFuture channelFuture = channelFutures.poll();
+ while (channelFuture != null) {
+ if (channelFuture.isSuccess() && channelFuture.getChannel().isConnected()) {
+ return new SimpleConnectResult(address, channelFuture);
+ }
+ channelFuture = channelFutures.poll();
+ }
+
+ channelFuture = bootstrap.connect(address);
+ channelFuture.addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (future.isSuccess()) {
+ channelGroup.add(future.getChannel());
+ }
+ }
+ });
+ return new SimpleConnectResult(address, channelFuture);
+ }
+
+ ChannelGroupFuture close() {
+ ChannelGroupFuture result = channelGroup.close();
+ result.addListener(new ChannelGroupFutureListener() {
+ @Override
+ public void operationComplete(ChannelGroupFuture future) throws Exception {
+ bootstrap.releaseExternalResources();
+ }
+ });
+ return result;
+ }
+
+ private final class SimpleConnectResult implements ConnectResult {
+
+ private final InetSocketAddress address;
+ private final ChannelFuture future;
+
+
+ private SimpleConnectResult(InetSocketAddress address, ChannelFuture future) {
+ this.address = address;
+ this.future = future;
+ }
+
+ @Override
+ public ChannelFuture getChannelFuture() {
+ return future;
+ }
+
+ @Override
+ public void release() {
+ if (future.isSuccess()) {
+ connections.get(address).offer(future);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/kafka/client/GZipMessageSetEncoder.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/GZipMessageSetEncoder.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/GZipMessageSetEncoder.java
new file mode 100644
index 0000000..daa0c2c
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/GZipMessageSetEncoder.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.zip.GZIPOutputStream;
+
+/**
+ * A {@link MessageSetEncoder} that compress message set using GZIP.
+ */
+final class GZipMessageSetEncoder extends AbstractCompressedMessageSetEncoder {
+
+ GZipMessageSetEncoder() {
+ super(Compression.GZIP);
+ }
+
+ @Override
+ protected OutputStream createCompressedStream(OutputStream os) throws IOException {
+ return new GZIPOutputStream(os);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IdentityMessageSetEncoder.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IdentityMessageSetEncoder.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IdentityMessageSetEncoder.java
new file mode 100644
index 0000000..51dc746
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IdentityMessageSetEncoder.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+/**
+ * A pass-through {@link MessageSetEncoder}.
+ */
+final class IdentityMessageSetEncoder extends AbstractMessageSetEncoder {
+
+ private ChannelBuffer messageSets = ChannelBuffers.EMPTY_BUFFER;
+
+ @Override
+ public MessageSetEncoder add(ChannelBuffer payload) {
+ messageSets = ChannelBuffers.wrappedBuffer(messageSets, encodePayload(payload));
+ return this;
+ }
+
+ @Override
+ public ChannelBuffer finish() {
+ ChannelBuffer buf = prefixLength(messageSets);
+ messageSets = ChannelBuffers.EMPTY_BUFFER;
+ return buf;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaBrokerCache.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaBrokerCache.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaBrokerCache.java
new file mode 100644
index 0000000..f2bb815
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaBrokerCache.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.apache.twill.common.Threads;
+import org.apache.twill.zookeeper.NodeChildren;
+import org.apache.twill.zookeeper.NodeData;
+import org.apache.twill.zookeeper.ZKClient;
+import com.google.common.base.Charsets;
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.SortedMap;
+
+/**
+ * A Service to cache kafka broker information by subscribing to ZooKeeper.
+ */
+final class KafkaBrokerCache extends AbstractIdleService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaBrokerCache.class);
+
+ private static final String BROKERS_PATH = "/brokers";
+
+ private final ZKClient zkClient;
+ private final Map<String, InetSocketAddress> brokers;
+ // topicBrokers is from topic->partition size->brokerId
+ private final Map<String, SortedMap<Integer, Set<String>>> topicBrokers;
+ private final Runnable invokeGetBrokers = new Runnable() {
+ @Override
+ public void run() {
+ getBrokers();
+ }
+ };
+ private final Runnable invokeGetTopics = new Runnable() {
+ @Override
+ public void run() {
+ getTopics();
+ }
+ };
+
+ KafkaBrokerCache(ZKClient zkClient) {
+ this.zkClient = zkClient;
+ this.brokers = Maps.newConcurrentMap();
+ this.topicBrokers = Maps.newConcurrentMap();
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ getBrokers();
+ getTopics();
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ // No-op
+ }
+
+ public int getPartitionSize(String topic) {
+ SortedMap<Integer, Set<String>> partitionBrokers = topicBrokers.get(topic);
+ if (partitionBrokers == null || partitionBrokers.isEmpty()) {
+ return 1;
+ }
+ return partitionBrokers.lastKey();
+ }
+
+ public TopicBroker getBrokerAddress(String topic, int partition) {
+ SortedMap<Integer, Set<String>> partitionBrokers = topicBrokers.get(topic);
+ if (partitionBrokers == null || partitionBrokers.isEmpty()) {
+ return pickRandomBroker(topic);
+ }
+
+ // If the requested partition is greater than supported partition size, randomly pick one
+ if (partition >= partitionBrokers.lastKey()) {
+ return pickRandomBroker(topic);
+ }
+
+ // Randomly pick a partition size and randomly pick a broker from it
+ Random random = new Random();
+ partitionBrokers = partitionBrokers.tailMap(partition + 1);
+ List<Integer> sizes = Lists.newArrayList(partitionBrokers.keySet());
+ Integer partitionSize = pickRandomItem(sizes, random);
+ List<String> ids = Lists.newArrayList(partitionBrokers.get(partitionSize));
+ InetSocketAddress address = brokers.get(ids.get(new Random().nextInt(ids.size())));
+ return address == null ? pickRandomBroker(topic) : new TopicBroker(topic, address, partitionSize);
+ }
+
+ private TopicBroker pickRandomBroker(String topic) {
+ Map.Entry<String, InetSocketAddress> entry = Iterables.getFirst(brokers.entrySet(), null);
+ if (entry == null) {
+ return null;
+ }
+ InetSocketAddress address = entry.getValue();
+ return new TopicBroker(topic, address, 0);
+ }
+
+ private <T> T pickRandomItem(List<T> list, Random random) {
+ return list.get(random.nextInt(list.size()));
+ }
+
+ private void getBrokers() {
+ final String idsPath = BROKERS_PATH + "/ids";
+
+ Futures.addCallback(zkClient.getChildren(idsPath, new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ getBrokers();
+ }
+ }), new ExistsOnFailureFutureCallback<NodeChildren>(idsPath, invokeGetBrokers) {
+ @Override
+ public void onSuccess(NodeChildren result) {
+ Set<String> children = ImmutableSet.copyOf(result.getChildren());
+ for (String child : children) {
+ getBrokenData(idsPath + "/" + child, child);
+ }
+ // Remove all removed brokers
+ removeDiff(children, brokers);
+ }
+ });
+ }
+
+ private void getTopics() {
+ final String topicsPath = BROKERS_PATH + "/topics";
+ Futures.addCallback(zkClient.getChildren(topicsPath, new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ getTopics();
+ }
+ }), new ExistsOnFailureFutureCallback<NodeChildren>(topicsPath, invokeGetTopics) {
+ @Override
+ public void onSuccess(NodeChildren result) {
+ Set<String> children = ImmutableSet.copyOf(result.getChildren());
+
+ // Process new children
+ for (String topic : ImmutableSet.copyOf(Sets.difference(children, topicBrokers.keySet()))) {
+ getTopic(topicsPath + "/" + topic, topic);
+ }
+
+ // Remove old children
+ removeDiff(children, topicBrokers);
+ }
+ });
+ }
+
+ private void getBrokenData(String path, final String brokerId) {
+ Futures.addCallback(zkClient.getData(path), new FutureCallback<NodeData>() {
+ @Override
+ public void onSuccess(NodeData result) {
+ String data = new String(result.getData(), Charsets.UTF_8);
+ String hostPort = data.substring(data.indexOf(':') + 1);
+ int idx = hostPort.indexOf(':');
+ brokers.put(brokerId, new InetSocketAddress(hostPort.substring(0, idx),
+ Integer.parseInt(hostPort.substring(idx + 1))));
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ // No-op, the watch on the parent node will handle it.
+ }
+ });
+ }
+
+ private void getTopic(final String path, final String topic) {
+ Futures.addCallback(zkClient.getChildren(path, new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ // Other event type changes are either could be ignored or handled by parent watcher
+ if (event.getType() == Event.EventType.NodeChildrenChanged) {
+ getTopic(path, topic);
+ }
+ }
+ }), new FutureCallback<NodeChildren>() {
+ @Override
+ public void onSuccess(NodeChildren result) {
+ List<String> children = result.getChildren();
+ final List<ListenableFuture<BrokerPartition>> futures = Lists.newArrayListWithCapacity(children.size());
+
+ // Fetch data from each broken node
+ for (final String brokerId : children) {
+ Futures.transform(zkClient.getData(path + "/" + brokerId), new Function<NodeData, BrokerPartition>() {
+ @Override
+ public BrokerPartition apply(NodeData input) {
+ return new BrokerPartition(brokerId, Integer.parseInt(new String(input.getData(), Charsets.UTF_8)));
+ }
+ });
+ }
+
+ // When all fetching is done, build the partition size->broker map for this topic
+ Futures.successfulAsList(futures).addListener(new Runnable() {
+ @Override
+ public void run() {
+ Map<Integer, Set<String>> partitionBrokers = Maps.newHashMap();
+ for (ListenableFuture<BrokerPartition> future : futures) {
+ try {
+ BrokerPartition info = future.get();
+ Set<String> brokerSet = partitionBrokers.get(info.getPartitionSize());
+ if (brokerSet == null) {
+ brokerSet = Sets.newHashSet();
+ partitionBrokers.put(info.getPartitionSize(), brokerSet);
+ }
+ brokerSet.add(info.getBrokerId());
+ } catch (Exception e) {
+ // Exception is ignored, as it will be handled by parent watcher
+ }
+ }
+ topicBrokers.put(topic, ImmutableSortedMap.copyOf(partitionBrokers));
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ // No-op. Failure would be handled by parent watcher already (e.g. node not exists -> children change in parent)
+ }
+ });
+ }
+
+ private <K, V> void removeDiff(Set<K> keys, Map<K, V> map) {
+ for (K key : ImmutableSet.copyOf(Sets.difference(map.keySet(), keys))) {
+ map.remove(key);
+ }
+ }
+
+ private abstract class ExistsOnFailureFutureCallback<V> implements FutureCallback<V> {
+
+ private final String path;
+ private final Runnable action;
+
+ protected ExistsOnFailureFutureCallback(String path, Runnable action) {
+ this.path = path;
+ this.action = action;
+ }
+
+ @Override
+ public final void onFailure(Throwable t) {
+ if (!isNotExists(t)) {
+ LOG.error("Fail to watch for kafka brokers: " + path, t);
+ return;
+ }
+
+ waitExists(path);
+ }
+
+ private boolean isNotExists(Throwable t) {
+ return ((t instanceof KeeperException) && ((KeeperException) t).code() == KeeperException.Code.NONODE);
+ }
+
+ private void waitExists(String path) {
+ LOG.info("Path " + path + " not exists. Watch for creation.");
+
+ // If the node doesn't exists, use the "exists" call to watch for node creation.
+ Futures.addCallback(zkClient.exists(path, new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ if (event.getType() == Event.EventType.NodeCreated || event.getType() == Event.EventType.NodeDeleted) {
+ action.run();
+ }
+ }
+ }), new FutureCallback<Stat>() {
+ @Override
+ public void onSuccess(Stat result) {
+ // If path exists, get children again, otherwise wait for watch to get triggered
+ if (result != null) {
+ action.run();
+ }
+ }
+ @Override
+ public void onFailure(Throwable t) {
+ action.run();
+ }
+ });
+ }
+ }
+
+ private static final class BrokerPartition {
+ private final String brokerId;
+ private final int partitionSize;
+
+ private BrokerPartition(String brokerId, int partitionSize) {
+ this.brokerId = brokerId;
+ this.partitionSize = partitionSize;
+ }
+
+ public String getBrokerId() {
+ return brokerId;
+ }
+
+ public int getPartitionSize() {
+ return partitionSize;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequest.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequest.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequest.java
new file mode 100644
index 0000000..7b43f8a
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+
+/**
+ *
+ */
+final class KafkaRequest {
+
+ public enum Type {
+ PRODUCE(0),
+ FETCH(1),
+ MULTI_FETCH(2),
+ MULTI_PRODUCE(3),
+ OFFSETS(4);
+
+ private final short id;
+
+ private Type(int id) {
+ this.id = (short) id;
+ }
+
+ public short getId() {
+ return id;
+ }
+ }
+
+ private final Type type;
+ private final String topic;
+ private final int partition;
+ private final ChannelBuffer body;
+ private final ResponseHandler responseHandler;
+
+
+ public static KafkaRequest createProduce(String topic, int partition, ChannelBuffer body) {
+ return new KafkaRequest(Type.PRODUCE, topic, partition, body, ResponseHandler.NO_OP);
+ }
+
+ public static KafkaRequest createFetch(String topic, int partition, ChannelBuffer body, ResponseHandler handler) {
+ return new KafkaRequest(Type.FETCH, topic, partition, body, handler);
+ }
+
+ public static KafkaRequest createOffsets(String topic, int partition, ChannelBuffer body, ResponseHandler handler) {
+ return new KafkaRequest(Type.OFFSETS, topic, partition, body, handler);
+ }
+
+ private KafkaRequest(Type type, String topic, int partition, ChannelBuffer body, ResponseHandler responseHandler) {
+ this.type = type;
+ this.topic = topic;
+ this.partition = partition;
+ this.body = body;
+ this.responseHandler = responseHandler;
+ }
+
+ Type getType() {
+ return type;
+ }
+
+ String getTopic() {
+ return topic;
+ }
+
+ int getPartition() {
+ return partition;
+ }
+
+ ChannelBuffer getBody() {
+ return body;
+ }
+
+ ResponseHandler getResponseHandler() {
+ return responseHandler;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestEncoder.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestEncoder.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestEncoder.java
new file mode 100644
index 0000000..ef78c76
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestEncoder.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import com.google.common.base.Charsets;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
+
+import java.nio.ByteBuffer;
+
+/**
+ *
+ */
+final class KafkaRequestEncoder extends OneToOneEncoder {
+
+ @Override
+ protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
+ if (!(msg instanceof KafkaRequest)) {
+ return msg;
+ }
+ KafkaRequest req = (KafkaRequest) msg;
+ ByteBuffer topic = Charsets.UTF_8.encode(req.getTopic());
+
+ ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(16 + topic.remaining() + req.getBody().readableBytes());
+ int writerIdx = buffer.writerIndex();
+ buffer.writerIndex(writerIdx + 4); // Reserves 4 bytes for message length
+
+ // Write out <REQUEST_TYPE>, <TOPIC_LENGTH>, <TOPIC>, <PARTITION>
+ buffer.writeShort(req.getType().getId());
+ buffer.writeShort(topic.remaining());
+ buffer.writeBytes(topic);
+ buffer.writeInt(req.getPartition());
+
+ // Write out the size of the whole buffer (excluding the size field) at the beginning
+ buffer.setInt(writerIdx, buffer.readableBytes() - 4 + req.getBody().readableBytes());
+
+ ChannelBuffer buf = ChannelBuffers.wrappedBuffer(buffer, req.getBody());
+ buf = buf.readBytes(buf.readableBytes());
+
+ return buf;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestSender.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestSender.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestSender.java
new file mode 100644
index 0000000..fbc552c
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestSender.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+/**
+ *
+ */
+interface KafkaRequestSender {
+
+ void send(KafkaRequest request);
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponse.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponse.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponse.java
new file mode 100644
index 0000000..68c1bd8
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponse.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.apache.twill.kafka.client.FetchException;
+import org.jboss.netty.buffer.ChannelBuffer;
+
+/**
+ *
+ */
+final class KafkaResponse {
+
+ private final FetchException.ErrorCode errorCode;
+ private final ChannelBuffer body;
+ private final int size;
+
+ KafkaResponse(FetchException.ErrorCode errorCode, ChannelBuffer body, int size) {
+ this.errorCode = errorCode;
+ this.body = body;
+ this.size = size;
+ }
+
+ public int getSize() {
+ return size;
+ }
+
+ public FetchException.ErrorCode getErrorCode() {
+ return errorCode;
+ }
+
+ public ChannelBuffer getBody() {
+ return body;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseDispatcher.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseDispatcher.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseDispatcher.java
new file mode 100644
index 0000000..47f70ce
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseDispatcher.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.SocketException;
+import java.nio.channels.ClosedChannelException;
+
+/**
+ *
+ */
+final class KafkaResponseDispatcher extends SimpleChannelHandler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaResponseDispatcher.class);
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+ Object attachment = ctx.getAttachment();
+ if (e.getMessage() instanceof KafkaResponse && attachment instanceof ResponseHandler) {
+ ((ResponseHandler) attachment).received((KafkaResponse) e.getMessage());
+ } else {
+ super.messageReceived(ctx, e);
+ }
+ }
+
+ @Override
+ public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+ if (e.getMessage() instanceof KafkaRequest) {
+ ctx.setAttachment(((KafkaRequest) e.getMessage()).getResponseHandler());
+ }
+ super.writeRequested(ctx, e);
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
+ if (e.getCause() instanceof ClosedChannelException || e.getCause() instanceof SocketException) {
+ // No need to log for socket exception as the client has logic to retry.
+ return;
+ }
+ LOG.warn("Exception caught in kafka client connection.", e.getCause());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseHandler.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseHandler.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseHandler.java
new file mode 100644
index 0000000..5251e65
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseHandler.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.apache.twill.kafka.client.FetchException;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+
+/**
+ *
+ */
+final class KafkaResponseHandler extends SimpleChannelHandler {
+
+ private final Bufferer bufferer = new Bufferer();
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+ Object msg = e.getMessage();
+ if (!(msg instanceof ChannelBuffer)) {
+ super.messageReceived(ctx, e);
+ return;
+ }
+
+ bufferer.apply((ChannelBuffer) msg);
+ ChannelBuffer buffer = bufferer.getNext();
+ while (buffer.readable()) {
+ // Send the response object upstream
+ Channels.fireMessageReceived(ctx, new KafkaResponse(FetchException.ErrorCode.fromCode(buffer.readShort()),
+ buffer, buffer.readableBytes() + 6));
+ buffer = bufferer.getNext();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/kafka/client/MessageFetcher.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/MessageFetcher.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/MessageFetcher.java
new file mode 100644
index 0000000..0814917
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/MessageFetcher.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.apache.twill.common.Threads;
+import org.apache.twill.kafka.client.FetchException;
+import org.apache.twill.kafka.client.FetchedMessage;
+import com.google.common.base.Throwables;
+import com.google.common.collect.AbstractIterator;
+import com.google.common.io.ByteStreams;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBufferInputStream;
+import org.jboss.netty.buffer.ChannelBufferOutputStream;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.xerial.snappy.SnappyInputStream;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.zip.GZIPInputStream;
+
+/**
+ * This class is for consuming messages from a kafka topic.
+ */
+final class MessageFetcher extends AbstractIterator<FetchedMessage> implements ResponseHandler {
+
+ private static final long BACKOFF_INTERVAL_MS = 100;
+
+ private final KafkaRequestSender sender;
+ private final String topic;
+ private final int partition;
+ private final int maxSize;
+ private final AtomicLong offset;
+ private final BlockingQueue<FetchResult> messages;
+ private final ScheduledExecutorService scheduler;
+ private volatile long backoffMillis;
+ private final Runnable sendFetchRequest = new Runnable() {
+ @Override
+ public void run() {
+ sendFetchRequest();
+ }
+ };
+
+ MessageFetcher(String topic, int partition, long offset, int maxSize, KafkaRequestSender sender) {
+ this.topic = topic;
+ this.partition = partition;
+ this.sender = sender;
+ this.offset = new AtomicLong(offset);
+ this.maxSize = maxSize;
+ this.messages = new LinkedBlockingQueue<FetchResult>();
+ this.scheduler = Executors.newSingleThreadScheduledExecutor(
+ Threads.createDaemonThreadFactory("kafka-" + topic + "-consumer"));
+ }
+
+ @Override
+ public void received(KafkaResponse response) {
+ if (response.getErrorCode() != FetchException.ErrorCode.OK) {
+ messages.add(FetchResult.failure(new FetchException("Error in fetching: " + response.getErrorCode(),
+ response.getErrorCode())));
+ return;
+ }
+
+ try {
+ if (decodeResponse(response.getBody(), -1)) {
+ backoffMillis = 0;
+ } else {
+ backoffMillis = Math.max(backoffMillis + BACKOFF_INTERVAL_MS, 1000);
+ scheduler.schedule(sendFetchRequest, backoffMillis, TimeUnit.MILLISECONDS);
+ }
+ } catch (Throwable t) {
+ messages.add(FetchResult.failure(t));
+ }
+ }
+
+ private boolean decodeResponse(ChannelBuffer buffer, long nextOffset) {
+ boolean hasMessage = false;
+ boolean computeOffset = nextOffset < 0;
+ while (buffer.readableBytes() >= 4) {
+ int size = buffer.readInt();
+ if (buffer.readableBytes() < size) {
+ if (!hasMessage) {
+ throw new IllegalStateException("Size too small");
+ }
+ break;
+ }
+ nextOffset = computeOffset ? offset.addAndGet(size + 4) : nextOffset;
+ decodeMessage(size, buffer, nextOffset);
+ hasMessage = true;
+ }
+ return hasMessage;
+
+ }
+
+ private void decodeMessage(int size, ChannelBuffer buffer, long nextOffset) {
+ int readerIdx = buffer.readerIndex();
+ int magic = buffer.readByte();
+ Compression compression = magic == 0 ? Compression.NONE : Compression.fromCode(buffer.readByte());
+ int crc = buffer.readInt();
+
+ ChannelBuffer payload = buffer.readSlice(size - (buffer.readerIndex() - readerIdx));
+
+ // Verify CRC?
+ enqueueMessage(compression, payload, nextOffset);
+ }
+
+ private void enqueueMessage(Compression compression, ChannelBuffer payload, long nextOffset) {
+ switch (compression) {
+ case NONE:
+ messages.add(FetchResult.success(new BasicFetchedMessage(nextOffset, payload.toByteBuffer())));
+ break;
+ case GZIP:
+ decodeResponse(gunzip(payload), nextOffset);
+ break;
+ case SNAPPY:
+ decodeResponse(unsnappy(payload), nextOffset);
+ break;
+ }
+ }
+
+ private ChannelBuffer gunzip(ChannelBuffer source) {
+ ChannelBufferOutputStream output = new ChannelBufferOutputStream(
+ ChannelBuffers.dynamicBuffer(source.readableBytes() * 2));
+ try {
+ try {
+ GZIPInputStream gzipInput = new GZIPInputStream(new ChannelBufferInputStream(source));
+ try {
+ ByteStreams.copy(gzipInput, output);
+ return output.buffer();
+ } finally {
+ gzipInput.close();
+ }
+ } finally {
+ output.close();
+ }
+ } catch (IOException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ private ChannelBuffer unsnappy(ChannelBuffer source) {
+ ChannelBufferOutputStream output = new ChannelBufferOutputStream(
+ ChannelBuffers.dynamicBuffer(source.readableBytes() * 2));
+ try {
+ try {
+ SnappyInputStream snappyInput = new SnappyInputStream(new ChannelBufferInputStream(source));
+ try {
+ ByteStreams.copy(snappyInput, output);
+ return output.buffer();
+ } finally {
+ snappyInput.close();
+ }
+ } finally {
+ output.close();
+ }
+ } catch (IOException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ private void sendFetchRequest() {
+ ChannelBuffer fetchBody = ChannelBuffers.buffer(12);
+ fetchBody.writeLong(offset.get());
+ fetchBody.writeInt(maxSize);
+ sender.send(KafkaRequest.createFetch(topic, partition, fetchBody, MessageFetcher.this));
+ }
+
+ @Override
+ protected FetchedMessage computeNext() {
+ FetchResult result = messages.poll();
+ if (result != null) {
+ return getMessage(result);
+ }
+
+ try {
+ sendFetchRequest();
+ return getMessage(messages.take());
+ } catch (InterruptedException e) {
+ scheduler.shutdownNow();
+ return endOfData();
+ }
+ }
+
+ private FetchedMessage getMessage(FetchResult result) {
+ try {
+ if (result.isSuccess()) {
+ return result.getMessage();
+ } else {
+ throw result.getErrorCause();
+ }
+ } catch (Throwable t) {
+ throw Throwables.propagate(t);
+ }
+ }
+
+ private static final class FetchResult {
+ private final FetchedMessage message;
+ private final Throwable errorCause;
+
+ static FetchResult success(FetchedMessage message) {
+ return new FetchResult(message, null);
+ }
+
+ static FetchResult failure(Throwable cause) {
+ return new FetchResult(null, cause);
+ }
+
+ private FetchResult(FetchedMessage message, Throwable errorCause) {
+ this.message = message;
+ this.errorCause = errorCause;
+ }
+
+ public FetchedMessage getMessage() {
+ return message;
+ }
+
+ public Throwable getErrorCause() {
+ return errorCause;
+ }
+
+ public boolean isSuccess() {
+ return message != null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/kafka/client/MessageSetEncoder.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/MessageSetEncoder.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/MessageSetEncoder.java
new file mode 100644
index 0000000..49008cc
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/MessageSetEncoder.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+
+/**
+ * This represents a set of messages that goes into the same message set and get encoded as
+ * single kafka message set.
+ */
+interface MessageSetEncoder {
+
+ MessageSetEncoder add(ChannelBuffer payload);
+
+ ChannelBuffer finish();
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ResponseHandler.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ResponseHandler.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ResponseHandler.java
new file mode 100644
index 0000000..f681b85
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ResponseHandler.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+/**
+ * Represents handler for kafka response.
+ */
+interface ResponseHandler {
+
+ ResponseHandler NO_OP = new ResponseHandler() {
+ @Override
+ public void received(KafkaResponse response) {
+ // No-op
+ }
+ };
+
+ void received(KafkaResponse response);
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaClient.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaClient.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaClient.java
new file mode 100644
index 0000000..8ff4856
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaClient.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.apache.twill.common.Threads;
+import org.apache.twill.kafka.client.FetchException;
+import org.apache.twill.kafka.client.FetchedMessage;
+import org.apache.twill.kafka.client.KafkaClient;
+import org.apache.twill.kafka.client.PreparePublish;
+import org.apache.twill.zookeeper.ZKClient;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.socket.nio.NioClientBossPool;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioWorkerPool;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Basic implementation of {@link KafkaClient}.
+ */
+public final class SimpleKafkaClient extends AbstractIdleService implements KafkaClient {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SimpleKafkaClient.class);
+ private static final int BROKER_POLL_INTERVAL = 100;
+
+ private final KafkaBrokerCache brokerCache;
+ private ClientBootstrap bootstrap;
+ private ConnectionPool connectionPool;
+
+ public SimpleKafkaClient(ZKClient zkClient) {
+ this.brokerCache = new KafkaBrokerCache(zkClient);
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ brokerCache.startAndWait();
+ ThreadFactory threadFactory = Threads.createDaemonThreadFactory("kafka-client-netty-%d");
+ NioClientBossPool bossPool = new NioClientBossPool(Executors.newSingleThreadExecutor(threadFactory), 1,
+ new HashedWheelTimer(threadFactory), null);
+ NioWorkerPool workerPool = new NioWorkerPool(Executors.newFixedThreadPool(4, threadFactory), 4);
+
+ bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(bossPool, workerPool));
+ bootstrap.setPipelineFactory(new KafkaChannelPipelineFactory());
+ connectionPool = new ConnectionPool(bootstrap);
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ connectionPool.close();
+ bootstrap.releaseExternalResources();
+ brokerCache.stopAndWait();
+ }
+
+ @Override
+ public PreparePublish preparePublish(final String topic, final Compression compression) {
+ final Map<Integer, MessageSetEncoder> encoders = Maps.newHashMap();
+
+ return new PreparePublish() {
+ @Override
+ public PreparePublish add(byte[] payload, Object partitionKey) {
+ return add(ByteBuffer.wrap(payload), partitionKey);
+ }
+
+ @Override
+ public PreparePublish add(ByteBuffer payload, Object partitionKey) {
+ // TODO: Partition
+ int partition = 0;
+
+ MessageSetEncoder encoder = encoders.get(partition);
+ if (encoder == null) {
+ encoder = getEncoder(compression);
+ encoders.put(partition, encoder);
+ }
+ encoder.add(ChannelBuffers.wrappedBuffer(payload));
+
+ return this;
+ }
+
+ @Override
+ public ListenableFuture<?> publish() {
+ List<ListenableFuture<?>> futures = Lists.newArrayListWithCapacity(encoders.size());
+ for (Map.Entry<Integer, MessageSetEncoder> entry : encoders.entrySet()) {
+ futures.add(doPublish(topic, entry.getKey(), entry.getValue().finish()));
+ }
+ encoders.clear();
+ return Futures.allAsList(futures);
+ }
+
+ private ListenableFuture<?> doPublish(String topic, int partition, ChannelBuffer messageSet) {
+ final KafkaRequest request = KafkaRequest.createProduce(topic, partition, messageSet);
+ final SettableFuture<?> result = SettableFuture.create();
+ final ConnectionPool.ConnectResult connection =
+ connectionPool.connect(getTopicBroker(topic, partition).getAddress());
+
+ connection.getChannelFuture().addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ try {
+ future.getChannel().write(request).addListener(getPublishChannelFutureListener(result, null, connection));
+ } catch (Exception e) {
+ result.setException(e);
+ }
+ }
+ });
+
+ return result;
+ }
+ };
+ }
+
+ @Override
+ public Iterator<FetchedMessage> consume(final String topic, final int partition, long offset, int maxSize) {
+ Preconditions.checkArgument(maxSize >= 10, "Message size cannot be smaller than 10.");
+
+ // Connect to broker. Consumer connection are long connection. No need to worry about reuse.
+ final AtomicReference<ChannelFuture> channelFutureRef = new AtomicReference<ChannelFuture>(
+ connectionPool.connect(getTopicBroker(topic, partition).getAddress()).getChannelFuture());
+
+ return new MessageFetcher(topic, partition, offset, maxSize, new KafkaRequestSender() {
+
+ @Override
+ public void send(final KafkaRequest request) {
+ if (!isRunning()) {
+ return;
+ }
+ try {
+ // Try to send the request
+ Channel channel = channelFutureRef.get().getChannel();
+ if (!channel.write(request).await().isSuccess()) {
+ // If failed, retry
+ channel.close();
+ ChannelFuture channelFuture = connectionPool.connect(
+ getTopicBroker(topic, partition).getAddress()).getChannelFuture();
+ channelFutureRef.set(channelFuture);
+ channelFuture.addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture channelFuture) throws Exception {
+ send(request);
+ }
+ });
+ }
+ } catch (InterruptedException e) {
+ // Ignore it
+ LOG.info("Interrupted when sending consume request", e);
+ }
+ }
+ });
+ }
+
+ @Override
+ public ListenableFuture<long[]> getOffset(final String topic, final int partition, long time, int maxOffsets) {
+ final SettableFuture<long[]> resultFuture = SettableFuture.create();
+ final ChannelBuffer body = ChannelBuffers.buffer(Longs.BYTES + Ints.BYTES);
+ body.writeLong(time);
+ body.writeInt(maxOffsets);
+
+ connectionPool.connect(getTopicBroker(topic, partition).getAddress())
+ .getChannelFuture().addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (checkFailure(future)) {
+ return;
+ }
+
+ future.getChannel().write(KafkaRequest.createOffsets(topic, partition, body, new ResponseHandler() {
+ @Override
+ public void received(KafkaResponse response) {
+ if (response.getErrorCode() != FetchException.ErrorCode.OK) {
+ resultFuture.setException(new FetchException("Failed to fetch offset.", response.getErrorCode()));
+ } else {
+ // Decode the offset response, which contains 4 bytes number of offsets, followed by number of offsets,
+ // each 8 bytes in size.
+ ChannelBuffer resultBuffer = response.getBody();
+ int size = resultBuffer.readInt();
+ long[] result = new long[size];
+ for (int i = 0; i < size; i++) {
+ result[i] = resultBuffer.readLong();
+ }
+ resultFuture.set(result);
+ }
+ }
+ })).addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ checkFailure(future);
+ }
+ });
+ }
+
+ private boolean checkFailure(ChannelFuture future) {
+ if (!future.isSuccess()) {
+ if (future.isCancelled()) {
+ resultFuture.cancel(true);
+ } else {
+ resultFuture.setException(future.getCause());
+ }
+ return true;
+ }
+ return false;
+ }
+ });
+
+ return resultFuture;
+ }
+
+ private TopicBroker getTopicBroker(String topic, int partition) {
+ TopicBroker topicBroker = brokerCache.getBrokerAddress(topic, partition);
+ while (topicBroker == null) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(BROKER_POLL_INTERVAL);
+ } catch (InterruptedException e) {
+ return null;
+ }
+ topicBroker = brokerCache.getBrokerAddress(topic, partition);
+ }
+ return topicBroker;
+ }
+
+ private MessageSetEncoder getEncoder(Compression compression) {
+ switch (compression) {
+ case GZIP:
+ return new GZipMessageSetEncoder();
+ case SNAPPY:
+ return new SnappyMessageSetEncoder();
+ default:
+ return new IdentityMessageSetEncoder();
+ }
+ }
+
+ private <V> ChannelFutureListener getPublishChannelFutureListener(final SettableFuture<V> result, final V resultObj,
+ final ConnectionPool.ConnectionReleaser releaser) {
+ return new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ try {
+ if (future.isSuccess()) {
+ result.set(resultObj);
+ } else if (future.isCancelled()) {
+ result.cancel(true);
+ } else {
+ result.setException(future.getCause());
+ }
+ } finally {
+ releaser.release();
+ }
+ }
+ };
+ }
+
+ private static final class KafkaChannelPipelineFactory implements ChannelPipelineFactory {
+
+ @Override
+ public ChannelPipeline getPipeline() throws Exception {
+ ChannelPipeline pipeline = Channels.pipeline();
+
+ pipeline.addLast("encoder", new KafkaRequestEncoder());
+ pipeline.addLast("decoder", new KafkaResponseHandler());
+ pipeline.addLast("dispatcher", new KafkaResponseDispatcher());
+ return pipeline;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SnappyMessageSetEncoder.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SnappyMessageSetEncoder.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SnappyMessageSetEncoder.java
new file mode 100644
index 0000000..bf18c08
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SnappyMessageSetEncoder.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import org.xerial.snappy.SnappyOutputStream;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A {@link MessageSetEncoder} that compress messages using snappy.
+ */
+final class SnappyMessageSetEncoder extends AbstractCompressedMessageSetEncoder {
+
+ SnappyMessageSetEncoder() {
+ super(Compression.SNAPPY);
+ }
+
+ @Override
+ protected OutputStream createCompressedStream(OutputStream os) throws IOException {
+ return new SnappyOutputStream(os);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/kafka/client/TopicBroker.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/TopicBroker.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/TopicBroker.java
new file mode 100644
index 0000000..fd4bf03
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/TopicBroker.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Represents broker information of a given topic.
+ */
+final class TopicBroker {
+
+ private final String topic;
+ private final InetSocketAddress address;
+ private final int partitionSize;
+
+ TopicBroker(String topic, InetSocketAddress address, int partitionSize) {
+ this.topic = topic;
+ this.address = address;
+ this.partitionSize = partitionSize;
+ }
+
+ String getTopic() {
+ return topic;
+ }
+
+ InetSocketAddress getAddress() {
+ return address;
+ }
+
+ int getPartitionSize() {
+ return partitionSize;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/kafka/client/package-info.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/package-info.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/package-info.java
new file mode 100644
index 0000000..f3f615c
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package provides pure java kafka client implementation.
+ */
+package org.apache.twill.internal.kafka.client;
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/4a1c943c/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java b/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
new file mode 100644
index 0000000..12818ef
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.logging;
+
+import org.apache.twill.common.Services;
+import org.apache.twill.common.Threads;
+import org.apache.twill.internal.kafka.client.Compression;
+import org.apache.twill.internal.kafka.client.SimpleKafkaClient;
+import org.apache.twill.kafka.client.KafkaClient;
+import org.apache.twill.kafka.client.PreparePublish;
+import org.apache.twill.zookeeper.RetryStrategies;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.apache.twill.zookeeper.ZKClientServices;
+import org.apache.twill.zookeeper.ZKClients;
+import ch.qos.logback.classic.pattern.ClassOfCallerConverter;
+import ch.qos.logback.classic.pattern.FileOfCallerConverter;
+import ch.qos.logback.classic.pattern.LineOfCallerConverter;
+import ch.qos.logback.classic.pattern.MethodOfCallerConverter;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.classic.spi.IThrowableProxy;
+import ch.qos.logback.classic.spi.StackTraceElementProxy;
+import ch.qos.logback.core.AppenderBase;
+import com.google.common.base.Charsets;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.gson.stream.JsonWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ *
+ */
+public final class KafkaAppender extends AppenderBase<ILoggingEvent> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaAppender.class);
+
+ private final LogEventConverter eventConverter;
+ private final AtomicReference<PreparePublish> publisher;
+ private final Runnable flushTask;
+ /**
+ * Rough count of how many entries are being buffered. It's just approximate, not exact.
+ */
+ private final AtomicInteger bufferedSize;
+
+ private ZKClientService zkClientService;
+ private KafkaClient kafkaClient;
+ private String zkConnectStr;
+ private String hostname;
+ private String topic;
+ private Queue<String> buffer;
+ private int flushLimit = 20;
+ private int flushPeriod = 100;
+ private ScheduledExecutorService scheduler;
+
+ public KafkaAppender() {
+ eventConverter = new LogEventConverter();
+ publisher = new AtomicReference<PreparePublish>();
+ flushTask = createFlushTask();
+ bufferedSize = new AtomicInteger();
+ buffer = new ConcurrentLinkedQueue<String>();
+ }
+
+ /**
+ * Sets the zookeeper connection string. Called by slf4j.
+ */
+ @SuppressWarnings("unused")
+ public void setZookeeper(String zkConnectStr) {
+ this.zkConnectStr = zkConnectStr;
+ }
+
+ /**
+ * Sets the hostname. Called by slf4j.
+ */
+ @SuppressWarnings("unused")
+ public void setHostname(String hostname) {
+ this.hostname = hostname;
+ }
+
+ /**
+ * Sets the topic name for publishing logs. Called by slf4j.
+ */
+ @SuppressWarnings("unused")
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ /**
+ * Sets the maximum number of cached log entries before performing an force flush. Called by slf4j.
+ */
+ @SuppressWarnings("unused")
+ public void setFlushLimit(int flushLimit) {
+ this.flushLimit = flushLimit;
+ }
+
+ /**
+ * Sets the periodic flush time in milliseconds. Called by slf4j.
+ */
+ @SuppressWarnings("unused")
+ public void setFlushPeriod(int flushPeriod) {
+ this.flushPeriod = flushPeriod;
+ }
+
+ @Override
+ public void start() {
+ Preconditions.checkNotNull(zkConnectStr);
+
+ scheduler = Executors.newSingleThreadScheduledExecutor(Threads.createDaemonThreadFactory("kafka-logger"));
+
+ zkClientService = ZKClientServices.delegate(
+ ZKClients.reWatchOnExpire(
+ ZKClients.retryOnFailure(ZKClientService.Builder.of(zkConnectStr).build(),
+ RetryStrategies.fixDelay(1, TimeUnit.SECONDS))));
+
+ kafkaClient = new SimpleKafkaClient(zkClientService);
+ Futures.addCallback(Services.chainStart(zkClientService, kafkaClient), new FutureCallback<Object>() {
+ @Override
+ public void onSuccess(Object result) {
+ LOG.info("Kafka client started: " + zkConnectStr);
+ publisher.set(kafkaClient.preparePublish(topic, Compression.SNAPPY));
+ scheduler.scheduleWithFixedDelay(flushTask, 0, flushPeriod, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ // Fail to talk to kafka. Other than logging, what can be done?
+ LOG.error("Failed to start kafka client.", t);
+ }
+ });
+
+ super.start();
+ }
+
+ @Override
+ public void stop() {
+ super.stop();
+ scheduler.shutdownNow();
+ Futures.getUnchecked(Services.chainStop(kafkaClient, zkClientService));
+ }
+
+ public void forceFlush() {
+ try {
+ publishLogs().get(2, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ LOG.error("Failed to publish last batch of log.", e);
+ }
+ }
+
+ @Override
+ protected void append(ILoggingEvent eventObject) {
+ buffer.offer(eventConverter.convert(eventObject));
+ if (bufferedSize.incrementAndGet() >= flushLimit && publisher.get() != null) {
+ // Try to do a extra flush
+ scheduler.submit(flushTask);
+ }
+ }
+
+ private ListenableFuture<Integer> publishLogs() {
+ // If the publisher is not available, simply returns a completed future.
+ PreparePublish publisher = KafkaAppender.this.publisher.get();
+ if (publisher == null) {
+ return Futures.immediateFuture(0);
+ }
+
+ int count = 0;
+ for (String json : Iterables.consumingIterable(buffer)) {
+ publisher.add(Charsets.UTF_8.encode(json), 0);
+ count++;
+ }
+ // Nothing to publish, simply returns a completed future.
+ if (count == 0) {
+ return Futures.immediateFuture(0);
+ }
+
+ bufferedSize.set(0);
+ final int finalCount = count;
+ return Futures.transform(publisher.publish(), new Function<Object, Integer>() {
+ @Override
+ public Integer apply(Object input) {
+ return finalCount;
+ }
+ });
+ }
+
+ /**
+ * Creates a {@link Runnable} that writes all logs in the buffer into kafka.
+ * @return The Runnable task
+ */
+ private Runnable createFlushTask() {
+ return new Runnable() {
+ @Override
+ public void run() {
+ Futures.addCallback(publishLogs(), new FutureCallback<Integer>() {
+ @Override
+ public void onSuccess(Integer result) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Log entries published, size=" + result);
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.error("Failed to push logs to kafka. Log entries dropped.", t);
+ }
+ });
+ }
+ };
+ }
+
+ /**
+ * Helper class to convert {@link ILoggingEvent} into json string.
+ */
+ private final class LogEventConverter {
+
+ private final ClassOfCallerConverter classNameConverter = new ClassOfCallerConverter();
+ private final MethodOfCallerConverter methodConverter = new MethodOfCallerConverter();
+ private final FileOfCallerConverter fileConverter = new FileOfCallerConverter();
+ private final LineOfCallerConverter lineConverter = new LineOfCallerConverter();
+
+ private String convert(ILoggingEvent event) {
+ StringWriter result = new StringWriter();
+ JsonWriter writer = new JsonWriter(result);
+
+ try {
+ try {
+ writer.beginObject();
+ writer.name("name").value(event.getLoggerName());
+ writer.name("host").value(hostname);
+ writer.name("timestamp").value(Long.toString(event.getTimeStamp()));
+ writer.name("level").value(event.getLevel().toString());
+ writer.name("className").value(classNameConverter.convert(event));
+ writer.name("method").value(methodConverter.convert(event));
+ writer.name("file").value(fileConverter.convert(event));
+ writer.name("line").value(lineConverter.convert(event));
+ writer.name("thread").value(event.getThreadName());
+ writer.name("message").value(event.getFormattedMessage());
+ writer.name("stackTraces");
+ encodeStackTraces(event.getThrowableProxy(), writer);
+
+ writer.endObject();
+ } finally {
+ writer.close();
+ }
+ } catch (IOException e) {
+ throw Throwables.propagate(e);
+ }
+
+ return result.toString();
+ }
+
+ private void encodeStackTraces(IThrowableProxy throwable, JsonWriter writer) throws IOException {
+ writer.beginArray();
+ try {
+ if (throwable == null) {
+ return;
+ }
+
+ for (StackTraceElementProxy stackTrace : throwable.getStackTraceElementProxyArray()) {
+ writer.beginObject();
+
+ StackTraceElement element = stackTrace.getStackTraceElement();
+ writer.name("className").value(element.getClassName());
+ writer.name("method").value(element.getMethodName());
+ writer.name("file").value(element.getFileName());
+ writer.name("line").value(element.getLineNumber());
+
+ writer.endObject();
+ }
+ } finally {
+ writer.endArray();
+ }
+ }
+ }
+}