You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/04/21 21:44:09 UTC
[10/50] [abbrv] git commit: Added error handling for fetch request
Added error handling for fetch request
* allow retry of fetch if offset was invalid
* updated changelog
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/95c60dbb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/95c60dbb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/95c60dbb
Branch: refs/heads/master
Commit: 95c60dbbbc80165969bae6cbbd1926207720e59c
Parents: 6b29f8f
Author: wurstmeister <wu...@users.noreply.github.com>
Authored: Sat Jan 11 16:34:01 2014 +0000
Committer: wurstmeister <wu...@users.noreply.github.com>
Committed: Sat Jan 11 16:40:02 2014 +0000
----------------------------------------------------------------------
CHANGELOG.md | 1 +
src/jvm/storm/kafka/FailedFetchException.java | 12 ++
src/jvm/storm/kafka/KafkaConfig.java | 5 +-
src/jvm/storm/kafka/KafkaError.java | 29 ++++
src/jvm/storm/kafka/KafkaSpout.java | 24 ---
src/jvm/storm/kafka/KafkaUtils.java | 159 +++++++++++++++++++
src/jvm/storm/kafka/PartitionManager.java | 11 +-
src/jvm/storm/kafka/trident/Coordinator.java | 1 +
.../kafka/trident/FailedFetchException.java | 7 -
src/jvm/storm/kafka/trident/KafkaUtils.java | 112 -------------
.../kafka/trident/TridentKafkaEmitter.java | 21 +--
src/test/storm/kafka/KafkaTestBroker.java | 52 ++++++
src/test/storm/kafka/KafkaUtilsTest.java | 90 +++++++++++
13 files changed, 350 insertions(+), 174 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/95c60dbb/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 2fb81fe..ced0ffc 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,2 +1,3 @@
## 0.3.0
* updated partition path in zookeeper
+* added error handling for fetch request
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/95c60dbb/src/jvm/storm/kafka/FailedFetchException.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/FailedFetchException.java b/src/jvm/storm/kafka/FailedFetchException.java
new file mode 100644
index 0000000..0bd1123
--- /dev/null
+++ b/src/jvm/storm/kafka/FailedFetchException.java
@@ -0,0 +1,12 @@
+package storm.kafka;
+
+public class FailedFetchException extends RuntimeException {
+
+ public FailedFetchException(String message) {
+ super(message);
+ }
+
+ public FailedFetchException(Exception e) {
+ super(e);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/95c60dbb/src/jvm/storm/kafka/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/KafkaConfig.java b/src/jvm/storm/kafka/KafkaConfig.java
index e241978..dddcead 100644
--- a/src/jvm/storm/kafka/KafkaConfig.java
+++ b/src/jvm/storm/kafka/KafkaConfig.java
@@ -17,6 +17,7 @@ public class KafkaConfig implements Serializable {
public MultiScheme scheme = new RawMultiScheme();
public boolean forceFromStart = false;
public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
+ public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
public KafkaConfig(BrokerHosts hosts, String topic) {
this(hosts, topic, kafka.api.OffsetRequest.DefaultClientId());
@@ -28,8 +29,4 @@ public class KafkaConfig implements Serializable {
this.clientId = clientId;
}
- public void forceStartOffsetTime(long millis) {
- startOffsetTime = millis;
- forceFromStart = true;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/95c60dbb/src/jvm/storm/kafka/KafkaError.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/KafkaError.java b/src/jvm/storm/kafka/KafkaError.java
new file mode 100644
index 0000000..260ab91
--- /dev/null
+++ b/src/jvm/storm/kafka/KafkaError.java
@@ -0,0 +1,29 @@
+package storm.kafka;
+
+/**
+ * Date: 11/01/2014
+ * Time: 14:21
+ */
+public enum KafkaError {
+ NO_ERROR,
+ OFFSET_OUT_OF_RANGE,
+ INVALID_MESSAGE,
+ UNKNOWN_TOPIC_OR_PARTITION,
+ INVALID_FETCH_SIZE,
+ LEADER_NOT_AVAILABLE,
+ NOT_LEADER_FOR_PARTITION,
+ REQUEST_TIMED_OUT,
+ BROKER_NOT_AVAILABLE,
+ REPLICA_NOT_AVAILABLE,
+ MESSAGE_SIZE_TOO_LARGE,
+ STALE_CONTROLLER_EPOCH,
+ UNKNOWN;
+
+ public static KafkaError getError(short errorCode) {
+ if (errorCode < 0) {
+ return UNKNOWN;
+ } else {
+ return values()[errorCode];
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/95c60dbb/src/jvm/storm/kafka/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/KafkaSpout.java b/src/jvm/storm/kafka/KafkaSpout.java
index cf407ad..d097510 100644
--- a/src/jvm/storm/kafka/KafkaSpout.java
+++ b/src/jvm/storm/kafka/KafkaSpout.java
@@ -10,7 +10,6 @@ import kafka.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.kafka.PartitionManager.KafkaMessageId;
-import storm.kafka.trident.KafkaUtils;
import java.util.*;
@@ -171,27 +170,4 @@ public class KafkaSpout extends BaseRichSpout {
}
}
- public static void main(String[] args) {
-// TopologyBuilder builder = new TopologyBuilder();
-// List<String> hosts = new ArrayList<String>();
-// hosts.add("localhost");
-// SpoutConfig spoutConf = SpoutConfig.fromHostStrings(hosts, 8, "clicks", "/kafkastorm", "id");
-// spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
-// spoutConf.forceStartOffsetTime(-2);
-//
-// // spoutConf.zkServers = new ArrayList<String>() {{
-// // add("localhost");
-// // }};
-// // spoutConf.zkPort = 2181;
-//
-// builder.setSpout("spout", new KafkaSpout(spoutConf), 3);
-//
-// Config conf = new Config();
-// //conf.setDebug(true);
-//
-// LocalCluster cluster = new LocalCluster();
-// cluster.submitTopology("kafka-test", conf, builder.createTopology());
-//
-// Utils.sleep(600000);
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/95c60dbb/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/KafkaUtils.java b/src/jvm/storm/kafka/KafkaUtils.java
new file mode 100644
index 0000000..5094f14
--- /dev/null
+++ b/src/jvm/storm/kafka/KafkaUtils.java
@@ -0,0 +1,159 @@
+package storm.kafka;
+
+import backtype.storm.metric.api.IMetric;
+import kafka.api.FetchRequest;
+import kafka.api.FetchRequestBuilder;
+import kafka.api.PartitionOffsetRequestInfo;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.OffsetRequest;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.javaapi.message.ByteBufferMessageSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import storm.kafka.trident.IBrokerReader;
+import storm.kafka.trident.StaticBrokerReader;
+import storm.kafka.trident.ZkBrokerReader;
+
+import java.net.ConnectException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+
+public class KafkaUtils {
+
+ public static final Logger LOG = LoggerFactory.getLogger(KafkaUtils.class);
+ private static final int NO_OFFSET = -5;
+
+
+ public static IBrokerReader makeBrokerReader(Map stormConf, KafkaConfig conf) {
+ if (conf.hosts instanceof StaticHosts) {
+ return new StaticBrokerReader(((StaticHosts) conf.hosts).getPartitionInformation());
+ } else {
+ return new ZkBrokerReader(stormConf, conf.topic, (ZkHosts) conf.hosts);
+ }
+ }
+
+ public static long getOffset(SimpleConsumer consumer, String topic, int partition, long startOffsetTime) {
+ TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
+ Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
+ requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(startOffsetTime, 1));
+ OffsetRequest request = new OffsetRequest(
+ requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
+
+ long[] offsets = consumer.getOffsetsBefore(request).offsets(topic, partition);
+ if (offsets.length > 0) {
+ return offsets[0];
+ } else {
+ return NO_OFFSET;
+ }
+ }
+
+ public static class KafkaOffsetMetric implements IMetric {
+ Map<Partition, Long> _partitionToOffset = new HashMap<Partition, Long>();
+ Set<Partition> _partitions;
+ String _topic;
+ DynamicPartitionConnections _connections;
+
+ public KafkaOffsetMetric(String topic, DynamicPartitionConnections connections) {
+ _topic = topic;
+ _connections = connections;
+ }
+
+ public void setLatestEmittedOffset(Partition partition, long offset) {
+ _partitionToOffset.put(partition, offset);
+ }
+
+ @Override
+ public Object getValueAndReset() {
+ try {
+ long totalSpoutLag = 0;
+ long totalLatestTimeOffset = 0;
+ long totalLatestEmittedOffset = 0;
+ HashMap ret = new HashMap();
+ if (_partitions != null && _partitions.size() == _partitionToOffset.size()) {
+ for (Map.Entry<Partition, Long> e : _partitionToOffset.entrySet()) {
+ Partition partition = e.getKey();
+ SimpleConsumer consumer = _connections.getConnection(partition);
+ if (consumer == null) {
+ LOG.warn("partitionToOffset contains partition not found in _connections. Stale partition data?");
+ return null;
+ }
+ long latestTimeOffset = getOffset(consumer, _topic, partition.partition, kafka.api.OffsetRequest.LatestTime());
+ if (latestTimeOffset == 0) {
+ LOG.warn("No data found in Kafka Partition " + partition.getId());
+ return null;
+ }
+ long latestEmittedOffset = e.getValue();
+ long spoutLag = latestTimeOffset - latestEmittedOffset;
+ ret.put(partition.getId() + "/" + "spoutLag", spoutLag);
+ ret.put(partition.getId() + "/" + "latestTime", latestTimeOffset);
+ ret.put(partition.getId() + "/" + "latestEmittedOffset", latestEmittedOffset);
+ totalSpoutLag += spoutLag;
+ totalLatestTimeOffset += latestTimeOffset;
+ totalLatestEmittedOffset += latestEmittedOffset;
+ }
+ ret.put("totalSpoutLag", totalSpoutLag);
+ ret.put("totalLatestTime", totalLatestTimeOffset);
+ ret.put("totalLatestEmittedOffset", totalLatestEmittedOffset);
+ return ret;
+ } else {
+ LOG.info("Metrics Tick: Not enough data to calculate spout lag.");
+ }
+ } catch (Throwable t) {
+ LOG.warn("Metrics Tick: Exception when computing kafkaOffset metric.", t);
+ }
+ return null;
+ }
+
+ public void refreshPartitions(Set<Partition> partitions) {
+ _partitions = partitions;
+ Iterator<Partition> it = _partitionToOffset.keySet().iterator();
+ while (it.hasNext()) {
+ if (!partitions.contains(it.next())) {
+ it.remove();
+ }
+ }
+ }
+ }
+
+ public static ByteBufferMessageSet fetchMessages(KafkaConfig config, SimpleConsumer consumer, Partition partition, long offset) {
+ ByteBufferMessageSet msgs = null;
+ String topic = config.topic;
+ int partitionId = partition.partition;
+ for (int errors = 0; errors < 2 && msgs == null; errors++) {
+ FetchRequestBuilder builder = new FetchRequestBuilder();
+ FetchRequest fetchRequest = builder.addFetch(topic, partitionId, offset, config.fetchSizeBytes).
+ clientId(config.clientId).build();
+ FetchResponse fetchResponse;
+ try {
+ fetchResponse = consumer.fetch(fetchRequest);
+ } catch (Exception e) {
+ if (e instanceof ConnectException) {
+ throw new FailedFetchException(e);
+ } else {
+ throw new RuntimeException(e);
+ }
+ }
+ if (fetchResponse.hasError()) {
+ KafkaError error = KafkaError.getError(fetchResponse.errorCode(topic, partitionId));
+ if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && config.useStartOffsetTimeIfOffsetOutOfRange && errors == 0) {
+ long startOffset = getOffset(consumer, topic, partitionId, config.startOffsetTime);
+ LOG.warn("Got fetch request with offset out of range: [" + offset + "]; " +
+ "retrying with default start offset time from configuration. " +
+ "configured start offset time: [" + config.startOffsetTime + "] offset: [" + startOffset + "]");
+ offset = startOffset;
+ } else {
+ String message = "Error fetching data from [" + partition + "] for topic [" + topic + "]: [" + error + "]";
+ LOG.error(message);
+ throw new FailedFetchException(message);
+ }
+ } else {
+ msgs = fetchResponse.messageSet(topic, partitionId);
+ }
+ }
+ return msgs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/95c60dbb/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/PartitionManager.java b/src/jvm/storm/kafka/PartitionManager.java
index e3e31db..0861c25 100644
--- a/src/jvm/storm/kafka/PartitionManager.java
+++ b/src/jvm/storm/kafka/PartitionManager.java
@@ -8,7 +8,6 @@ import backtype.storm.metric.api.ReducedMetric;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.utils.Utils;
import com.google.common.collect.ImmutableMap;
-import kafka.api.FetchRequestBuilder;
import kafka.api.OffsetRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
@@ -17,7 +16,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.kafka.KafkaSpout.EmitState;
import storm.kafka.KafkaSpout.MessageAndRealOffset;
-import storm.kafka.trident.KafkaUtils;
import storm.kafka.trident.MaxMetric;
import java.util.*;
@@ -132,15 +130,8 @@ public class PartitionManager {
}
private void fill() {
- //LOG.info("Fetching from Kafka: " + _consumer.host() + ":" + _partition.partition + " from offset " + _emittedToOffset);
long start = System.nanoTime();
- ByteBufferMessageSet msgs = _consumer.fetch(
- new FetchRequestBuilder().addFetch(
- _spoutConfig.topic,
- _partition.partition,
- _emittedToOffset,
- _spoutConfig.fetchSizeBytes).build()).messageSet(_spoutConfig.topic,
- _partition.partition);
+ ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, _emittedToOffset);
long end = System.nanoTime();
long millis = (end - start) / 1000000;
_fetchAPILatencyMax.update(millis);
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/95c60dbb/src/jvm/storm/kafka/trident/Coordinator.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/trident/Coordinator.java b/src/jvm/storm/kafka/trident/Coordinator.java
index d97feed..f67acaa 100644
--- a/src/jvm/storm/kafka/trident/Coordinator.java
+++ b/src/jvm/storm/kafka/trident/Coordinator.java
@@ -1,5 +1,6 @@
package storm.kafka.trident;
+import storm.kafka.KafkaUtils;
import storm.trident.spout.IOpaquePartitionedTridentSpout;
import storm.trident.spout.IPartitionedTridentSpout;
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/95c60dbb/src/jvm/storm/kafka/trident/FailedFetchException.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/trident/FailedFetchException.java b/src/jvm/storm/kafka/trident/FailedFetchException.java
deleted file mode 100644
index c4fcc61..0000000
--- a/src/jvm/storm/kafka/trident/FailedFetchException.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package storm.kafka.trident;
-
-public class FailedFetchException extends RuntimeException {
- public FailedFetchException(Exception e) {
- super(e);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/95c60dbb/src/jvm/storm/kafka/trident/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/trident/KafkaUtils.java b/src/jvm/storm/kafka/trident/KafkaUtils.java
deleted file mode 100644
index e4ba3b3..0000000
--- a/src/jvm/storm/kafka/trident/KafkaUtils.java
+++ /dev/null
@@ -1,112 +0,0 @@
-package storm.kafka.trident;
-
-import backtype.storm.metric.api.IMetric;
-import kafka.api.PartitionOffsetRequestInfo;
-import kafka.common.TopicAndPartition;
-import kafka.javaapi.OffsetRequest;
-import kafka.javaapi.consumer.SimpleConsumer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import storm.kafka.*;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-
-public class KafkaUtils {
- public static final Logger LOG = LoggerFactory.getLogger(KafkaUtils.class);
- private static final int NO_OFFSET = -5;
-
-
- public static IBrokerReader makeBrokerReader(Map stormConf, KafkaConfig conf) {
- if (conf.hosts instanceof StaticHosts) {
- return new StaticBrokerReader(((StaticHosts) conf.hosts).getPartitionInformation());
- } else {
- return new ZkBrokerReader(stormConf, conf.topic, (ZkHosts) conf.hosts);
- }
- }
-
- public static long getOffset(SimpleConsumer consumer, String topic, int partition, long startOffsetTime) {
- TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
- Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
- requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(startOffsetTime, 1));
- OffsetRequest request = new OffsetRequest(
- requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
-
- long[] offsets = consumer.getOffsetsBefore(request).offsets(topic, partition);
- if (offsets.length > 0) {
- return offsets[0];
- } else {
- return NO_OFFSET;
- }
- }
-
- public static class KafkaOffsetMetric implements IMetric {
- Map<Partition, Long> _partitionToOffset = new HashMap<Partition, Long>();
- Set<Partition> _partitions;
- String _topic;
- DynamicPartitionConnections _connections;
-
- public KafkaOffsetMetric(String topic, DynamicPartitionConnections connections) {
- _topic = topic;
- _connections = connections;
- }
-
- public void setLatestEmittedOffset(Partition partition, long offset) {
- _partitionToOffset.put(partition, offset);
- }
-
- @Override
- public Object getValueAndReset() {
- try {
- long totalSpoutLag = 0;
- long totalLatestTimeOffset = 0;
- long totalLatestEmittedOffset = 0;
- HashMap ret = new HashMap();
- if (_partitions != null && _partitions.size() == _partitionToOffset.size()) {
- for (Map.Entry<Partition, Long> e : _partitionToOffset.entrySet()) {
- Partition partition = e.getKey();
- SimpleConsumer consumer = _connections.getConnection(partition);
- if (consumer == null) {
- LOG.warn("partitionToOffset contains partition not found in _connections. Stale partition data?");
- return null;
- }
- long latestTimeOffset = getOffset(consumer, _topic, partition.partition, kafka.api.OffsetRequest.LatestTime());
- if (latestTimeOffset == 0) {
- LOG.warn("No data found in Kafka Partition " + partition.getId());
- return null;
- }
- long latestEmittedOffset = e.getValue();
- long spoutLag = latestTimeOffset - latestEmittedOffset;
- ret.put(partition.getId() + "/" + "spoutLag", spoutLag);
- ret.put(partition.getId() + "/" + "latestTime", latestTimeOffset);
- ret.put(partition.getId() + "/" + "latestEmittedOffset", latestEmittedOffset);
- totalSpoutLag += spoutLag;
- totalLatestTimeOffset += latestTimeOffset;
- totalLatestEmittedOffset += latestEmittedOffset;
- }
- ret.put("totalSpoutLag", totalSpoutLag);
- ret.put("totalLatestTime", totalLatestTimeOffset);
- ret.put("totalLatestEmittedOffset", totalLatestEmittedOffset);
- return ret;
- } else {
- LOG.info("Metrics Tick: Not enough data to calculate spout lag.");
- }
- } catch (Throwable t) {
- LOG.warn("Metrics Tick: Exception when computing kafkaOffset metric.", t);
- }
- return null;
- }
-
- public void refreshPartitions(Set<Partition> partitions) {
- _partitions = partitions;
- Iterator<Partition> it = _partitionToOffset.keySet().iterator();
- while (it.hasNext()) {
- if (!partitions.contains(it.next())) {
- it.remove();
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/95c60dbb/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java b/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
index ab4ec63..eceba47 100644
--- a/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
+++ b/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
@@ -7,8 +7,6 @@ import backtype.storm.metric.api.ReducedMetric;
import backtype.storm.task.TopologyContext;
import backtype.storm.utils.Utils;
import com.google.common.collect.ImmutableMap;
-import kafka.api.FetchRequest;
-import kafka.api.FetchRequestBuilder;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.Message;
@@ -16,13 +14,14 @@ import kafka.message.MessageAndOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.kafka.DynamicPartitionConnections;
+import storm.kafka.FailedFetchException;
+import storm.kafka.KafkaUtils;
import storm.kafka.Partition;
import storm.trident.operation.TridentCollector;
import storm.trident.spout.IOpaquePartitionedTridentSpout;
import storm.trident.spout.IPartitionedTridentSpout;
import storm.trident.topology.TransactionAttempt;
-import java.net.ConnectException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -103,16 +102,7 @@ public class TridentKafkaEmitter {
}
offset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, startTime);
}
- ByteBufferMessageSet msgs;
- try {
- msgs = fetchMessages(consumer, partition, offset);
- } catch (Exception e) {
- if (e instanceof ConnectException) {
- throw new FailedFetchException(e);
- } else {
- throw new RuntimeException(e);
- }
- }
+ ByteBufferMessageSet msgs = fetchMessages(consumer, partition, offset);
long endoffset = offset;
for (MessageAndOffset msg : msgs) {
emit(collector, msg.message());
@@ -130,11 +120,8 @@ public class TridentKafkaEmitter {
}
private ByteBufferMessageSet fetchMessages(SimpleConsumer consumer, Partition partition, long offset) {
- ByteBufferMessageSet msgs;
long start = System.nanoTime();
- FetchRequestBuilder builder = new FetchRequestBuilder();
- FetchRequest fetchRequest = builder.addFetch(_config.topic, partition.partition, offset, _config.fetchSizeBytes).clientId(_config.clientId).build();
- msgs = consumer.fetch(fetchRequest).messageSet(_config.topic, partition.partition);
+ ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);
long end = System.nanoTime();
long millis = (end - start) / 1000000;
_kafkaMeanFetchLatencyMetric.update(millis);
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/95c60dbb/src/test/storm/kafka/KafkaTestBroker.java
----------------------------------------------------------------------
diff --git a/src/test/storm/kafka/KafkaTestBroker.java b/src/test/storm/kafka/KafkaTestBroker.java
new file mode 100644
index 0000000..7019c86
--- /dev/null
+++ b/src/test/storm/kafka/KafkaTestBroker.java
@@ -0,0 +1,52 @@
+package storm.kafka;
+
+import com.netflix.curator.framework.CuratorFramework;
+import com.netflix.curator.framework.CuratorFrameworkFactory;
+import com.netflix.curator.retry.ExponentialBackoffRetry;
+import com.netflix.curator.test.TestingServer;
+import kafka.server.KafkaServerStartable;
+
+import java.util.Properties;
+
+/**
+ * Date: 11/01/2014
+ * Time: 13:15
+ */
+public class KafkaTestBroker {
+
+ private final int port = 49123;
+ private KafkaServerStartable kafka;
+ private TestingServer server;
+ private String zookeeperConnectionString;
+
+ public KafkaTestBroker() {
+ try {
+ server = new TestingServer();
+ zookeeperConnectionString = server.getConnectString();
+ ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
+ CuratorFramework zookeeper = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
+ zookeeper.start();
+ Properties p = new Properties();
+ p.setProperty("zookeeper.connect", zookeeperConnectionString);
+ p.setProperty("broker.id", "0");
+ p.setProperty("port", "" + port);
+ kafka.server.KafkaConfig config = new kafka.server.KafkaConfig(p);
+ kafka = new KafkaServerStartable(config);
+ kafka.startup();
+ } catch (Exception ex) {
+ throw new RuntimeException("Could not start test broker", ex);
+ }
+ }
+
+ public String getBrokerConnectionString() {
+ return "localhost:" + port;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void shutdown() {
+ kafka.shutdown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/95c60dbb/src/test/storm/kafka/KafkaUtilsTest.java
----------------------------------------------------------------------
diff --git a/src/test/storm/kafka/KafkaUtilsTest.java b/src/test/storm/kafka/KafkaUtilsTest.java
new file mode 100644
index 0000000..506789c
--- /dev/null
+++ b/src/test/storm/kafka/KafkaUtilsTest.java
@@ -0,0 +1,90 @@
+package storm.kafka;
+
+import backtype.storm.utils.Utils;
+import kafka.api.OffsetRequest;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import storm.kafka.trident.GlobalPartitionInformation;
+
+import java.util.Properties;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+public class KafkaUtilsTest {
+
+ private KafkaTestBroker broker;
+ private SimpleConsumer simpleConsumer;
+ private KafkaConfig config;
+
+ @Before
+ public void setup() {
+ broker = new KafkaTestBroker();
+ GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation();
+ globalPartitionInformation.addPartition(0, Broker.fromString(broker.getBrokerConnectionString()));
+ BrokerHosts brokerHosts = new StaticHosts(globalPartitionInformation);
+ config = new KafkaConfig(brokerHosts, "testTopic");
+ simpleConsumer = new SimpleConsumer("localhost", broker.getPort(), 60000, 1024, "testClient");
+ }
+
+ @After
+ public void shutdown() {
+ broker.shutdown();
+ }
+
+
+ @Test(expected = FailedFetchException.class)
+ public void topicDoesNotExist() throws Exception {
+ KafkaUtils.fetchMessages(config, simpleConsumer, new Partition(Broker.fromString(broker.getBrokerConnectionString()), 0), 0);
+ }
+
+ @Test(expected = FailedFetchException.class)
+ public void brokerIsDown() throws Exception {
+ int port = broker.getPort();
+ broker.shutdown();
+ SimpleConsumer simpleConsumer = new SimpleConsumer("localhost", port, 100, 1024, "testClient");
+ KafkaUtils.fetchMessages(config, simpleConsumer, new Partition(Broker.fromString(broker.getBrokerConnectionString()), 0), OffsetRequest.LatestTime());
+ }
+
+ @Test
+ public void fetchMessage() throws Exception {
+ long lastOffset = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, OffsetRequest.EarliestTime());
+ sendMessageAndAssertValueForOffset(lastOffset);
+ }
+
+ @Test(expected = FailedFetchException.class)
+ public void fetchMessagesWithInvalidOffsetAndDefaultHandlingDisabled() throws Exception {
+ config.useStartOffsetTimeIfOffsetOutOfRange = false;
+ sendMessageAndAssertValueForOffset(-99);
+ }
+
+ @Test
+ public void fetchMessagesWithInvalidOffsetAndDefaultHandlingEnabled() throws Exception {
+ sendMessageAndAssertValueForOffset(-99);
+ }
+
+ private String createTopicAndSendMessage() {
+ Properties p = new Properties();
+ p.setProperty("metadata.broker.list", "localhost:49123");
+ p.setProperty("serializer.class", "kafka.serializer.StringEncoder");
+ ProducerConfig producerConfig = new ProducerConfig(p);
+ Producer<String, String> producer = new Producer<String, String>(producerConfig);
+ String value = "value";
+ producer.send(new KeyedMessage<String, String>(config.topic, value));
+ return value;
+ }
+
+ private void sendMessageAndAssertValueForOffset(long offset) {
+ String value = createTopicAndSendMessage();
+ ByteBufferMessageSet messageAndOffsets = KafkaUtils.fetchMessages(config, simpleConsumer, new Partition(Broker.fromString(broker.getBrokerConnectionString()), 0), offset);
+ String message = new String(Utils.toByteArray(messageAndOffsets.iterator().next().message().payload()));
+ assertThat(message, is(equalTo(value)));
+ }
+}