You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2017/01/10 19:34:15 UTC
twill git commit: Return the offset to read next message in
KafkaConsumer.MessageCallback#onReceive
Repository: twill
Updated Branches:
refs/heads/master 5986553ba -> b28e59eaa
Return the offset to read next message in KafkaConsumer.MessageCallback#onReceive
This closes #16 on Github.
Signed-off-by: Terence Yim <ch...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/b28e59ea
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/b28e59ea
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/b28e59ea
Branch: refs/heads/master
Commit: b28e59eaabb45cf201d5ea15fafee25079dec795
Parents: 5986553
Author: Chengfeng <ma...@cask.co>
Authored: Mon Nov 21 19:04:52 2016 -0800
Committer: Terence Yim <ch...@apache.org>
Committed: Tue Jan 10 11:33:17 2017 -0800
----------------------------------------------------------------------
.../org/apache/twill/api/ServiceAnnouncer.java | 2 +-
.../twill/internal/AbstractTwillController.java | 8 +-
.../kafka/client/BasicFetchedMessage.java | 10 +++
.../kafka/client/SimpleKafkaConsumer.java | 19 ++---
.../twill/kafka/client/FetchedMessage.java | 5 ++
.../twill/kafka/client/KafkaConsumer.java | 3 +-
.../apache/twill/kafka/client/KafkaTest.java | 82 ++++++++++++++++----
7 files changed, 103 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/twill/blob/b28e59ea/twill-api/src/main/java/org/apache/twill/api/ServiceAnnouncer.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/ServiceAnnouncer.java b/twill-api/src/main/java/org/apache/twill/api/ServiceAnnouncer.java
index 9513352..2721388 100644
--- a/twill-api/src/main/java/org/apache/twill/api/ServiceAnnouncer.java
+++ b/twill-api/src/main/java/org/apache/twill/api/ServiceAnnouncer.java
@@ -32,7 +32,7 @@ public interface ServiceAnnouncer {
Cancellable announce(String serviceName, int port);
/**
- * Registers an endpoint that could be discovered by external party with a payload
+ * Registers an endpoint that could be discovered by external party with a payload.
* @param serviceName Name of the endpoint
* @param port Port of the endpoint
* @param payload byte array payload
http://git-wip-us.apache.org/repos/asf/twill/blob/b28e59ea/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
index 5a1c5b3..212411f 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
@@ -229,9 +229,11 @@ public abstract class AbstractTwillController extends AbstractZKServiceControlle
}
@Override
- public void onReceived(Iterator<FetchedMessage> messages) {
+ public long onReceived(Iterator<FetchedMessage> messages) {
+ long nextOffset = -1L;
while (messages.hasNext()) {
- String json = Charsets.UTF_8.decode(messages.next().getPayload()).toString();
+ FetchedMessage message = messages.next();
+ String json = Charsets.UTF_8.decode(message.getPayload()).toString();
try {
LogEntry entry = GSON.fromJson(json, LogEntry.class);
if (entry != null) {
@@ -240,7 +242,9 @@ public abstract class AbstractTwillController extends AbstractZKServiceControlle
} catch (Exception e) {
LOG.error("Failed to decode log entry {}", json, e);
}
+ nextOffset = message.getNextOffset();
}
+ return nextOffset;
}
@Override
http://git-wip-us.apache.org/repos/asf/twill/blob/b28e59ea/twill-core/src/main/java/org/apache/twill/internal/kafka/client/BasicFetchedMessage.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/BasicFetchedMessage.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/BasicFetchedMessage.java
index ee53ed4..bf5d264 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/BasicFetchedMessage.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/BasicFetchedMessage.java
@@ -29,6 +29,7 @@ final class BasicFetchedMessage implements FetchedMessage {
private final TopicPartition topicPartition;
private ByteBuffer payload;
+ private long offset;
private long nextOffset;
BasicFetchedMessage(TopicPartition topicPartition) {
@@ -39,6 +40,10 @@ final class BasicFetchedMessage implements FetchedMessage {
this.payload = payload;
}
+ void setOffset(long offset) {
+ this.offset = offset;
+ }
+
void setNextOffset(long nextOffset) {
this.nextOffset = nextOffset;
}
@@ -54,6 +59,11 @@ final class BasicFetchedMessage implements FetchedMessage {
}
@Override
+ public long getOffset() {
+ return offset;
+ }
+
+ @Override
public long getNextOffset() {
return nextOffset;
}
http://git-wip-us.apache.org/repos/asf/twill/blob/b28e59ea/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
index 0299e56..927788f 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
@@ -57,6 +57,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
@@ -273,17 +274,17 @@ final class SimpleKafkaConsumer implements KafkaConsumer {
final AtomicBoolean stopped = new AtomicBoolean();
return new MessageCallback() {
@Override
- public void onReceived(final Iterator<FetchedMessage> messages) {
+ public long onReceived(final Iterator<FetchedMessage> messages) {
if (stopped.get()) {
- return;
+ return -1L;
}
- Futures.getUnchecked(executor.submit(new Runnable() {
+ return Futures.getUnchecked(executor.submit(new Callable<Long>() {
@Override
- public void run() {
+ public Long call() {
if (stopped.get()) {
- return;
+ return -1L;
}
- callback.onReceived(messages);
+ return callback.onReceived(messages);
}
}));
}
@@ -450,7 +451,7 @@ final class SimpleKafkaConsumer implements KafkaConsumer {
private void invokeCallback(ByteBufferMessageSet messages, AtomicLong offset) {
long savedOffset = offset.get();
try {
- callback.onReceived(createFetchedMessages(messages, offset));
+ offset.set(callback.onReceived(createFetchedMessages(messages, offset)));
} catch (Throwable t) {
LOG.error("Callback throws exception. Retry from offset {} for {}", startOffset, topicPart, t);
offset.set(savedOffset);
@@ -475,9 +476,9 @@ final class SimpleKafkaConsumer implements KafkaConsumer {
continue;
}
- offset.set(message.nextOffset());
fetchedMessage.setPayload(message.message().payload());
- fetchedMessage.setNextOffset(offset.get());
+ fetchedMessage.setOffset(message.offset());
+ fetchedMessage.setNextOffset(message.nextOffset());
return fetchedMessage;
}
http://git-wip-us.apache.org/repos/asf/twill/blob/b28e59ea/twill-core/src/main/java/org/apache/twill/kafka/client/FetchedMessage.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/kafka/client/FetchedMessage.java b/twill-core/src/main/java/org/apache/twill/kafka/client/FetchedMessage.java
index 5739ac6..49dc291 100644
--- a/twill-core/src/main/java/org/apache/twill/kafka/client/FetchedMessage.java
+++ b/twill-core/src/main/java/org/apache/twill/kafka/client/FetchedMessage.java
@@ -29,6 +29,11 @@ public interface FetchedMessage {
ByteBuffer getPayload();
/**
+ * Returns the offset for the current message.
+ */
+ long getOffset();
+
+ /**
* Returns the offset for the next message to be read.
*/
long getNextOffset();
http://git-wip-us.apache.org/repos/asf/twill/blob/b28e59ea/twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java b/twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java
index d53ee98..28bcd80 100644
--- a/twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java
+++ b/twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java
@@ -35,8 +35,9 @@ public interface KafkaConsumer {
* Invoked when new messages is available.
* @param messages Iterator of new messages. The {@link FetchedMessage} instance maybe reused in the Iterator
* and across different invocation.
+ * @return The offset of the message to be fetched next.
*/
- void onReceived(Iterator<FetchedMessage> messages);
+ long onReceived(Iterator<FetchedMessage> messages);
/**
* Invoked when message consumption is stopped. When this method is invoked,
http://git-wip-us.apache.org/repos/asf/twill/blob/b28e59ea/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
----------------------------------------------------------------------
diff --git a/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java b/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
index 4ac8ae4..958925c 100644
--- a/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
+++ b/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
@@ -42,10 +42,8 @@ import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.Properties;
-import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -89,8 +87,8 @@ public class KafkaTest {
@Test
public void testKafkaClientReconnect() throws Exception {
String topic = "backoff";
- Properties kafkServerConfig = generateKafkaConfig(zkServer.getConnectionStr() + "/backoff");
- EmbeddedKafkaServer server = new EmbeddedKafkaServer(kafkServerConfig);
+ Properties kafkaServerConfig = generateKafkaConfig(zkServer.getConnectionStr() + "/backoff");
+ EmbeddedKafkaServer server = new EmbeddedKafkaServer(kafkaServerConfig);
ZKClientService zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr() + "/backoff").build();
zkClient.startAndWait();
@@ -106,15 +104,19 @@ public class KafkaTest {
// Publish a messages
createPublishThread(kafkaClient, topic, Compression.NONE, "First message", 1).start();
- // Creater a consumer
+ // Create a consumer
final BlockingQueue<String> queue = new LinkedBlockingQueue<>();
Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0)
.consume(new KafkaConsumer.MessageCallback() {
@Override
- public void onReceived(Iterator<FetchedMessage> messages) {
+ public long onReceived(Iterator<FetchedMessage> messages) {
+ long nextOffset = -1L;
while (messages.hasNext()) {
- queue.offer(Charsets.UTF_8.decode(messages.next().getPayload()).toString());
+ FetchedMessage message = messages.next();
+ nextOffset = message.getNextOffset();
+ queue.offer(Charsets.UTF_8.decode(message.getPayload()).toString());
}
+ return nextOffset;
}
@Override
@@ -130,7 +132,7 @@ public class KafkaTest {
// Start the server again.
// Needs to create a new instance with the same config since guava service cannot be restarted
- server = new EmbeddedKafkaServer(kafkServerConfig);
+ server = new EmbeddedKafkaServer(kafkaServerConfig);
server.startAndWait();
// Publish another message
@@ -170,11 +172,15 @@ public class KafkaTest {
Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume(new KafkaConsumer
.MessageCallback() {
@Override
- public void onReceived(Iterator<FetchedMessage> messages) {
+ public long onReceived(Iterator<FetchedMessage> messages) {
+ long nextOffset = -1;
while (messages.hasNext()) {
- LOG.info(Charsets.UTF_8.decode(messages.next().getPayload()).toString());
+ FetchedMessage message = messages.next();
+ nextOffset = message.getNextOffset();
+ LOG.info(Charsets.UTF_8.decode(message.getPayload()).toString());
latch.countDown();
}
+ return nextOffset;
}
@Override
@@ -189,6 +195,51 @@ public class KafkaTest {
}
@Test
+ public void testKafkaClientSkipNext() throws Exception {
+ String topic = "testClientSkipNext";
+ // Publish 30 messages with indecies the same as offsets within the range 0 - 29
+ Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP, "GZIP Testing message", 10);
+ t1.start();
+ t1.join();
+ Thread t2 = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing message", 10, 10);
+ t2.start();
+ t2.join();
+ Thread t3 = createPublishThread(kafkaClient, topic, Compression.SNAPPY, "Snappy Testing message", 10, 20);
+ t3.start();
+ t3.join();
+
+ final CountDownLatch stopLatch = new CountDownLatch(1);
+ final BlockingQueue<Long> offsetQueue = new LinkedBlockingQueue<>();
+ Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume(
+ new KafkaConsumer.MessageCallback() {
+ @Override
+ public long onReceived(Iterator<FetchedMessage> messages) {
+ long nextOffset = -1L;
+ if (messages.hasNext()) {
+ FetchedMessage message = messages.next();
+ nextOffset = message.getNextOffset() + 1;
+ offsetQueue.offer(message.getOffset());
+ LOG.info(Charsets.UTF_8.decode(message.getPayload()).toString());
+ return nextOffset;
+ }
+ return nextOffset;
+ }
+
+ @Override
+ public void finished() {
+ stopLatch.countDown();
+ }
+ });
+ // 15 messages should be in the queue since onReceived returns `message.getNextOffset() + 1` as next offset to read
+ for (long i = 0; i < 30; i += 2) {
+ Assert.assertEquals(i, (long) offsetQueue.poll(60, TimeUnit.SECONDS));
+ }
+ Assert.assertNull(offsetQueue.poll(2, TimeUnit.SECONDS));
+ cancel.cancel();
+ Assert.assertTrue(stopLatch.await(1, TimeUnit.SECONDS));
+ }
+
+ @Test
public void testBrokerChange() throws Exception {
// Create a new namespace in ZK for Kafka server for this test case
String connectionStr = zkServer.getConnectionStr() + "/broker_change";
@@ -207,13 +258,17 @@ public class KafkaTest {
// Attach a consumer
final BlockingQueue<String> consumedMessages = Queues.newLinkedBlockingQueue();
- Cancellable cancelConsumer = kafkaClient.getConsumer()
+ kafkaClient.getConsumer()
.prepare().addFromBeginning("test", 0).consume(new KafkaConsumer.MessageCallback() {
@Override
- public void onReceived(Iterator<FetchedMessage> messages) {
+ public long onReceived(Iterator<FetchedMessage> messages) {
+ long nextOffset = -1L;
while (messages.hasNext()) {
- consumedMessages.add(Charsets.UTF_8.decode(messages.next().getPayload()).toString());
+ FetchedMessage message = messages.next();
+ nextOffset = message.getNextOffset();
+ consumedMessages.add(Charsets.UTF_8.decode(message.getPayload()).toString());
}
+ return nextOffset;
}
@Override
@@ -286,6 +341,7 @@ public class KafkaTest {
prop.setProperty("log.flush.interval.messages", "10000");
prop.setProperty("log.flush.interval.ms", "1000");
prop.setProperty("log.segment.bytes", "536870912");
+ prop.setProperty("message.send.max.retries", "10");
prop.setProperty("zookeeper.connect", zkConnectStr);
prop.setProperty("zookeeper.connection.timeout.ms", "1000000");
prop.setProperty("default.replication.factor", "1");