You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/04/21 21:44:09 UTC

[10/50] [abbrv] git commit: Added error handling for fetch request

Added error handling for fetch request

* allow retry of fetch if offset was invalid
* updated changelog


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/95c60dbb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/95c60dbb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/95c60dbb

Branch: refs/heads/master
Commit: 95c60dbbbc80165969bae6cbbd1926207720e59c
Parents: 6b29f8f
Author: wurstmeister <wu...@users.noreply.github.com>
Authored: Sat Jan 11 16:34:01 2014 +0000
Committer: wurstmeister <wu...@users.noreply.github.com>
Committed: Sat Jan 11 16:40:02 2014 +0000

----------------------------------------------------------------------
 CHANGELOG.md                                    |   1 +
 src/jvm/storm/kafka/FailedFetchException.java   |  12 ++
 src/jvm/storm/kafka/KafkaConfig.java            |   5 +-
 src/jvm/storm/kafka/KafkaError.java             |  29 ++++
 src/jvm/storm/kafka/KafkaSpout.java             |  24 ---
 src/jvm/storm/kafka/KafkaUtils.java             | 159 +++++++++++++++++++
 src/jvm/storm/kafka/PartitionManager.java       |  11 +-
 src/jvm/storm/kafka/trident/Coordinator.java    |   1 +
 .../kafka/trident/FailedFetchException.java     |   7 -
 src/jvm/storm/kafka/trident/KafkaUtils.java     | 112 -------------
 .../kafka/trident/TridentKafkaEmitter.java      |  21 +--
 src/test/storm/kafka/KafkaTestBroker.java       |  52 ++++++
 src/test/storm/kafka/KafkaUtilsTest.java        |  90 +++++++++++
 13 files changed, 350 insertions(+), 174 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/95c60dbb/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 2fb81fe..ced0ffc 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,2 +1,3 @@
 ## 0.3.0
 * updated partition path in zookeeper
+* added error handling for fetch request

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/95c60dbb/src/jvm/storm/kafka/FailedFetchException.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/FailedFetchException.java b/src/jvm/storm/kafka/FailedFetchException.java
new file mode 100644
index 0000000..0bd1123
--- /dev/null
+++ b/src/jvm/storm/kafka/FailedFetchException.java
@@ -0,0 +1,12 @@
+package storm.kafka;
+
+public class FailedFetchException extends RuntimeException {
+
+    public FailedFetchException(String message) {
+        super(message);
+    }
+
+    public FailedFetchException(Exception e) {
+        super(e);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/95c60dbb/src/jvm/storm/kafka/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/KafkaConfig.java b/src/jvm/storm/kafka/KafkaConfig.java
index e241978..dddcead 100644
--- a/src/jvm/storm/kafka/KafkaConfig.java
+++ b/src/jvm/storm/kafka/KafkaConfig.java
@@ -17,6 +17,7 @@ public class KafkaConfig implements Serializable {
     public MultiScheme scheme = new RawMultiScheme();
     public boolean forceFromStart = false;
     public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
+    public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
 
     public KafkaConfig(BrokerHosts hosts, String topic) {
         this(hosts, topic, kafka.api.OffsetRequest.DefaultClientId());
@@ -28,8 +29,4 @@ public class KafkaConfig implements Serializable {
         this.clientId = clientId;
     }
 
-    public void forceStartOffsetTime(long millis) {
-        startOffsetTime = millis;
-        forceFromStart = true;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/95c60dbb/src/jvm/storm/kafka/KafkaError.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/KafkaError.java b/src/jvm/storm/kafka/KafkaError.java
new file mode 100644
index 0000000..260ab91
--- /dev/null
+++ b/src/jvm/storm/kafka/KafkaError.java
@@ -0,0 +1,29 @@
+package storm.kafka;
+
+/**
+ * Date: 11/01/2014
+ * Time: 14:21
+ */
+public enum KafkaError {
+    NO_ERROR,
+    OFFSET_OUT_OF_RANGE,
+    INVALID_MESSAGE,
+    UNKNOWN_TOPIC_OR_PARTITION,
+    INVALID_FETCH_SIZE,
+    LEADER_NOT_AVAILABLE,
+    NOT_LEADER_FOR_PARTITION,
+    REQUEST_TIMED_OUT,
+    BROKER_NOT_AVAILABLE,
+    REPLICA_NOT_AVAILABLE,
+    MESSAGE_SIZE_TOO_LARGE,
+    STALE_CONTROLLER_EPOCH,
+    UNKNOWN;
+
+    public static KafkaError getError(short errorCode) {
+        if (errorCode < 0) {
+            return UNKNOWN;
+        } else {
+            return values()[errorCode];
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/95c60dbb/src/jvm/storm/kafka/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/KafkaSpout.java b/src/jvm/storm/kafka/KafkaSpout.java
index cf407ad..d097510 100644
--- a/src/jvm/storm/kafka/KafkaSpout.java
+++ b/src/jvm/storm/kafka/KafkaSpout.java
@@ -10,7 +10,6 @@ import kafka.message.Message;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import storm.kafka.PartitionManager.KafkaMessageId;
-import storm.kafka.trident.KafkaUtils;
 
 import java.util.*;
 
@@ -171,27 +170,4 @@ public class KafkaSpout extends BaseRichSpout {
         }
     }
 
-    public static void main(String[] args) {
-//        TopologyBuilder builder = new TopologyBuilder();
-//        List<String> hosts = new ArrayList<String>();
-//        hosts.add("localhost");
-//        SpoutConfig spoutConf = SpoutConfig.fromHostStrings(hosts, 8, "clicks", "/kafkastorm", "id");
-//        spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
-//        spoutConf.forceStartOffsetTime(-2);
-//
-// //       spoutConf.zkServers = new ArrayList<String>() {{
-// //          add("localhost");
-// //       }};
-// //       spoutConf.zkPort = 2181;
-//
-//        builder.setSpout("spout", new KafkaSpout(spoutConf), 3);
-//
-//        Config conf = new Config();
-//        //conf.setDebug(true);
-//
-//        LocalCluster cluster = new LocalCluster();
-//        cluster.submitTopology("kafka-test", conf, builder.createTopology());
-//
-//        Utils.sleep(600000);
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/95c60dbb/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/KafkaUtils.java b/src/jvm/storm/kafka/KafkaUtils.java
new file mode 100644
index 0000000..5094f14
--- /dev/null
+++ b/src/jvm/storm/kafka/KafkaUtils.java
@@ -0,0 +1,159 @@
+package storm.kafka;
+
+import backtype.storm.metric.api.IMetric;
+import kafka.api.FetchRequest;
+import kafka.api.FetchRequestBuilder;
+import kafka.api.PartitionOffsetRequestInfo;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.OffsetRequest;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.javaapi.message.ByteBufferMessageSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import storm.kafka.trident.IBrokerReader;
+import storm.kafka.trident.StaticBrokerReader;
+import storm.kafka.trident.ZkBrokerReader;
+
+import java.net.ConnectException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+
+public class KafkaUtils {
+
+    public static final Logger LOG = LoggerFactory.getLogger(KafkaUtils.class);
+    private static final int NO_OFFSET = -5;
+
+
+    public static IBrokerReader makeBrokerReader(Map stormConf, KafkaConfig conf) {
+        if (conf.hosts instanceof StaticHosts) {
+            return new StaticBrokerReader(((StaticHosts) conf.hosts).getPartitionInformation());
+        } else {
+            return new ZkBrokerReader(stormConf, conf.topic, (ZkHosts) conf.hosts);
+        }
+    }
+
+    public static long getOffset(SimpleConsumer consumer, String topic, int partition, long startOffsetTime) {
+        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
+        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
+        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(startOffsetTime, 1));
+        OffsetRequest request = new OffsetRequest(
+                requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
+
+        long[] offsets = consumer.getOffsetsBefore(request).offsets(topic, partition);
+        if (offsets.length > 0) {
+            return offsets[0];
+        } else {
+            return NO_OFFSET;
+        }
+    }
+
+    public static class KafkaOffsetMetric implements IMetric {
+        Map<Partition, Long> _partitionToOffset = new HashMap<Partition, Long>();
+        Set<Partition> _partitions;
+        String _topic;
+        DynamicPartitionConnections _connections;
+
+        public KafkaOffsetMetric(String topic, DynamicPartitionConnections connections) {
+            _topic = topic;
+            _connections = connections;
+        }
+
+        public void setLatestEmittedOffset(Partition partition, long offset) {
+            _partitionToOffset.put(partition, offset);
+        }
+
+        @Override
+        public Object getValueAndReset() {
+            try {
+                long totalSpoutLag = 0;
+                long totalLatestTimeOffset = 0;
+                long totalLatestEmittedOffset = 0;
+                HashMap ret = new HashMap();
+                if (_partitions != null && _partitions.size() == _partitionToOffset.size()) {
+                    for (Map.Entry<Partition, Long> e : _partitionToOffset.entrySet()) {
+                        Partition partition = e.getKey();
+                        SimpleConsumer consumer = _connections.getConnection(partition);
+                        if (consumer == null) {
+                            LOG.warn("partitionToOffset contains partition not found in _connections. Stale partition data?");
+                            return null;
+                        }
+                        long latestTimeOffset = getOffset(consumer, _topic, partition.partition, kafka.api.OffsetRequest.LatestTime());
+                        if (latestTimeOffset == 0) {
+                            LOG.warn("No data found in Kafka Partition " + partition.getId());
+                            return null;
+                        }
+                        long latestEmittedOffset = e.getValue();
+                        long spoutLag = latestTimeOffset - latestEmittedOffset;
+                        ret.put(partition.getId() + "/" + "spoutLag", spoutLag);
+                        ret.put(partition.getId() + "/" + "latestTime", latestTimeOffset);
+                        ret.put(partition.getId() + "/" + "latestEmittedOffset", latestEmittedOffset);
+                        totalSpoutLag += spoutLag;
+                        totalLatestTimeOffset += latestTimeOffset;
+                        totalLatestEmittedOffset += latestEmittedOffset;
+                    }
+                    ret.put("totalSpoutLag", totalSpoutLag);
+                    ret.put("totalLatestTime", totalLatestTimeOffset);
+                    ret.put("totalLatestEmittedOffset", totalLatestEmittedOffset);
+                    return ret;
+                } else {
+                    LOG.info("Metrics Tick: Not enough data to calculate spout lag.");
+                }
+            } catch (Throwable t) {
+                LOG.warn("Metrics Tick: Exception when computing kafkaOffset metric.", t);
+            }
+            return null;
+        }
+
+        public void refreshPartitions(Set<Partition> partitions) {
+            _partitions = partitions;
+            Iterator<Partition> it = _partitionToOffset.keySet().iterator();
+            while (it.hasNext()) {
+                if (!partitions.contains(it.next())) {
+                    it.remove();
+                }
+            }
+        }
+    }
+
+    public static ByteBufferMessageSet fetchMessages(KafkaConfig config, SimpleConsumer consumer, Partition partition, long offset) {
+        ByteBufferMessageSet msgs = null;
+        String topic = config.topic;
+        int partitionId = partition.partition;
+        for (int errors = 0; errors < 2 && msgs == null; errors++) {
+            FetchRequestBuilder builder = new FetchRequestBuilder();
+            FetchRequest fetchRequest = builder.addFetch(topic, partitionId, offset, config.fetchSizeBytes).
+                    clientId(config.clientId).build();
+            FetchResponse fetchResponse;
+            try {
+                fetchResponse = consumer.fetch(fetchRequest);
+            } catch (Exception e) {
+                if (e instanceof ConnectException) {
+                    throw new FailedFetchException(e);
+                } else {
+                    throw new RuntimeException(e);
+                }
+            }
+            if (fetchResponse.hasError()) {
+                KafkaError error = KafkaError.getError(fetchResponse.errorCode(topic, partitionId));
+                if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && config.useStartOffsetTimeIfOffsetOutOfRange && errors == 0) {
+                    long startOffset = getOffset(consumer, topic, partitionId, config.startOffsetTime);
+                    LOG.warn("Got fetch request with offset out of range: [" + offset + "]; " +
+                            "retrying with default start offset time from configuration. " +
+                            "configured start offset time: [" + config.startOffsetTime + "] offset: [" + startOffset + "]");
+                    offset = startOffset;
+                } else {
+                    String message = "Error fetching data from [" + partition + "] for topic [" + topic + "]: [" + error + "]";
+                    LOG.error(message);
+                    throw new FailedFetchException(message);
+                }
+            } else {
+                msgs = fetchResponse.messageSet(topic, partitionId);
+            }
+        }
+        return msgs;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/95c60dbb/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/PartitionManager.java b/src/jvm/storm/kafka/PartitionManager.java
index e3e31db..0861c25 100644
--- a/src/jvm/storm/kafka/PartitionManager.java
+++ b/src/jvm/storm/kafka/PartitionManager.java
@@ -8,7 +8,6 @@ import backtype.storm.metric.api.ReducedMetric;
 import backtype.storm.spout.SpoutOutputCollector;
 import backtype.storm.utils.Utils;
 import com.google.common.collect.ImmutableMap;
-import kafka.api.FetchRequestBuilder;
 import kafka.api.OffsetRequest;
 import kafka.javaapi.consumer.SimpleConsumer;
 import kafka.javaapi.message.ByteBufferMessageSet;
@@ -17,7 +16,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import storm.kafka.KafkaSpout.EmitState;
 import storm.kafka.KafkaSpout.MessageAndRealOffset;
-import storm.kafka.trident.KafkaUtils;
 import storm.kafka.trident.MaxMetric;
 
 import java.util.*;
@@ -132,15 +130,8 @@ public class PartitionManager {
     }
 
     private void fill() {
-        //LOG.info("Fetching from Kafka: " + _consumer.host() + ":" + _partition.partition + " from offset " + _emittedToOffset);
         long start = System.nanoTime();
-        ByteBufferMessageSet msgs = _consumer.fetch(
-                new FetchRequestBuilder().addFetch(
-                        _spoutConfig.topic,
-                        _partition.partition,
-                        _emittedToOffset,
-                        _spoutConfig.fetchSizeBytes).build()).messageSet(_spoutConfig.topic,
-                _partition.partition);
+        ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, _emittedToOffset);
         long end = System.nanoTime();
         long millis = (end - start) / 1000000;
         _fetchAPILatencyMax.update(millis);

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/95c60dbb/src/jvm/storm/kafka/trident/Coordinator.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/trident/Coordinator.java b/src/jvm/storm/kafka/trident/Coordinator.java
index d97feed..f67acaa 100644
--- a/src/jvm/storm/kafka/trident/Coordinator.java
+++ b/src/jvm/storm/kafka/trident/Coordinator.java
@@ -1,5 +1,6 @@
 package storm.kafka.trident;
 
+import storm.kafka.KafkaUtils;
 import storm.trident.spout.IOpaquePartitionedTridentSpout;
 import storm.trident.spout.IPartitionedTridentSpout;
 

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/95c60dbb/src/jvm/storm/kafka/trident/FailedFetchException.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/trident/FailedFetchException.java b/src/jvm/storm/kafka/trident/FailedFetchException.java
deleted file mode 100644
index c4fcc61..0000000
--- a/src/jvm/storm/kafka/trident/FailedFetchException.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package storm.kafka.trident;
-
-public class FailedFetchException extends RuntimeException {
-    public FailedFetchException(Exception e) {
-        super(e);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/95c60dbb/src/jvm/storm/kafka/trident/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/trident/KafkaUtils.java b/src/jvm/storm/kafka/trident/KafkaUtils.java
deleted file mode 100644
index e4ba3b3..0000000
--- a/src/jvm/storm/kafka/trident/KafkaUtils.java
+++ /dev/null
@@ -1,112 +0,0 @@
-package storm.kafka.trident;
-
-import backtype.storm.metric.api.IMetric;
-import kafka.api.PartitionOffsetRequestInfo;
-import kafka.common.TopicAndPartition;
-import kafka.javaapi.OffsetRequest;
-import kafka.javaapi.consumer.SimpleConsumer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import storm.kafka.*;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-
-public class KafkaUtils {
-    public static final Logger LOG = LoggerFactory.getLogger(KafkaUtils.class);
-    private static final int NO_OFFSET = -5;
-
-
-    public static IBrokerReader makeBrokerReader(Map stormConf, KafkaConfig conf) {
-        if (conf.hosts instanceof StaticHosts) {
-            return new StaticBrokerReader(((StaticHosts) conf.hosts).getPartitionInformation());
-        } else {
-            return new ZkBrokerReader(stormConf, conf.topic, (ZkHosts) conf.hosts);
-        }
-    }
-
-    public static long getOffset(SimpleConsumer consumer, String topic, int partition, long startOffsetTime) {
-        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
-        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
-        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(startOffsetTime, 1));
-        OffsetRequest request = new OffsetRequest(
-                requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
-
-        long[] offsets = consumer.getOffsetsBefore(request).offsets(topic, partition);
-        if (offsets.length > 0) {
-            return offsets[0];
-        } else {
-            return NO_OFFSET;
-        }
-    }
-
-    public static class KafkaOffsetMetric implements IMetric {
-        Map<Partition, Long> _partitionToOffset = new HashMap<Partition, Long>();
-        Set<Partition> _partitions;
-        String _topic;
-        DynamicPartitionConnections _connections;
-
-        public KafkaOffsetMetric(String topic, DynamicPartitionConnections connections) {
-            _topic = topic;
-            _connections = connections;
-        }
-
-        public void setLatestEmittedOffset(Partition partition, long offset) {
-            _partitionToOffset.put(partition, offset);
-        }
-
-        @Override
-        public Object getValueAndReset() {
-            try {
-                long totalSpoutLag = 0;
-                long totalLatestTimeOffset = 0;
-                long totalLatestEmittedOffset = 0;
-                HashMap ret = new HashMap();
-                if (_partitions != null && _partitions.size() == _partitionToOffset.size()) {
-                    for (Map.Entry<Partition, Long> e : _partitionToOffset.entrySet()) {
-                        Partition partition = e.getKey();
-                        SimpleConsumer consumer = _connections.getConnection(partition);
-                        if (consumer == null) {
-                            LOG.warn("partitionToOffset contains partition not found in _connections. Stale partition data?");
-                            return null;
-                        }
-                        long latestTimeOffset = getOffset(consumer, _topic, partition.partition, kafka.api.OffsetRequest.LatestTime());
-                        if (latestTimeOffset == 0) {
-                            LOG.warn("No data found in Kafka Partition " + partition.getId());
-                            return null;
-                        }
-                        long latestEmittedOffset = e.getValue();
-                        long spoutLag = latestTimeOffset - latestEmittedOffset;
-                        ret.put(partition.getId() + "/" + "spoutLag", spoutLag);
-                        ret.put(partition.getId() + "/" + "latestTime", latestTimeOffset);
-                        ret.put(partition.getId() + "/" + "latestEmittedOffset", latestEmittedOffset);
-                        totalSpoutLag += spoutLag;
-                        totalLatestTimeOffset += latestTimeOffset;
-                        totalLatestEmittedOffset += latestEmittedOffset;
-                    }
-                    ret.put("totalSpoutLag", totalSpoutLag);
-                    ret.put("totalLatestTime", totalLatestTimeOffset);
-                    ret.put("totalLatestEmittedOffset", totalLatestEmittedOffset);
-                    return ret;
-                } else {
-                    LOG.info("Metrics Tick: Not enough data to calculate spout lag.");
-                }
-            } catch (Throwable t) {
-                LOG.warn("Metrics Tick: Exception when computing kafkaOffset metric.", t);
-            }
-            return null;
-        }
-
-        public void refreshPartitions(Set<Partition> partitions) {
-            _partitions = partitions;
-            Iterator<Partition> it = _partitionToOffset.keySet().iterator();
-            while (it.hasNext()) {
-                if (!partitions.contains(it.next())) {
-                    it.remove();
-                }
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/95c60dbb/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java b/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
index ab4ec63..eceba47 100644
--- a/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
+++ b/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
@@ -7,8 +7,6 @@ import backtype.storm.metric.api.ReducedMetric;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.utils.Utils;
 import com.google.common.collect.ImmutableMap;
-import kafka.api.FetchRequest;
-import kafka.api.FetchRequestBuilder;
 import kafka.javaapi.consumer.SimpleConsumer;
 import kafka.javaapi.message.ByteBufferMessageSet;
 import kafka.message.Message;
@@ -16,13 +14,14 @@ import kafka.message.MessageAndOffset;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import storm.kafka.DynamicPartitionConnections;
+import storm.kafka.FailedFetchException;
+import storm.kafka.KafkaUtils;
 import storm.kafka.Partition;
 import storm.trident.operation.TridentCollector;
 import storm.trident.spout.IOpaquePartitionedTridentSpout;
 import storm.trident.spout.IPartitionedTridentSpout;
 import storm.trident.topology.TransactionAttempt;
 
-import java.net.ConnectException;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -103,16 +102,7 @@ public class TridentKafkaEmitter {
             }
             offset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, startTime);
         }
-        ByteBufferMessageSet msgs;
-        try {
-            msgs = fetchMessages(consumer, partition, offset);
-        } catch (Exception e) {
-            if (e instanceof ConnectException) {
-                throw new FailedFetchException(e);
-            } else {
-                throw new RuntimeException(e);
-            }
-        }
+        ByteBufferMessageSet msgs = fetchMessages(consumer, partition, offset);
         long endoffset = offset;
         for (MessageAndOffset msg : msgs) {
             emit(collector, msg.message());
@@ -130,11 +120,8 @@ public class TridentKafkaEmitter {
     }
 
     private ByteBufferMessageSet fetchMessages(SimpleConsumer consumer, Partition partition, long offset) {
-        ByteBufferMessageSet msgs;
         long start = System.nanoTime();
-        FetchRequestBuilder builder = new FetchRequestBuilder();
-        FetchRequest fetchRequest = builder.addFetch(_config.topic, partition.partition, offset, _config.fetchSizeBytes).clientId(_config.clientId).build();
-        msgs = consumer.fetch(fetchRequest).messageSet(_config.topic, partition.partition);
+        ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);
         long end = System.nanoTime();
         long millis = (end - start) / 1000000;
         _kafkaMeanFetchLatencyMetric.update(millis);

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/95c60dbb/src/test/storm/kafka/KafkaTestBroker.java
----------------------------------------------------------------------
diff --git a/src/test/storm/kafka/KafkaTestBroker.java b/src/test/storm/kafka/KafkaTestBroker.java
new file mode 100644
index 0000000..7019c86
--- /dev/null
+++ b/src/test/storm/kafka/KafkaTestBroker.java
@@ -0,0 +1,52 @@
+package storm.kafka;
+
+import com.netflix.curator.framework.CuratorFramework;
+import com.netflix.curator.framework.CuratorFrameworkFactory;
+import com.netflix.curator.retry.ExponentialBackoffRetry;
+import com.netflix.curator.test.TestingServer;
+import kafka.server.KafkaServerStartable;
+
+import java.util.Properties;
+
+/**
+ * Date: 11/01/2014
+ * Time: 13:15
+ */
+public class KafkaTestBroker {
+
+    private final int port = 49123;
+    private KafkaServerStartable kafka;
+    private TestingServer server;
+    private String zookeeperConnectionString;
+
+    public KafkaTestBroker() {
+        try {
+            server = new TestingServer();
+            zookeeperConnectionString = server.getConnectString();
+            ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
+            CuratorFramework zookeeper = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
+            zookeeper.start();
+            Properties p = new Properties();
+            p.setProperty("zookeeper.connect", zookeeperConnectionString);
+            p.setProperty("broker.id", "0");
+            p.setProperty("port", "" + port);
+            kafka.server.KafkaConfig config = new kafka.server.KafkaConfig(p);
+            kafka = new KafkaServerStartable(config);
+            kafka.startup();
+        } catch (Exception ex) {
+            throw new RuntimeException("Could not start test broker", ex);
+        }
+    }
+
+    public String getBrokerConnectionString() {
+        return "localhost:" + port;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    public void shutdown() {
+        kafka.shutdown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/95c60dbb/src/test/storm/kafka/KafkaUtilsTest.java
----------------------------------------------------------------------
diff --git a/src/test/storm/kafka/KafkaUtilsTest.java b/src/test/storm/kafka/KafkaUtilsTest.java
new file mode 100644
index 0000000..506789c
--- /dev/null
+++ b/src/test/storm/kafka/KafkaUtilsTest.java
@@ -0,0 +1,90 @@
+package storm.kafka;
+
+import backtype.storm.utils.Utils;
+import kafka.api.OffsetRequest;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import storm.kafka.trident.GlobalPartitionInformation;
+
+import java.util.Properties;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+public class KafkaUtilsTest {
+
+    private KafkaTestBroker broker;
+    private SimpleConsumer simpleConsumer;
+    private KafkaConfig config;
+
+    @Before
+    public void setup() {
+        broker = new KafkaTestBroker();
+        GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation();
+        globalPartitionInformation.addPartition(0, Broker.fromString(broker.getBrokerConnectionString()));
+        BrokerHosts brokerHosts = new StaticHosts(globalPartitionInformation);
+        config = new KafkaConfig(brokerHosts, "testTopic");
+        simpleConsumer = new SimpleConsumer("localhost", broker.getPort(), 60000, 1024, "testClient");
+    }
+
+    @After
+    public void shutdown() {
+        broker.shutdown();
+    }
+
+
+    @Test(expected = FailedFetchException.class)
+    public void topicDoesNotExist() throws Exception {
+        KafkaUtils.fetchMessages(config, simpleConsumer, new Partition(Broker.fromString(broker.getBrokerConnectionString()), 0), 0);
+    }
+
+    @Test(expected = FailedFetchException.class)
+    public void brokerIsDown() throws Exception {
+        int port = broker.getPort();
+        broker.shutdown();
+        SimpleConsumer simpleConsumer = new SimpleConsumer("localhost", port, 100, 1024, "testClient");
+        KafkaUtils.fetchMessages(config, simpleConsumer, new Partition(Broker.fromString(broker.getBrokerConnectionString()), 0), OffsetRequest.LatestTime());
+    }
+
+    @Test
+    public void fetchMessage() throws Exception {
+        long lastOffset = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, OffsetRequest.EarliestTime());
+        sendMessageAndAssertValueForOffset(lastOffset);
+    }
+
+    @Test(expected = FailedFetchException.class)
+    public void fetchMessagesWithInvalidOffsetAndDefaultHandlingDisabled() throws Exception {
+        config.useStartOffsetTimeIfOffsetOutOfRange = false;
+        sendMessageAndAssertValueForOffset(-99);
+    }
+
+    @Test
+    public void fetchMessagesWithInvalidOffsetAndDefaultHandlingEnabled() throws Exception {
+        sendMessageAndAssertValueForOffset(-99);
+    }
+
+    private String createTopicAndSendMessage() {
+        Properties p = new Properties();
+        p.setProperty("metadata.broker.list", "localhost:49123");
+        p.setProperty("serializer.class", "kafka.serializer.StringEncoder");
+        ProducerConfig producerConfig = new ProducerConfig(p);
+        Producer<String, String> producer = new Producer<String, String>(producerConfig);
+        String value = "value";
+        producer.send(new KeyedMessage<String, String>(config.topic, value));
+        return value;
+    }
+
+    private void sendMessageAndAssertValueForOffset(long offset) {
+        String value = createTopicAndSendMessage();
+        ByteBufferMessageSet messageAndOffsets = KafkaUtils.fetchMessages(config, simpleConsumer, new Partition(Broker.fromString(broker.getBrokerConnectionString()), 0), offset);
+        String message = new String(Utils.toByteArray(messageAndOffsets.iterator().next().message().payload()));
+        assertThat(message, is(equalTo(value)));
+    }
+}