You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/11 21:57:34 UTC
[38/53] [abbrv] [partial] storm git commit: STORM-1202: Migrate APIs
to org.apache.storm, but try to provide some form of backwards compatability
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
new file mode 100644
index 0000000..6eddaf5
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
@@ -0,0 +1,287 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.metric.api.CombinedMetric;
+import org.apache.storm.metric.api.MeanReducer;
+import org.apache.storm.metric.api.ReducedMetric;
+import org.apache.storm.task.TopologyContext;
+import com.google.common.collect.ImmutableMap;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.message.Message;
+import kafka.message.MessageAndOffset;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.*;
+import org.apache.storm.kafka.TopicOffsetOutOfRangeException;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
+import org.apache.storm.trident.spout.IPartitionedTridentSpout;
+import org.apache.storm.trident.topology.TransactionAttempt;
+
+import java.util.*;
+
+public class TridentKafkaEmitter {
+
+ public static final Logger LOG = LoggerFactory.getLogger(TridentKafkaEmitter.class);
+
+ private DynamicPartitionConnections _connections;
+ private String _topologyName;
+ private KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric;
+ private ReducedMetric _kafkaMeanFetchLatencyMetric;
+ private CombinedMetric _kafkaMaxFetchLatencyMetric;
+ private TridentKafkaConfig _config;
+ private String _topologyInstanceId;
+
+ public TridentKafkaEmitter(Map conf, TopologyContext context, TridentKafkaConfig config, String topologyInstanceId) {
+ _config = config;
+ _topologyInstanceId = topologyInstanceId;
+ _connections = new DynamicPartitionConnections(_config, KafkaUtils.makeBrokerReader(conf, _config));
+ _topologyName = (String) conf.get(Config.TOPOLOGY_NAME);
+ _kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(_connections);
+ context.registerMetric("kafkaOffset", _kafkaOffsetMetric, _config.metricsTimeBucketSizeInSecs);
+ _kafkaMeanFetchLatencyMetric = context.registerMetric("kafkaFetchAvg", new MeanReducer(), _config.metricsTimeBucketSizeInSecs);
+ _kafkaMaxFetchLatencyMetric = context.registerMetric("kafkaFetchMax", new MaxMetric(), _config.metricsTimeBucketSizeInSecs);
+ }
+
+
+ private Map failFastEmitNewPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map lastMeta) {
+ SimpleConsumer consumer = _connections.register(partition);
+ Map ret = doEmitNewPartitionBatch(consumer, partition, collector, lastMeta);
+ _kafkaOffsetMetric.setLatestEmittedOffset(partition, (Long) ret.get("offset"));
+ return ret;
+ }
+
+ private Map emitNewPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map lastMeta) {
+ try {
+ return failFastEmitNewPartitionBatch(attempt, collector, partition, lastMeta);
+ } catch (FailedFetchException e) {
+ LOG.warn("Failed to fetch from partition " + partition);
+ if (lastMeta == null) {
+ return null;
+ } else {
+ Map ret = new HashMap();
+ ret.put("offset", lastMeta.get("nextOffset"));
+ ret.put("nextOffset", lastMeta.get("nextOffset"));
+ ret.put("partition", partition.partition);
+ ret.put("broker", ImmutableMap.of("host", partition.host.host, "port", partition.host.port));
+ ret.put("topic", partition.topic);
+ ret.put("topology", ImmutableMap.of("name", _topologyName, "id", _topologyInstanceId));
+ return ret;
+ }
+ }
+ }
+
+ private Map doEmitNewPartitionBatch(SimpleConsumer consumer, Partition partition, TridentCollector collector, Map lastMeta) {
+ long offset;
+ if (lastMeta != null) {
+ String lastInstanceId = null;
+ Map lastTopoMeta = (Map) lastMeta.get("topology");
+ if (lastTopoMeta != null) {
+ lastInstanceId = (String) lastTopoMeta.get("id");
+ }
+ if (_config.ignoreZkOffsets && !_topologyInstanceId.equals(lastInstanceId)) {
+ offset = KafkaUtils.getOffset(consumer, partition.topic, partition.partition, _config.startOffsetTime);
+ } else {
+ offset = (Long) lastMeta.get("nextOffset");
+ }
+ } else {
+ offset = KafkaUtils.getOffset(consumer, partition.topic, partition.partition, _config);
+ }
+
+ ByteBufferMessageSet msgs = null;
+ try {
+ msgs = fetchMessages(consumer, partition, offset);
+ } catch (TopicOffsetOutOfRangeException e) {
+ long newOffset = KafkaUtils.getOffset(consumer, partition.topic, partition.partition, kafka.api.OffsetRequest.EarliestTime());
+ LOG.warn("OffsetOutOfRange: Updating offset from offset = " + offset + " to offset = " + newOffset);
+ offset = newOffset;
+ msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);
+ }
+
+ long endoffset = offset;
+ for (MessageAndOffset msg : msgs) {
+ emit(collector, msg.message(), partition, msg.offset());
+ endoffset = msg.nextOffset();
+ }
+ Map newMeta = new HashMap();
+ newMeta.put("offset", offset);
+ newMeta.put("nextOffset", endoffset);
+ newMeta.put("instanceId", _topologyInstanceId);
+ newMeta.put("partition", partition.partition);
+ newMeta.put("broker", ImmutableMap.of("host", partition.host.host, "port", partition.host.port));
+ newMeta.put("topic", partition.topic);
+ newMeta.put("topology", ImmutableMap.of("name", _topologyName, "id", _topologyInstanceId));
+ return newMeta;
+ }
+
+ private ByteBufferMessageSet fetchMessages(SimpleConsumer consumer, Partition partition, long offset) {
+ long start = System.nanoTime();
+ ByteBufferMessageSet msgs = null;
+ msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);
+ long end = System.nanoTime();
+ long millis = (end - start) / 1000000;
+ _kafkaMeanFetchLatencyMetric.update(millis);
+ _kafkaMaxFetchLatencyMetric.update(millis);
+ return msgs;
+ }
+
+ /**
+ * re-emit the batch described by the meta data provided
+ *
+ * @param attempt
+ * @param collector
+ * @param partition
+ * @param meta
+ */
+ private void reEmitPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map meta) {
+ LOG.info("re-emitting batch, attempt " + attempt);
+ String instanceId = (String) meta.get("instanceId");
+ if (!_config.ignoreZkOffsets || instanceId.equals(_topologyInstanceId)) {
+ SimpleConsumer consumer = _connections.register(partition);
+ long offset = (Long) meta.get("offset");
+ long nextOffset = (Long) meta.get("nextOffset");
+ ByteBufferMessageSet msgs = null;
+ msgs = fetchMessages(consumer, partition, offset);
+
+ if(msgs != null) {
+ for (MessageAndOffset msg : msgs) {
+ if (offset == nextOffset) {
+ break;
+ }
+ if (offset > nextOffset) {
+ throw new RuntimeException("Error when re-emitting batch. overshot the end offset");
+ }
+ emit(collector, msg.message(), partition, msg.offset());
+ offset = msg.nextOffset();
+ }
+ }
+ }
+ }
+
+ private void emit(TridentCollector collector, Message msg, Partition partition, long offset) {
+ Iterable<List<Object>> values;
+ if (_config.scheme instanceof MessageMetadataSchemeAsMultiScheme) {
+ values = KafkaUtils.generateTuples((MessageMetadataSchemeAsMultiScheme) _config.scheme, msg, partition, offset);
+ } else {
+ values = KafkaUtils.generateTuples(_config, msg, partition.topic);
+ }
+
+ if (values != null) {
+ for (List<Object> value : values) {
+ collector.emit(value);
+ }
+ }
+ }
+
+ private void clear() {
+ _connections.clear();
+ }
+
+ private List<Partition> orderPartitions(List<GlobalPartitionInformation> partitions) {
+ List<Partition> part = new ArrayList<Partition>();
+ for (GlobalPartitionInformation globalPartitionInformation : partitions)
+ part.addAll(globalPartitionInformation.getOrderedPartitions());
+ return part;
+ }
+
+ private void refresh(List<Partition> list) {
+ _connections.clear();
+ _kafkaOffsetMetric.refreshPartitions(new HashSet<Partition>(list));
+ }
+
+
+ public IOpaquePartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map> asOpaqueEmitter() {
+
+ return new IOpaquePartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map>() {
+
+ /**
+ * Emit a batch of tuples for a partition/transaction.
+ *
+ * Return the metadata describing this batch that will be used as lastPartitionMeta
+ * for defining the parameters of the next batch.
+ */
+ @Override
+ public Map emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) {
+ return emitNewPartitionBatch(transactionAttempt, tridentCollector, partition, map);
+ }
+
+ @Override
+ public void refreshPartitions(List<Partition> partitions) {
+ refresh(partitions);
+ }
+
+ @Override
+ public List<Partition> getOrderedPartitions(List<GlobalPartitionInformation> partitionInformation) {
+ return orderPartitions(partitionInformation);
+ }
+
+ @Override
+ public void close() {
+ clear();
+ }
+ };
+ }
+
+ public IPartitionedTridentSpout.Emitter asTransactionalEmitter() {
+ return new IPartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map>() {
+
+ /**
+ * Emit a batch of tuples for a partition/transaction that's never been emitted before.
+ * Return the metadata that can be used to reconstruct this partition/batch in the future.
+ */
+ @Override
+ public Map emitPartitionBatchNew(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) {
+ return failFastEmitNewPartitionBatch(transactionAttempt, tridentCollector, partition, map);
+ }
+
+ /**
+ * Emit a batch of tuples for a partition/transaction that has been emitted before, using
+ * the metadata created when it was first emitted.
+ */
+ @Override
+ public void emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) {
+ reEmitPartitionBatch(transactionAttempt, tridentCollector, partition, map);
+ }
+
+ /**
+ * This method is called when this task is responsible for a new set of partitions. Should be used
+ * to manage things like connections to brokers.
+ */
+ @Override
+ public void refreshPartitions(List<Partition> partitions) {
+ refresh(partitions);
+ }
+
+ @Override
+ public List<Partition> getOrderedPartitions(List<GlobalPartitionInformation> partitionInformation) {
+ return orderPartitions(partitionInformation);
+ }
+
+ @Override
+ public void close() {
+ clear();
+ }
+ };
+
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
new file mode 100644
index 0000000..5741dc7
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.topology.FailedException;
+import org.apache.commons.lang.Validate;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
+import org.apache.storm.kafka.trident.selector.KafkaTopicSelector;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+public class TridentKafkaState implements State {
+ private static final Logger LOG = LoggerFactory.getLogger(TridentKafkaState.class);
+
+ private KafkaProducer producer;
+ private OutputCollector collector;
+
+ private TridentTupleToKafkaMapper mapper;
+ private KafkaTopicSelector topicSelector;
+
+ public TridentKafkaState withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper mapper) {
+ this.mapper = mapper;
+ return this;
+ }
+
+ public TridentKafkaState withKafkaTopicSelector(KafkaTopicSelector selector) {
+ this.topicSelector = selector;
+ return this;
+ }
+
+ @Override
+ public void beginCommit(Long txid) {
+ LOG.debug("beginCommit is Noop.");
+ }
+
+ @Override
+ public void commit(Long txid) {
+ LOG.debug("commit is Noop.");
+ }
+
+ public void prepare(Properties options) {
+ Validate.notNull(mapper, "mapper can not be null");
+ Validate.notNull(topicSelector, "topicSelector can not be null");
+ producer = new KafkaProducer(options);
+ }
+
+ public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
+ String topic = null;
+ for (TridentTuple tuple : tuples) {
+ try {
+ topic = topicSelector.getTopic(tuple);
+
+ if(topic != null) {
+ Future<RecordMetadata> result = producer.send(new ProducerRecord(topic,
+ mapper.getKeyFromTuple(tuple), mapper.getMessageFromTuple(tuple)));
+ try {
+ result.get();
+ } catch (ExecutionException e) {
+ String errorMsg = "Could not retrieve result for message with key = "
+ + mapper.getKeyFromTuple(tuple) + " from topic = " + topic;
+ LOG.error(errorMsg, e);
+ throw new FailedException(errorMsg, e);
+ }
+ } else {
+ LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) + ", topic selector returned null.");
+ }
+ } catch (Exception ex) {
+ String errorMsg = "Could not send message with key = " + mapper.getKeyFromTuple(tuple)
+ + " to topic = " + topic;
+ LOG.warn(errorMsg, ex);
+ throw new FailedException(errorMsg, ex);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java
new file mode 100644
index 0000000..f564510
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident;
+
+import org.apache.storm.task.IMetricsContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
+import org.apache.storm.kafka.trident.selector.KafkaTopicSelector;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.state.StateFactory;
+
+import java.util.Map;
+import java.util.Properties;
+
+public class TridentKafkaStateFactory implements StateFactory {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TridentKafkaStateFactory.class);
+
+ private TridentTupleToKafkaMapper mapper;
+ private KafkaTopicSelector topicSelector;
+ private Properties producerProperties = new Properties();
+
+ public TridentKafkaStateFactory withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper mapper) {
+ this.mapper = mapper;
+ return this;
+ }
+
+ public TridentKafkaStateFactory withKafkaTopicSelector(KafkaTopicSelector selector) {
+ this.topicSelector = selector;
+ return this;
+ }
+
+ public TridentKafkaStateFactory withProducerProperties(Properties props) {
+ this.producerProperties = props;
+ return this;
+ }
+
+ @Override
+ public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
+ LOG.info("makeState(partitonIndex={}, numpartitions={}", partitionIndex, numPartitions);
+ TridentKafkaState state = new TridentKafkaState()
+ .withKafkaTopicSelector(this.topicSelector)
+ .withTridentTupleToKafkaMapper(this.mapper);
+ state.prepare(producerProperties);
+ return state;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaUpdater.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaUpdater.java
new file mode 100644
index 0000000..7a905ab
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaUpdater.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident;
+
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.BaseStateUpdater;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.util.List;
+
+public class TridentKafkaUpdater extends BaseStateUpdater<TridentKafkaState> {
+ @Override
+ public void updateState(TridentKafkaState state, List<TridentTuple> tuples, TridentCollector collector) {
+ state.updateState(tuples, collector);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/ZkBrokerReader.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/ZkBrokerReader.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/ZkBrokerReader.java
new file mode 100644
index 0000000..d26c341
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/ZkBrokerReader.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.DynamicBrokersReader;
+import org.apache.storm.kafka.ZkHosts;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class ZkBrokerReader implements IBrokerReader {
+
+ public static final Logger LOG = LoggerFactory.getLogger(ZkBrokerReader.class);
+
+ List<GlobalPartitionInformation> cachedBrokers = new ArrayList<GlobalPartitionInformation>();
+ DynamicBrokersReader reader;
+ long lastRefreshTimeMs;
+
+
+ long refreshMillis;
+
+ public ZkBrokerReader(Map conf, String topic, ZkHosts hosts) {
+ try {
+ reader = new DynamicBrokersReader(conf, hosts.brokerZkStr, hosts.brokerZkPath, topic);
+ cachedBrokers = reader.getBrokerInfo();
+ lastRefreshTimeMs = System.currentTimeMillis();
+ refreshMillis = hosts.refreshFreqSecs * 1000L;
+ } catch (java.net.SocketTimeoutException e) {
+ LOG.warn("Failed to update brokers", e);
+ }
+
+ }
+
+ private void refresh() {
+ long currTime = System.currentTimeMillis();
+ if (currTime > lastRefreshTimeMs + refreshMillis) {
+ try {
+ LOG.info("brokers need refreshing because " + refreshMillis + "ms have expired");
+ cachedBrokers = reader.getBrokerInfo();
+ lastRefreshTimeMs = currTime;
+ } catch (java.net.SocketTimeoutException e) {
+ LOG.warn("Failed to update brokers", e);
+ }
+ }
+ }
+ @Override
+ public GlobalPartitionInformation getBrokerForTopic(String topic) {
+ refresh();
+ for(GlobalPartitionInformation partitionInformation : cachedBrokers) {
+ if (partitionInformation.topic.equals(topic)) return partitionInformation;
+ }
+ return null;
+ }
+
+ @Override
+ public List<GlobalPartitionInformation> getAllBrokers() {
+ refresh();
+ return cachedBrokers;
+ }
+
+ @Override
+ public void close() {
+ reader.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java
new file mode 100644
index 0000000..2d04971
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident.mapper;
+
+import org.apache.storm.trident.tuple.TridentTuple;
+
+public class FieldNameBasedTupleToKafkaMapper<K, V> implements TridentTupleToKafkaMapper {
+
+ public final String keyFieldName;
+ public final String msgFieldName;
+
+ public FieldNameBasedTupleToKafkaMapper(String keyFieldName, String msgFieldName) {
+ this.keyFieldName = keyFieldName;
+ this.msgFieldName = msgFieldName;
+ }
+
+ @Override
+ public K getKeyFromTuple(TridentTuple tuple) {
+ return (K) tuple.getValueByField(keyFieldName);
+ }
+
+ @Override
+ public V getMessageFromTuple(TridentTuple tuple) {
+ return (V) tuple.getValueByField(msgFieldName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java
new file mode 100644
index 0000000..28c6c89
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident.mapper;
+
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.io.Serializable;
+
+public interface TridentTupleToKafkaMapper<K,V> extends Serializable {
+ K getKeyFromTuple(TridentTuple tuple);
+ V getMessageFromTuple(TridentTuple tuple);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java
new file mode 100644
index 0000000..7ae49a3
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident.selector;
+
+import org.apache.storm.trident.tuple.TridentTuple;
+
+public class DefaultTopicSelector implements KafkaTopicSelector {
+
+ private final String topicName;
+
+ public DefaultTopicSelector(final String topicName) {
+ this.topicName = topicName;
+ }
+
+ @Override
+ public String getTopic(TridentTuple tuple) {
+ return topicName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java
new file mode 100644
index 0000000..012a6c7
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident.selector;
+
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.io.Serializable;
+
+public interface KafkaTopicSelector extends Serializable {
+ String getTopic(TridentTuple tuple);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/Broker.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/Broker.java b/external/storm-kafka/src/jvm/storm/kafka/Broker.java
deleted file mode 100644
index 513ab22..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/Broker.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import com.google.common.base.Objects;
-
-import java.io.Serializable;
-
-public class Broker implements Serializable, Comparable<Broker> {
- public String host;
- public int port;
-
- // for kryo compatibility
- private Broker() {
-
- }
-
- public Broker(String host, int port) {
- this.host = host;
- this.port = port;
- }
-
- public Broker(String host) {
- this(host, 9092);
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(host, port);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null || getClass() != obj.getClass()) {
- return false;
- }
- final Broker other = (Broker) obj;
- return Objects.equal(this.host, other.host) && Objects.equal(this.port, other.port);
- }
-
- @Override
- public String toString() {
- return host + ":" + port;
- }
-
- public static Broker fromString(String host) {
- Broker hp;
- String[] spec = host.split(":");
- if (spec.length == 1) {
- hp = new Broker(spec[0]);
- } else if (spec.length == 2) {
- hp = new Broker(spec[0], Integer.parseInt(spec[1]));
- } else {
- throw new IllegalArgumentException("Invalid host specification: " + host);
- }
- return hp;
- }
-
-
- @Override
- public int compareTo(Broker o) {
- if (this.host.equals(o.host)) {
- return this.port - o.port;
- } else {
- return this.host.compareTo(o.host);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/BrokerHosts.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/BrokerHosts.java b/external/storm-kafka/src/jvm/storm/kafka/BrokerHosts.java
deleted file mode 100644
index 1a06fc5..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/BrokerHosts.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import java.io.Serializable;
-
-
-public interface BrokerHosts extends Serializable {
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/ByteBufferSerializer.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/ByteBufferSerializer.java b/external/storm-kafka/src/jvm/storm/kafka/ByteBufferSerializer.java
deleted file mode 100644
index 1a7238e..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/ByteBufferSerializer.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import backtype.storm.utils.Utils;
-import org.apache.kafka.common.serialization.Serializer;
-
-import java.nio.ByteBuffer;
-import java.util.Map;
-
-public class ByteBufferSerializer implements Serializer<ByteBuffer> {
- @Override
- public void configure(Map<String, ?> map, boolean b) {
-
- }
-
- @Override
- public void close() {
-
- }
-
- @Override
- public byte[] serialize(String s, ByteBuffer b) {
- return Utils.toByteArray(b);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/DynamicBrokersReader.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/DynamicBrokersReader.java b/external/storm-kafka/src/jvm/storm/kafka/DynamicBrokersReader.java
deleted file mode 100644
index d0f6724..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/DynamicBrokersReader.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import backtype.storm.Config;
-import backtype.storm.utils.Utils;
-import com.google.common.base.Preconditions;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.RetryNTimes;
-import org.json.simple.JSONValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import storm.kafka.trident.GlobalPartitionInformation;
-
-import java.io.UnsupportedEncodingException;
-import java.net.SocketTimeoutException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-
-public class DynamicBrokersReader {
-
- public static final Logger LOG = LoggerFactory.getLogger(DynamicBrokersReader.class);
-
- private CuratorFramework _curator;
- private String _zkPath;
- private String _topic;
- private Boolean _isWildcardTopic;
-
- public DynamicBrokersReader(Map conf, String zkStr, String zkPath, String topic) {
- // Check required parameters
- Preconditions.checkNotNull(conf, "conf cannot be null");
-
- validateConfig(conf);
-
- Preconditions.checkNotNull(zkStr,"zkString cannot be null");
- Preconditions.checkNotNull(zkPath, "zkPath cannot be null");
- Preconditions.checkNotNull(topic, "topic cannot be null");
-
- _zkPath = zkPath;
- _topic = topic;
- _isWildcardTopic = Utils.getBoolean(conf.get("kafka.topic.wildcard.match"), false);
- try {
- _curator = CuratorFrameworkFactory.newClient(
- zkStr,
- Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)),
- Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)),
- new RetryNTimes(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)),
- Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL))));
- _curator.start();
- } catch (Exception ex) {
- LOG.error("Couldn't connect to zookeeper", ex);
- throw new RuntimeException(ex);
- }
- }
-
- /**
- * Get all partitions with their current leaders
- */
- public List<GlobalPartitionInformation> getBrokerInfo() throws SocketTimeoutException {
- List<String> topics = getTopics();
- List<GlobalPartitionInformation> partitions = new ArrayList<GlobalPartitionInformation>();
-
- for (String topic : topics) {
- GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(topic, this._isWildcardTopic);
- try {
- int numPartitionsForTopic = getNumPartitions(topic);
- String brokerInfoPath = brokerPath();
- for (int partition = 0; partition < numPartitionsForTopic; partition++) {
- int leader = getLeaderFor(topic,partition);
- String path = brokerInfoPath + "/" + leader;
- try {
- byte[] brokerData = _curator.getData().forPath(path);
- Broker hp = getBrokerHost(brokerData);
- globalPartitionInformation.addPartition(partition, hp);
- } catch (org.apache.zookeeper.KeeperException.NoNodeException e) {
- LOG.error("Node {} does not exist ", path);
- }
- }
- } catch (SocketTimeoutException e) {
- throw e;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- LOG.info("Read partition info from zookeeper: " + globalPartitionInformation);
- partitions.add(globalPartitionInformation);
- }
- return partitions;
- }
-
- private int getNumPartitions(String topic) {
- try {
- String topicBrokersPath = partitionPath(topic);
- List<String> children = _curator.getChildren().forPath(topicBrokersPath);
- return children.size();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- private List<String> getTopics() {
- List<String> topics = new ArrayList<String>();
- if (!_isWildcardTopic) {
- topics.add(_topic);
- return topics;
- } else {
- try {
- List<String> children = _curator.getChildren().forPath(topicsPath());
- for (String t : children) {
- if (t.matches(_topic)) {
- LOG.info(String.format("Found matching topic %s", t));
- topics.add(t);
- }
- }
- return topics;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- public String topicsPath () {
- return _zkPath + "/topics";
- }
- public String partitionPath(String topic) {
- return topicsPath() + "/" + topic + "/partitions";
- }
-
- public String brokerPath() {
- return _zkPath + "/ids";
- }
-
-
-
- /**
- * get /brokers/topics/distributedTopic/partitions/1/state
- * { "controller_epoch":4, "isr":[ 1, 0 ], "leader":1, "leader_epoch":1, "version":1 }
- * @param topic
- * @param partition
- * @return
- */
- private int getLeaderFor(String topic, long partition) {
- try {
- String topicBrokersPath = partitionPath(topic);
- byte[] hostPortData = _curator.getData().forPath(topicBrokersPath + "/" + partition + "/state");
- Map<Object, Object> value = (Map<Object, Object>) JSONValue.parse(new String(hostPortData, "UTF-8"));
- Integer leader = ((Number) value.get("leader")).intValue();
- if (leader == -1) {
- throw new RuntimeException("No leader found for partition " + partition);
- }
- return leader;
- } catch (RuntimeException e) {
- throw e;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- public void close() {
- _curator.close();
- }
-
- /**
- * [zk: localhost:2181(CONNECTED) 56] get /brokers/ids/0
- * { "host":"localhost", "jmx_port":9999, "port":9092, "version":1 }
- *
- * @param contents
- * @return
- */
- private Broker getBrokerHost(byte[] contents) {
- try {
- Map<Object, Object> value = (Map<Object, Object>) JSONValue.parse(new String(contents, "UTF-8"));
- String host = (String) value.get("host");
- Integer port = ((Long) value.get("port")).intValue();
- return new Broker(host, port);
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Validate required parameters in the input configuration Map
- * @param conf
- */
- private void validateConfig(final Map conf) {
- Preconditions.checkNotNull(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT),
- "%s cannot be null", Config.STORM_ZOOKEEPER_SESSION_TIMEOUT);
- Preconditions.checkNotNull(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT),
- "%s cannot be null", Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT);
- Preconditions.checkNotNull(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES),
- "%s cannot be null", Config.STORM_ZOOKEEPER_RETRY_TIMES);
- Preconditions.checkNotNull(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL),
- "%s cannot be null", Config.STORM_ZOOKEEPER_RETRY_INTERVAL);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/DynamicPartitionConnections.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/DynamicPartitionConnections.java b/external/storm-kafka/src/jvm/storm/kafka/DynamicPartitionConnections.java
deleted file mode 100644
index e237a7a..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/DynamicPartitionConnections.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import kafka.javaapi.consumer.SimpleConsumer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import storm.kafka.trident.IBrokerReader;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-
-public class DynamicPartitionConnections {
-
- public static final Logger LOG = LoggerFactory.getLogger(DynamicPartitionConnections.class);
-
- static class ConnectionInfo {
- SimpleConsumer consumer;
- Set<String> partitions = new HashSet<String>();
-
- public ConnectionInfo(SimpleConsumer consumer) {
- this.consumer = consumer;
- }
- }
-
- Map<Broker, ConnectionInfo> _connections = new HashMap();
- KafkaConfig _config;
- IBrokerReader _reader;
-
- public DynamicPartitionConnections(KafkaConfig config, IBrokerReader brokerReader) {
- _config = config;
- _reader = brokerReader;
- }
-
- public SimpleConsumer register(Partition partition) {
- Broker broker = _reader.getBrokerForTopic(partition.topic).getBrokerFor(partition.partition);
- return register(broker, partition.topic, partition.partition);
- }
-
- public SimpleConsumer register(Broker host, String topic, int partition) {
- if (!_connections.containsKey(host)) {
- _connections.put(host, new ConnectionInfo(new SimpleConsumer(host.host, host.port, _config.socketTimeoutMs, _config.bufferSizeBytes, _config.clientId)));
- }
- ConnectionInfo info = _connections.get(host);
- info.partitions.add(getHashKey(topic,partition));
- return info.consumer;
- }
-
- public SimpleConsumer getConnection(Partition partition) {
- ConnectionInfo info = _connections.get(partition.host);
- if (info != null) {
- return info.consumer;
- }
- return null;
- }
-
- public void unregister(Broker port, String topic, int partition) {
- ConnectionInfo info = _connections.get(port);
- info.partitions.remove(getHashKey(topic,partition));
- if (info.partitions.isEmpty()) {
- info.consumer.close();
- _connections.remove(port);
- }
- }
-
- public void unregister(Partition partition) {
- unregister(partition.host, partition.topic, partition.partition);
- }
-
- public void clear() {
- for (ConnectionInfo info : _connections.values()) {
- info.consumer.close();
- }
- _connections.clear();
- }
-
- private String getHashKey(String topic, int partition) {
- return topic + "_" + partition;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java b/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java
deleted file mode 100644
index 5664f12..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager {
-
- private final long retryInitialDelayMs;
- private final double retryDelayMultiplier;
- private final long retryDelayMaxMs;
-
- private Queue<MessageRetryRecord> waiting = new PriorityQueue<MessageRetryRecord>(11, new RetryTimeComparator());
- private Map<Long,MessageRetryRecord> records = new ConcurrentHashMap<Long,MessageRetryRecord>();
-
- public ExponentialBackoffMsgRetryManager(long retryInitialDelayMs, double retryDelayMultiplier, long retryDelayMaxMs) {
- this.retryInitialDelayMs = retryInitialDelayMs;
- this.retryDelayMultiplier = retryDelayMultiplier;
- this.retryDelayMaxMs = retryDelayMaxMs;
- }
-
- @Override
- public void failed(Long offset) {
- MessageRetryRecord oldRecord = this.records.get(offset);
- MessageRetryRecord newRecord = oldRecord == null ?
- new MessageRetryRecord(offset) :
- oldRecord.createNextRetryRecord();
- this.records.put(offset, newRecord);
- this.waiting.add(newRecord);
- }
-
- @Override
- public void acked(Long offset) {
- MessageRetryRecord record = this.records.remove(offset);
- if (record != null) {
- this.waiting.remove(record);
- }
- }
-
- @Override
- public void retryStarted(Long offset) {
- MessageRetryRecord record = this.records.get(offset);
- if (record == null || !this.waiting.contains(record)) {
- throw new IllegalStateException("cannot retry a message that has not failed");
- } else {
- this.waiting.remove(record);
- }
- }
-
- @Override
- public Long nextFailedMessageToRetry() {
- if (this.waiting.size() > 0) {
- MessageRetryRecord first = this.waiting.peek();
- if (System.currentTimeMillis() >= first.retryTimeUTC) {
- if (this.records.containsKey(first.offset)) {
- return first.offset;
- } else {
- // defensive programming - should be impossible
- this.waiting.remove(first);
- return nextFailedMessageToRetry();
- }
- }
- }
- return null;
- }
-
- @Override
- public boolean shouldRetryMsg(Long offset) {
- MessageRetryRecord record = this.records.get(offset);
- return record != null &&
- this.waiting.contains(record) &&
- System.currentTimeMillis() >= record.retryTimeUTC;
- }
-
- @Override
- public Set<Long> clearInvalidMessages(Long kafkaOffset) {
- Set<Long> invalidOffsets = new HashSet<Long>();
- for(Long offset : records.keySet()){
- if(offset < kafkaOffset){
- MessageRetryRecord record = this.records.remove(offset);
- if (record != null) {
- this.waiting.remove(record);
- invalidOffsets.add(offset);
- }
- }
- }
- return invalidOffsets;
- }
-
- /**
- * A MessageRetryRecord holds the data of how many times a message has
- * failed and been retried, and when the last failure occurred. It can
- * determine whether it is ready to be retried by employing an exponential
- * back-off calculation using config values stored in SpoutConfig:
- * <ul>
- * <li>retryInitialDelayMs - time to delay before the first retry</li>
- * <li>retryDelayMultiplier - multiplier by which to increase the delay for each subsequent retry</li>
- * <li>retryDelayMaxMs - maximum retry delay (once this delay time is reached, subsequent retries will
- * delay for this amount of time every time)
- * </li>
- * </ul>
- */
- private class MessageRetryRecord {
- private final long offset;
- private final int retryNum;
- private final long retryTimeUTC;
-
- public MessageRetryRecord(long offset) {
- this(offset, 1);
- }
-
- private MessageRetryRecord(long offset, int retryNum) {
- this.offset = offset;
- this.retryNum = retryNum;
- this.retryTimeUTC = System.currentTimeMillis() + calculateRetryDelay();
- }
-
- /**
- * Create a MessageRetryRecord for the next retry that should occur after this one.
- * @return MessageRetryRecord with the next retry time, or null to indicate that another
- * retry should not be performed. The latter case can happen if we are about to
- * run into the backtype.storm.Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS in the Storm
- * configuration.
- */
- public MessageRetryRecord createNextRetryRecord() {
- return new MessageRetryRecord(this.offset, this.retryNum + 1);
- }
-
- private long calculateRetryDelay() {
- double delayMultiplier = Math.pow(retryDelayMultiplier, this.retryNum - 1);
- double delay = retryInitialDelayMs * delayMultiplier;
- Long maxLong = Long.MAX_VALUE;
- long delayThisRetryMs = delay >= maxLong.doubleValue()
- ? maxLong
- : (long) delay;
- return Math.min(delayThisRetryMs, retryDelayMaxMs);
- }
-
- @Override
- public boolean equals(Object other) {
- return (other instanceof MessageRetryRecord
- && this.offset == ((MessageRetryRecord) other).offset);
- }
-
- @Override
- public int hashCode() {
- return Long.valueOf(this.offset).hashCode();
- }
- }
-
- private static class RetryTimeComparator implements Comparator<MessageRetryRecord> {
-
- @Override
- public int compare(MessageRetryRecord record1, MessageRetryRecord record2) {
- return Long.valueOf(record1.retryTimeUTC).compareTo(Long.valueOf(record2.retryTimeUTC));
- }
-
- @Override
- public boolean equals(Object obj) {
- return false;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/FailedFetchException.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/FailedFetchException.java b/external/storm-kafka/src/jvm/storm/kafka/FailedFetchException.java
deleted file mode 100644
index 011240e..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/FailedFetchException.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-public class FailedFetchException extends RuntimeException {
-
- public FailedFetchException(String message) {
- super(message);
- }
-
- public FailedFetchException(Exception e) {
- super(e);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/FailedMsgRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/FailedMsgRetryManager.java b/external/storm-kafka/src/jvm/storm/kafka/FailedMsgRetryManager.java
deleted file mode 100644
index 30c9a24..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/FailedMsgRetryManager.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import java.util.Set;
-
-public interface FailedMsgRetryManager {
- public void failed(Long offset);
- public void acked(Long offset);
- public void retryStarted(Long offset);
- public Long nextFailedMessageToRetry();
- public boolean shouldRetryMsg(Long offset);
- public Set<Long> clearInvalidMessages(Long kafkaOffset);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/IntSerializer.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/IntSerializer.java b/external/storm-kafka/src/jvm/storm/kafka/IntSerializer.java
deleted file mode 100644
index 07cbd26..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/IntSerializer.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import org.apache.kafka.common.serialization.Serializer;
-
-import java.nio.ByteBuffer;
-import java.nio.IntBuffer;
-import java.util.Map;
-
-public class IntSerializer implements Serializer<Integer> {
- @Override
- public void configure(Map<String, ?> map, boolean b) {
- }
-
- @Override
- public byte[] serialize(String topic, Integer val) {
- byte[] r = new byte[4];
- IntBuffer b = ByteBuffer.wrap(r).asIntBuffer();
- b.put(val);
- return r;
- }
-
- @Override
- public void close() {
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java
deleted file mode 100644
index 49c7526..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import backtype.storm.spout.MultiScheme;
-import backtype.storm.spout.RawMultiScheme;
-
-import java.io.Serializable;
-
-public class KafkaConfig implements Serializable {
- private static final long serialVersionUID = 5276718734571623855L;
-
- public final BrokerHosts hosts;
- public final String topic;
- public final String clientId;
-
- public int fetchSizeBytes = 1024 * 1024;
- public int socketTimeoutMs = 10000;
- public int fetchMaxWait = 10000;
- public int bufferSizeBytes = 1024 * 1024;
- public MultiScheme scheme = new RawMultiScheme();
- public boolean ignoreZkOffsets = false;
- public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
- public long maxOffsetBehind = Long.MAX_VALUE;
- public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
- public int metricsTimeBucketSizeInSecs = 60;
-
- public KafkaConfig(BrokerHosts hosts, String topic) {
- this(hosts, topic, kafka.api.OffsetRequest.DefaultClientId());
- }
-
- public KafkaConfig(BrokerHosts hosts, String topic, String clientId) {
- this.hosts = hosts;
- this.topic = topic;
- this.clientId = clientId;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/KafkaError.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaError.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaError.java
deleted file mode 100644
index 634af85..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaError.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-public enum KafkaError {
- NO_ERROR,
- OFFSET_OUT_OF_RANGE,
- INVALID_MESSAGE,
- UNKNOWN_TOPIC_OR_PARTITION,
- INVALID_FETCH_SIZE,
- LEADER_NOT_AVAILABLE,
- NOT_LEADER_FOR_PARTITION,
- REQUEST_TIMED_OUT,
- BROKER_NOT_AVAILABLE,
- REPLICA_NOT_AVAILABLE,
- MESSAGE_SIZE_TOO_LARGE,
- STALE_CONTROLLER_EPOCH,
- OFFSET_METADATA_TOO_LARGE,
- UNKNOWN;
-
- public static KafkaError getError(int errorCode) {
- if (errorCode < 0 || errorCode >= UNKNOWN.ordinal()) {
- return UNKNOWN;
- } else {
- return values()[errorCode];
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java
deleted file mode 100644
index 8169014..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import backtype.storm.Config;
-import backtype.storm.metric.api.IMetric;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import com.google.common.base.Strings;
-import kafka.message.Message;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import storm.kafka.PartitionManager.KafkaMessageId;
-
-import java.util.*;
-
-// TODO: need to add blacklisting
-// TODO: need to make a best effort to not re-emit messages if don't have to
-public class KafkaSpout extends BaseRichSpout {
- static enum EmitState {
- EMITTED_MORE_LEFT,
- EMITTED_END,
- NO_EMITTED
- }
-
- public static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
-
- SpoutConfig _spoutConfig;
- SpoutOutputCollector _collector;
- PartitionCoordinator _coordinator;
- DynamicPartitionConnections _connections;
- ZkState _state;
-
- long _lastUpdateMs = 0;
-
- int _currPartitionIndex = 0;
-
- public KafkaSpout(SpoutConfig spoutConf) {
- _spoutConfig = spoutConf;
- }
-
- @Override
- public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
- _collector = collector;
- String topologyInstanceId = context.getStormId();
- Map stateConf = new HashMap(conf);
- List<String> zkServers = _spoutConfig.zkServers;
- if (zkServers == null) {
- zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
- }
- Integer zkPort = _spoutConfig.zkPort;
- if (zkPort == null) {
- zkPort = ((Number) conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
- }
- stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers);
- stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort);
- stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot);
- _state = new ZkState(stateConf);
-
- _connections = new DynamicPartitionConnections(_spoutConfig, KafkaUtils.makeBrokerReader(conf, _spoutConfig));
-
- // using TransactionalState like this is a hack
- int totalTasks = context.getComponentTasks(context.getThisComponentId()).size();
- if (_spoutConfig.hosts instanceof StaticHosts) {
- _coordinator = new StaticCoordinator(_connections, conf,
- _spoutConfig, _state, context.getThisTaskIndex(),
- totalTasks, topologyInstanceId);
- } else {
- _coordinator = new ZkCoordinator(_connections, conf,
- _spoutConfig, _state, context.getThisTaskIndex(),
- totalTasks, topologyInstanceId);
- }
-
- context.registerMetric("kafkaOffset", new IMetric() {
- KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(_connections);
-
- @Override
- public Object getValueAndReset() {
- List<PartitionManager> pms = _coordinator.getMyManagedPartitions();
- Set<Partition> latestPartitions = new HashSet();
- for (PartitionManager pm : pms) {
- latestPartitions.add(pm.getPartition());
- }
- _kafkaOffsetMetric.refreshPartitions(latestPartitions);
- for (PartitionManager pm : pms) {
- _kafkaOffsetMetric.setLatestEmittedOffset(pm.getPartition(), pm.lastCompletedOffset());
- }
- return _kafkaOffsetMetric.getValueAndReset();
- }
- }, _spoutConfig.metricsTimeBucketSizeInSecs);
-
- context.registerMetric("kafkaPartition", new IMetric() {
- @Override
- public Object getValueAndReset() {
- List<PartitionManager> pms = _coordinator.getMyManagedPartitions();
- Map concatMetricsDataMaps = new HashMap();
- for (PartitionManager pm : pms) {
- concatMetricsDataMaps.putAll(pm.getMetricsDataMap());
- }
- return concatMetricsDataMaps;
- }
- }, _spoutConfig.metricsTimeBucketSizeInSecs);
- }
-
- @Override
- public void close() {
- _state.close();
- }
-
- @Override
- public void nextTuple() {
- List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
- for (int i = 0; i < managers.size(); i++) {
-
- try {
- // in case the number of managers decreased
- _currPartitionIndex = _currPartitionIndex % managers.size();
- EmitState state = managers.get(_currPartitionIndex).next(_collector);
- if (state != EmitState.EMITTED_MORE_LEFT) {
- _currPartitionIndex = (_currPartitionIndex + 1) % managers.size();
- }
- if (state != EmitState.NO_EMITTED) {
- break;
- }
- } catch (FailedFetchException e) {
- LOG.warn("Fetch failed", e);
- _coordinator.refresh();
- }
- }
-
- long diffWithNow = System.currentTimeMillis() - _lastUpdateMs;
-
- /*
- As far as the System.currentTimeMillis() is dependent on System clock,
- additional check on negative value of diffWithNow in case of external changes.
- */
- if (diffWithNow > _spoutConfig.stateUpdateIntervalMs || diffWithNow < 0) {
- commit();
- }
- }
-
- @Override
- public void ack(Object msgId) {
- KafkaMessageId id = (KafkaMessageId) msgId;
- PartitionManager m = _coordinator.getManager(id.partition);
- if (m != null) {
- m.ack(id.offset);
- }
- }
-
- @Override
- public void fail(Object msgId) {
- KafkaMessageId id = (KafkaMessageId) msgId;
- PartitionManager m = _coordinator.getManager(id.partition);
- if (m != null) {
- m.fail(id.offset);
- }
- }
-
- @Override
- public void deactivate() {
- commit();
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- if (!Strings.isNullOrEmpty(_spoutConfig.outputStreamId)) {
- declarer.declareStream(_spoutConfig.outputStreamId, _spoutConfig.scheme.getOutputFields());
- } else {
- declarer.declare(_spoutConfig.scheme.getOutputFields());
- }
- }
-
- private void commit() {
- _lastUpdateMs = System.currentTimeMillis();
- for (PartitionManager manager : _coordinator.getMyManagedPartitions()) {
- manager.commit();
- }
- }
-
-}