You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/05/20 20:58:38 UTC
[1/3] git commit: STORM-303: Forward port of storm-kafka work.
Repository: incubator-storm
Updated Branches:
refs/heads/master fe7ac2910 -> 4ee9ea7a8
STORM-303: Forward port of storm-kafka work.
This is a port of my work from
https://github.com/brndnmtthws/storm-kafka-0.8-plus to the current Storm
incubator project.
This is the code we use in our production cluster, which has been battle
tested with billions of messages per day.
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/5b025832
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/5b025832
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/5b025832
Branch: refs/heads/master
Commit: 5b0258322fd0b479fb372a69184a24bc27958c21
Parents: 222c725
Author: Brenden Matthews <br...@diddyinc.com>
Authored: Mon Apr 28 12:15:54 2014 -0700
Committer: Brenden Matthews <br...@diddyinc.com>
Committed: Fri May 16 19:24:21 2014 -0700
----------------------------------------------------------------------
.../storm-kafka/src/jvm/storm/kafka/Broker.java | 3 +-
.../src/jvm/storm/kafka/BrokerHosts.java | 2 +-
.../jvm/storm/kafka/DynamicBrokersReader.java | 10 +-
.../src/jvm/storm/kafka/KafkaConfig.java | 2 +
.../src/jvm/storm/kafka/KafkaSpout.java | 21 +--
.../src/jvm/storm/kafka/KafkaUtils.java | 16 ++-
.../kafka/KeyValueSchemeAsMultiScheme.java | 1 +
.../jvm/storm/kafka/PartitionCoordinator.java | 2 +
.../src/jvm/storm/kafka/PartitionManager.java | 142 +++++++++++--------
.../src/jvm/storm/kafka/StaticCoordinator.java | 3 +
.../src/jvm/storm/kafka/StringScheme.java | 2 +-
.../src/jvm/storm/kafka/ZkCoordinator.java | 3 +-
.../src/jvm/storm/kafka/ZkHosts.java | 2 +-
.../src/jvm/storm/kafka/ZkState.java | 2 +-
.../trident/GlobalPartitionInformation.java | 3 +-
.../trident/TransactionalTridentKafkaSpout.java | 2 +-
.../kafka/trident/TridentKafkaEmitter.java | 1 -
.../jvm/storm/kafka/trident/ZkBrokerReader.java | 72 +++++-----
18 files changed, 172 insertions(+), 117 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/5b025832/external/storm-kafka/src/jvm/storm/kafka/Broker.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/Broker.java b/external/storm-kafka/src/jvm/storm/kafka/Broker.java
index bfa3e0b..866f84f 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/Broker.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/Broker.java
@@ -17,9 +17,10 @@
*/
package storm.kafka;
-import java.io.Serializable;
import com.google.common.base.Objects;
+import java.io.Serializable;
+
public class Broker implements Serializable, Comparable<Broker> {
public final String host;
public final int port;
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/5b025832/external/storm-kafka/src/jvm/storm/kafka/BrokerHosts.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/BrokerHosts.java b/external/storm-kafka/src/jvm/storm/kafka/BrokerHosts.java
index fcdf0b6..1a06fc5 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/BrokerHosts.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/BrokerHosts.java
@@ -22,4 +22,4 @@ import java.io.Serializable;
public interface BrokerHosts extends Serializable {
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/5b025832/external/storm-kafka/src/jvm/storm/kafka/DynamicBrokersReader.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/DynamicBrokersReader.java b/external/storm-kafka/src/jvm/storm/kafka/DynamicBrokersReader.java
index 5197862..e8e750a 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/DynamicBrokersReader.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/DynamicBrokersReader.java
@@ -27,8 +27,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.kafka.trident.GlobalPartitionInformation;
-import java.io.IOException;
import java.io.UnsupportedEncodingException;
+import java.net.SocketTimeoutException;
import java.util.List;
import java.util.Map;
@@ -52,15 +52,15 @@ public class DynamicBrokersReader {
Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL))));
_curator.start();
} catch (Exception ex) {
- LOG.error("can't connect to zookeeper");
+ LOG.error("Couldn't connect to zookeeper", ex);
}
}
/**
* Get all partitions with their current leaders
*/
- public GlobalPartitionInformation getBrokerInfo() {
- GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation();
+ public GlobalPartitionInformation getBrokerInfo() throws SocketTimeoutException {
+ GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation();
try {
int numPartitionsForTopic = getNumPartitions();
String brokerInfoPath = brokerPath();
@@ -75,6 +75,8 @@ public class DynamicBrokersReader {
LOG.error("Node {} does not exist ", path);
}
}
+ } catch (SocketTimeoutException e) {
+ throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/5b025832/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java
index 875629b..9c156e8 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java
@@ -30,10 +30,12 @@ public class KafkaConfig implements Serializable {
public int fetchSizeBytes = 1024 * 1024;
public int socketTimeoutMs = 10000;
+ public int fetchMaxWait = 10000;
public int bufferSizeBytes = 1024 * 1024;
public MultiScheme scheme = new RawMultiScheme();
public boolean forceFromStart = false;
public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
+ public long maxOffsetBehind = 100000;
public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
public int metricsTimeBucketSizeInSecs = 60;
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/5b025832/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java
index 102dce1..f3bc3ea 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java
@@ -135,14 +135,19 @@ public class KafkaSpout extends BaseRichSpout {
List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
for (int i = 0; i < managers.size(); i++) {
- // in case the number of managers decreased
- _currPartitionIndex = _currPartitionIndex % managers.size();
- EmitState state = managers.get(_currPartitionIndex).next(_collector);
- if (state != EmitState.EMITTED_MORE_LEFT) {
- _currPartitionIndex = (_currPartitionIndex + 1) % managers.size();
- }
- if (state != EmitState.NO_EMITTED) {
- break;
+ try {
+ // in case the number of managers decreased
+ _currPartitionIndex = _currPartitionIndex % managers.size();
+ EmitState state = managers.get(_currPartitionIndex).next(_collector);
+ if (state != EmitState.EMITTED_MORE_LEFT) {
+ _currPartitionIndex = (_currPartitionIndex + 1) % managers.size();
+ }
+ if (state != EmitState.NO_EMITTED) {
+ break;
+ }
+ } catch (FailedFetchException e) {
+ LOG.warn("Fetch failed", e);
+ _coordinator.refresh();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/5b025832/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
index 3d355e5..25d3709 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
@@ -36,8 +36,11 @@ import storm.kafka.trident.IBrokerReader;
import storm.kafka.trident.StaticBrokerReader;
import storm.kafka.trident.ZkBrokerReader;
+import java.io.IOException;
import java.net.ConnectException;
+import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
+import java.nio.channels.UnresolvedAddressException;
import java.util.*;
@@ -110,9 +113,9 @@ public class KafkaUtils {
LOG.warn("partitionToOffset contains partition not found in _connections. Stale partition data?");
return null;
}
- long earliestTimeOffset = getOffset(consumer, _topic, partition.partition, kafka.api.OffsetRequest.EarliestTime());
long latestTimeOffset = getOffset(consumer, _topic, partition.partition, kafka.api.OffsetRequest.LatestTime());
- if (earliestTimeOffset == 0 || latestTimeOffset == 0) {
+ long earliestTimeOffset = getOffset(consumer, _topic, partition.partition, kafka.api.OffsetRequest.EarliestTime());
+ if (latestTimeOffset == 0 || earliestTimeOffset == 0) {
LOG.warn("No data found in Kafka Partition " + partition.getId());
return null;
}
@@ -159,12 +162,17 @@ public class KafkaUtils {
for (int errors = 0; errors < 2 && msgs == null; errors++) {
FetchRequestBuilder builder = new FetchRequestBuilder();
FetchRequest fetchRequest = builder.addFetch(topic, partitionId, offset, config.fetchSizeBytes).
- clientId(config.clientId).build();
+ clientId(config.clientId).maxWait(config.fetchMaxWait).build();
FetchResponse fetchResponse;
try {
fetchResponse = consumer.fetch(fetchRequest);
} catch (Exception e) {
- if (e instanceof ConnectException) {
+ if (e instanceof ConnectException ||
+ e instanceof SocketTimeoutException ||
+ e instanceof IOException ||
+ e instanceof UnresolvedAddressException
+ ) {
+ LOG.warn("Network error when fetching messages:", e);
throw new FailedFetchException(e);
} else {
throw new RuntimeException(e);
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/5b025832/external/storm-kafka/src/jvm/storm/kafka/KeyValueSchemeAsMultiScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KeyValueSchemeAsMultiScheme.java b/external/storm-kafka/src/jvm/storm/kafka/KeyValueSchemeAsMultiScheme.java
index a570e7d..7def6ac 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KeyValueSchemeAsMultiScheme.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KeyValueSchemeAsMultiScheme.java
@@ -18,6 +18,7 @@
package storm.kafka;
import backtype.storm.spout.SchemeAsMultiScheme;
+
import java.util.Arrays;
import java.util.List;
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/5b025832/external/storm-kafka/src/jvm/storm/kafka/PartitionCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionCoordinator.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionCoordinator.java
index 60cc237..9cfed60 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/PartitionCoordinator.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionCoordinator.java
@@ -23,4 +23,6 @@ public interface PartitionCoordinator {
List<PartitionManager> getMyManagedPartitions();
PartitionManager getManager(Partition partition);
+
+ void refresh();
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/5b025832/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
index 9504427..d868e50 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
@@ -23,8 +23,8 @@ import backtype.storm.metric.api.CountMetric;
import backtype.storm.metric.api.MeanReducer;
import backtype.storm.metric.api.ReducedMetric;
import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.utils.Utils;
import com.google.common.collect.ImmutableMap;
-import kafka.api.OffsetRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
@@ -42,19 +42,9 @@ public class PartitionManager {
private final ReducedMetric _fetchAPILatencyMean;
private final CountMetric _fetchAPICallCount;
private final CountMetric _fetchAPIMessageCount;
-
- static class KafkaMessageId {
- public Partition partition;
- public long offset;
-
- public KafkaMessageId(Partition partition, long offset) {
- this.partition = partition;
- this.offset = offset;
- }
- }
-
Long _emittedToOffset;
SortedSet<Long> _pending = new TreeSet<Long>();
+ SortedSet<Long> failed = new TreeSet<Long>();
Long _committedTo;
LinkedList<MessageAndRealOffset> _waitingToEmit = new LinkedList<MessageAndRealOffset>();
Partition _partition;
@@ -64,8 +54,7 @@ public class PartitionManager {
DynamicPartitionConnections _connections;
ZkState _state;
Map _stormConf;
-
-
+ long numberFailed, numberAcked;
public PartitionManager(DynamicPartitionConnections connections, String topologyInstanceId, ZkState state, Map stormConf, SpoutConfig spoutConfig, Partition id) {
_partition = id;
_connections = connections;
@@ -74,13 +63,14 @@ public class PartitionManager {
_consumer = connections.register(id.host, id.partition);
_state = state;
_stormConf = stormConf;
+ numberAcked = numberFailed = 0;
String jsonTopologyId = null;
Long jsonOffset = null;
String path = committedPath();
try {
Map<Object, Object> json = _state.readJSON(path);
- LOG.info("Read partition information from: " + path + " --> " + json );
+ LOG.info("Read partition information from: " + path + " --> " + json );
if (json != null) {
jsonTopologyId = (String) ((Map<Object, Object>) json.get("topology")).get("id");
jsonOffset = (Long) json.get("offset");
@@ -89,8 +79,10 @@ public class PartitionManager {
LOG.warn("Error reading and/or parsing at ZkNode: " + path, e);
}
+ Long currentOffset = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig);
+
if (jsonTopologyId == null || jsonOffset == null) { // failed to parse JSON?
- _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig);
+ _committedTo = currentOffset;
LOG.info("No partition information found, using configuration to determine offset");
} else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.forceFromStart) {
_committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig.startOffsetTime);
@@ -100,7 +92,14 @@ public class PartitionManager {
LOG.info("Read last commit offset from zookeeper: " + _committedTo + "; old topology_id: " + jsonTopologyId + " - new topology_id: " + topologyInstanceId );
}
- LOG.info("Starting " + _partition + " from offset " + _committedTo);
+ if (currentOffset - _committedTo > spoutConfig.maxOffsetBehind || _committedTo <= 0) {
+ LOG.info("Last commit offset from zookeeper: " + _committedTo);
+ _committedTo = currentOffset;
+ LOG.info("Commit offset " + _committedTo + " is more than " +
+ spoutConfig.maxOffsetBehind + " behind, resetting to startOffsetTime=" + spoutConfig.startOffsetTime);
+ }
+
+ LOG.info("Starting Kafka " + _consumer.host() + ":" + id.partition + " from offset " + _committedTo);
_emittedToOffset = _committedTo;
_fetchAPILatencyMax = new CombinedMetric(new MaxMetric());
@@ -147,54 +146,77 @@ public class PartitionManager {
private void fill() {
long start = System.nanoTime();
- ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, _emittedToOffset);
+ long offset;
+ final boolean had_failed = !failed.isEmpty();
+
+ // Are there failed tuples? If so, fetch those first.
+ if (had_failed) {
+ offset = failed.first();
+ } else {
+ offset = _emittedToOffset;
+ }
+
+ ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset);
long end = System.nanoTime();
long millis = (end - start) / 1000000;
_fetchAPILatencyMax.update(millis);
_fetchAPILatencyMean.update(millis);
_fetchAPICallCount.incr();
- int numMessages = countMessages(msgs);
- _fetchAPIMessageCount.incrBy(numMessages);
+ if (msgs != null) {
+ int numMessages = 0;
- if (numMessages > 0) {
- LOG.info("Fetched " + numMessages + " messages from: " + _partition);
- }
- for (MessageAndOffset msg : msgs) {
- _pending.add(_emittedToOffset);
- _waitingToEmit.add(new MessageAndRealOffset(msg.message(), _emittedToOffset));
- _emittedToOffset = msg.nextOffset();
- }
- if (numMessages > 0) {
- LOG.info("Added " + numMessages + " messages from: " + _partition + " to internal buffers");
- }
- }
-
- private int countMessages(ByteBufferMessageSet messageSet) {
- int counter = 0;
- for (MessageAndOffset messageAndOffset : messageSet) {
- counter = counter + 1;
+ for (MessageAndOffset msg : msgs) {
+ final Long cur_offset = msg.offset();
+ if (cur_offset < offset) {
+ // Skip any old offsets.
+ continue;
+ }
+ if (!had_failed || failed.contains(cur_offset)) {
+ numMessages += 1;
+ _pending.add(cur_offset);
+ _waitingToEmit.add(new MessageAndRealOffset(msg.message(), cur_offset));
+ _emittedToOffset = Math.max(msg.nextOffset(), _emittedToOffset);
+ if (had_failed) {
+ failed.remove(cur_offset);
+ }
+ }
+ }
+ _fetchAPIMessageCount.incrBy(numMessages);
}
- return counter;
}
public void ack(Long offset) {
- _pending.remove(offset);
+ if (!_pending.isEmpty() && _pending.first() < offset - _spoutConfig.maxOffsetBehind) {
+ // Too many things pending!
+ _pending.headSet(offset).clear();
+ } else {
+ _pending.remove(offset);
+ }
+ numberAcked++;
}
public void fail(Long offset) {
- //TODO: should it use in-memory ack set to skip anything that's been acked but not committed???
- // things might get crazy with lots of timeouts
- if (_emittedToOffset > offset) {
- _emittedToOffset = offset;
- _pending.tailSet(offset).clear();
+ if (offset < _emittedToOffset - _spoutConfig.maxOffsetBehind) {
+ LOG.info(
+ "Skipping failed tuple at offset=" + offset +
+ " because it's more than maxOffsetBehind=" + _spoutConfig.maxOffsetBehind +
+ " behind _emittedToOffset=" + _emittedToOffset
+ );
+ } else {
+ LOG.debug("failing at offset=" + offset + " with _pending.size()=" + _pending.size() + " pending and _emittedToOffset=" + _emittedToOffset);
+ failed.add(offset);
+ numberFailed++;
+ if (numberAcked == 0 && numberFailed > _spoutConfig.maxOffsetBehind) {
+ throw new RuntimeException("Too many tuple failures");
+ }
}
}
public void commit() {
long lastCompletedOffset = lastCompletedOffset();
- if (lastCompletedOffset != lastCommittedOffset()) {
- LOG.info("Writing last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
- Map<Object, Object> data = ImmutableMap.builder()
+ if (lastCompletedOffset != lastCompletedOffset) {
+ LOG.debug("Writing last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
+ Map<Object, Object> data = (Map<Object, Object>) ImmutableMap.builder()
.put("topology", ImmutableMap.of("id", _topologyInstanceId,
"name", _stormConf.get(Config.TOPOLOGY_NAME)))
.put("offset", lastCompletedOffset)
@@ -203,24 +225,16 @@ public class PartitionManager {
"port", _partition.host.port))
.put("topic", _spoutConfig.topic).build();
_state.writeJSON(committedPath(), data);
+
_committedTo = lastCompletedOffset;
- LOG.info("Wrote last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
+ LOG.debug("Wrote last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
} else {
- LOG.info("No new offset for " + _partition + " for topology: " + _topologyInstanceId);
+ LOG.debug("No new offset for " + _partition + " for topology: " + _topologyInstanceId);
}
}
private String committedPath() {
- return _spoutConfig.zkRoot + "/" + _spoutConfig.id + "/" + _spoutConfig.topic + "/" + _partition.getId();
- }
-
- public long queryPartitionOffsetLatestTime() {
- return KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition,
- OffsetRequest.LatestTime());
- }
-
- public long lastCommittedOffset() {
- return _committedTo;
+ return _spoutConfig.zkRoot + "/" + _spoutConfig.id + "/" + _partition.getId();
}
public long lastCompletedOffset() {
@@ -238,4 +252,14 @@ public class PartitionManager {
public void close() {
_connections.unregister(_partition.host, _partition.partition);
}
+
+ static class KafkaMessageId {
+ public Partition partition;
+ public long offset;
+
+ public KafkaMessageId(Partition partition, long offset) {
+ this.partition = partition;
+ this.offset = offset;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/5b025832/external/storm-kafka/src/jvm/storm/kafka/StaticCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/StaticCoordinator.java b/external/storm-kafka/src/jvm/storm/kafka/StaticCoordinator.java
index 456a2a1..de4d0fd 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/StaticCoordinator.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/StaticCoordinator.java
@@ -45,4 +45,7 @@ public class StaticCoordinator implements PartitionCoordinator {
return _managers.get(partition);
}
+ @Override
+ public void refresh() { return; }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/5b025832/external/storm-kafka/src/jvm/storm/kafka/StringScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/StringScheme.java b/external/storm-kafka/src/jvm/storm/kafka/StringScheme.java
index 102ea69..286dc9b 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/StringScheme.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/StringScheme.java
@@ -43,4 +43,4 @@ public class StringScheme implements Scheme {
public Fields getOutputFields() {
return new Fields(STRING_SCHEME_KEY);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/5b025832/external/storm-kafka/src/jvm/storm/kafka/ZkCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/ZkCoordinator.java b/external/storm-kafka/src/jvm/storm/kafka/ZkCoordinator.java
index e414d06..0a8fe36 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/ZkCoordinator.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/ZkCoordinator.java
@@ -72,7 +72,8 @@ public class ZkCoordinator implements PartitionCoordinator {
return _cachedList;
}
- void refresh() {
+ @Override
+ public void refresh() {
try {
LOG.info(taskId(_taskIndex, _totalTasks) + "Refreshing partition manager connections");
GlobalPartitionInformation brokerInfo = _reader.getBrokerInfo();
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/5b025832/external/storm-kafka/src/jvm/storm/kafka/ZkHosts.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/ZkHosts.java b/external/storm-kafka/src/jvm/storm/kafka/ZkHosts.java
index d9acc66..4e4327d 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/ZkHosts.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/ZkHosts.java
@@ -33,4 +33,4 @@ public class ZkHosts implements BrokerHosts {
public ZkHosts(String brokerZkStr) {
this(brokerZkStr, DEFAULT_ZK_PATH);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/5b025832/external/storm-kafka/src/jvm/storm/kafka/ZkState.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/ZkState.java b/external/storm-kafka/src/jvm/storm/kafka/ZkState.java
index 52585ef..a00f060 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/ZkState.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/ZkState.java
@@ -66,7 +66,7 @@ public class ZkState {
}
public void writeJSON(String path, Map<Object, Object> data) {
- LOG.info("Writing " + path + " the data " + data.toString());
+ LOG.debug("Writing " + path + " the data " + data.toString());
writeBytes(path, JSONValue.toJSONString(data).getBytes(Charset.forName("UTF-8")));
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/5b025832/external/storm-kafka/src/jvm/storm/kafka/trident/GlobalPartitionInformation.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/GlobalPartitionInformation.java b/external/storm-kafka/src/jvm/storm/kafka/trident/GlobalPartitionInformation.java
index ae136e5..76bec9e 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/GlobalPartitionInformation.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/GlobalPartitionInformation.java
@@ -17,14 +17,13 @@
*/
package storm.kafka.trident;
+import com.google.common.base.Objects;
import storm.kafka.Broker;
import storm.kafka.Partition;
import java.io.Serializable;
import java.util.*;
-import com.google.common.base.Objects;
-
public class GlobalPartitionInformation implements Iterable<Partition>, Serializable {
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/5b025832/external/storm-kafka/src/jvm/storm/kafka/trident/TransactionalTridentKafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TransactionalTridentKafkaSpout.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TransactionalTridentKafkaSpout.java
index 7a44bdd..8c10bed 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TransactionalTridentKafkaSpout.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TransactionalTridentKafkaSpout.java
@@ -55,4 +55,4 @@ public class TransactionalTridentKafkaSpout implements IPartitionedTridentSpout<
public Map<String, Object> getComponentConfiguration() {
return null;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/5b025832/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
index 8c57681..94bf134 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
@@ -22,7 +22,6 @@ import backtype.storm.metric.api.CombinedMetric;
import backtype.storm.metric.api.MeanReducer;
import backtype.storm.metric.api.ReducedMetric;
import backtype.storm.task.TopologyContext;
-import backtype.storm.utils.Utils;
import com.google.common.collect.ImmutableMap;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/5b025832/external/storm-kafka/src/jvm/storm/kafka/trident/ZkBrokerReader.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/ZkBrokerReader.java b/external/storm-kafka/src/jvm/storm/kafka/trident/ZkBrokerReader.java
index db11328..1a26705 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/ZkBrokerReader.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/ZkBrokerReader.java
@@ -27,36 +27,44 @@ import java.util.Map;
public class ZkBrokerReader implements IBrokerReader {
- public static final Logger LOG = LoggerFactory.getLogger(ZkBrokerReader.class);
-
- GlobalPartitionInformation cachedBrokers;
- DynamicBrokersReader reader;
- long lastRefreshTimeMs;
-
-
- long refreshMillis;
-
- public ZkBrokerReader(Map conf, String topic, ZkHosts hosts) {
- reader = new DynamicBrokersReader(conf, hosts.brokerZkStr, hosts.brokerZkPath, topic);
- cachedBrokers = reader.getBrokerInfo();
- lastRefreshTimeMs = System.currentTimeMillis();
- refreshMillis = hosts.refreshFreqSecs * 1000L;
-
- }
-
- @Override
- public GlobalPartitionInformation getCurrentBrokers() {
- long currTime = System.currentTimeMillis();
- if (currTime > lastRefreshTimeMs + refreshMillis) {
- LOG.info("brokers need refreshing because " + refreshMillis + "ms have expired");
- cachedBrokers = reader.getBrokerInfo();
- lastRefreshTimeMs = currTime;
- }
- return cachedBrokers;
- }
-
- @Override
- public void close() {
- reader.close();
- }
+ public static final Logger LOG = LoggerFactory.getLogger(ZkBrokerReader.class);
+
+ GlobalPartitionInformation cachedBrokers;
+ DynamicBrokersReader reader;
+ long lastRefreshTimeMs;
+
+
+ long refreshMillis;
+
+ public ZkBrokerReader(Map conf, String topic, ZkHosts hosts) {
+ try {
+ reader = new DynamicBrokersReader(conf, hosts.brokerZkStr, hosts.brokerZkPath, topic);
+ cachedBrokers = reader.getBrokerInfo();
+ lastRefreshTimeMs = System.currentTimeMillis();
+ refreshMillis = hosts.refreshFreqSecs * 1000L;
+ } catch (java.net.SocketTimeoutException e) {
+ LOG.warn("Failed to update brokers", e);
+ }
+
+ }
+
+ @Override
+ public GlobalPartitionInformation getCurrentBrokers() {
+ long currTime = System.currentTimeMillis();
+ if (currTime > lastRefreshTimeMs + refreshMillis) {
+ try {
+ LOG.info("brokers need refreshing because " + refreshMillis + "ms have expired");
+ cachedBrokers = reader.getBrokerInfo();
+ lastRefreshTimeMs = currTime;
+ } catch (java.net.SocketTimeoutException e) {
+ LOG.warn("Failed to update brokers", e);
+ }
+ }
+ return cachedBrokers;
+ }
+
+ @Override
+ public void close() {
+ reader.close();
+ }
}
[2/3] git commit: Merge branch 'master' of
github.com:brndnmtthws/incubator-storm
Posted by pt...@apache.org.
Merge branch 'master' of github.com:brndnmtthws/incubator-storm
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/512e34ee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/512e34ee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/512e34ee
Branch: refs/heads/master
Commit: 512e34eee399a57cffe9c38f68229d604c9f5db9
Parents: fe7ac29 5b02583
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Tue May 20 13:58:18 2014 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Tue May 20 13:58:18 2014 -0400
----------------------------------------------------------------------
.../storm-kafka/src/jvm/storm/kafka/Broker.java | 3 +-
.../src/jvm/storm/kafka/BrokerHosts.java | 2 +-
.../jvm/storm/kafka/DynamicBrokersReader.java | 10 +-
.../src/jvm/storm/kafka/KafkaConfig.java | 2 +
.../src/jvm/storm/kafka/KafkaSpout.java | 21 +--
.../src/jvm/storm/kafka/KafkaUtils.java | 16 ++-
.../kafka/KeyValueSchemeAsMultiScheme.java | 1 +
.../jvm/storm/kafka/PartitionCoordinator.java | 2 +
.../src/jvm/storm/kafka/PartitionManager.java | 142 +++++++++++--------
.../src/jvm/storm/kafka/StaticCoordinator.java | 3 +
.../src/jvm/storm/kafka/StringScheme.java | 2 +-
.../src/jvm/storm/kafka/ZkCoordinator.java | 3 +-
.../src/jvm/storm/kafka/ZkHosts.java | 2 +-
.../src/jvm/storm/kafka/ZkState.java | 2 +-
.../trident/GlobalPartitionInformation.java | 3 +-
.../trident/TransactionalTridentKafkaSpout.java | 2 +-
.../kafka/trident/TridentKafkaEmitter.java | 1 -
.../jvm/storm/kafka/trident/ZkBrokerReader.java | 72 +++++-----
18 files changed, 172 insertions(+), 117 deletions(-)
----------------------------------------------------------------------
[3/3] git commit: STORM-303: update changelog
Posted by pt...@apache.org.
STORM-303: update changelog
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/4ee9ea7a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/4ee9ea7a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/4ee9ea7a
Branch: refs/heads/master
Commit: 4ee9ea7a8b391380c185a921bcf562b95461f44a
Parents: 512e34e
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Tue May 20 14:58:30 2014 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Tue May 20 14:58:30 2014 -0400
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/4ee9ea7a/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index c6d0816..2281408 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 0.9.2-incubating (unreleased)
+ * STORM-303: storm-kafka reliability improvements
* STORM-233: Removed inline heartbeat to nimbus to avoid workers being killed when under heavy ZK load
* STORM-267: fix package name of LoggingMetricsConsumer in storm.yaml.example
* STORM-265: upgrade to clojure 1.5.1