You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/05/20 20:58:38 UTC

[1/3] git commit: STORM-303: Forward port of storm-kafka work.

Repository: incubator-storm
Updated Branches:
  refs/heads/master fe7ac2910 -> 4ee9ea7a8


STORM-303: Forward port of storm-kafka work.

This is a port of my work from
https://github.com/brndnmtthws/storm-kafka-0.8-plus to the current Storm
incubator project.

This is the code we use in our production cluster, which has been battle
tested with billions of messages per day.


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/5b025832
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/5b025832
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/5b025832

Branch: refs/heads/master
Commit: 5b0258322fd0b479fb372a69184a24bc27958c21
Parents: 222c725
Author: Brenden Matthews <br...@diddyinc.com>
Authored: Mon Apr 28 12:15:54 2014 -0700
Committer: Brenden Matthews <br...@diddyinc.com>
Committed: Fri May 16 19:24:21 2014 -0700

----------------------------------------------------------------------
 .../storm-kafka/src/jvm/storm/kafka/Broker.java |   3 +-
 .../src/jvm/storm/kafka/BrokerHosts.java        |   2 +-
 .../jvm/storm/kafka/DynamicBrokersReader.java   |  10 +-
 .../src/jvm/storm/kafka/KafkaConfig.java        |   2 +
 .../src/jvm/storm/kafka/KafkaSpout.java         |  21 +--
 .../src/jvm/storm/kafka/KafkaUtils.java         |  16 ++-
 .../kafka/KeyValueSchemeAsMultiScheme.java      |   1 +
 .../jvm/storm/kafka/PartitionCoordinator.java   |   2 +
 .../src/jvm/storm/kafka/PartitionManager.java   | 142 +++++++++++--------
 .../src/jvm/storm/kafka/StaticCoordinator.java  |   3 +
 .../src/jvm/storm/kafka/StringScheme.java       |   2 +-
 .../src/jvm/storm/kafka/ZkCoordinator.java      |   3 +-
 .../src/jvm/storm/kafka/ZkHosts.java            |   2 +-
 .../src/jvm/storm/kafka/ZkState.java            |   2 +-
 .../trident/GlobalPartitionInformation.java     |   3 +-
 .../trident/TransactionalTridentKafkaSpout.java |   2 +-
 .../kafka/trident/TridentKafkaEmitter.java      |   1 -
 .../jvm/storm/kafka/trident/ZkBrokerReader.java |  72 +++++-----
 18 files changed, 172 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/5b025832/external/storm-kafka/src/jvm/storm/kafka/Broker.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/Broker.java b/external/storm-kafka/src/jvm/storm/kafka/Broker.java
index bfa3e0b..866f84f 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/Broker.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/Broker.java
@@ -17,9 +17,10 @@
  */
 package storm.kafka;
 
-import java.io.Serializable;
 import com.google.common.base.Objects;
 
+import java.io.Serializable;
+
 public class Broker implements Serializable, Comparable<Broker> {
     public final String host;
     public final int port;

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/5b025832/external/storm-kafka/src/jvm/storm/kafka/BrokerHosts.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/BrokerHosts.java b/external/storm-kafka/src/jvm/storm/kafka/BrokerHosts.java
index fcdf0b6..1a06fc5 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/BrokerHosts.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/BrokerHosts.java
@@ -22,4 +22,4 @@ import java.io.Serializable;
 
 public interface BrokerHosts extends Serializable {
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/5b025832/external/storm-kafka/src/jvm/storm/kafka/DynamicBrokersReader.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/DynamicBrokersReader.java b/external/storm-kafka/src/jvm/storm/kafka/DynamicBrokersReader.java
index 5197862..e8e750a 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/DynamicBrokersReader.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/DynamicBrokersReader.java
@@ -27,8 +27,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import storm.kafka.trident.GlobalPartitionInformation;
 
-import java.io.IOException;
 import java.io.UnsupportedEncodingException;
+import java.net.SocketTimeoutException;
 import java.util.List;
 import java.util.Map;
 
@@ -52,15 +52,15 @@ public class DynamicBrokersReader {
                             Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL))));
             _curator.start();
         } catch (Exception ex) {
-            LOG.error("can't connect to zookeeper");
+            LOG.error("Couldn't connect to zookeeper", ex);
         }
     }
 
     /**
      * Get all partitions with their current leaders
      */
-    public GlobalPartitionInformation getBrokerInfo() {
-        GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation();
+    public GlobalPartitionInformation getBrokerInfo() throws SocketTimeoutException {
+      GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation();
         try {
             int numPartitionsForTopic = getNumPartitions();
             String brokerInfoPath = brokerPath();
@@ -75,6 +75,8 @@ public class DynamicBrokersReader {
                     LOG.error("Node {} does not exist ", path);
                 }
             }
+        } catch (SocketTimeoutException e) {
+					throw e;
         } catch (Exception e) {
             throw new RuntimeException(e);
         }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/5b025832/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java
index 875629b..9c156e8 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java
@@ -30,10 +30,12 @@ public class KafkaConfig implements Serializable {
 
     public int fetchSizeBytes = 1024 * 1024;
     public int socketTimeoutMs = 10000;
+    public int fetchMaxWait = 10000;
     public int bufferSizeBytes = 1024 * 1024;
     public MultiScheme scheme = new RawMultiScheme();
     public boolean forceFromStart = false;
     public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
+    public long maxOffsetBehind = 100000;
     public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
     public int metricsTimeBucketSizeInSecs = 60;
 

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/5b025832/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java
index 102dce1..f3bc3ea 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java
@@ -135,14 +135,19 @@ public class KafkaSpout extends BaseRichSpout {
         List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
         for (int i = 0; i < managers.size(); i++) {
 
-            // in case the number of managers decreased
-            _currPartitionIndex = _currPartitionIndex % managers.size();
-            EmitState state = managers.get(_currPartitionIndex).next(_collector);
-            if (state != EmitState.EMITTED_MORE_LEFT) {
-                _currPartitionIndex = (_currPartitionIndex + 1) % managers.size();
-            }
-            if (state != EmitState.NO_EMITTED) {
-                break;
+            try {
+                // in case the number of managers decreased
+                _currPartitionIndex = _currPartitionIndex % managers.size();
+                EmitState state = managers.get(_currPartitionIndex).next(_collector);
+                if (state != EmitState.EMITTED_MORE_LEFT) {
+                    _currPartitionIndex = (_currPartitionIndex + 1) % managers.size();
+                }
+                if (state != EmitState.NO_EMITTED) {
+                    break;
+                }
+            } catch (FailedFetchException e) {
+                LOG.warn("Fetch failed", e);
+                _coordinator.refresh();
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/5b025832/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
index 3d355e5..25d3709 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
@@ -36,8 +36,11 @@ import storm.kafka.trident.IBrokerReader;
 import storm.kafka.trident.StaticBrokerReader;
 import storm.kafka.trident.ZkBrokerReader;
 
+import java.io.IOException;
 import java.net.ConnectException;
+import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
+import java.nio.channels.UnresolvedAddressException;
 import java.util.*;
 
 
@@ -110,9 +113,9 @@ public class KafkaUtils {
                             LOG.warn("partitionToOffset contains partition not found in _connections. Stale partition data?");
                             return null;
                         }
-                        long earliestTimeOffset = getOffset(consumer, _topic, partition.partition, kafka.api.OffsetRequest.EarliestTime()); 
                         long latestTimeOffset = getOffset(consumer, _topic, partition.partition, kafka.api.OffsetRequest.LatestTime());
-                        if (earliestTimeOffset == 0 || latestTimeOffset == 0) {
+                        long earliestTimeOffset = getOffset(consumer, _topic, partition.partition, kafka.api.OffsetRequest.EarliestTime());
+                        if (latestTimeOffset == 0 || earliestTimeOffset == 0) {
                             LOG.warn("No data found in Kafka Partition " + partition.getId());
                             return null;
                         }
@@ -159,12 +162,17 @@ public class KafkaUtils {
         for (int errors = 0; errors < 2 && msgs == null; errors++) {
             FetchRequestBuilder builder = new FetchRequestBuilder();
             FetchRequest fetchRequest = builder.addFetch(topic, partitionId, offset, config.fetchSizeBytes).
-                    clientId(config.clientId).build();
+                    clientId(config.clientId).maxWait(config.fetchMaxWait).build();
             FetchResponse fetchResponse;
             try {
                 fetchResponse = consumer.fetch(fetchRequest);
             } catch (Exception e) {
-                if (e instanceof ConnectException) {
+                if (e instanceof ConnectException ||
+                        e instanceof SocketTimeoutException ||
+                        e instanceof IOException ||
+                        e instanceof UnresolvedAddressException
+                        ) {
+                    LOG.warn("Network error when fetching messages:", e);
                     throw new FailedFetchException(e);
                 } else {
                     throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/5b025832/external/storm-kafka/src/jvm/storm/kafka/KeyValueSchemeAsMultiScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KeyValueSchemeAsMultiScheme.java b/external/storm-kafka/src/jvm/storm/kafka/KeyValueSchemeAsMultiScheme.java
index a570e7d..7def6ac 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KeyValueSchemeAsMultiScheme.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KeyValueSchemeAsMultiScheme.java
@@ -18,6 +18,7 @@
 package storm.kafka;
 
 import backtype.storm.spout.SchemeAsMultiScheme;
+
 import java.util.Arrays;
 import java.util.List;
 

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/5b025832/external/storm-kafka/src/jvm/storm/kafka/PartitionCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionCoordinator.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionCoordinator.java
index 60cc237..9cfed60 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/PartitionCoordinator.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionCoordinator.java
@@ -23,4 +23,6 @@ public interface PartitionCoordinator {
     List<PartitionManager> getMyManagedPartitions();
 
     PartitionManager getManager(Partition partition);
+
+    void refresh();
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/5b025832/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
index 9504427..d868e50 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
@@ -23,8 +23,8 @@ import backtype.storm.metric.api.CountMetric;
 import backtype.storm.metric.api.MeanReducer;
 import backtype.storm.metric.api.ReducedMetric;
 import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.utils.Utils;
 import com.google.common.collect.ImmutableMap;
-import kafka.api.OffsetRequest;
 import kafka.javaapi.consumer.SimpleConsumer;
 import kafka.javaapi.message.ByteBufferMessageSet;
 import kafka.message.MessageAndOffset;
@@ -42,19 +42,9 @@ public class PartitionManager {
     private final ReducedMetric _fetchAPILatencyMean;
     private final CountMetric _fetchAPICallCount;
     private final CountMetric _fetchAPIMessageCount;
-
-    static class KafkaMessageId {
-        public Partition partition;
-        public long offset;
-
-        public KafkaMessageId(Partition partition, long offset) {
-            this.partition = partition;
-            this.offset = offset;
-        }
-    }
-
     Long _emittedToOffset;
     SortedSet<Long> _pending = new TreeSet<Long>();
+    SortedSet<Long> failed = new TreeSet<Long>();
     Long _committedTo;
     LinkedList<MessageAndRealOffset> _waitingToEmit = new LinkedList<MessageAndRealOffset>();
     Partition _partition;
@@ -64,8 +54,7 @@ public class PartitionManager {
     DynamicPartitionConnections _connections;
     ZkState _state;
     Map _stormConf;
-
-
+    long numberFailed, numberAcked;
     public PartitionManager(DynamicPartitionConnections connections, String topologyInstanceId, ZkState state, Map stormConf, SpoutConfig spoutConfig, Partition id) {
         _partition = id;
         _connections = connections;
@@ -74,13 +63,14 @@ public class PartitionManager {
         _consumer = connections.register(id.host, id.partition);
         _state = state;
         _stormConf = stormConf;
+        numberAcked = numberFailed = 0;
 
         String jsonTopologyId = null;
         Long jsonOffset = null;
         String path = committedPath();
         try {
             Map<Object, Object> json = _state.readJSON(path);
-            LOG.info("Read partition information from: " + path +  " --> " + json );
+            LOG.info("Read partition information from: " + path +  "  --> " + json );
             if (json != null) {
                 jsonTopologyId = (String) ((Map<Object, Object>) json.get("topology")).get("id");
                 jsonOffset = (Long) json.get("offset");
@@ -89,8 +79,10 @@ public class PartitionManager {
             LOG.warn("Error reading and/or parsing at ZkNode: " + path, e);
         }
 
+        Long currentOffset = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig);
+
         if (jsonTopologyId == null || jsonOffset == null) { // failed to parse JSON?
-            _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig);
+            _committedTo = currentOffset;
             LOG.info("No partition information found, using configuration to determine offset");
         } else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.forceFromStart) {
             _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig.startOffsetTime);
@@ -100,7 +92,14 @@ public class PartitionManager {
             LOG.info("Read last commit offset from zookeeper: " + _committedTo + "; old topology_id: " + jsonTopologyId + " - new topology_id: " + topologyInstanceId );
         }
 
-        LOG.info("Starting " + _partition + " from offset " + _committedTo);
+        if (currentOffset - _committedTo > spoutConfig.maxOffsetBehind || _committedTo <= 0) {
+            LOG.info("Last commit offset from zookeeper: " + _committedTo);
+            _committedTo = currentOffset;
+            LOG.info("Commit offset " + _committedTo + " is more than " +
+                    spoutConfig.maxOffsetBehind + " behind, resetting to startOffsetTime=" + spoutConfig.startOffsetTime);
+        }
+
+        LOG.info("Starting Kafka " + _consumer.host() + ":" + id.partition + " from offset " + _committedTo);
         _emittedToOffset = _committedTo;
 
         _fetchAPILatencyMax = new CombinedMetric(new MaxMetric());
@@ -147,54 +146,77 @@ public class PartitionManager {
 
     private void fill() {
         long start = System.nanoTime();
-        ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, _emittedToOffset);
+        long offset;
+        final boolean had_failed = !failed.isEmpty();
+
+        // Are there failed tuples? If so, fetch those first.
+        if (had_failed) {
+            offset = failed.first();
+        } else {
+            offset = _emittedToOffset;
+        }
+
+        ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset);
         long end = System.nanoTime();
         long millis = (end - start) / 1000000;
         _fetchAPILatencyMax.update(millis);
         _fetchAPILatencyMean.update(millis);
         _fetchAPICallCount.incr();
-        int numMessages = countMessages(msgs);
-        _fetchAPIMessageCount.incrBy(numMessages);
+        if (msgs != null) {
+            int numMessages = 0;
 
-        if (numMessages > 0) {
-            LOG.info("Fetched " + numMessages + " messages from: " + _partition);
-        }
-        for (MessageAndOffset msg : msgs) {
-            _pending.add(_emittedToOffset);
-            _waitingToEmit.add(new MessageAndRealOffset(msg.message(), _emittedToOffset));
-            _emittedToOffset = msg.nextOffset();
-        }
-        if (numMessages > 0) {
-            LOG.info("Added " + numMessages + " messages from: " + _partition + " to internal buffers");
-        }
-    }
-
-    private int countMessages(ByteBufferMessageSet messageSet) {
-        int counter = 0;
-        for (MessageAndOffset messageAndOffset : messageSet) {
-            counter = counter + 1;
+            for (MessageAndOffset msg : msgs) {
+                final Long cur_offset = msg.offset();
+                if (cur_offset < offset) {
+                    // Skip any old offsets.
+                    continue;
+                }
+                if (!had_failed || failed.contains(cur_offset)) {
+                    numMessages += 1;
+                    _pending.add(cur_offset);
+                    _waitingToEmit.add(new MessageAndRealOffset(msg.message(), cur_offset));
+                    _emittedToOffset = Math.max(msg.nextOffset(), _emittedToOffset);
+                    if (had_failed) {
+                        failed.remove(cur_offset);
+                    }
+                }
+            }
+            _fetchAPIMessageCount.incrBy(numMessages);
         }
-        return counter;
     }
 
     public void ack(Long offset) {
-        _pending.remove(offset);
+        if (!_pending.isEmpty() && _pending.first() < offset - _spoutConfig.maxOffsetBehind) {
+            // Too many things pending!
+            _pending.headSet(offset).clear();
+        } else {
+            _pending.remove(offset);
+        }
+        numberAcked++;
     }
 
     public void fail(Long offset) {
-        //TODO: should it use in-memory ack set to skip anything that's been acked but not committed???
-        // things might get crazy with lots of timeouts
-        if (_emittedToOffset > offset) {
-            _emittedToOffset = offset;
-            _pending.tailSet(offset).clear();
+        if (offset < _emittedToOffset - _spoutConfig.maxOffsetBehind) {
+            LOG.info(
+                    "Skipping failed tuple at offset=" + offset +
+                            " because it's more than maxOffsetBehind=" + _spoutConfig.maxOffsetBehind +
+                            " behind _emittedToOffset=" + _emittedToOffset
+            );
+        } else {
+            LOG.debug("failing at offset=" + offset + " with _pending.size()=" + _pending.size() + " pending and _emittedToOffset=" + _emittedToOffset);
+            failed.add(offset);
+            numberFailed++;
+            if (numberAcked == 0 && numberFailed > _spoutConfig.maxOffsetBehind) {
+                throw new RuntimeException("Too many tuple failures");
+            }
         }
     }
 
     public void commit() {
         long lastCompletedOffset = lastCompletedOffset();
-        if (lastCompletedOffset != lastCommittedOffset()) {
-            LOG.info("Writing last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
-            Map<Object, Object> data = ImmutableMap.builder()
+        if (lastCompletedOffset != lastCompletedOffset) {
+            LOG.debug("Writing last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
+            Map<Object, Object> data = (Map<Object, Object>) ImmutableMap.builder()
                     .put("topology", ImmutableMap.of("id", _topologyInstanceId,
                             "name", _stormConf.get(Config.TOPOLOGY_NAME)))
                     .put("offset", lastCompletedOffset)
@@ -203,24 +225,16 @@ public class PartitionManager {
                             "port", _partition.host.port))
                     .put("topic", _spoutConfig.topic).build();
             _state.writeJSON(committedPath(), data);
+
             _committedTo = lastCompletedOffset;
-            LOG.info("Wrote last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
+            LOG.debug("Wrote last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
         } else {
-            LOG.info("No new offset for " + _partition + " for topology: " + _topologyInstanceId);
+            LOG.debug("No new offset for " + _partition + " for topology: " + _topologyInstanceId);
         }
     }
 
     private String committedPath() {
-        return _spoutConfig.zkRoot + "/" + _spoutConfig.id + "/" +  _spoutConfig.topic + "/" + _partition.getId();
-    }
-
-    public long queryPartitionOffsetLatestTime() {
-        return KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition,
-                OffsetRequest.LatestTime());
-    }
-
-    public long lastCommittedOffset() {
-        return _committedTo;
+        return _spoutConfig.zkRoot + "/" + _spoutConfig.id + "/" + _partition.getId();
     }
 
     public long lastCompletedOffset() {
@@ -238,4 +252,14 @@ public class PartitionManager {
     public void close() {
         _connections.unregister(_partition.host, _partition.partition);
     }
+
+    static class KafkaMessageId {
+        public Partition partition;
+        public long offset;
+
+        public KafkaMessageId(Partition partition, long offset) {
+            this.partition = partition;
+            this.offset = offset;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/5b025832/external/storm-kafka/src/jvm/storm/kafka/StaticCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/StaticCoordinator.java b/external/storm-kafka/src/jvm/storm/kafka/StaticCoordinator.java
index 456a2a1..de4d0fd 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/StaticCoordinator.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/StaticCoordinator.java
@@ -45,4 +45,7 @@ public class StaticCoordinator implements PartitionCoordinator {
         return _managers.get(partition);
     }
 
+    @Override
+    public void refresh() { return; }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/5b025832/external/storm-kafka/src/jvm/storm/kafka/StringScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/StringScheme.java b/external/storm-kafka/src/jvm/storm/kafka/StringScheme.java
index 102ea69..286dc9b 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/StringScheme.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/StringScheme.java
@@ -43,4 +43,4 @@ public class StringScheme implements Scheme {
     public Fields getOutputFields() {
         return new Fields(STRING_SCHEME_KEY);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/5b025832/external/storm-kafka/src/jvm/storm/kafka/ZkCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/ZkCoordinator.java b/external/storm-kafka/src/jvm/storm/kafka/ZkCoordinator.java
index e414d06..0a8fe36 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/ZkCoordinator.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/ZkCoordinator.java
@@ -72,7 +72,8 @@ public class ZkCoordinator implements PartitionCoordinator {
         return _cachedList;
     }
 
-    void refresh() {
+    @Override
+    public void refresh() {
         try {
             LOG.info(taskId(_taskIndex, _totalTasks) + "Refreshing partition manager connections");
             GlobalPartitionInformation brokerInfo = _reader.getBrokerInfo();

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/5b025832/external/storm-kafka/src/jvm/storm/kafka/ZkHosts.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/ZkHosts.java b/external/storm-kafka/src/jvm/storm/kafka/ZkHosts.java
index d9acc66..4e4327d 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/ZkHosts.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/ZkHosts.java
@@ -33,4 +33,4 @@ public class ZkHosts implements BrokerHosts {
     public ZkHosts(String brokerZkStr) {
         this(brokerZkStr, DEFAULT_ZK_PATH);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/5b025832/external/storm-kafka/src/jvm/storm/kafka/ZkState.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/ZkState.java b/external/storm-kafka/src/jvm/storm/kafka/ZkState.java
index 52585ef..a00f060 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/ZkState.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/ZkState.java
@@ -66,7 +66,7 @@ public class ZkState {
     }
 
     public void writeJSON(String path, Map<Object, Object> data) {
-        LOG.info("Writing " + path + " the data " + data.toString());
+        LOG.debug("Writing " + path + " the data " + data.toString());
         writeBytes(path, JSONValue.toJSONString(data).getBytes(Charset.forName("UTF-8")));
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/5b025832/external/storm-kafka/src/jvm/storm/kafka/trident/GlobalPartitionInformation.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/GlobalPartitionInformation.java b/external/storm-kafka/src/jvm/storm/kafka/trident/GlobalPartitionInformation.java
index ae136e5..76bec9e 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/GlobalPartitionInformation.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/GlobalPartitionInformation.java
@@ -17,14 +17,13 @@
  */
 package storm.kafka.trident;
 
+import com.google.common.base.Objects;
 import storm.kafka.Broker;
 import storm.kafka.Partition;
 
 import java.io.Serializable;
 import java.util.*;
 
-import com.google.common.base.Objects;
-
 
 public class GlobalPartitionInformation implements Iterable<Partition>, Serializable {
 

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/5b025832/external/storm-kafka/src/jvm/storm/kafka/trident/TransactionalTridentKafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TransactionalTridentKafkaSpout.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TransactionalTridentKafkaSpout.java
index 7a44bdd..8c10bed 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TransactionalTridentKafkaSpout.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TransactionalTridentKafkaSpout.java
@@ -55,4 +55,4 @@ public class TransactionalTridentKafkaSpout implements IPartitionedTridentSpout<
     public Map<String, Object> getComponentConfiguration() {
         return null;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/5b025832/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
index 8c57681..94bf134 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
@@ -22,7 +22,6 @@ import backtype.storm.metric.api.CombinedMetric;
 import backtype.storm.metric.api.MeanReducer;
 import backtype.storm.metric.api.ReducedMetric;
 import backtype.storm.task.TopologyContext;
-import backtype.storm.utils.Utils;
 import com.google.common.collect.ImmutableMap;
 import kafka.javaapi.consumer.SimpleConsumer;
 import kafka.javaapi.message.ByteBufferMessageSet;

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/5b025832/external/storm-kafka/src/jvm/storm/kafka/trident/ZkBrokerReader.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/ZkBrokerReader.java b/external/storm-kafka/src/jvm/storm/kafka/trident/ZkBrokerReader.java
index db11328..1a26705 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/ZkBrokerReader.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/ZkBrokerReader.java
@@ -27,36 +27,44 @@ import java.util.Map;
 
 public class ZkBrokerReader implements IBrokerReader {
 
-    public static final Logger LOG = LoggerFactory.getLogger(ZkBrokerReader.class);
-
-    GlobalPartitionInformation cachedBrokers;
-    DynamicBrokersReader reader;
-    long lastRefreshTimeMs;
-
-
-    long refreshMillis;
-
-    public ZkBrokerReader(Map conf, String topic, ZkHosts hosts) {
-        reader = new DynamicBrokersReader(conf, hosts.brokerZkStr, hosts.brokerZkPath, topic);
-        cachedBrokers = reader.getBrokerInfo();
-        lastRefreshTimeMs = System.currentTimeMillis();
-        refreshMillis = hosts.refreshFreqSecs * 1000L;
-
-    }
-
-    @Override
-    public GlobalPartitionInformation getCurrentBrokers() {
-        long currTime = System.currentTimeMillis();
-        if (currTime > lastRefreshTimeMs + refreshMillis) {
-            LOG.info("brokers need refreshing because " + refreshMillis + "ms have expired");
-            cachedBrokers = reader.getBrokerInfo();
-            lastRefreshTimeMs = currTime;
-        }
-        return cachedBrokers;
-    }
-
-    @Override
-    public void close() {
-        reader.close();
-    }
+	public static final Logger LOG = LoggerFactory.getLogger(ZkBrokerReader.class);
+
+	GlobalPartitionInformation cachedBrokers;
+	DynamicBrokersReader reader;
+	long lastRefreshTimeMs;
+
+
+	long refreshMillis;
+
+	public ZkBrokerReader(Map conf, String topic, ZkHosts hosts) {
+		try {
+			reader = new DynamicBrokersReader(conf, hosts.brokerZkStr, hosts.brokerZkPath, topic);
+			cachedBrokers = reader.getBrokerInfo();
+			lastRefreshTimeMs = System.currentTimeMillis();
+			refreshMillis = hosts.refreshFreqSecs * 1000L;
+		} catch (java.net.SocketTimeoutException e) {
+			LOG.warn("Failed to update brokers", e);
+		}
+
+	}
+
+	@Override
+	public GlobalPartitionInformation getCurrentBrokers() {
+		long currTime = System.currentTimeMillis();
+		if (currTime > lastRefreshTimeMs + refreshMillis) {
+			try {
+				LOG.info("brokers need refreshing because " + refreshMillis + "ms have expired");
+				cachedBrokers = reader.getBrokerInfo();
+				lastRefreshTimeMs = currTime;
+			} catch (java.net.SocketTimeoutException e) {
+				LOG.warn("Failed to update brokers", e);
+			}
+		}
+		return cachedBrokers;
+	}
+
+	@Override
+	public void close() {
+		reader.close();
+	}
 }


[2/3] git commit: Merge branch 'master' of github.com:brndnmtthws/incubator-storm

Posted by pt...@apache.org.
Merge branch 'master' of github.com:brndnmtthws/incubator-storm


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/512e34ee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/512e34ee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/512e34ee

Branch: refs/heads/master
Commit: 512e34eee399a57cffe9c38f68229d604c9f5db9
Parents: fe7ac29 5b02583
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Tue May 20 13:58:18 2014 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Tue May 20 13:58:18 2014 -0400

----------------------------------------------------------------------
 .../storm-kafka/src/jvm/storm/kafka/Broker.java |   3 +-
 .../src/jvm/storm/kafka/BrokerHosts.java        |   2 +-
 .../jvm/storm/kafka/DynamicBrokersReader.java   |  10 +-
 .../src/jvm/storm/kafka/KafkaConfig.java        |   2 +
 .../src/jvm/storm/kafka/KafkaSpout.java         |  21 +--
 .../src/jvm/storm/kafka/KafkaUtils.java         |  16 ++-
 .../kafka/KeyValueSchemeAsMultiScheme.java      |   1 +
 .../jvm/storm/kafka/PartitionCoordinator.java   |   2 +
 .../src/jvm/storm/kafka/PartitionManager.java   | 142 +++++++++++--------
 .../src/jvm/storm/kafka/StaticCoordinator.java  |   3 +
 .../src/jvm/storm/kafka/StringScheme.java       |   2 +-
 .../src/jvm/storm/kafka/ZkCoordinator.java      |   3 +-
 .../src/jvm/storm/kafka/ZkHosts.java            |   2 +-
 .../src/jvm/storm/kafka/ZkState.java            |   2 +-
 .../trident/GlobalPartitionInformation.java     |   3 +-
 .../trident/TransactionalTridentKafkaSpout.java |   2 +-
 .../kafka/trident/TridentKafkaEmitter.java      |   1 -
 .../jvm/storm/kafka/trident/ZkBrokerReader.java |  72 +++++-----
 18 files changed, 172 insertions(+), 117 deletions(-)
----------------------------------------------------------------------



[3/3] git commit: STORM-303: update changelog

Posted by pt...@apache.org.
STORM-303: update changelog


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/4ee9ea7a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/4ee9ea7a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/4ee9ea7a

Branch: refs/heads/master
Commit: 4ee9ea7a8b391380c185a921bcf562b95461f44a
Parents: 512e34e
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Tue May 20 14:58:30 2014 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Tue May 20 14:58:30 2014 -0400

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/4ee9ea7a/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index c6d0816..2281408 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 0.9.2-incubating (unreleased)
+ * STORM-303: storm-kafka reliability improvements
  * STORM-233: Removed inline heartbeat to nimbus to avoid workers being killed when under heavy ZK load
  * STORM-267: fix package name of LoggingMetricsConsumer in storm.yaml.example
  * STORM-265: upgrade to clojure 1.5.1