You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/11 21:57:32 UTC
[36/53] [abbrv] [partial] storm git commit: STORM-1202: Migrate APIs
to org.apache.storm, but try to provide some form of backwards compatability
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/DefaultCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/DefaultCoordinator.java b/external/storm-kafka/src/jvm/storm/kafka/trident/DefaultCoordinator.java
deleted file mode 100644
index 04e1396..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/DefaultCoordinator.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka.trident;
-
-public class DefaultCoordinator implements IBatchCoordinator {
-
- @Override
- public boolean isReady(long txid) {
- return true;
- }
-
- @Override
- public void close() {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/GlobalPartitionInformation.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/GlobalPartitionInformation.java b/external/storm-kafka/src/jvm/storm/kafka/trident/GlobalPartitionInformation.java
deleted file mode 100644
index b0d97fc..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/GlobalPartitionInformation.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka.trident;
-
-import com.google.common.base.Objects;
-import storm.kafka.Broker;
-import storm.kafka.Partition;
-
-import java.io.Serializable;
-import java.util.*;
-
-
-public class GlobalPartitionInformation implements Iterable<Partition>, Serializable {
-
- private Map<Integer, Broker> partitionMap;
- public String topic;
-
- //Flag to keep the Partition Path Id backward compatible with Old implementation of Partition.getId() == "partition_" + partition
- private Boolean bUseTopicNameForPartitionPathId;
-
- public GlobalPartitionInformation(String topic, Boolean bUseTopicNameForPartitionPathId) {
- this.topic = topic;
- this.partitionMap = new TreeMap<Integer, Broker>();
- this.bUseTopicNameForPartitionPathId = bUseTopicNameForPartitionPathId;
- }
-
- public GlobalPartitionInformation(String topic) {
- this.topic = topic;
- this.partitionMap = new TreeMap<Integer, Broker>();
- this.bUseTopicNameForPartitionPathId = false;
- }
-
- public void addPartition(int partitionId, Broker broker) {
- partitionMap.put(partitionId, broker);
- }
-
- @Override
- public String toString() {
- return "GlobalPartitionInformation{" +
- "topic=" + topic +
- ", partitionMap=" + partitionMap +
- '}';
- }
-
- public Broker getBrokerFor(Integer partitionId) {
- return partitionMap.get(partitionId);
- }
-
- public List<Partition> getOrderedPartitions() {
- List<Partition> partitions = new LinkedList<Partition>();
- for (Map.Entry<Integer, Broker> partition : partitionMap.entrySet()) {
- partitions.add(new Partition(partition.getValue(), this.topic, partition.getKey(), this.bUseTopicNameForPartitionPathId));
- }
- return partitions;
- }
-
- @Override
- public Iterator<Partition> iterator() {
- final Iterator<Map.Entry<Integer, Broker>> iterator = partitionMap.entrySet().iterator();
- final String topic = this.topic;
- final Boolean bUseTopicNameForPartitionPathId = this.bUseTopicNameForPartitionPathId;
- return new Iterator<Partition>() {
- @Override
- public boolean hasNext() {
- return iterator.hasNext();
- }
-
- @Override
- public Partition next() {
- Map.Entry<Integer, Broker> next = iterator.next();
- return new Partition(next.getValue(), topic , next.getKey(), bUseTopicNameForPartitionPathId);
- }
-
- @Override
- public void remove() {
- iterator.remove();
- }
- };
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(partitionMap);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null || getClass() != obj.getClass()) {
- return false;
- }
- final GlobalPartitionInformation other = (GlobalPartitionInformation) obj;
- return Objects.equal(this.partitionMap, other.partitionMap);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/IBatchCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/IBatchCoordinator.java b/external/storm-kafka/src/jvm/storm/kafka/trident/IBatchCoordinator.java
deleted file mode 100644
index 04231f4..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/IBatchCoordinator.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka.trident;
-
-import java.io.Serializable;
-
-public interface IBatchCoordinator extends Serializable {
- boolean isReady(long txid);
-
- void close();
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/IBrokerReader.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/IBrokerReader.java b/external/storm-kafka/src/jvm/storm/kafka/trident/IBrokerReader.java
deleted file mode 100644
index afba659..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/IBrokerReader.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka.trident;
-
-import java.util.List;
-import java.util.Map;
-
-public interface IBrokerReader {
-
- GlobalPartitionInformation getBrokerForTopic(String topic);
-
- List<GlobalPartitionInformation> getAllBrokers();
-
- void close();
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/MaxMetric.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/MaxMetric.java b/external/storm-kafka/src/jvm/storm/kafka/trident/MaxMetric.java
deleted file mode 100644
index 60d7c7b..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/MaxMetric.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka.trident;
-
-
-import backtype.storm.metric.api.ICombiner;
-
-public class MaxMetric implements ICombiner<Long> {
- @Override
- public Long identity() {
- return null;
- }
-
- @Override
- public Long combine(Long l1, Long l2) {
- if (l1 == null) {
- return l2;
- }
- if (l2 == null) {
- return l1;
- }
- return Math.max(l1, l2);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/OpaqueTridentKafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/OpaqueTridentKafkaSpout.java b/external/storm-kafka/src/jvm/storm/kafka/trident/OpaqueTridentKafkaSpout.java
deleted file mode 100644
index fbd1d7a..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/OpaqueTridentKafkaSpout.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka.trident;
-
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Fields;
-import storm.kafka.Partition;
-import storm.trident.spout.IOpaquePartitionedTridentSpout;
-
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-
-public class OpaqueTridentKafkaSpout implements IOpaquePartitionedTridentSpout<List<GlobalPartitionInformation>, Partition, Map> {
-
-
- TridentKafkaConfig _config;
-
- public OpaqueTridentKafkaSpout(TridentKafkaConfig config) {
- _config = config;
- }
-
- @Override
- public IOpaquePartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map> getEmitter(Map conf, TopologyContext context) {
- return new TridentKafkaEmitter(conf, context, _config, context
- .getStormId()).asOpaqueEmitter();
- }
-
- @Override
- public IOpaquePartitionedTridentSpout.Coordinator getCoordinator(Map conf, TopologyContext tc) {
- return new storm.kafka.trident.Coordinator(conf, _config);
- }
-
- @Override
- public Fields getOutputFields() {
- return _config.scheme.getOutputFields();
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/StaticBrokerReader.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/StaticBrokerReader.java b/external/storm-kafka/src/jvm/storm/kafka/trident/StaticBrokerReader.java
deleted file mode 100644
index ca83c06..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/StaticBrokerReader.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka.trident;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-public class StaticBrokerReader implements IBrokerReader {
-
- private Map<String,GlobalPartitionInformation> brokers = new TreeMap<String,GlobalPartitionInformation>();
-
- public StaticBrokerReader(String topic, GlobalPartitionInformation partitionInformation) {
- this.brokers.put(topic, partitionInformation);
- }
-
- @Override
- public GlobalPartitionInformation getBrokerForTopic(String topic) {
- if (brokers.containsKey(topic)) return brokers.get(topic);
- return null;
- }
-
- @Override
- public List<GlobalPartitionInformation> getAllBrokers () {
- List<GlobalPartitionInformation> list = new ArrayList<GlobalPartitionInformation>();
- list.addAll(brokers.values());
- return list;
- }
-
- @Override
- public void close() {
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/TransactionalTridentKafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TransactionalTridentKafkaSpout.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TransactionalTridentKafkaSpout.java
deleted file mode 100644
index 9feffc8..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TransactionalTridentKafkaSpout.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka.trident;
-
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Fields;
-import storm.kafka.Partition;
-import storm.trident.spout.IPartitionedTridentSpout;
-
-import java.util.Map;
-import java.util.UUID;
-
-
-public class TransactionalTridentKafkaSpout implements IPartitionedTridentSpout<GlobalPartitionInformation, Partition, Map> {
-
- TridentKafkaConfig _config;
-
- public TransactionalTridentKafkaSpout(TridentKafkaConfig config) {
- _config = config;
- }
-
-
- @Override
- public IPartitionedTridentSpout.Coordinator getCoordinator(Map conf, TopologyContext context) {
- return new storm.kafka.trident.Coordinator(conf, _config);
- }
-
- @Override
- public IPartitionedTridentSpout.Emitter getEmitter(Map conf, TopologyContext context) {
- return new TridentKafkaEmitter(conf, context, _config, context
- .getStormId()).asTransactionalEmitter();
- }
-
- @Override
- public Fields getOutputFields() {
- return _config.scheme.getOutputFields();
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaConfig.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaConfig.java
deleted file mode 100644
index 3878cc8..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaConfig.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka.trident;
-
-import storm.kafka.BrokerHosts;
-import storm.kafka.KafkaConfig;
-
-
-public class TridentKafkaConfig extends KafkaConfig {
-
-
- public final IBatchCoordinator coordinator = new DefaultCoordinator();
-
- public TridentKafkaConfig(BrokerHosts hosts, String topic) {
- super(hosts, topic);
- }
-
- public TridentKafkaConfig(BrokerHosts hosts, String topic, String clientId) {
- super(hosts, topic, clientId);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
deleted file mode 100644
index a97d2cb..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
+++ /dev/null
@@ -1,287 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka.trident;
-
-import backtype.storm.Config;
-import backtype.storm.metric.api.CombinedMetric;
-import backtype.storm.metric.api.MeanReducer;
-import backtype.storm.metric.api.ReducedMetric;
-import backtype.storm.task.TopologyContext;
-import com.google.common.collect.ImmutableMap;
-import kafka.javaapi.consumer.SimpleConsumer;
-import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.message.Message;
-import kafka.message.MessageAndOffset;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import storm.kafka.*;
-import storm.kafka.TopicOffsetOutOfRangeException;
-import storm.trident.operation.TridentCollector;
-import storm.trident.spout.IOpaquePartitionedTridentSpout;
-import storm.trident.spout.IPartitionedTridentSpout;
-import storm.trident.topology.TransactionAttempt;
-
-import java.util.*;
-
-public class TridentKafkaEmitter {
-
- public static final Logger LOG = LoggerFactory.getLogger(TridentKafkaEmitter.class);
-
- private DynamicPartitionConnections _connections;
- private String _topologyName;
- private KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric;
- private ReducedMetric _kafkaMeanFetchLatencyMetric;
- private CombinedMetric _kafkaMaxFetchLatencyMetric;
- private TridentKafkaConfig _config;
- private String _topologyInstanceId;
-
- public TridentKafkaEmitter(Map conf, TopologyContext context, TridentKafkaConfig config, String topologyInstanceId) {
- _config = config;
- _topologyInstanceId = topologyInstanceId;
- _connections = new DynamicPartitionConnections(_config, KafkaUtils.makeBrokerReader(conf, _config));
- _topologyName = (String) conf.get(Config.TOPOLOGY_NAME);
- _kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(_connections);
- context.registerMetric("kafkaOffset", _kafkaOffsetMetric, _config.metricsTimeBucketSizeInSecs);
- _kafkaMeanFetchLatencyMetric = context.registerMetric("kafkaFetchAvg", new MeanReducer(), _config.metricsTimeBucketSizeInSecs);
- _kafkaMaxFetchLatencyMetric = context.registerMetric("kafkaFetchMax", new MaxMetric(), _config.metricsTimeBucketSizeInSecs);
- }
-
-
- private Map failFastEmitNewPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map lastMeta) {
- SimpleConsumer consumer = _connections.register(partition);
- Map ret = doEmitNewPartitionBatch(consumer, partition, collector, lastMeta);
- _kafkaOffsetMetric.setLatestEmittedOffset(partition, (Long) ret.get("offset"));
- return ret;
- }
-
- private Map emitNewPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map lastMeta) {
- try {
- return failFastEmitNewPartitionBatch(attempt, collector, partition, lastMeta);
- } catch (FailedFetchException e) {
- LOG.warn("Failed to fetch from partition " + partition);
- if (lastMeta == null) {
- return null;
- } else {
- Map ret = new HashMap();
- ret.put("offset", lastMeta.get("nextOffset"));
- ret.put("nextOffset", lastMeta.get("nextOffset"));
- ret.put("partition", partition.partition);
- ret.put("broker", ImmutableMap.of("host", partition.host.host, "port", partition.host.port));
- ret.put("topic", partition.topic);
- ret.put("topology", ImmutableMap.of("name", _topologyName, "id", _topologyInstanceId));
- return ret;
- }
- }
- }
-
- private Map doEmitNewPartitionBatch(SimpleConsumer consumer, Partition partition, TridentCollector collector, Map lastMeta) {
- long offset;
- if (lastMeta != null) {
- String lastInstanceId = null;
- Map lastTopoMeta = (Map) lastMeta.get("topology");
- if (lastTopoMeta != null) {
- lastInstanceId = (String) lastTopoMeta.get("id");
- }
- if (_config.ignoreZkOffsets && !_topologyInstanceId.equals(lastInstanceId)) {
- offset = KafkaUtils.getOffset(consumer, partition.topic, partition.partition, _config.startOffsetTime);
- } else {
- offset = (Long) lastMeta.get("nextOffset");
- }
- } else {
- offset = KafkaUtils.getOffset(consumer, partition.topic, partition.partition, _config);
- }
-
- ByteBufferMessageSet msgs = null;
- try {
- msgs = fetchMessages(consumer, partition, offset);
- } catch (TopicOffsetOutOfRangeException e) {
- long newOffset = KafkaUtils.getOffset(consumer, partition.topic, partition.partition, kafka.api.OffsetRequest.EarliestTime());
- LOG.warn("OffsetOutOfRange: Updating offset from offset = " + offset + " to offset = " + newOffset);
- offset = newOffset;
- msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);
- }
-
- long endoffset = offset;
- for (MessageAndOffset msg : msgs) {
- emit(collector, msg.message(), partition, msg.offset());
- endoffset = msg.nextOffset();
- }
- Map newMeta = new HashMap();
- newMeta.put("offset", offset);
- newMeta.put("nextOffset", endoffset);
- newMeta.put("instanceId", _topologyInstanceId);
- newMeta.put("partition", partition.partition);
- newMeta.put("broker", ImmutableMap.of("host", partition.host.host, "port", partition.host.port));
- newMeta.put("topic", partition.topic);
- newMeta.put("topology", ImmutableMap.of("name", _topologyName, "id", _topologyInstanceId));
- return newMeta;
- }
-
- private ByteBufferMessageSet fetchMessages(SimpleConsumer consumer, Partition partition, long offset) {
- long start = System.nanoTime();
- ByteBufferMessageSet msgs = null;
- msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);
- long end = System.nanoTime();
- long millis = (end - start) / 1000000;
- _kafkaMeanFetchLatencyMetric.update(millis);
- _kafkaMaxFetchLatencyMetric.update(millis);
- return msgs;
- }
-
- /**
- * re-emit the batch described by the meta data provided
- *
- * @param attempt
- * @param collector
- * @param partition
- * @param meta
- */
- private void reEmitPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map meta) {
- LOG.info("re-emitting batch, attempt " + attempt);
- String instanceId = (String) meta.get("instanceId");
- if (!_config.ignoreZkOffsets || instanceId.equals(_topologyInstanceId)) {
- SimpleConsumer consumer = _connections.register(partition);
- long offset = (Long) meta.get("offset");
- long nextOffset = (Long) meta.get("nextOffset");
- ByteBufferMessageSet msgs = null;
- msgs = fetchMessages(consumer, partition, offset);
-
- if(msgs != null) {
- for (MessageAndOffset msg : msgs) {
- if (offset == nextOffset) {
- break;
- }
- if (offset > nextOffset) {
- throw new RuntimeException("Error when re-emitting batch. overshot the end offset");
- }
- emit(collector, msg.message(), partition, msg.offset());
- offset = msg.nextOffset();
- }
- }
- }
- }
-
- private void emit(TridentCollector collector, Message msg, Partition partition, long offset) {
- Iterable<List<Object>> values;
- if (_config.scheme instanceof MessageMetadataSchemeAsMultiScheme) {
- values = KafkaUtils.generateTuples((MessageMetadataSchemeAsMultiScheme) _config.scheme, msg, partition, offset);
- } else {
- values = KafkaUtils.generateTuples(_config, msg, partition.topic);
- }
-
- if (values != null) {
- for (List<Object> value : values) {
- collector.emit(value);
- }
- }
- }
-
- private void clear() {
- _connections.clear();
- }
-
- private List<Partition> orderPartitions(List<GlobalPartitionInformation> partitions) {
- List<Partition> part = new ArrayList<Partition>();
- for (GlobalPartitionInformation globalPartitionInformation : partitions)
- part.addAll(globalPartitionInformation.getOrderedPartitions());
- return part;
- }
-
- private void refresh(List<Partition> list) {
- _connections.clear();
- _kafkaOffsetMetric.refreshPartitions(new HashSet<Partition>(list));
- }
-
-
- public IOpaquePartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map> asOpaqueEmitter() {
-
- return new IOpaquePartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map>() {
-
- /**
- * Emit a batch of tuples for a partition/transaction.
- *
- * Return the metadata describing this batch that will be used as lastPartitionMeta
- * for defining the parameters of the next batch.
- */
- @Override
- public Map emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) {
- return emitNewPartitionBatch(transactionAttempt, tridentCollector, partition, map);
- }
-
- @Override
- public void refreshPartitions(List<Partition> partitions) {
- refresh(partitions);
- }
-
- @Override
- public List<Partition> getOrderedPartitions(List<GlobalPartitionInformation> partitionInformation) {
- return orderPartitions(partitionInformation);
- }
-
- @Override
- public void close() {
- clear();
- }
- };
- }
-
- public IPartitionedTridentSpout.Emitter asTransactionalEmitter() {
- return new IPartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map>() {
-
- /**
- * Emit a batch of tuples for a partition/transaction that's never been emitted before.
- * Return the metadata that can be used to reconstruct this partition/batch in the future.
- */
- @Override
- public Map emitPartitionBatchNew(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) {
- return failFastEmitNewPartitionBatch(transactionAttempt, tridentCollector, partition, map);
- }
-
- /**
- * Emit a batch of tuples for a partition/transaction that has been emitted before, using
- * the metadata created when it was first emitted.
- */
- @Override
- public void emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) {
- reEmitPartitionBatch(transactionAttempt, tridentCollector, partition, map);
- }
-
- /**
- * This method is called when this task is responsible for a new set of partitions. Should be used
- * to manage things like connections to brokers.
- */
- @Override
- public void refreshPartitions(List<Partition> partitions) {
- refresh(partitions);
- }
-
- @Override
- public List<Partition> getOrderedPartitions(List<GlobalPartitionInformation> partitionInformation) {
- return orderPartitions(partitionInformation);
- }
-
- @Override
- public void close() {
- clear();
- }
- };
-
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
deleted file mode 100644
index 84b6a6a..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka.trident;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.topology.FailedException;
-import org.apache.commons.lang.Validate;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
-import storm.kafka.trident.selector.KafkaTopicSelector;
-import storm.trident.operation.TridentCollector;
-import storm.trident.state.State;
-import storm.trident.tuple.TridentTuple;
-
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
-public class TridentKafkaState implements State {
- private static final Logger LOG = LoggerFactory.getLogger(TridentKafkaState.class);
-
- private KafkaProducer producer;
- private OutputCollector collector;
-
- private TridentTupleToKafkaMapper mapper;
- private KafkaTopicSelector topicSelector;
-
- public TridentKafkaState withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper mapper) {
- this.mapper = mapper;
- return this;
- }
-
- public TridentKafkaState withKafkaTopicSelector(KafkaTopicSelector selector) {
- this.topicSelector = selector;
- return this;
- }
-
- @Override
- public void beginCommit(Long txid) {
- LOG.debug("beginCommit is Noop.");
- }
-
- @Override
- public void commit(Long txid) {
- LOG.debug("commit is Noop.");
- }
-
- public void prepare(Properties options) {
- Validate.notNull(mapper, "mapper can not be null");
- Validate.notNull(topicSelector, "topicSelector can not be null");
- producer = new KafkaProducer(options);
- }
-
- public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
- String topic = null;
- for (TridentTuple tuple : tuples) {
- try {
- topic = topicSelector.getTopic(tuple);
-
- if(topic != null) {
- Future<RecordMetadata> result = producer.send(new ProducerRecord(topic,
- mapper.getKeyFromTuple(tuple), mapper.getMessageFromTuple(tuple)));
- try {
- result.get();
- } catch (ExecutionException e) {
- String errorMsg = "Could not retrieve result for message with key = "
- + mapper.getKeyFromTuple(tuple) + " from topic = " + topic;
- LOG.error(errorMsg, e);
- throw new FailedException(errorMsg, e);
- }
- } else {
- LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) + ", topic selector returned null.");
- }
- } catch (Exception ex) {
- String errorMsg = "Could not send message with key = " + mapper.getKeyFromTuple(tuple)
- + " to topic = " + topic;
- LOG.warn(errorMsg, ex);
- throw new FailedException(errorMsg, ex);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaStateFactory.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaStateFactory.java
deleted file mode 100644
index a5d9d42..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaStateFactory.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka.trident;
-
-import backtype.storm.task.IMetricsContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
-import storm.kafka.trident.selector.KafkaTopicSelector;
-import storm.trident.state.State;
-import storm.trident.state.StateFactory;
-
-import java.util.Map;
-import java.util.Properties;
-
-public class TridentKafkaStateFactory implements StateFactory {
-
- private static final Logger LOG = LoggerFactory.getLogger(TridentKafkaStateFactory.class);
-
- private TridentTupleToKafkaMapper mapper;
- private KafkaTopicSelector topicSelector;
- private Properties producerProperties = new Properties();
-
- public TridentKafkaStateFactory withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper mapper) {
- this.mapper = mapper;
- return this;
- }
-
- public TridentKafkaStateFactory withKafkaTopicSelector(KafkaTopicSelector selector) {
- this.topicSelector = selector;
- return this;
- }
-
- public TridentKafkaStateFactory withProducerProperties(Properties props) {
- this.producerProperties = props;
- return this;
- }
-
- @Override
- public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
- LOG.info("makeState(partitonIndex={}, numpartitions={}", partitionIndex, numPartitions);
- TridentKafkaState state = new TridentKafkaState()
- .withKafkaTopicSelector(this.topicSelector)
- .withTridentTupleToKafkaMapper(this.mapper);
- state.prepare(producerProperties);
- return state;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaUpdater.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaUpdater.java
deleted file mode 100644
index 6639b36..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaUpdater.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka.trident;
-
-import storm.trident.operation.TridentCollector;
-import storm.trident.state.BaseStateUpdater;
-import storm.trident.tuple.TridentTuple;
-
-import java.util.List;
-
-public class TridentKafkaUpdater extends BaseStateUpdater<TridentKafkaState> {
- @Override
- public void updateState(TridentKafkaState state, List<TridentTuple> tuples, TridentCollector collector) {
- state.updateState(tuples, collector);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/ZkBrokerReader.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/ZkBrokerReader.java b/external/storm-kafka/src/jvm/storm/kafka/trident/ZkBrokerReader.java
deleted file mode 100644
index b480bdd..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/ZkBrokerReader.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka.trident;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import storm.kafka.DynamicBrokersReader;
-import storm.kafka.ZkHosts;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-
-public class ZkBrokerReader implements IBrokerReader {
-
- public static final Logger LOG = LoggerFactory.getLogger(ZkBrokerReader.class);
-
- List<GlobalPartitionInformation> cachedBrokers = new ArrayList<GlobalPartitionInformation>();
- DynamicBrokersReader reader;
- long lastRefreshTimeMs;
-
-
- long refreshMillis;
-
- public ZkBrokerReader(Map conf, String topic, ZkHosts hosts) {
- try {
- reader = new DynamicBrokersReader(conf, hosts.brokerZkStr, hosts.brokerZkPath, topic);
- cachedBrokers = reader.getBrokerInfo();
- lastRefreshTimeMs = System.currentTimeMillis();
- refreshMillis = hosts.refreshFreqSecs * 1000L;
- } catch (java.net.SocketTimeoutException e) {
- LOG.warn("Failed to update brokers", e);
- }
-
- }
-
- private void refresh() {
- long currTime = System.currentTimeMillis();
- if (currTime > lastRefreshTimeMs + refreshMillis) {
- try {
- LOG.info("brokers need refreshing because " + refreshMillis + "ms have expired");
- cachedBrokers = reader.getBrokerInfo();
- lastRefreshTimeMs = currTime;
- } catch (java.net.SocketTimeoutException e) {
- LOG.warn("Failed to update brokers", e);
- }
- }
- }
- @Override
- public GlobalPartitionInformation getBrokerForTopic(String topic) {
- refresh();
- for(GlobalPartitionInformation partitionInformation : cachedBrokers) {
- if (partitionInformation.topic.equals(topic)) return partitionInformation;
- }
- return null;
- }
-
- @Override
- public List<GlobalPartitionInformation> getAllBrokers() {
- refresh();
- return cachedBrokers;
- }
-
- @Override
- public void close() {
- reader.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java b/external/storm-kafka/src/jvm/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java
deleted file mode 100644
index 29a49d1..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka.trident.mapper;
-
-import storm.trident.tuple.TridentTuple;
-
-public class FieldNameBasedTupleToKafkaMapper<K, V> implements TridentTupleToKafkaMapper {
-
- public final String keyFieldName;
- public final String msgFieldName;
-
- public FieldNameBasedTupleToKafkaMapper(String keyFieldName, String msgFieldName) {
- this.keyFieldName = keyFieldName;
- this.msgFieldName = msgFieldName;
- }
-
- @Override
- public K getKeyFromTuple(TridentTuple tuple) {
- return (K) tuple.getValueByField(keyFieldName);
- }
-
- @Override
- public V getMessageFromTuple(TridentTuple tuple) {
- return (V) tuple.getValueByField(msgFieldName);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java b/external/storm-kafka/src/jvm/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java
deleted file mode 100644
index 9759ba3..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka.trident.mapper;
-
-import backtype.storm.tuple.Tuple;
-import storm.trident.tuple.TridentTuple;
-
-import java.io.Serializable;
-
-public interface TridentTupleToKafkaMapper<K,V> extends Serializable {
- K getKeyFromTuple(TridentTuple tuple);
- V getMessageFromTuple(TridentTuple tuple);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/selector/DefaultTopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/selector/DefaultTopicSelector.java b/external/storm-kafka/src/jvm/storm/kafka/trident/selector/DefaultTopicSelector.java
deleted file mode 100644
index 473a38d..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/selector/DefaultTopicSelector.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka.trident.selector;
-
-import storm.trident.tuple.TridentTuple;
-
-public class DefaultTopicSelector implements KafkaTopicSelector {
-
- private final String topicName;
-
- public DefaultTopicSelector(final String topicName) {
- this.topicName = topicName;
- }
-
- @Override
- public String getTopic(TridentTuple tuple) {
- return topicName;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/selector/KafkaTopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/selector/KafkaTopicSelector.java b/external/storm-kafka/src/jvm/storm/kafka/trident/selector/KafkaTopicSelector.java
deleted file mode 100644
index f6c5d82..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/selector/KafkaTopicSelector.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka.trident.selector;
-
-import storm.trident.tuple.TridentTuple;
-
-import java.io.Serializable;
-
-public interface KafkaTopicSelector extends Serializable {
- String getTopic(TridentTuple tuple);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/org/apache/storm/kafka/DynamicBrokersReaderTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/DynamicBrokersReaderTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/DynamicBrokersReaderTest.java
new file mode 100644
index 0000000..3363252
--- /dev/null
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/DynamicBrokersReaderTest.java
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.apache.storm.Config;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.test.TestingServer;
+import org.apache.curator.utils.ZKPaths;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.storm.kafka.trident.GlobalPartitionInformation;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Date: 16/05/2013
+ * Time: 20:35
+ */
+public class DynamicBrokersReaderTest {
+ private DynamicBrokersReader dynamicBrokersReader, wildCardBrokerReader;
+ private String masterPath = "/brokers";
+ private String topic = "testing1";
+ private String secondTopic = "testing2";
+ private String thirdTopic = "testing3";
+
+ private CuratorFramework zookeeper;
+ private TestingServer server;
+
+ @Before
+ public void setUp() throws Exception {
+ server = new TestingServer();
+ String connectionString = server.getConnectString();
+ Map conf = new HashMap();
+ conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 1000);
+ conf.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 1000);
+ conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 4);
+ conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 5);
+
+ ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
+ zookeeper = CuratorFrameworkFactory.newClient(connectionString, retryPolicy);
+ dynamicBrokersReader = new DynamicBrokersReader(conf, connectionString, masterPath, topic);
+
+ Map conf2 = new HashMap();
+ conf2.putAll(conf);
+ conf2.put("kafka.topic.wildcard.match",true);
+
+ wildCardBrokerReader = new DynamicBrokersReader(conf2, connectionString, masterPath, "^test.*$");
+ zookeeper.start();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ dynamicBrokersReader.close();
+ zookeeper.close();
+ server.close();
+ }
+
+ private void addPartition(int id, String host, int port, String topic) throws Exception {
+ writePartitionId(id, topic);
+ writeLeader(id, 0, topic);
+ writeLeaderDetails(0, host, port);
+ }
+
+ private void addPartition(int id, int leader, String host, int port, String topic) throws Exception {
+ writePartitionId(id, topic);
+ writeLeader(id, leader, topic);
+ writeLeaderDetails(leader, host, port);
+ }
+
+ private void writePartitionId(int id, String topic) throws Exception {
+ String path = dynamicBrokersReader.partitionPath(topic);
+ writeDataToPath(path, ("" + id));
+ }
+
+ private void writeDataToPath(String path, String data) throws Exception {
+ ZKPaths.mkdirs(zookeeper.getZookeeperClient().getZooKeeper(), path);
+ zookeeper.setData().forPath(path, data.getBytes());
+ }
+
+ private void writeLeader(int id, int leaderId, String topic) throws Exception {
+ String path = dynamicBrokersReader.partitionPath(topic) + "/" + id + "/state";
+ String value = " { \"controller_epoch\":4, \"isr\":[ 1, 0 ], \"leader\":" + leaderId + ", \"leader_epoch\":1, \"version\":1 }";
+ writeDataToPath(path, value);
+ }
+
+ private void writeLeaderDetails(int leaderId, String host, int port) throws Exception {
+ String path = dynamicBrokersReader.brokerPath() + "/" + leaderId;
+ String value = "{ \"host\":\"" + host + "\", \"jmx_port\":9999, \"port\":" + port + ", \"version\":1 }";
+ writeDataToPath(path, value);
+ }
+
+
+ private GlobalPartitionInformation getByTopic(List<GlobalPartitionInformation> partitions, String topic){
+ for(GlobalPartitionInformation partitionInformation : partitions) {
+ if (partitionInformation.topic.equals(topic)) return partitionInformation;
+ }
+ return null;
+ }
+
+ @Test
+ public void testGetBrokerInfo() throws Exception {
+ String host = "localhost";
+ int port = 9092;
+ int partition = 0;
+ addPartition(partition, host, port, topic);
+ List<GlobalPartitionInformation> partitions = dynamicBrokersReader.getBrokerInfo();
+
+ GlobalPartitionInformation brokerInfo = getByTopic(partitions, topic);
+ assertNotNull(brokerInfo);
+ assertEquals(1, brokerInfo.getOrderedPartitions().size());
+ assertEquals(port, brokerInfo.getBrokerFor(partition).port);
+ assertEquals(host, brokerInfo.getBrokerFor(partition).host);
+ }
+
+ @Test
+ public void testGetBrokerInfoWildcardMatch() throws Exception {
+ String host = "localhost";
+ int port = 9092;
+ int partition = 0;
+ addPartition(partition, host, port, topic);
+ addPartition(partition, host, port, secondTopic);
+
+ List<GlobalPartitionInformation> partitions = wildCardBrokerReader.getBrokerInfo();
+
+ GlobalPartitionInformation brokerInfo = getByTopic(partitions, topic);
+ assertNotNull(brokerInfo);
+ assertEquals(1, brokerInfo.getOrderedPartitions().size());
+ assertEquals(port, brokerInfo.getBrokerFor(partition).port);
+ assertEquals(host, brokerInfo.getBrokerFor(partition).host);
+
+ brokerInfo = getByTopic(partitions, secondTopic);
+ assertNotNull(brokerInfo);
+ assertEquals(1, brokerInfo.getOrderedPartitions().size());
+ assertEquals(port, brokerInfo.getBrokerFor(partition).port);
+ assertEquals(host, brokerInfo.getBrokerFor(partition).host);
+
+ addPartition(partition, host, port, thirdTopic);
+ //Discover newly added topic
+ partitions = wildCardBrokerReader.getBrokerInfo();
+ assertNotNull(getByTopic(partitions, topic));
+ assertNotNull(getByTopic(partitions, secondTopic));
+ assertNotNull(getByTopic(partitions, secondTopic));
+ }
+
+
+ @Test
+ public void testMultiplePartitionsOnDifferentHosts() throws Exception {
+ String host = "localhost";
+ int port = 9092;
+ int secondPort = 9093;
+ int partition = 0;
+ int secondPartition = partition + 1;
+ addPartition(partition, 0, host, port, topic);
+ addPartition(secondPartition, 1, host, secondPort, topic);
+
+ List<GlobalPartitionInformation> partitions = dynamicBrokersReader.getBrokerInfo();
+
+ GlobalPartitionInformation brokerInfo = getByTopic(partitions, topic);
+ assertNotNull(brokerInfo);
+ assertEquals(2, brokerInfo.getOrderedPartitions().size());
+
+ assertEquals(port, brokerInfo.getBrokerFor(partition).port);
+ assertEquals(host, brokerInfo.getBrokerFor(partition).host);
+
+ assertEquals(secondPort, brokerInfo.getBrokerFor(secondPartition).port);
+ assertEquals(host, brokerInfo.getBrokerFor(secondPartition).host);
+ }
+
+
+ @Test
+ public void testMultiplePartitionsOnSameHost() throws Exception {
+ String host = "localhost";
+ int port = 9092;
+ int partition = 0;
+ int secondPartition = partition + 1;
+ addPartition(partition, 0, host, port, topic);
+ addPartition(secondPartition, 0, host, port, topic);
+
+ List<GlobalPartitionInformation> partitions = dynamicBrokersReader.getBrokerInfo();
+
+ GlobalPartitionInformation brokerInfo = getByTopic(partitions, topic);
+ assertNotNull(brokerInfo);
+ assertEquals(2, brokerInfo.getOrderedPartitions().size());
+
+ assertEquals(port, brokerInfo.getBrokerFor(partition).port);
+ assertEquals(host, brokerInfo.getBrokerFor(partition).host);
+
+ assertEquals(port, brokerInfo.getBrokerFor(secondPartition).port);
+ assertEquals(host, brokerInfo.getBrokerFor(secondPartition).host);
+ }
+
+ @Test
+ public void testSwitchHostForPartition() throws Exception {
+ String host = "localhost";
+ int port = 9092;
+ int partition = 0;
+ addPartition(partition, host, port, topic);
+ List<GlobalPartitionInformation> partitions = dynamicBrokersReader.getBrokerInfo();
+
+ GlobalPartitionInformation brokerInfo = getByTopic(partitions, topic);
+ assertNotNull(brokerInfo);
+ assertEquals(port, brokerInfo.getBrokerFor(partition).port);
+ assertEquals(host, brokerInfo.getBrokerFor(partition).host);
+
+ String newHost = host + "switch";
+ int newPort = port + 1;
+ addPartition(partition, newHost, newPort, topic);
+ partitions = dynamicBrokersReader.getBrokerInfo();
+
+ brokerInfo = getByTopic(partitions, topic);
+ assertNotNull(brokerInfo);
+ assertEquals(newPort, brokerInfo.getBrokerFor(partition).port);
+ assertEquals(newHost, brokerInfo.getBrokerFor(partition).host);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testErrorLogsWhenConfigIsMissing() throws Exception {
+ String connectionString = server.getConnectString();
+ Map conf = new HashMap();
+ conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 1000);
+// conf.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 1000);
+ conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 4);
+ conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 5);
+
+ DynamicBrokersReader dynamicBrokersReader1 = new DynamicBrokersReader(conf, connectionString, masterPath, topic);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
new file mode 100644
index 0000000..8fa6564
--- /dev/null
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.junit.Test;
+
+public class ExponentialBackoffMsgRetryManagerTest {
+
+ private static final Long TEST_OFFSET = 101L;
+ private static final Long TEST_OFFSET2 = 102L;
+ private static final Long TEST_OFFSET3 = 105L;
+ private static final Long TEST_NEW_OFFSET = 103L;
+
+ @Test
+ public void testImmediateRetry() throws Exception {
+ ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+ manager.failed(TEST_OFFSET);
+ Long next = manager.nextFailedMessageToRetry();
+ assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
+ assertTrue("message should be ready for retry immediately", manager.shouldRetryMsg(TEST_OFFSET));
+
+ manager.retryStarted(TEST_OFFSET);
+
+ manager.failed(TEST_OFFSET);
+ next = manager.nextFailedMessageToRetry();
+ assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
+ assertTrue("message should be ready for retry immediately", manager.shouldRetryMsg(TEST_OFFSET));
+ }
+
+ @Test
+ public void testSingleDelay() throws Exception {
+ ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(100, 1d, 1000);
+ manager.failed(TEST_OFFSET);
+ Thread.sleep(5);
+ Long next = manager.nextFailedMessageToRetry();
+ assertNull("expect no message ready for retry yet", next);
+ assertFalse("message should not be ready for retry yet", manager.shouldRetryMsg(TEST_OFFSET));
+
+ Thread.sleep(100);
+ next = manager.nextFailedMessageToRetry();
+ assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
+ assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
+ }
+
+ @Test
+ public void testExponentialBackoff() throws Exception {
+ final long initial = 10;
+ final double mult = 2d;
+ ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(initial, mult, initial * 10);
+
+ long expectedWaitTime = initial;
+ for (long i = 0L; i < 3L; ++i) {
+ manager.failed(TEST_OFFSET);
+
+ Thread.sleep((expectedWaitTime + 1L) / 2L);
+ assertFalse("message should not be ready for retry yet", manager.shouldRetryMsg(TEST_OFFSET));
+
+ Thread.sleep((expectedWaitTime + 1L) / 2L);
+ Long next = manager.nextFailedMessageToRetry();
+ assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
+ assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
+
+ manager.retryStarted(TEST_OFFSET);
+ expectedWaitTime *= mult;
+ }
+ }
+
+ @Test
+ public void testRetryOrder() throws Exception {
+ final long initial = 10;
+ final double mult = 2d;
+ final long max = 20;
+ ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(initial, mult, max);
+
+ manager.failed(TEST_OFFSET);
+ Thread.sleep(initial);
+
+ manager.retryStarted(TEST_OFFSET);
+ manager.failed(TEST_OFFSET);
+ manager.failed(TEST_OFFSET2);
+
+ // although TEST_OFFSET failed first, it's retry delay time is longer b/c this is the second retry
+ // so TEST_OFFSET2 should come first
+
+ Thread.sleep(initial * 2);
+ assertTrue("message "+TEST_OFFSET+"should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
+ assertTrue("message "+TEST_OFFSET2+"should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET2));
+
+ Long next = manager.nextFailedMessageToRetry();
+ assertEquals("expect first message to retry is "+TEST_OFFSET2, TEST_OFFSET2, next);
+
+ Thread.sleep(initial);
+
+ // haven't retried yet, so first should still be TEST_OFFSET2
+ next = manager.nextFailedMessageToRetry();
+ assertEquals("expect first message to retry is "+TEST_OFFSET2, TEST_OFFSET2, next);
+ manager.retryStarted(next);
+
+ // now it should be TEST_OFFSET
+ next = manager.nextFailedMessageToRetry();
+ assertEquals("expect message to retry is now "+TEST_OFFSET, TEST_OFFSET, next);
+ manager.retryStarted(next);
+
+ // now none left
+ next = manager.nextFailedMessageToRetry();
+ assertNull("expect no message to retry now", next);
+ }
+
+ @Test
+ public void testQueriesAfterRetriedAlready() throws Exception {
+ ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+ manager.failed(TEST_OFFSET);
+ Long next = manager.nextFailedMessageToRetry();
+ assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
+ assertTrue("message should be ready for retry immediately", manager.shouldRetryMsg(TEST_OFFSET));
+
+ manager.retryStarted(TEST_OFFSET);
+ next = manager.nextFailedMessageToRetry();
+ assertNull("expect no message ready after retried", next);
+ assertFalse("message should not be ready after retried", manager.shouldRetryMsg(TEST_OFFSET));
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testRetryWithoutFail() throws Exception {
+ ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+ manager.retryStarted(TEST_OFFSET);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testFailRetryRetry() throws Exception {
+ ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+ manager.failed(TEST_OFFSET);
+ try {
+ manager.retryStarted(TEST_OFFSET);
+ } catch (IllegalStateException ise) {
+ fail("IllegalStateException unexpected here: " + ise);
+ }
+
+ assertFalse("message should not be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
+ manager.retryStarted(TEST_OFFSET);
+ }
+
+ @Test
+ public void testMaxBackoff() throws Exception {
+ final long initial = 100;
+ final double mult = 2d;
+ final long max = 2000;
+ ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(initial, mult, max);
+
+ long expectedWaitTime = initial;
+ for (long i = 0L; i < 4L; ++i) {
+ manager.failed(TEST_OFFSET);
+
+ Thread.sleep((expectedWaitTime + 1L) / 2L);
+ assertFalse("message should not be ready for retry yet", manager.shouldRetryMsg(TEST_OFFSET));
+
+ Thread.sleep((expectedWaitTime + 1L) / 2L);
+ Long next = manager.nextFailedMessageToRetry();
+ assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
+ assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
+
+ manager.retryStarted(TEST_OFFSET);
+ expectedWaitTime = Math.min((long) (expectedWaitTime * mult), max);
+ }
+ }
+
+ @Test
+ public void testFailThenAck() throws Exception {
+ ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+ manager.failed(TEST_OFFSET);
+ assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
+
+ manager.acked(TEST_OFFSET);
+
+ Long next = manager.nextFailedMessageToRetry();
+ assertNull("expect no message ready after acked", next);
+ assertFalse("message should not be ready after acked", manager.shouldRetryMsg(TEST_OFFSET));
+ }
+
+ @Test
+ public void testAckThenFail() throws Exception {
+ ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+ manager.acked(TEST_OFFSET);
+ assertFalse("message should not be ready after acked", manager.shouldRetryMsg(TEST_OFFSET));
+
+ manager.failed(TEST_OFFSET);
+
+ Long next = manager.nextFailedMessageToRetry();
+ assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
+ assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
+ }
+
+ @Test
+ public void testClearInvalidMessages() throws Exception {
+ ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+ manager.failed(TEST_OFFSET);
+ manager.failed(TEST_OFFSET2);
+ manager.failed(TEST_OFFSET3);
+
+ assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
+ assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET2));
+ assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET3));
+
+ manager.clearInvalidMessages(TEST_NEW_OFFSET);
+
+ Long next = manager.nextFailedMessageToRetry();
+ assertEquals("expect test offset next available for retry", TEST_OFFSET3, next);
+
+ manager.acked(TEST_OFFSET3);
+ next = manager.nextFailedMessageToRetry();
+ assertNull("expect no message ready after acked", next);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaErrorTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaErrorTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaErrorTest.java
new file mode 100644
index 0000000..e38bc1e
--- /dev/null
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaErrorTest.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Date: 12/01/2014
+ * Time: 18:09
+ */
+public class KafkaErrorTest {
+
+ @Test
+ public void getError() {
+ assertThat(KafkaError.getError(0), is(equalTo(KafkaError.NO_ERROR)));
+ }
+
+ @Test
+ public void offsetMetaDataTooLarge() {
+ assertThat(KafkaError.getError(12), is(equalTo(KafkaError.OFFSET_METADATA_TOO_LARGE)));
+ }
+
+ @Test
+ public void unknownNegative() {
+ assertThat(KafkaError.getError(-1), is(equalTo(KafkaError.UNKNOWN)));
+ }
+
+ @Test
+ public void unknownPositive() {
+ assertThat(KafkaError.getError(75), is(equalTo(KafkaError.UNKNOWN)));
+ }
+
+ @Test
+ public void unknown() {
+ assertThat(KafkaError.getError(13), is(equalTo(KafkaError.UNKNOWN)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaTestBroker.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaTestBroker.java b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaTestBroker.java
new file mode 100644
index 0000000..e2fb60f
--- /dev/null
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaTestBroker.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingServer;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServerStartable;
+import org.apache.commons.io.FileUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Date: 11/01/2014
+ * Time: 13:15
+ */
+public class KafkaTestBroker {
+
+ private int port;
+ private KafkaServerStartable kafka;
+ private TestingServer server;
+ private CuratorFramework zookeeper;
+ private File logDir;
+
+ public KafkaTestBroker() {
+ try {
+ server = new TestingServer();
+ String zookeeperConnectionString = server.getConnectString();
+ ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
+ zookeeper = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
+ zookeeper.start();
+ port = InstanceSpec.getRandomPort();
+ logDir = new File(System.getProperty("java.io.tmpdir"), "kafka/logs/kafka-test-" + port);
+ KafkaConfig config = buildKafkaConfig(zookeeperConnectionString);
+ kafka = new KafkaServerStartable(config);
+ kafka.startup();
+ } catch (Exception ex) {
+ throw new RuntimeException("Could not start test broker", ex);
+ }
+ }
+
+ private kafka.server.KafkaConfig buildKafkaConfig(String zookeeperConnectionString) {
+ Properties p = new Properties();
+ p.setProperty("zookeeper.connect", zookeeperConnectionString);
+ p.setProperty("broker.id", "0");
+ p.setProperty("port", "" + port);
+ p.setProperty("log.dirs", logDir.getAbsolutePath());
+ return new KafkaConfig(p);
+ }
+
+ public String getBrokerConnectionString() {
+ return "localhost:" + port;
+ }
+
+ public int getPort() {
+ return port;
+ }
+ public void shutdown() {
+ kafka.shutdown();
+ if (zookeeper.getState().equals(CuratorFrameworkState.STARTED)) {
+ zookeeper.close();
+ }
+ try {
+ server.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ FileUtils.deleteQuietly(logDir);
+ }
+}