You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2014/04/22 08:06:03 UTC
[06/50] [abbrv] git commit: [TWILL-27] Upgrade to Kafka-0.8
[TWILL-27] Upgrade to Kafka-0.8
Signed-off-by: Terence Yim <te...@continuuity.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/fcbe54d5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/fcbe54d5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/fcbe54d5
Branch: refs/heads/site
Commit: fcbe54d50d22a4a85eaea35612cd9db143deef67
Parents: 15de6ae
Author: Terence Yim <te...@continuuity.com>
Authored: Wed Jan 22 17:07:52 2014 -0800
Committer: Terence Yim <te...@continuuity.com>
Committed: Mon Jan 27 13:40:28 2014 -0800
----------------------------------------------------------------------
pom.xml | 19 +
twill-core/pom.xml | 4 +
.../twill/internal/AbstractTwillController.java | 124 ++---
.../org/apache/twill/internal/Constants.java | 1 -
.../internal/kafka/EmbeddedKafkaServer.java | 64 +--
.../AbstractCompressedMessageSetEncoder.java | 78 ---
.../kafka/client/AbstractMessageSetEncoder.java | 79 ---
.../kafka/client/BasicFetchedMessage.java | 34 +-
.../twill/internal/kafka/client/Bufferer.java | 61 ---
.../kafka/client/ByteBufferEncoder.java | 38 ++
.../internal/kafka/client/Compression.java | 49 --
.../internal/kafka/client/ConnectionPool.java | 125 -----
.../kafka/client/GZipMessageSetEncoder.java | 37 --
.../kafka/client/IdentityMessageSetEncoder.java | 42 --
.../internal/kafka/client/IntegerEncoder.java | 35 ++
.../kafka/client/IntegerPartitioner.java | 34 ++
.../internal/kafka/client/KafkaBrokerCache.java | 326 -------------
.../internal/kafka/client/KafkaRequest.java | 91 ----
.../kafka/client/KafkaRequestEncoder.java | 60 ---
.../kafka/client/KafkaRequestSender.java | 26 -
.../internal/kafka/client/KafkaResponse.java | 49 --
.../kafka/client/KafkaResponseDispatcher.java | 63 ---
.../kafka/client/KafkaResponseHandler.java | 51 --
.../internal/kafka/client/MessageFetcher.java | 243 ----------
.../kafka/client/MessageSetEncoder.java | 31 --
.../internal/kafka/client/ResponseHandler.java | 33 --
.../kafka/client/SimpleKafkaClient.java | 304 ------------
.../kafka/client/SimpleKafkaConsumer.java | 481 +++++++++++++++++++
.../kafka/client/SimpleKafkaPublisher.java | 113 +++++
.../kafka/client/SnappyMessageSetEncoder.java | 38 --
.../internal/kafka/client/TopicBroker.java | 48 --
.../internal/kafka/client/ZKBrokerService.java | 403 ++++++++++++++++
.../kafka/client/ZKKafkaClientService.java | 118 +++++
.../internal/kafka/client/package-info.java | 2 +-
.../twill/internal/logging/KafkaAppender.java | 43 +-
.../internal/logging/KafkaTwillRunnable.java | 122 -----
.../apache/twill/kafka/client/BrokerInfo.java | 65 +++
.../twill/kafka/client/BrokerService.java | 48 ++
.../apache/twill/kafka/client/Compression.java | 38 ++
.../twill/kafka/client/FetchException.java | 77 ---
.../twill/kafka/client/FetchedMessage.java | 13 +-
.../apache/twill/kafka/client/KafkaClient.java | 36 +-
.../twill/kafka/client/KafkaClientService.java | 26 +
.../twill/kafka/client/KafkaConsumer.java | 92 ++++
.../twill/kafka/client/KafkaPublisher.java | 95 ++++
.../twill/kafka/client/PreparePublish.java | 34 --
.../twill/kafka/client/TopicPartition.java | 70 +++
.../apache/twill/kafka/client/package-info.java | 2 +-
twill-core/src/main/resources/kafka-0.7.2.tgz | Bin 8811693 -> 0 bytes
.../apache/twill/kafka/client/KafkaTest.java | 139 ++----
.../appmaster/ApplicationMasterService.java | 93 ++--
.../apache/twill/yarn/YarnTwillPreparer.java | 10 -
52 files changed, 1889 insertions(+), 2418 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0c6e9dd..3d7cebe 100644
--- a/pom.xml
+++ b/pom.xml
@@ -522,6 +522,25 @@
<version>4.0</version>
</dependency>
<dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.10</artifactId>
+ <version>0.8.0</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jdmk</groupId>
+ <artifactId>jmxtools</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jmx</groupId>
+ <artifactId>jmxri</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
<version>${hadoop.version}</version>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/pom.xml
----------------------------------------------------------------------
diff --git a/twill-core/pom.xml b/twill-core/pom.xml
index faff711..d998b40 100644
--- a/twill-core/pom.xml
+++ b/twill-core/pom.xml
@@ -78,6 +78,10 @@
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.10</artifactId>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
index 97e0a8f..71f0c14 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
@@ -17,33 +17,34 @@
*/
package org.apache.twill.internal;
+import com.google.common.base.Charsets;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
import org.apache.twill.api.RunId;
import org.apache.twill.api.TwillController;
import org.apache.twill.api.logging.LogEntry;
import org.apache.twill.api.logging.LogHandler;
+import org.apache.twill.common.Cancellable;
import org.apache.twill.discovery.DiscoveryServiceClient;
import org.apache.twill.discovery.ServiceDiscovered;
import org.apache.twill.discovery.ZKDiscoveryService;
import org.apache.twill.internal.json.StackTraceElementCodec;
-import org.apache.twill.internal.kafka.client.SimpleKafkaClient;
+import org.apache.twill.internal.kafka.client.ZKKafkaClientService;
import org.apache.twill.internal.logging.LogEntryDecoder;
import org.apache.twill.internal.state.SystemMessages;
import org.apache.twill.kafka.client.FetchedMessage;
-import org.apache.twill.kafka.client.KafkaClient;
+import org.apache.twill.kafka.client.KafkaClientService;
+import org.apache.twill.kafka.client.KafkaConsumer;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.twill.zookeeper.ZKClients;
-import com.google.common.base.Charsets;
-import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.TimeUnit;
/**
* A abstract base class for {@link org.apache.twill.api.TwillController} implementation that uses Zookeeper to controller a
@@ -52,47 +53,47 @@ import java.util.concurrent.TimeUnit;
public abstract class AbstractTwillController extends AbstractZKServiceController implements TwillController {
private static final Logger LOG = LoggerFactory.getLogger(AbstractTwillController.class);
- private static final int MAX_KAFKA_FETCH_SIZE = 1048576;
- private static final long SHUTDOWN_TIMEOUT_MS = 2000;
- private static final long LOG_FETCH_TIMEOUT_MS = 5000;
private final Queue<LogHandler> logHandlers;
- private final KafkaClient kafkaClient;
+ private final KafkaClientService kafkaClient;
private final DiscoveryServiceClient discoveryServiceClient;
- private final LogPollerThread logPoller;
+ private volatile Cancellable logCancellable;
public AbstractTwillController(RunId runId, ZKClient zkClient, Iterable<LogHandler> logHandlers) {
super(runId, zkClient);
this.logHandlers = new ConcurrentLinkedQueue<LogHandler>();
- this.kafkaClient = new SimpleKafkaClient(ZKClients.namespace(zkClient, "/" + runId.getId() + "/kafka"));
+ this.kafkaClient = new ZKKafkaClientService(ZKClients.namespace(zkClient, "/" + runId.getId() + "/kafka"));
this.discoveryServiceClient = new ZKDiscoveryService(zkClient);
Iterables.addAll(this.logHandlers, logHandlers);
- this.logPoller = new LogPollerThread(runId, kafkaClient, logHandlers);
}
@Override
- protected void doStartUp() {
+ protected synchronized void doStartUp() {
if (!logHandlers.isEmpty()) {
- logPoller.start();
+ kafkaClient.startAndWait();
+ logCancellable = kafkaClient.getConsumer().prepare()
+ .addFromBeginning(Constants.LOG_TOPIC, 0)
+ .consume(new LogMessageCallback(logHandlers));
}
}
@Override
protected void doShutDown() {
- logPoller.terminate();
- try {
- // Wait for the poller thread to stop.
- logPoller.join(SHUTDOWN_TIMEOUT_MS);
- } catch (InterruptedException e) {
- LOG.warn("Joining of log poller thread interrupted.", e);
+ if (logCancellable != null) {
+ logCancellable.cancel();
}
+ // Safe to call stop no matter when state the KafkaClientService is in.
+ kafkaClient.stopAndWait();
}
@Override
public final synchronized void addLogHandler(LogHandler handler) {
logHandlers.add(handler);
- if (!logPoller.isAlive()) {
- logPoller.start();
+ if (logHandlers.size() == 1) {
+ kafkaClient.startAndWait();
+ logCancellable = kafkaClient.getConsumer().prepare()
+ .addFromBeginning(Constants.LOG_TOPIC, 0)
+ .consume(new LogMessageCallback(logHandlers));
}
}
@@ -106,74 +107,45 @@ public abstract class AbstractTwillController extends AbstractZKServiceControlle
return sendMessage(SystemMessages.setInstances(runnable, newCount), newCount);
}
- private static final class LogPollerThread extends Thread {
+ private static final class LogMessageCallback implements KafkaConsumer.MessageCallback {
+
+ private static final Gson GSON = new GsonBuilder().registerTypeAdapter(LogEntry.class, new LogEntryDecoder())
+ .registerTypeAdapter(StackTraceElement.class, new StackTraceElementCodec())
+ .create();
- private final KafkaClient kafkaClient;
private final Iterable<LogHandler> logHandlers;
- private volatile boolean running = true;
- LogPollerThread(RunId runId, KafkaClient kafkaClient, Iterable<LogHandler> logHandlers) {
- super("twill-log-poller-" + runId.getId());
- setDaemon(true);
- this.kafkaClient = kafkaClient;
+ private LogMessageCallback(Iterable<LogHandler> logHandlers) {
this.logHandlers = logHandlers;
}
@Override
- public void run() {
- LOG.info("Twill log poller thread '{}' started.", getName());
- kafkaClient.startAndWait();
- Gson gson = new GsonBuilder().registerTypeAdapter(LogEntry.class, new LogEntryDecoder())
- .registerTypeAdapter(StackTraceElement.class, new StackTraceElementCodec())
- .create();
-
- while (running && !isInterrupted()) {
- long offset;
- try {
- // Get the earliest offset
- long[] offsets = kafkaClient.getOffset(Constants.LOG_TOPIC, 0, -2, 1).get(LOG_FETCH_TIMEOUT_MS,
- TimeUnit.MILLISECONDS);
- // Should have one entry
- offset = offsets[0];
- } catch (Throwable t) {
- // Keep retrying
- LOG.warn("Failed to fetch offsets from Kafka. Retrying.", t);
- continue;
- }
-
- // Now fetch log messages from Kafka
- Iterator<FetchedMessage> messageIterator = kafkaClient.consume(Constants.LOG_TOPIC, 0,
- offset, MAX_KAFKA_FETCH_SIZE);
+ public void onReceived(Iterator<FetchedMessage> messages) {
+ while (messages.hasNext()) {
+ String json = Charsets.UTF_8.decode(messages.next().getPayload()).toString();
try {
- while (messageIterator.hasNext()) {
- String json = Charsets.UTF_8.decode(messageIterator.next().getBuffer()).toString();
- try {
- LogEntry entry = gson.fromJson(json, LogEntry.class);
- if (entry != null) {
- invokeHandlers(entry);
- }
- } catch (Exception e) {
- LOG.error("Failed to decode log entry {}", json, e);
- }
+ LogEntry entry = GSON.fromJson(json, LogEntry.class);
+ if (entry != null) {
+ invokeHandlers(entry);
}
- } catch (Throwable t) {
- LOG.warn("Exception while fetching log message from Kafka. Retrying.", t);
- continue;
+ } catch (Exception e) {
+ LOG.error("Failed to decode log entry {}", json, e);
}
}
-
- kafkaClient.stopAndWait();
- LOG.info("Twill log poller thread stopped.");
}
- void terminate() {
- running = false;
- interrupt();
+ @Override
+ public void finished() {
+ // No-op
}
private void invokeHandlers(LogEntry entry) {
for (LogHandler handler : logHandlers) {
- handler.onLog(entry);
+ try {
+ handler.onLog(entry);
+ } catch (Throwable t) {
+ LOG.warn("Exception while calling LogHandler {}", handler, t);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/internal/Constants.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/Constants.java b/twill-core/src/main/java/org/apache/twill/internal/Constants.java
index 0387d3e..efe91a7 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/Constants.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/Constants.java
@@ -51,7 +51,6 @@ public final class Constants {
public static final String TWILL_SPEC = "twillSpec.json";
public static final String ARGUMENTS = "arguments.json";
public static final String LOGBACK_TEMPLATE = "logback-template.xml";
- public static final String KAFKA = "kafka.tgz";
public static final String JVM_OPTIONS = "jvm.opts";
public static final String CREDENTIALS = "credentials.store";
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java
index 14dfc70..d82d617 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java
@@ -17,77 +17,33 @@
*/
package org.apache.twill.internal.kafka;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AbstractIdleService;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServerStartable;
import java.io.File;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.List;
import java.util.Properties;
/**
- *
+ * A {@link com.google.common.util.concurrent.Service} implementation for running an instance of Kafka server in
+ * the same process.
*/
public final class EmbeddedKafkaServer extends AbstractIdleService {
- private static final String KAFAK_CONFIG_CLASS = "kafka.server.KafkaConfig";
- private static final String KAFKA_SERVER_CLASS = "kafka.server.KafkaServerStartable";
-
- private final Object server;
-
- public EmbeddedKafkaServer(File kafkaDir, Properties properties) {
- this(createClassLoader(kafkaDir), properties);
- }
-
- public EmbeddedKafkaServer(ClassLoader classLoader, Properties properties) {
- try {
- Class<?> configClass = classLoader.loadClass(KAFAK_CONFIG_CLASS);
- Object config = configClass.getConstructor(Properties.class).newInstance(properties);
+ private final KafkaServerStartable server;
- Class<?> serverClass = classLoader.loadClass(KAFKA_SERVER_CLASS);
- server = serverClass.getConstructor(configClass).newInstance(config);
- } catch (Exception e) {
- throw Throwables.propagate(e);
- }
+ public EmbeddedKafkaServer(Properties properties) {
+ server = new KafkaServerStartable(new KafkaConfig(properties));
}
@Override
protected void startUp() throws Exception {
- server.getClass().getMethod("startup").invoke(server);
+ server.startup();
}
@Override
protected void shutDown() throws Exception {
- server.getClass().getMethod("shutdown").invoke(server);
- server.getClass().getMethod("awaitShutdown").invoke(server);
- }
-
- private static ClassLoader createClassLoader(File kafkaDir) {
- ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
- ClassLoader thisClassLoader = EmbeddedKafkaServer.class.getClassLoader();
- ClassLoader parent = contextClassLoader != null
- ? contextClassLoader
- : thisClassLoader != null
- ? thisClassLoader : ClassLoader.getSystemClassLoader();
-
- return new URLClassLoader(findJars(kafkaDir, Lists.<URL>newArrayList()).toArray(new URL[0]), parent);
- }
-
- private static List<URL> findJars(File dir, List<URL> urls) {
- try {
- for (File file : dir.listFiles()) {
- if (file.isDirectory()) {
- findJars(file, urls);
- } else if (file.getName().endsWith(".jar")) {
- urls.add(file.toURI().toURL());
- }
- }
- return urls;
- } catch (MalformedURLException e) {
- throw Throwables.propagate(e);
- }
+ server.shutdown();
+ server.awaitShutdown();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/internal/kafka/client/AbstractCompressedMessageSetEncoder.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/AbstractCompressedMessageSetEncoder.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/AbstractCompressedMessageSetEncoder.java
deleted file mode 100644
index a9c3381..0000000
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/AbstractCompressedMessageSetEncoder.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-import com.google.common.base.Throwables;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBufferOutputStream;
-import org.jboss.netty.buffer.ChannelBuffers;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-/**
- * A base implementation of {@link MessageSetEncoder} that do message compression.
- */
-abstract class AbstractCompressedMessageSetEncoder extends AbstractMessageSetEncoder {
-
- private final Compression compression;
- private ChannelBufferOutputStream os;
- private OutputStream compressedOutput;
-
-
- protected AbstractCompressedMessageSetEncoder(Compression compression) {
- this.compression = compression;
- try {
- this.os = new ChannelBufferOutputStream(ChannelBuffers.dynamicBuffer());
- this.compressedOutput = createCompressedStream(os);
- } catch (IOException e) {
- // Should never happen
- throw Throwables.propagate(e);
- }
- }
-
- @Override
- public final MessageSetEncoder add(ChannelBuffer payload) {
- try {
- ChannelBuffer encoded = encodePayload(payload);
- encoded.readBytes(compressedOutput, encoded.readableBytes());
- } catch (IOException e) {
- throw Throwables.propagate(e);
- }
- return this;
-
- }
-
- @Override
- public final ChannelBuffer finish() {
- try {
- compressedOutput.close();
- ChannelBuffer buf = prefixLength(encodePayload(os.buffer(), compression));
- compressedOutput = createCompressedStream(os);
- os.buffer().clear();
-
- return buf;
-
- } catch (IOException e) {
- throw Throwables.propagate(e);
- }
-
- }
-
- protected abstract OutputStream createCompressedStream(OutputStream os) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/internal/kafka/client/AbstractMessageSetEncoder.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/AbstractMessageSetEncoder.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/AbstractMessageSetEncoder.java
deleted file mode 100644
index 9955d6a..0000000
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/AbstractMessageSetEncoder.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-
-import java.util.zip.CRC32;
-
-/**
- * A base implementation of {@link MessageSetEncoder}.
- */
-abstract class AbstractMessageSetEncoder implements MessageSetEncoder {
-
- private static final ThreadLocal<CRC32> CRC32_LOCAL = new ThreadLocal<CRC32>() {
- @Override
- protected CRC32 initialValue() {
- return new CRC32();
- }
- };
-
- protected final int computeCRC32(ChannelBuffer buffer) {
- CRC32 crc32 = CRC32_LOCAL.get();
- crc32.reset();
-
- if (buffer.hasArray()) {
- crc32.update(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), buffer.readableBytes());
- } else {
- byte[] bytes = new byte[buffer.readableBytes()];
- buffer.getBytes(buffer.readerIndex(), bytes);
- crc32.update(bytes);
- }
- return (int) crc32.getValue();
- }
-
- protected final ChannelBuffer encodePayload(ChannelBuffer payload) {
- return encodePayload(payload, Compression.NONE);
- }
-
- protected final ChannelBuffer encodePayload(ChannelBuffer payload, Compression compression) {
- ChannelBuffer header = ChannelBuffers.buffer(10);
-
- int crc = computeCRC32(payload);
-
- int magic = ((compression == Compression.NONE) ? 0 : 1);
-
- // Message length = 1 byte magic + (optional 1 compression byte) + 4 bytes crc + payload length
- header.writeInt(5 + magic + payload.readableBytes());
- // Magic number = 0 for non-compressed data
- header.writeByte(magic);
- if (magic > 0) {
- header.writeByte(compression.getCode());
- }
- header.writeInt(crc);
-
- return ChannelBuffers.wrappedBuffer(header, payload);
- }
-
- protected final ChannelBuffer prefixLength(ChannelBuffer buffer) {
- ChannelBuffer sizeBuf = ChannelBuffers.buffer(4);
- sizeBuf.writeInt(buffer.readableBytes());
- return ChannelBuffers.wrappedBuffer(sizeBuf, buffer);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/internal/kafka/client/BasicFetchedMessage.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/BasicFetchedMessage.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/BasicFetchedMessage.java
index 286bf82..ee53ed4 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/BasicFetchedMessage.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/BasicFetchedMessage.java
@@ -18,29 +18,43 @@
package org.apache.twill.internal.kafka.client;
import org.apache.twill.kafka.client.FetchedMessage;
+import org.apache.twill.kafka.client.TopicPartition;
import java.nio.ByteBuffer;
/**
- *
+ * An implementation of FetchedMessage that provides setters as well.
*/
final class BasicFetchedMessage implements FetchedMessage {
- private final long offset;
- private final ByteBuffer buffer;
+ private final TopicPartition topicPartition;
+ private ByteBuffer payload;
+ private long nextOffset;
+
+ BasicFetchedMessage(TopicPartition topicPartition) {
+ this.topicPartition = topicPartition;
+ }
+
+ void setPayload(ByteBuffer payload) {
+ this.payload = payload;
+ }
+
+ void setNextOffset(long nextOffset) {
+ this.nextOffset = nextOffset;
+ }
- BasicFetchedMessage(long offset, ByteBuffer buffer) {
- this.offset = offset;
- this.buffer = buffer;
+ @Override
+ public TopicPartition getTopicPartition() {
+ return topicPartition;
}
@Override
- public long getOffset() {
- return offset;
+ public ByteBuffer getPayload() {
+ return payload;
}
@Override
- public ByteBuffer getBuffer() {
- return buffer;
+ public long getNextOffset() {
+ return nextOffset;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/internal/kafka/client/Bufferer.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/Bufferer.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/Bufferer.java
deleted file mode 100644
index c1fb4f2..0000000
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/Bufferer.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-
-/**
- * A class to help buffering data of format [len][payload-of-len].
- */
-final class Bufferer {
-
- private ChannelBuffer currentBuffer = null;
- private int currentSize = -1;
-
- void apply(ChannelBuffer buffer) {
- currentBuffer = concatBuffer(currentBuffer, buffer);
- }
-
- /**
- * Returns the buffer if the buffer data is ready to be consumed,
- * otherwise return {@link ChannelBuffers#EMPTY_BUFFER}.
- */
- ChannelBuffer getNext() {
- if (currentSize < 0) {
- if (currentBuffer.readableBytes() < 4) {
- return ChannelBuffers.EMPTY_BUFFER;
- }
- currentSize = currentBuffer.readInt();
- }
-
- // Keep buffering if less then required number of bytes
- if (currentBuffer.readableBytes() < currentSize) {
- return ChannelBuffers.EMPTY_BUFFER;
- }
-
- ChannelBuffer result = currentBuffer.readSlice(currentSize);
- currentSize = -1;
-
- return result;
- }
-
- private ChannelBuffer concatBuffer(ChannelBuffer current, ChannelBuffer buffer) {
- return current == null ? buffer : ChannelBuffers.wrappedBuffer(current, buffer);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ByteBufferEncoder.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ByteBufferEncoder.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ByteBufferEncoder.java
new file mode 100644
index 0000000..9211d92
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ByteBufferEncoder.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import kafka.serializer.Encoder;
+import kafka.utils.VerifiableProperties;
+
+import java.nio.ByteBuffer;
+
+/**
+ * A kafka {@link kafka.serializer.Encoder} for encoding byte buffer into byte array.
+ */
+public final class ByteBufferEncoder implements Encoder<ByteBuffer> {
+
+ public ByteBufferEncoder(VerifiableProperties properties) {
+ }
+
+ public byte[] toBytes(ByteBuffer buffer) {
+ byte[] bytes = new byte[buffer.remaining()];
+ buffer.get(bytes);
+ return bytes;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/internal/kafka/client/Compression.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/Compression.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/Compression.java
deleted file mode 100644
index 3355b9f..0000000
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/Compression.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-/**
- * Enum for indicating compression method.
- */
-public enum Compression {
- NONE(0),
- GZIP(1),
- SNAPPY(2);
-
- private final int code;
-
- Compression(int code) {
- this.code = code;
- }
-
- public int getCode() {
- return code;
- }
-
- public static Compression fromCode(int code) {
- switch (code) {
- case 0:
- return NONE;
- case 1:
- return GZIP;
- case 2:
- return SNAPPY;
- }
- throw new IllegalArgumentException("Unknown compression code.");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ConnectionPool.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ConnectionPool.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ConnectionPool.java
deleted file mode 100644
index c2865ba..0000000
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ConnectionPool.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-import com.google.common.collect.Maps;
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.ChannelGroupFuture;
-import org.jboss.netty.channel.group.ChannelGroupFutureListener;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-
-import java.net.InetSocketAddress;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * Provides netty socket connection reuse.
- */
-final class ConnectionPool {
-
- private final ClientBootstrap bootstrap;
- private final ChannelGroup channelGroup;
- private final ConcurrentMap<InetSocketAddress, Queue<ChannelFuture>> connections;
-
- /**
- * For releasing a connection back to the pool.
- */
- interface ConnectionReleaser {
- void release();
- }
-
- /**
- * Result of a connect request.
- */
- interface ConnectResult extends ConnectionReleaser {
- ChannelFuture getChannelFuture();
- }
-
- ConnectionPool(ClientBootstrap bootstrap) {
- this.bootstrap = bootstrap;
- this.channelGroup = new DefaultChannelGroup();
- this.connections = Maps.newConcurrentMap();
- }
-
- ConnectResult connect(InetSocketAddress address) {
- Queue<ChannelFuture> channelFutures = connections.get(address);
- if (channelFutures == null) {
- channelFutures = new ConcurrentLinkedQueue<ChannelFuture>();
- Queue<ChannelFuture> result = connections.putIfAbsent(address, channelFutures);
- channelFutures = result == null ? channelFutures : result;
- }
-
- ChannelFuture channelFuture = channelFutures.poll();
- while (channelFuture != null) {
- if (channelFuture.isSuccess() && channelFuture.getChannel().isConnected()) {
- return new SimpleConnectResult(address, channelFuture);
- }
- channelFuture = channelFutures.poll();
- }
-
- channelFuture = bootstrap.connect(address);
- channelFuture.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (future.isSuccess()) {
- channelGroup.add(future.getChannel());
- }
- }
- });
- return new SimpleConnectResult(address, channelFuture);
- }
-
- ChannelGroupFuture close() {
- ChannelGroupFuture result = channelGroup.close();
- result.addListener(new ChannelGroupFutureListener() {
- @Override
- public void operationComplete(ChannelGroupFuture future) throws Exception {
- bootstrap.releaseExternalResources();
- }
- });
- return result;
- }
-
- private final class SimpleConnectResult implements ConnectResult {
-
- private final InetSocketAddress address;
- private final ChannelFuture future;
-
-
- private SimpleConnectResult(InetSocketAddress address, ChannelFuture future) {
- this.address = address;
- this.future = future;
- }
-
- @Override
- public ChannelFuture getChannelFuture() {
- return future;
- }
-
- @Override
- public void release() {
- if (future.isSuccess()) {
- connections.get(address).offer(future);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/internal/kafka/client/GZipMessageSetEncoder.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/GZipMessageSetEncoder.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/GZipMessageSetEncoder.java
deleted file mode 100644
index daa0c2c..0000000
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/GZipMessageSetEncoder.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.zip.GZIPOutputStream;
-
-/**
- * A {@link MessageSetEncoder} that compress message set using GZIP.
- */
-final class GZipMessageSetEncoder extends AbstractCompressedMessageSetEncoder {
-
- GZipMessageSetEncoder() {
- super(Compression.GZIP);
- }
-
- @Override
- protected OutputStream createCompressedStream(OutputStream os) throws IOException {
- return new GZIPOutputStream(os);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IdentityMessageSetEncoder.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IdentityMessageSetEncoder.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IdentityMessageSetEncoder.java
deleted file mode 100644
index 51dc746..0000000
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IdentityMessageSetEncoder.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-
-/**
- * A pass-through {@link MessageSetEncoder}.
- */
-final class IdentityMessageSetEncoder extends AbstractMessageSetEncoder {
-
- private ChannelBuffer messageSets = ChannelBuffers.EMPTY_BUFFER;
-
- @Override
- public MessageSetEncoder add(ChannelBuffer payload) {
- messageSets = ChannelBuffers.wrappedBuffer(messageSets, encodePayload(payload));
- return this;
- }
-
- @Override
- public ChannelBuffer finish() {
- ChannelBuffer buf = prefixLength(messageSets);
- messageSets = ChannelBuffers.EMPTY_BUFFER;
- return buf;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IntegerEncoder.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IntegerEncoder.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IntegerEncoder.java
new file mode 100644
index 0000000..cbe7eaa
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IntegerEncoder.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import com.google.common.primitives.Ints;
+import kafka.serializer.Encoder;
+import kafka.utils.VerifiableProperties;
+
+/**
+ * A kafka {@link kafka.serializer.Encoder} for encoding integer into bytes.
+ */
+public final class IntegerEncoder implements Encoder<Integer> {
+
+ public IntegerEncoder(VerifiableProperties properties) {
+ }
+
+ public byte[] toBytes(Integer buffer) {
+ return Ints.toByteArray(buffer.intValue());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IntegerPartitioner.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IntegerPartitioner.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IntegerPartitioner.java
new file mode 100644
index 0000000..4aa7940
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IntegerPartitioner.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.kafka.client;
+
+import kafka.producer.Partitioner;
+import kafka.utils.VerifiableProperties;
+
+/**
+ * A kafka {@link kafka.producer.Partitioner} using integer key to compute partition id.
+ */
+public final class IntegerPartitioner implements Partitioner<Integer> {
+
+ public IntegerPartitioner(VerifiableProperties properties) {
+ }
+
+ public int partition(Integer key, int numPartitions) {
+ return key % numPartitions;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaBrokerCache.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaBrokerCache.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaBrokerCache.java
deleted file mode 100644
index f2bb815..0000000
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaBrokerCache.java
+++ /dev/null
@@ -1,326 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-import org.apache.twill.common.Threads;
-import org.apache.twill.zookeeper.NodeChildren;
-import org.apache.twill.zookeeper.NodeData;
-import org.apache.twill.zookeeper.ZKClient;
-import com.google.common.base.Charsets;
-import com.google.common.base.Function;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.ImmutableSortedMap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.AbstractIdleService;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetSocketAddress;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.SortedMap;
-
-/**
- * A Service to cache kafka broker information by subscribing to ZooKeeper.
- */
-final class KafkaBrokerCache extends AbstractIdleService {
-
- private static final Logger LOG = LoggerFactory.getLogger(KafkaBrokerCache.class);
-
- private static final String BROKERS_PATH = "/brokers";
-
- private final ZKClient zkClient;
- private final Map<String, InetSocketAddress> brokers;
- // topicBrokers is from topic->partition size->brokerId
- private final Map<String, SortedMap<Integer, Set<String>>> topicBrokers;
- private final Runnable invokeGetBrokers = new Runnable() {
- @Override
- public void run() {
- getBrokers();
- }
- };
- private final Runnable invokeGetTopics = new Runnable() {
- @Override
- public void run() {
- getTopics();
- }
- };
-
- KafkaBrokerCache(ZKClient zkClient) {
- this.zkClient = zkClient;
- this.brokers = Maps.newConcurrentMap();
- this.topicBrokers = Maps.newConcurrentMap();
- }
-
- @Override
- protected void startUp() throws Exception {
- getBrokers();
- getTopics();
- }
-
- @Override
- protected void shutDown() throws Exception {
- // No-op
- }
-
- public int getPartitionSize(String topic) {
- SortedMap<Integer, Set<String>> partitionBrokers = topicBrokers.get(topic);
- if (partitionBrokers == null || partitionBrokers.isEmpty()) {
- return 1;
- }
- return partitionBrokers.lastKey();
- }
-
- public TopicBroker getBrokerAddress(String topic, int partition) {
- SortedMap<Integer, Set<String>> partitionBrokers = topicBrokers.get(topic);
- if (partitionBrokers == null || partitionBrokers.isEmpty()) {
- return pickRandomBroker(topic);
- }
-
- // If the requested partition is greater than supported partition size, randomly pick one
- if (partition >= partitionBrokers.lastKey()) {
- return pickRandomBroker(topic);
- }
-
- // Randomly pick a partition size and randomly pick a broker from it
- Random random = new Random();
- partitionBrokers = partitionBrokers.tailMap(partition + 1);
- List<Integer> sizes = Lists.newArrayList(partitionBrokers.keySet());
- Integer partitionSize = pickRandomItem(sizes, random);
- List<String> ids = Lists.newArrayList(partitionBrokers.get(partitionSize));
- InetSocketAddress address = brokers.get(ids.get(new Random().nextInt(ids.size())));
- return address == null ? pickRandomBroker(topic) : new TopicBroker(topic, address, partitionSize);
- }
-
- private TopicBroker pickRandomBroker(String topic) {
- Map.Entry<String, InetSocketAddress> entry = Iterables.getFirst(brokers.entrySet(), null);
- if (entry == null) {
- return null;
- }
- InetSocketAddress address = entry.getValue();
- return new TopicBroker(topic, address, 0);
- }
-
- private <T> T pickRandomItem(List<T> list, Random random) {
- return list.get(random.nextInt(list.size()));
- }
-
- private void getBrokers() {
- final String idsPath = BROKERS_PATH + "/ids";
-
- Futures.addCallback(zkClient.getChildren(idsPath, new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- getBrokers();
- }
- }), new ExistsOnFailureFutureCallback<NodeChildren>(idsPath, invokeGetBrokers) {
- @Override
- public void onSuccess(NodeChildren result) {
- Set<String> children = ImmutableSet.copyOf(result.getChildren());
- for (String child : children) {
- getBrokenData(idsPath + "/" + child, child);
- }
- // Remove all removed brokers
- removeDiff(children, brokers);
- }
- });
- }
-
- private void getTopics() {
- final String topicsPath = BROKERS_PATH + "/topics";
- Futures.addCallback(zkClient.getChildren(topicsPath, new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- getTopics();
- }
- }), new ExistsOnFailureFutureCallback<NodeChildren>(topicsPath, invokeGetTopics) {
- @Override
- public void onSuccess(NodeChildren result) {
- Set<String> children = ImmutableSet.copyOf(result.getChildren());
-
- // Process new children
- for (String topic : ImmutableSet.copyOf(Sets.difference(children, topicBrokers.keySet()))) {
- getTopic(topicsPath + "/" + topic, topic);
- }
-
- // Remove old children
- removeDiff(children, topicBrokers);
- }
- });
- }
-
- private void getBrokenData(String path, final String brokerId) {
- Futures.addCallback(zkClient.getData(path), new FutureCallback<NodeData>() {
- @Override
- public void onSuccess(NodeData result) {
- String data = new String(result.getData(), Charsets.UTF_8);
- String hostPort = data.substring(data.indexOf(':') + 1);
- int idx = hostPort.indexOf(':');
- brokers.put(brokerId, new InetSocketAddress(hostPort.substring(0, idx),
- Integer.parseInt(hostPort.substring(idx + 1))));
- }
-
- @Override
- public void onFailure(Throwable t) {
- // No-op, the watch on the parent node will handle it.
- }
- });
- }
-
- private void getTopic(final String path, final String topic) {
- Futures.addCallback(zkClient.getChildren(path, new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- // Other event type changes are either could be ignored or handled by parent watcher
- if (event.getType() == Event.EventType.NodeChildrenChanged) {
- getTopic(path, topic);
- }
- }
- }), new FutureCallback<NodeChildren>() {
- @Override
- public void onSuccess(NodeChildren result) {
- List<String> children = result.getChildren();
- final List<ListenableFuture<BrokerPartition>> futures = Lists.newArrayListWithCapacity(children.size());
-
- // Fetch data from each broken node
- for (final String brokerId : children) {
- Futures.transform(zkClient.getData(path + "/" + brokerId), new Function<NodeData, BrokerPartition>() {
- @Override
- public BrokerPartition apply(NodeData input) {
- return new BrokerPartition(brokerId, Integer.parseInt(new String(input.getData(), Charsets.UTF_8)));
- }
- });
- }
-
- // When all fetching is done, build the partition size->broker map for this topic
- Futures.successfulAsList(futures).addListener(new Runnable() {
- @Override
- public void run() {
- Map<Integer, Set<String>> partitionBrokers = Maps.newHashMap();
- for (ListenableFuture<BrokerPartition> future : futures) {
- try {
- BrokerPartition info = future.get();
- Set<String> brokerSet = partitionBrokers.get(info.getPartitionSize());
- if (brokerSet == null) {
- brokerSet = Sets.newHashSet();
- partitionBrokers.put(info.getPartitionSize(), brokerSet);
- }
- brokerSet.add(info.getBrokerId());
- } catch (Exception e) {
- // Exception is ignored, as it will be handled by parent watcher
- }
- }
- topicBrokers.put(topic, ImmutableSortedMap.copyOf(partitionBrokers));
- }
- }, Threads.SAME_THREAD_EXECUTOR);
- }
-
- @Override
- public void onFailure(Throwable t) {
- // No-op. Failure would be handled by parent watcher already (e.g. node not exists -> children change in parent)
- }
- });
- }
-
- private <K, V> void removeDiff(Set<K> keys, Map<K, V> map) {
- for (K key : ImmutableSet.copyOf(Sets.difference(map.keySet(), keys))) {
- map.remove(key);
- }
- }
-
- private abstract class ExistsOnFailureFutureCallback<V> implements FutureCallback<V> {
-
- private final String path;
- private final Runnable action;
-
- protected ExistsOnFailureFutureCallback(String path, Runnable action) {
- this.path = path;
- this.action = action;
- }
-
- @Override
- public final void onFailure(Throwable t) {
- if (!isNotExists(t)) {
- LOG.error("Fail to watch for kafka brokers: " + path, t);
- return;
- }
-
- waitExists(path);
- }
-
- private boolean isNotExists(Throwable t) {
- return ((t instanceof KeeperException) && ((KeeperException) t).code() == KeeperException.Code.NONODE);
- }
-
- private void waitExists(String path) {
- LOG.info("Path " + path + " not exists. Watch for creation.");
-
- // If the node doesn't exists, use the "exists" call to watch for node creation.
- Futures.addCallback(zkClient.exists(path, new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- if (event.getType() == Event.EventType.NodeCreated || event.getType() == Event.EventType.NodeDeleted) {
- action.run();
- }
- }
- }), new FutureCallback<Stat>() {
- @Override
- public void onSuccess(Stat result) {
- // If path exists, get children again, otherwise wait for watch to get triggered
- if (result != null) {
- action.run();
- }
- }
- @Override
- public void onFailure(Throwable t) {
- action.run();
- }
- });
- }
- }
-
- private static final class BrokerPartition {
- private final String brokerId;
- private final int partitionSize;
-
- private BrokerPartition(String brokerId, int partitionSize) {
- this.brokerId = brokerId;
- this.partitionSize = partitionSize;
- }
-
- public String getBrokerId() {
- return brokerId;
- }
-
- public int getPartitionSize() {
- return partitionSize;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequest.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequest.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequest.java
deleted file mode 100644
index 7b43f8a..0000000
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-import org.jboss.netty.buffer.ChannelBuffer;
-
-/**
- *
- */
-final class KafkaRequest {
-
- public enum Type {
- PRODUCE(0),
- FETCH(1),
- MULTI_FETCH(2),
- MULTI_PRODUCE(3),
- OFFSETS(4);
-
- private final short id;
-
- private Type(int id) {
- this.id = (short) id;
- }
-
- public short getId() {
- return id;
- }
- }
-
- private final Type type;
- private final String topic;
- private final int partition;
- private final ChannelBuffer body;
- private final ResponseHandler responseHandler;
-
-
- public static KafkaRequest createProduce(String topic, int partition, ChannelBuffer body) {
- return new KafkaRequest(Type.PRODUCE, topic, partition, body, ResponseHandler.NO_OP);
- }
-
- public static KafkaRequest createFetch(String topic, int partition, ChannelBuffer body, ResponseHandler handler) {
- return new KafkaRequest(Type.FETCH, topic, partition, body, handler);
- }
-
- public static KafkaRequest createOffsets(String topic, int partition, ChannelBuffer body, ResponseHandler handler) {
- return new KafkaRequest(Type.OFFSETS, topic, partition, body, handler);
- }
-
- private KafkaRequest(Type type, String topic, int partition, ChannelBuffer body, ResponseHandler responseHandler) {
- this.type = type;
- this.topic = topic;
- this.partition = partition;
- this.body = body;
- this.responseHandler = responseHandler;
- }
-
- Type getType() {
- return type;
- }
-
- String getTopic() {
- return topic;
- }
-
- int getPartition() {
- return partition;
- }
-
- ChannelBuffer getBody() {
- return body;
- }
-
- ResponseHandler getResponseHandler() {
- return responseHandler;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestEncoder.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestEncoder.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestEncoder.java
deleted file mode 100644
index ef78c76..0000000
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestEncoder.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-import com.google.common.base.Charsets;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
-
-import java.nio.ByteBuffer;
-
-/**
- *
- */
-final class KafkaRequestEncoder extends OneToOneEncoder {
-
- @Override
- protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
- if (!(msg instanceof KafkaRequest)) {
- return msg;
- }
- KafkaRequest req = (KafkaRequest) msg;
- ByteBuffer topic = Charsets.UTF_8.encode(req.getTopic());
-
- ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(16 + topic.remaining() + req.getBody().readableBytes());
- int writerIdx = buffer.writerIndex();
- buffer.writerIndex(writerIdx + 4); // Reserves 4 bytes for message length
-
- // Write out <REQUEST_TYPE>, <TOPIC_LENGTH>, <TOPIC>, <PARTITION>
- buffer.writeShort(req.getType().getId());
- buffer.writeShort(topic.remaining());
- buffer.writeBytes(topic);
- buffer.writeInt(req.getPartition());
-
- // Write out the size of the whole buffer (excluding the size field) at the beginning
- buffer.setInt(writerIdx, buffer.readableBytes() - 4 + req.getBody().readableBytes());
-
- ChannelBuffer buf = ChannelBuffers.wrappedBuffer(buffer, req.getBody());
- buf = buf.readBytes(buf.readableBytes());
-
- return buf;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestSender.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestSender.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestSender.java
deleted file mode 100644
index fbc552c..0000000
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaRequestSender.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-/**
- *
- */
-interface KafkaRequestSender {
-
- void send(KafkaRequest request);
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponse.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponse.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponse.java
deleted file mode 100644
index 68c1bd8..0000000
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponse.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-import org.apache.twill.kafka.client.FetchException;
-import org.jboss.netty.buffer.ChannelBuffer;
-
-/**
- *
- */
-final class KafkaResponse {
-
- private final FetchException.ErrorCode errorCode;
- private final ChannelBuffer body;
- private final int size;
-
- KafkaResponse(FetchException.ErrorCode errorCode, ChannelBuffer body, int size) {
- this.errorCode = errorCode;
- this.body = body;
- this.size = size;
- }
-
- public int getSize() {
- return size;
- }
-
- public FetchException.ErrorCode getErrorCode() {
- return errorCode;
- }
-
- public ChannelBuffer getBody() {
- return body;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseDispatcher.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseDispatcher.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseDispatcher.java
deleted file mode 100644
index 47f70ce..0000000
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseDispatcher.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.SocketException;
-import java.nio.channels.ClosedChannelException;
-
-/**
- *
- */
-final class KafkaResponseDispatcher extends SimpleChannelHandler {
-
- private static final Logger LOG = LoggerFactory.getLogger(KafkaResponseDispatcher.class);
-
- @Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
- Object attachment = ctx.getAttachment();
- if (e.getMessage() instanceof KafkaResponse && attachment instanceof ResponseHandler) {
- ((ResponseHandler) attachment).received((KafkaResponse) e.getMessage());
- } else {
- super.messageReceived(ctx, e);
- }
- }
-
- @Override
- public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
- if (e.getMessage() instanceof KafkaRequest) {
- ctx.setAttachment(((KafkaRequest) e.getMessage()).getResponseHandler());
- }
- super.writeRequested(ctx, e);
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
- if (e.getCause() instanceof ClosedChannelException || e.getCause() instanceof SocketException) {
- // No need to log for socket exception as the client has logic to retry.
- return;
- }
- LOG.warn("Exception caught in kafka client connection.", e.getCause());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseHandler.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseHandler.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseHandler.java
deleted file mode 100644
index 5251e65..0000000
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/KafkaResponseHandler.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-import org.apache.twill.kafka.client.FetchException;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-
-/**
- *
- */
-final class KafkaResponseHandler extends SimpleChannelHandler {
-
- private final Bufferer bufferer = new Bufferer();
-
- @Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
- Object msg = e.getMessage();
- if (!(msg instanceof ChannelBuffer)) {
- super.messageReceived(ctx, e);
- return;
- }
-
- bufferer.apply((ChannelBuffer) msg);
- ChannelBuffer buffer = bufferer.getNext();
- while (buffer.readable()) {
- // Send the response object upstream
- Channels.fireMessageReceived(ctx, new KafkaResponse(FetchException.ErrorCode.fromCode(buffer.readShort()),
- buffer, buffer.readableBytes() + 6));
- buffer = bufferer.getNext();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/internal/kafka/client/MessageFetcher.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/MessageFetcher.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/MessageFetcher.java
deleted file mode 100644
index 0814917..0000000
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/MessageFetcher.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-import org.apache.twill.common.Threads;
-import org.apache.twill.kafka.client.FetchException;
-import org.apache.twill.kafka.client.FetchedMessage;
-import com.google.common.base.Throwables;
-import com.google.common.collect.AbstractIterator;
-import com.google.common.io.ByteStreams;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBufferInputStream;
-import org.jboss.netty.buffer.ChannelBufferOutputStream;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.xerial.snappy.SnappyInputStream;
-
-import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.zip.GZIPInputStream;
-
-/**
- * This class is for consuming messages from a kafka topic.
- */
-final class MessageFetcher extends AbstractIterator<FetchedMessage> implements ResponseHandler {
-
- private static final long BACKOFF_INTERVAL_MS = 100;
-
- private final KafkaRequestSender sender;
- private final String topic;
- private final int partition;
- private final int maxSize;
- private final AtomicLong offset;
- private final BlockingQueue<FetchResult> messages;
- private final ScheduledExecutorService scheduler;
- private volatile long backoffMillis;
- private final Runnable sendFetchRequest = new Runnable() {
- @Override
- public void run() {
- sendFetchRequest();
- }
- };
-
- MessageFetcher(String topic, int partition, long offset, int maxSize, KafkaRequestSender sender) {
- this.topic = topic;
- this.partition = partition;
- this.sender = sender;
- this.offset = new AtomicLong(offset);
- this.maxSize = maxSize;
- this.messages = new LinkedBlockingQueue<FetchResult>();
- this.scheduler = Executors.newSingleThreadScheduledExecutor(
- Threads.createDaemonThreadFactory("kafka-" + topic + "-consumer"));
- }
-
- @Override
- public void received(KafkaResponse response) {
- if (response.getErrorCode() != FetchException.ErrorCode.OK) {
- messages.add(FetchResult.failure(new FetchException("Error in fetching: " + response.getErrorCode(),
- response.getErrorCode())));
- return;
- }
-
- try {
- if (decodeResponse(response.getBody(), -1)) {
- backoffMillis = 0;
- } else {
- backoffMillis = Math.max(backoffMillis + BACKOFF_INTERVAL_MS, 1000);
- scheduler.schedule(sendFetchRequest, backoffMillis, TimeUnit.MILLISECONDS);
- }
- } catch (Throwable t) {
- messages.add(FetchResult.failure(t));
- }
- }
-
- private boolean decodeResponse(ChannelBuffer buffer, long nextOffset) {
- boolean hasMessage = false;
- boolean computeOffset = nextOffset < 0;
- while (buffer.readableBytes() >= 4) {
- int size = buffer.readInt();
- if (buffer.readableBytes() < size) {
- if (!hasMessage) {
- throw new IllegalStateException("Size too small");
- }
- break;
- }
- nextOffset = computeOffset ? offset.addAndGet(size + 4) : nextOffset;
- decodeMessage(size, buffer, nextOffset);
- hasMessage = true;
- }
- return hasMessage;
-
- }
-
- private void decodeMessage(int size, ChannelBuffer buffer, long nextOffset) {
- int readerIdx = buffer.readerIndex();
- int magic = buffer.readByte();
- Compression compression = magic == 0 ? Compression.NONE : Compression.fromCode(buffer.readByte());
- int crc = buffer.readInt();
-
- ChannelBuffer payload = buffer.readSlice(size - (buffer.readerIndex() - readerIdx));
-
- // Verify CRC?
- enqueueMessage(compression, payload, nextOffset);
- }
-
- private void enqueueMessage(Compression compression, ChannelBuffer payload, long nextOffset) {
- switch (compression) {
- case NONE:
- messages.add(FetchResult.success(new BasicFetchedMessage(nextOffset, payload.toByteBuffer())));
- break;
- case GZIP:
- decodeResponse(gunzip(payload), nextOffset);
- break;
- case SNAPPY:
- decodeResponse(unsnappy(payload), nextOffset);
- break;
- }
- }
-
- private ChannelBuffer gunzip(ChannelBuffer source) {
- ChannelBufferOutputStream output = new ChannelBufferOutputStream(
- ChannelBuffers.dynamicBuffer(source.readableBytes() * 2));
- try {
- try {
- GZIPInputStream gzipInput = new GZIPInputStream(new ChannelBufferInputStream(source));
- try {
- ByteStreams.copy(gzipInput, output);
- return output.buffer();
- } finally {
- gzipInput.close();
- }
- } finally {
- output.close();
- }
- } catch (IOException e) {
- throw Throwables.propagate(e);
- }
- }
-
- private ChannelBuffer unsnappy(ChannelBuffer source) {
- ChannelBufferOutputStream output = new ChannelBufferOutputStream(
- ChannelBuffers.dynamicBuffer(source.readableBytes() * 2));
- try {
- try {
- SnappyInputStream snappyInput = new SnappyInputStream(new ChannelBufferInputStream(source));
- try {
- ByteStreams.copy(snappyInput, output);
- return output.buffer();
- } finally {
- snappyInput.close();
- }
- } finally {
- output.close();
- }
- } catch (IOException e) {
- throw Throwables.propagate(e);
- }
- }
-
- private void sendFetchRequest() {
- ChannelBuffer fetchBody = ChannelBuffers.buffer(12);
- fetchBody.writeLong(offset.get());
- fetchBody.writeInt(maxSize);
- sender.send(KafkaRequest.createFetch(topic, partition, fetchBody, MessageFetcher.this));
- }
-
- @Override
- protected FetchedMessage computeNext() {
- FetchResult result = messages.poll();
- if (result != null) {
- return getMessage(result);
- }
-
- try {
- sendFetchRequest();
- return getMessage(messages.take());
- } catch (InterruptedException e) {
- scheduler.shutdownNow();
- return endOfData();
- }
- }
-
- private FetchedMessage getMessage(FetchResult result) {
- try {
- if (result.isSuccess()) {
- return result.getMessage();
- } else {
- throw result.getErrorCause();
- }
- } catch (Throwable t) {
- throw Throwables.propagate(t);
- }
- }
-
- private static final class FetchResult {
- private final FetchedMessage message;
- private final Throwable errorCause;
-
- static FetchResult success(FetchedMessage message) {
- return new FetchResult(message, null);
- }
-
- static FetchResult failure(Throwable cause) {
- return new FetchResult(null, cause);
- }
-
- private FetchResult(FetchedMessage message, Throwable errorCause) {
- this.message = message;
- this.errorCause = errorCause;
- }
-
- public FetchedMessage getMessage() {
- return message;
- }
-
- public Throwable getErrorCause() {
- return errorCause;
- }
-
- public boolean isSuccess() {
- return message != null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/internal/kafka/client/MessageSetEncoder.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/MessageSetEncoder.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/MessageSetEncoder.java
deleted file mode 100644
index 49008cc..0000000
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/MessageSetEncoder.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-import org.jboss.netty.buffer.ChannelBuffer;
-
-/**
- * This represents a set of messages that goes into the same message set and get encoded as
- * single kafka message set.
- */
-interface MessageSetEncoder {
-
- MessageSetEncoder add(ChannelBuffer payload);
-
- ChannelBuffer finish();
-}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/fcbe54d5/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ResponseHandler.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ResponseHandler.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ResponseHandler.java
deleted file mode 100644
index f681b85..0000000
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ResponseHandler.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-/**
- * Represents handler for kafka response.
- */
-interface ResponseHandler {
-
- ResponseHandler NO_OP = new ResponseHandler() {
- @Override
- public void received(KafkaResponse response) {
- // No-op
- }
- };
-
- void received(KafkaResponse response);
-}