You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/11 21:57:32 UTC

[36/53] [abbrv] [partial] storm git commit: STORM-1202: Migrate APIs to org.apache.storm, but try to provide some form of backwards compatability

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/DefaultCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/DefaultCoordinator.java b/external/storm-kafka/src/jvm/storm/kafka/trident/DefaultCoordinator.java
deleted file mode 100644
index 04e1396..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/DefaultCoordinator.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka.trident;
-
-public class DefaultCoordinator implements IBatchCoordinator {
-
-    @Override
-    public boolean isReady(long txid) {
-        return true;
-    }
-
-    @Override
-    public void close() {
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/GlobalPartitionInformation.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/GlobalPartitionInformation.java b/external/storm-kafka/src/jvm/storm/kafka/trident/GlobalPartitionInformation.java
deleted file mode 100644
index b0d97fc..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/GlobalPartitionInformation.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka.trident;
-
-import com.google.common.base.Objects;
-import storm.kafka.Broker;
-import storm.kafka.Partition;
-
-import java.io.Serializable;
-import java.util.*;
-
-
-public class GlobalPartitionInformation implements Iterable<Partition>, Serializable {
-
-    private Map<Integer, Broker> partitionMap;
-    public String topic;
-
-    //Flag to keep the Partition Path Id backward compatible with Old implementation of Partition.getId() == "partition_" + partition
-    private Boolean bUseTopicNameForPartitionPathId;
-
-    public GlobalPartitionInformation(String topic, Boolean bUseTopicNameForPartitionPathId) {
-        this.topic = topic;
-        this.partitionMap = new TreeMap<Integer, Broker>();
-        this.bUseTopicNameForPartitionPathId = bUseTopicNameForPartitionPathId;
-    }
-
-    public GlobalPartitionInformation(String topic) {
-        this.topic = topic;
-        this.partitionMap = new TreeMap<Integer, Broker>();
-        this.bUseTopicNameForPartitionPathId = false;
-    }
-
-    public void addPartition(int partitionId, Broker broker) {
-        partitionMap.put(partitionId, broker);
-    }
-
-    @Override
-    public String toString() {
-        return "GlobalPartitionInformation{" +
-                "topic=" + topic +
-                ", partitionMap=" + partitionMap +
-                '}';
-    }
-
-    public Broker getBrokerFor(Integer partitionId) {
-        return partitionMap.get(partitionId);
-    }
-
-    public List<Partition> getOrderedPartitions() {
-        List<Partition> partitions = new LinkedList<Partition>();
-        for (Map.Entry<Integer, Broker> partition : partitionMap.entrySet()) {
-            partitions.add(new Partition(partition.getValue(), this.topic, partition.getKey(), this.bUseTopicNameForPartitionPathId));
-        }
-        return partitions;
-    }
-
-    @Override
-    public Iterator<Partition> iterator() {
-        final Iterator<Map.Entry<Integer, Broker>> iterator = partitionMap.entrySet().iterator();
-        final String topic = this.topic;
-        final Boolean bUseTopicNameForPartitionPathId = this.bUseTopicNameForPartitionPathId;
-        return new Iterator<Partition>() {
-            @Override
-            public boolean hasNext() {
-                return iterator.hasNext();
-            }
-
-            @Override
-            public Partition next() {
-                Map.Entry<Integer, Broker> next = iterator.next();
-                return new Partition(next.getValue(), topic , next.getKey(), bUseTopicNameForPartitionPathId);
-            }
-
-            @Override
-            public void remove() {
-                iterator.remove();
-            }
-        };
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hashCode(partitionMap);
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj) {
-            return true;
-        }
-        if (obj == null || getClass() != obj.getClass()) {
-            return false;
-        }
-        final GlobalPartitionInformation other = (GlobalPartitionInformation) obj;
-        return Objects.equal(this.partitionMap, other.partitionMap);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/IBatchCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/IBatchCoordinator.java b/external/storm-kafka/src/jvm/storm/kafka/trident/IBatchCoordinator.java
deleted file mode 100644
index 04231f4..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/IBatchCoordinator.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka.trident;
-
-import java.io.Serializable;
-
-public interface IBatchCoordinator extends Serializable {
-    boolean isReady(long txid);
-
-    void close();
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/IBrokerReader.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/IBrokerReader.java b/external/storm-kafka/src/jvm/storm/kafka/trident/IBrokerReader.java
deleted file mode 100644
index afba659..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/IBrokerReader.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka.trident;
-
-import java.util.List;
-import java.util.Map;
-
-public interface IBrokerReader {
-
-    GlobalPartitionInformation getBrokerForTopic(String topic);
-
-    List<GlobalPartitionInformation> getAllBrokers();
-
-    void close();
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/MaxMetric.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/MaxMetric.java b/external/storm-kafka/src/jvm/storm/kafka/trident/MaxMetric.java
deleted file mode 100644
index 60d7c7b..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/MaxMetric.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka.trident;
-
-
-import backtype.storm.metric.api.ICombiner;
-
-public class MaxMetric implements ICombiner<Long> {
-    @Override
-    public Long identity() {
-        return null;
-    }
-
-    @Override
-    public Long combine(Long l1, Long l2) {
-        if (l1 == null) {
-            return l2;
-        }
-        if (l2 == null) {
-            return l1;
-        }
-        return Math.max(l1, l2);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/OpaqueTridentKafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/OpaqueTridentKafkaSpout.java b/external/storm-kafka/src/jvm/storm/kafka/trident/OpaqueTridentKafkaSpout.java
deleted file mode 100644
index fbd1d7a..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/OpaqueTridentKafkaSpout.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka.trident;
-
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Fields;
-import storm.kafka.Partition;
-import storm.trident.spout.IOpaquePartitionedTridentSpout;
-
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-
-public class OpaqueTridentKafkaSpout implements IOpaquePartitionedTridentSpout<List<GlobalPartitionInformation>, Partition, Map> {
-
-
-    TridentKafkaConfig _config;
-
-    public OpaqueTridentKafkaSpout(TridentKafkaConfig config) {
-        _config = config;
-    }
-
-    @Override
-    public IOpaquePartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map> getEmitter(Map conf, TopologyContext context) {
-        return new TridentKafkaEmitter(conf, context, _config, context
-                .getStormId()).asOpaqueEmitter();
-    }
-
-    @Override
-    public IOpaquePartitionedTridentSpout.Coordinator getCoordinator(Map conf, TopologyContext tc) {
-        return new storm.kafka.trident.Coordinator(conf, _config);
-    }
-
-    @Override
-    public Fields getOutputFields() {
-        return _config.scheme.getOutputFields();
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        return null;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/StaticBrokerReader.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/StaticBrokerReader.java b/external/storm-kafka/src/jvm/storm/kafka/trident/StaticBrokerReader.java
deleted file mode 100644
index ca83c06..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/StaticBrokerReader.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka.trident;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-public class StaticBrokerReader implements IBrokerReader {
-
-    private Map<String,GlobalPartitionInformation> brokers = new TreeMap<String,GlobalPartitionInformation>();
-
-    public StaticBrokerReader(String topic, GlobalPartitionInformation partitionInformation) {
-        this.brokers.put(topic, partitionInformation);
-    }
-
-    @Override
-    public GlobalPartitionInformation getBrokerForTopic(String topic) {
-        if (brokers.containsKey(topic)) return brokers.get(topic);
-        return null;
-    }
-
-    @Override
-    public List<GlobalPartitionInformation> getAllBrokers () {
-        List<GlobalPartitionInformation> list = new ArrayList<GlobalPartitionInformation>();
-        list.addAll(brokers.values());
-        return list;
-    }
-
-    @Override
-    public void close() {
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/TransactionalTridentKafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TransactionalTridentKafkaSpout.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TransactionalTridentKafkaSpout.java
deleted file mode 100644
index 9feffc8..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TransactionalTridentKafkaSpout.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka.trident;
-
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Fields;
-import storm.kafka.Partition;
-import storm.trident.spout.IPartitionedTridentSpout;
-
-import java.util.Map;
-import java.util.UUID;
-
-
-public class TransactionalTridentKafkaSpout implements IPartitionedTridentSpout<GlobalPartitionInformation, Partition, Map> {
-
-    TridentKafkaConfig _config;
-
-    public TransactionalTridentKafkaSpout(TridentKafkaConfig config) {
-        _config = config;
-    }
-
-
-    @Override
-    public IPartitionedTridentSpout.Coordinator getCoordinator(Map conf, TopologyContext context) {
-        return new storm.kafka.trident.Coordinator(conf, _config);
-    }
-
-    @Override
-    public IPartitionedTridentSpout.Emitter getEmitter(Map conf, TopologyContext context) {
-        return new TridentKafkaEmitter(conf, context, _config, context
-                .getStormId()).asTransactionalEmitter();
-    }
-
-    @Override
-    public Fields getOutputFields() {
-        return _config.scheme.getOutputFields();
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaConfig.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaConfig.java
deleted file mode 100644
index 3878cc8..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaConfig.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka.trident;
-
-import storm.kafka.BrokerHosts;
-import storm.kafka.KafkaConfig;
-
-
-public class TridentKafkaConfig extends KafkaConfig {
-
-
-    public final IBatchCoordinator coordinator = new DefaultCoordinator();
-
-    public TridentKafkaConfig(BrokerHosts hosts, String topic) {
-        super(hosts, topic);
-    }
-
-    public TridentKafkaConfig(BrokerHosts hosts, String topic, String clientId) {
-        super(hosts, topic, clientId);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
deleted file mode 100644
index a97d2cb..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
+++ /dev/null
@@ -1,287 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka.trident;
-
-import backtype.storm.Config;
-import backtype.storm.metric.api.CombinedMetric;
-import backtype.storm.metric.api.MeanReducer;
-import backtype.storm.metric.api.ReducedMetric;
-import backtype.storm.task.TopologyContext;
-import com.google.common.collect.ImmutableMap;
-import kafka.javaapi.consumer.SimpleConsumer;
-import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.message.Message;
-import kafka.message.MessageAndOffset;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import storm.kafka.*;
-import storm.kafka.TopicOffsetOutOfRangeException;
-import storm.trident.operation.TridentCollector;
-import storm.trident.spout.IOpaquePartitionedTridentSpout;
-import storm.trident.spout.IPartitionedTridentSpout;
-import storm.trident.topology.TransactionAttempt;
-
-import java.util.*;
-
-public class TridentKafkaEmitter {
-
-    public static final Logger LOG = LoggerFactory.getLogger(TridentKafkaEmitter.class);
-
-    private DynamicPartitionConnections _connections;
-    private String _topologyName;
-    private KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric;
-    private ReducedMetric _kafkaMeanFetchLatencyMetric;
-    private CombinedMetric _kafkaMaxFetchLatencyMetric;
-    private TridentKafkaConfig _config;
-    private String _topologyInstanceId;
-
-    public TridentKafkaEmitter(Map conf, TopologyContext context, TridentKafkaConfig config, String topologyInstanceId) {
-        _config = config;
-        _topologyInstanceId = topologyInstanceId;
-        _connections = new DynamicPartitionConnections(_config, KafkaUtils.makeBrokerReader(conf, _config));
-        _topologyName = (String) conf.get(Config.TOPOLOGY_NAME);
-        _kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(_connections);
-        context.registerMetric("kafkaOffset", _kafkaOffsetMetric, _config.metricsTimeBucketSizeInSecs);
-        _kafkaMeanFetchLatencyMetric = context.registerMetric("kafkaFetchAvg", new MeanReducer(), _config.metricsTimeBucketSizeInSecs);
-        _kafkaMaxFetchLatencyMetric = context.registerMetric("kafkaFetchMax", new MaxMetric(), _config.metricsTimeBucketSizeInSecs);
-    }
-
-
-    private Map failFastEmitNewPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map lastMeta) {
-        SimpleConsumer consumer = _connections.register(partition);
-        Map ret = doEmitNewPartitionBatch(consumer, partition, collector, lastMeta);
-        _kafkaOffsetMetric.setLatestEmittedOffset(partition, (Long) ret.get("offset"));
-        return ret;
-    }
-
-    private Map emitNewPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map lastMeta) {
-        try {
-            return failFastEmitNewPartitionBatch(attempt, collector, partition, lastMeta);
-        } catch (FailedFetchException e) {
-            LOG.warn("Failed to fetch from partition " + partition);
-            if (lastMeta == null) {
-                return null;
-            } else {
-                Map ret = new HashMap();
-                ret.put("offset", lastMeta.get("nextOffset"));
-                ret.put("nextOffset", lastMeta.get("nextOffset"));
-                ret.put("partition", partition.partition);
-                ret.put("broker", ImmutableMap.of("host", partition.host.host, "port", partition.host.port));
-                ret.put("topic", partition.topic);
-                ret.put("topology", ImmutableMap.of("name", _topologyName, "id", _topologyInstanceId));
-                return ret;
-            }
-        }
-    }
-
-    private Map doEmitNewPartitionBatch(SimpleConsumer consumer, Partition partition, TridentCollector collector, Map lastMeta) {
-        long offset;
-        if (lastMeta != null) {
-            String lastInstanceId = null;
-            Map lastTopoMeta = (Map) lastMeta.get("topology");
-            if (lastTopoMeta != null) {
-                lastInstanceId = (String) lastTopoMeta.get("id");
-            }
-            if (_config.ignoreZkOffsets && !_topologyInstanceId.equals(lastInstanceId)) {
-                offset = KafkaUtils.getOffset(consumer, partition.topic, partition.partition, _config.startOffsetTime);
-            } else {
-                offset = (Long) lastMeta.get("nextOffset");
-            }
-        } else {
-            offset = KafkaUtils.getOffset(consumer, partition.topic, partition.partition, _config);
-        }
-
-        ByteBufferMessageSet msgs = null;
-        try {
-            msgs = fetchMessages(consumer, partition, offset);
-        } catch (TopicOffsetOutOfRangeException e) {
-            long newOffset = KafkaUtils.getOffset(consumer, partition.topic, partition.partition, kafka.api.OffsetRequest.EarliestTime());
-            LOG.warn("OffsetOutOfRange: Updating offset from offset = " + offset + " to offset = " + newOffset);
-            offset = newOffset;
-            msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);
-        }
-
-        long endoffset = offset;
-        for (MessageAndOffset msg : msgs) {
-            emit(collector, msg.message(), partition, msg.offset());
-            endoffset = msg.nextOffset();
-        }
-        Map newMeta = new HashMap();
-        newMeta.put("offset", offset);
-        newMeta.put("nextOffset", endoffset);
-        newMeta.put("instanceId", _topologyInstanceId);
-        newMeta.put("partition", partition.partition);
-        newMeta.put("broker", ImmutableMap.of("host", partition.host.host, "port", partition.host.port));
-        newMeta.put("topic", partition.topic);
-        newMeta.put("topology", ImmutableMap.of("name", _topologyName, "id", _topologyInstanceId));
-        return newMeta;
-    }
-
-    private ByteBufferMessageSet fetchMessages(SimpleConsumer consumer, Partition partition, long offset) {
-        long start = System.nanoTime();
-        ByteBufferMessageSet msgs = null;
-        msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);
-        long end = System.nanoTime();
-        long millis = (end - start) / 1000000;
-        _kafkaMeanFetchLatencyMetric.update(millis);
-        _kafkaMaxFetchLatencyMetric.update(millis);
-        return msgs;
-    }
-
-    /**
-     * re-emit the batch described by the meta data provided
-     *
-     * @param attempt
-     * @param collector
-     * @param partition
-     * @param meta
-     */
-    private void reEmitPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map meta) {
-        LOG.info("re-emitting batch, attempt " + attempt);
-        String instanceId = (String) meta.get("instanceId");
-        if (!_config.ignoreZkOffsets || instanceId.equals(_topologyInstanceId)) {
-            SimpleConsumer consumer = _connections.register(partition);
-            long offset = (Long) meta.get("offset");
-            long nextOffset = (Long) meta.get("nextOffset");
-            ByteBufferMessageSet msgs = null;
-            msgs = fetchMessages(consumer, partition, offset);
-
-            if(msgs != null) {
-                for (MessageAndOffset msg : msgs) {
-                    if (offset == nextOffset) {
-                        break;
-                    }
-                    if (offset > nextOffset) {
-                        throw new RuntimeException("Error when re-emitting batch. overshot the end offset");
-                    }
-                    emit(collector, msg.message(), partition, msg.offset());
-                    offset = msg.nextOffset();
-                }
-            }
-        }
-    }
-
-    private void emit(TridentCollector collector, Message msg, Partition partition, long offset) {
-        Iterable<List<Object>> values;
-        if (_config.scheme instanceof MessageMetadataSchemeAsMultiScheme) {
-            values = KafkaUtils.generateTuples((MessageMetadataSchemeAsMultiScheme) _config.scheme, msg, partition, offset);
-        } else {
-            values = KafkaUtils.generateTuples(_config, msg, partition.topic);
-        }
-
-        if (values != null) {
-            for (List<Object> value : values) {
-                collector.emit(value);
-            }
-        }
-    }
-
-    private void clear() {
-        _connections.clear();
-    }
-
-    private List<Partition> orderPartitions(List<GlobalPartitionInformation> partitions) {
-        List<Partition> part = new ArrayList<Partition>();
-        for (GlobalPartitionInformation globalPartitionInformation : partitions)
-            part.addAll(globalPartitionInformation.getOrderedPartitions());
-        return part;
-    }
-
-    private void refresh(List<Partition> list) {
-        _connections.clear();
-        _kafkaOffsetMetric.refreshPartitions(new HashSet<Partition>(list));
-    }
-
-
-    public IOpaquePartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map> asOpaqueEmitter() {
-
-        return new IOpaquePartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map>() {
-
-            /**
-             * Emit a batch of tuples for a partition/transaction.
-             *
-             * Return the metadata describing this batch that will be used as lastPartitionMeta
-             * for defining the parameters of the next batch.
-             */
-            @Override
-            public Map emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) {
-                return emitNewPartitionBatch(transactionAttempt, tridentCollector, partition, map);
-            }
-
-            @Override
-            public void refreshPartitions(List<Partition> partitions) {
-                refresh(partitions);
-            }
-
-            @Override
-            public List<Partition> getOrderedPartitions(List<GlobalPartitionInformation> partitionInformation) {
-                return orderPartitions(partitionInformation);
-            }
-
-            @Override
-            public void close() {
-                clear();
-            }
-        };
-    }
-
-    public IPartitionedTridentSpout.Emitter asTransactionalEmitter() {
-        return new IPartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map>() {
-
-            /**
-             * Emit a batch of tuples for a partition/transaction that's never been emitted before.
-             * Return the metadata that can be used to reconstruct this partition/batch in the future.
-             */
-            @Override
-            public Map emitPartitionBatchNew(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) {
-                return failFastEmitNewPartitionBatch(transactionAttempt, tridentCollector, partition, map);
-            }
-
-            /**
-             * Emit a batch of tuples for a partition/transaction that has been emitted before, using
-             * the metadata created when it was first emitted.
-             */
-            @Override
-            public void emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) {
-                reEmitPartitionBatch(transactionAttempt, tridentCollector, partition, map);
-            }
-
-            /**
-             * This method is called when this task is responsible for a new set of partitions. Should be used
-             * to manage things like connections to brokers.
-             */
-            @Override
-            public void refreshPartitions(List<Partition> partitions) {
-                refresh(partitions);
-            }
-
-            @Override
-            public List<Partition> getOrderedPartitions(List<GlobalPartitionInformation> partitionInformation) {
-                return orderPartitions(partitionInformation);
-            }
-
-            @Override
-            public void close() {
-                clear();
-            }
-        };
-
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
deleted file mode 100644
index 84b6a6a..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka.trident;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.topology.FailedException;
-import org.apache.commons.lang.Validate;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
-import storm.kafka.trident.selector.KafkaTopicSelector;
-import storm.trident.operation.TridentCollector;
-import storm.trident.state.State;
-import storm.trident.tuple.TridentTuple;
-
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
-public class TridentKafkaState implements State {
-    private static final Logger LOG = LoggerFactory.getLogger(TridentKafkaState.class);
-
-    private KafkaProducer producer;
-    private OutputCollector collector;
-
-    private TridentTupleToKafkaMapper mapper;
-    private KafkaTopicSelector topicSelector;
-
-    public TridentKafkaState withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper mapper) {
-        this.mapper = mapper;
-        return this;
-    }
-
-    public TridentKafkaState withKafkaTopicSelector(KafkaTopicSelector selector) {
-        this.topicSelector = selector;
-        return this;
-    }
-
-    @Override
-    public void beginCommit(Long txid) {
-        LOG.debug("beginCommit is Noop.");
-    }
-
-    @Override
-    public void commit(Long txid) {
-        LOG.debug("commit is Noop.");
-    }
-
-    public void prepare(Properties options) {
-        Validate.notNull(mapper, "mapper can not be null");
-        Validate.notNull(topicSelector, "topicSelector can not be null");
-        producer = new KafkaProducer(options);
-    }
-
-    public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
-        String topic = null;
-        for (TridentTuple tuple : tuples) {
-            try {
-                topic = topicSelector.getTopic(tuple);
-
-                if(topic != null) {
-                    Future<RecordMetadata> result = producer.send(new ProducerRecord(topic,
-                            mapper.getKeyFromTuple(tuple), mapper.getMessageFromTuple(tuple)));
-                    try {
-                        result.get();
-                    } catch (ExecutionException e) {
-                        String errorMsg = "Could not retrieve result for message with key = "
-                                + mapper.getKeyFromTuple(tuple) + " from topic = " + topic;
-                        LOG.error(errorMsg, e);
-                        throw new FailedException(errorMsg, e);
-                    }
-                } else {
-                    LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) + ", topic selector returned null.");
-                }
-            } catch (Exception ex) {
-                String errorMsg = "Could not send message with key = " + mapper.getKeyFromTuple(tuple)
-                        + " to topic = " + topic;
-                LOG.warn(errorMsg, ex);
-                throw new FailedException(errorMsg, ex);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaStateFactory.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaStateFactory.java
deleted file mode 100644
index a5d9d42..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaStateFactory.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka.trident;
-
-import backtype.storm.task.IMetricsContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
-import storm.kafka.trident.selector.KafkaTopicSelector;
-import storm.trident.state.State;
-import storm.trident.state.StateFactory;
-
-import java.util.Map;
-import java.util.Properties;
-
-public class TridentKafkaStateFactory implements StateFactory {
-
-    private static final Logger LOG = LoggerFactory.getLogger(TridentKafkaStateFactory.class);
-
-    private TridentTupleToKafkaMapper mapper;
-    private KafkaTopicSelector topicSelector;
-    private Properties producerProperties = new Properties();
-
-    public TridentKafkaStateFactory withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper mapper) {
-        this.mapper = mapper;
-        return this;
-    }
-
-    public TridentKafkaStateFactory withKafkaTopicSelector(KafkaTopicSelector selector) {
-        this.topicSelector = selector;
-        return this;
-    }
-
-    public TridentKafkaStateFactory withProducerProperties(Properties props) {
-        this.producerProperties = props;
-        return this;
-    }
-
-    @Override
-    public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
-        LOG.info("makeState(partitonIndex={}, numpartitions={}", partitionIndex, numPartitions);
-        TridentKafkaState state = new TridentKafkaState()
-                .withKafkaTopicSelector(this.topicSelector)
-                .withTridentTupleToKafkaMapper(this.mapper);
-        state.prepare(producerProperties);
-        return state;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaUpdater.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaUpdater.java
deleted file mode 100644
index 6639b36..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaUpdater.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka.trident;
-
-import storm.trident.operation.TridentCollector;
-import storm.trident.state.BaseStateUpdater;
-import storm.trident.tuple.TridentTuple;
-
-import java.util.List;
-
-public class TridentKafkaUpdater extends BaseStateUpdater<TridentKafkaState> {
-    @Override
-    public void updateState(TridentKafkaState state, List<TridentTuple> tuples, TridentCollector collector) {
-        state.updateState(tuples, collector);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/ZkBrokerReader.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/ZkBrokerReader.java b/external/storm-kafka/src/jvm/storm/kafka/trident/ZkBrokerReader.java
deleted file mode 100644
index b480bdd..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/ZkBrokerReader.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka.trident;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import storm.kafka.DynamicBrokersReader;
-import storm.kafka.ZkHosts;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-
-public class ZkBrokerReader implements IBrokerReader {
-
-	public static final Logger LOG = LoggerFactory.getLogger(ZkBrokerReader.class);
-
-	List<GlobalPartitionInformation> cachedBrokers = new ArrayList<GlobalPartitionInformation>();
-	DynamicBrokersReader reader;
-	long lastRefreshTimeMs;
-
-
-	long refreshMillis;
-
-	public ZkBrokerReader(Map conf, String topic, ZkHosts hosts) {
-		try {
-			reader = new DynamicBrokersReader(conf, hosts.brokerZkStr, hosts.brokerZkPath, topic);
-			cachedBrokers = reader.getBrokerInfo();
-			lastRefreshTimeMs = System.currentTimeMillis();
-			refreshMillis = hosts.refreshFreqSecs * 1000L;
-		} catch (java.net.SocketTimeoutException e) {
-			LOG.warn("Failed to update brokers", e);
-		}
-
-	}
-
-	private void refresh() {
-		long currTime = System.currentTimeMillis();
-		if (currTime > lastRefreshTimeMs + refreshMillis) {
-			try {
-				LOG.info("brokers need refreshing because " + refreshMillis + "ms have expired");
-				cachedBrokers = reader.getBrokerInfo();
-				lastRefreshTimeMs = currTime;
-			} catch (java.net.SocketTimeoutException e) {
-				LOG.warn("Failed to update brokers", e);
-			}
-		}
-	}
-	@Override
-	public GlobalPartitionInformation getBrokerForTopic(String topic) {
-		refresh();
-        for(GlobalPartitionInformation partitionInformation : cachedBrokers) {
-            if (partitionInformation.topic.equals(topic)) return partitionInformation;
-        }
-		return null;
-	}
-
-	@Override
-	public List<GlobalPartitionInformation> getAllBrokers() {
-		refresh();
-		return cachedBrokers;
-	}
-
-	@Override
-	public void close() {
-		reader.close();
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java b/external/storm-kafka/src/jvm/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java
deleted file mode 100644
index 29a49d1..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka.trident.mapper;
-
-import storm.trident.tuple.TridentTuple;
-
-public class FieldNameBasedTupleToKafkaMapper<K, V> implements TridentTupleToKafkaMapper {
-
-    public final String keyFieldName;
-    public final String msgFieldName;
-
-    public FieldNameBasedTupleToKafkaMapper(String keyFieldName, String msgFieldName) {
-        this.keyFieldName = keyFieldName;
-        this.msgFieldName = msgFieldName;
-    }
-
-    @Override
-    public K getKeyFromTuple(TridentTuple tuple) {
-        return (K) tuple.getValueByField(keyFieldName);
-    }
-
-    @Override
-    public V getMessageFromTuple(TridentTuple tuple) {
-        return (V) tuple.getValueByField(msgFieldName);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java b/external/storm-kafka/src/jvm/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java
deleted file mode 100644
index 9759ba3..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka.trident.mapper;
-
-import backtype.storm.tuple.Tuple;
-import storm.trident.tuple.TridentTuple;
-
-import java.io.Serializable;
-
-public interface TridentTupleToKafkaMapper<K,V>  extends Serializable {
-    K getKeyFromTuple(TridentTuple tuple);
-    V getMessageFromTuple(TridentTuple tuple);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/selector/DefaultTopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/selector/DefaultTopicSelector.java b/external/storm-kafka/src/jvm/storm/kafka/trident/selector/DefaultTopicSelector.java
deleted file mode 100644
index 473a38d..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/selector/DefaultTopicSelector.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka.trident.selector;
-
-import storm.trident.tuple.TridentTuple;
-
-public class DefaultTopicSelector implements KafkaTopicSelector {
-
-    private final String topicName;
-
-    public DefaultTopicSelector(final String topicName) {
-        this.topicName = topicName;
-    }
-
-    @Override
-    public String getTopic(TridentTuple tuple) {
-        return topicName;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/selector/KafkaTopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/selector/KafkaTopicSelector.java b/external/storm-kafka/src/jvm/storm/kafka/trident/selector/KafkaTopicSelector.java
deleted file mode 100644
index f6c5d82..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/selector/KafkaTopicSelector.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka.trident.selector;
-
-import storm.trident.tuple.TridentTuple;
-
-import java.io.Serializable;
-
-public interface KafkaTopicSelector extends Serializable {
-    String getTopic(TridentTuple tuple);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/org/apache/storm/kafka/DynamicBrokersReaderTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/DynamicBrokersReaderTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/DynamicBrokersReaderTest.java
new file mode 100644
index 0000000..3363252
--- /dev/null
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/DynamicBrokersReaderTest.java
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.apache.storm.Config;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.test.TestingServer;
+import org.apache.curator.utils.ZKPaths;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.storm.kafka.trident.GlobalPartitionInformation;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Date: 16/05/2013
+ * Time: 20:35
+ */
+public class DynamicBrokersReaderTest {
+    private DynamicBrokersReader dynamicBrokersReader, wildCardBrokerReader;
+    private String masterPath = "/brokers";
+    private String topic = "testing1";
+    private String secondTopic = "testing2";
+    private String thirdTopic = "testing3";
+
+    private CuratorFramework zookeeper;
+    private TestingServer server;
+
+    @Before
+    public void setUp() throws Exception {
+        server = new TestingServer();
+        String connectionString = server.getConnectString();
+        Map conf = new HashMap();
+        conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 1000);
+        conf.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 1000);
+        conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 4);
+        conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 5);
+
+        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
+        zookeeper = CuratorFrameworkFactory.newClient(connectionString, retryPolicy);
+        dynamicBrokersReader = new DynamicBrokersReader(conf, connectionString, masterPath, topic);
+
+        Map conf2 = new HashMap();
+        conf2.putAll(conf);
+        conf2.put("kafka.topic.wildcard.match",true);
+
+        wildCardBrokerReader = new DynamicBrokersReader(conf2, connectionString, masterPath, "^test.*$");
+        zookeeper.start();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        dynamicBrokersReader.close();
+        zookeeper.close();
+        server.close();
+    }
+
+    private void addPartition(int id, String host, int port, String topic) throws Exception {
+        writePartitionId(id, topic);
+        writeLeader(id, 0, topic);
+        writeLeaderDetails(0, host, port);
+    }
+
+    private void addPartition(int id, int leader, String host, int port, String topic) throws Exception {
+        writePartitionId(id, topic);
+        writeLeader(id, leader, topic);
+        writeLeaderDetails(leader, host, port);
+    }
+
+    private void writePartitionId(int id, String topic) throws Exception {
+        String path = dynamicBrokersReader.partitionPath(topic);
+        writeDataToPath(path, ("" + id));
+    }
+
+    private void writeDataToPath(String path, String data) throws Exception {
+        ZKPaths.mkdirs(zookeeper.getZookeeperClient().getZooKeeper(), path);
+        zookeeper.setData().forPath(path, data.getBytes());
+    }
+
+    private void writeLeader(int id, int leaderId, String topic) throws Exception {
+        String path = dynamicBrokersReader.partitionPath(topic) + "/" + id + "/state";
+        String value = " { \"controller_epoch\":4, \"isr\":[ 1, 0 ], \"leader\":" + leaderId + ", \"leader_epoch\":1, \"version\":1 }";
+        writeDataToPath(path, value);
+    }
+
+    private void writeLeaderDetails(int leaderId, String host, int port) throws Exception {
+        String path = dynamicBrokersReader.brokerPath() + "/" + leaderId;
+        String value = "{ \"host\":\"" + host + "\", \"jmx_port\":9999, \"port\":" + port + ", \"version\":1 }";
+        writeDataToPath(path, value);
+    }
+
+
+    private GlobalPartitionInformation getByTopic(List<GlobalPartitionInformation> partitions, String topic){
+        for(GlobalPartitionInformation partitionInformation : partitions) {
+            if (partitionInformation.topic.equals(topic)) return partitionInformation;
+        }
+        return null;
+    }
+
+    @Test
+    public void testGetBrokerInfo() throws Exception {
+        String host = "localhost";
+        int port = 9092;
+        int partition = 0;
+        addPartition(partition, host, port, topic);
+        List<GlobalPartitionInformation> partitions = dynamicBrokersReader.getBrokerInfo();
+
+        GlobalPartitionInformation brokerInfo = getByTopic(partitions, topic);
+        assertNotNull(brokerInfo);
+        assertEquals(1, brokerInfo.getOrderedPartitions().size());
+        assertEquals(port, brokerInfo.getBrokerFor(partition).port);
+        assertEquals(host, brokerInfo.getBrokerFor(partition).host);
+    }
+
+    @Test
+    public void testGetBrokerInfoWildcardMatch() throws Exception {
+        String host = "localhost";
+        int port = 9092;
+        int partition = 0;
+        addPartition(partition, host, port, topic);
+        addPartition(partition, host, port, secondTopic);
+
+        List<GlobalPartitionInformation> partitions = wildCardBrokerReader.getBrokerInfo();
+
+        GlobalPartitionInformation brokerInfo = getByTopic(partitions, topic);
+        assertNotNull(brokerInfo);
+        assertEquals(1, brokerInfo.getOrderedPartitions().size());
+        assertEquals(port, brokerInfo.getBrokerFor(partition).port);
+        assertEquals(host, brokerInfo.getBrokerFor(partition).host);
+
+        brokerInfo = getByTopic(partitions, secondTopic);
+        assertNotNull(brokerInfo);
+        assertEquals(1, brokerInfo.getOrderedPartitions().size());
+        assertEquals(port, brokerInfo.getBrokerFor(partition).port);
+        assertEquals(host, brokerInfo.getBrokerFor(partition).host);
+
+        addPartition(partition, host, port, thirdTopic);
+        //Discover newly added topic
+        partitions = wildCardBrokerReader.getBrokerInfo();
+        assertNotNull(getByTopic(partitions, topic));
+        assertNotNull(getByTopic(partitions, secondTopic));
+        assertNotNull(getByTopic(partitions, secondTopic));
+    }
+
+
+    @Test
+    public void testMultiplePartitionsOnDifferentHosts() throws Exception {
+        String host = "localhost";
+        int port = 9092;
+        int secondPort = 9093;
+        int partition = 0;
+        int secondPartition = partition + 1;
+        addPartition(partition, 0, host, port, topic);
+        addPartition(secondPartition, 1, host, secondPort, topic);
+
+        List<GlobalPartitionInformation> partitions = dynamicBrokersReader.getBrokerInfo();
+
+        GlobalPartitionInformation brokerInfo = getByTopic(partitions, topic);
+        assertNotNull(brokerInfo);
+        assertEquals(2, brokerInfo.getOrderedPartitions().size());
+
+        assertEquals(port, brokerInfo.getBrokerFor(partition).port);
+        assertEquals(host, brokerInfo.getBrokerFor(partition).host);
+
+        assertEquals(secondPort, brokerInfo.getBrokerFor(secondPartition).port);
+        assertEquals(host, brokerInfo.getBrokerFor(secondPartition).host);
+    }
+
+
+    @Test
+    public void testMultiplePartitionsOnSameHost() throws Exception {
+        String host = "localhost";
+        int port = 9092;
+        int partition = 0;
+        int secondPartition = partition + 1;
+        addPartition(partition, 0, host, port, topic);
+        addPartition(secondPartition, 0, host, port, topic);
+
+        List<GlobalPartitionInformation> partitions = dynamicBrokersReader.getBrokerInfo();
+
+        GlobalPartitionInformation brokerInfo = getByTopic(partitions, topic);
+        assertNotNull(brokerInfo);
+        assertEquals(2, brokerInfo.getOrderedPartitions().size());
+
+        assertEquals(port, brokerInfo.getBrokerFor(partition).port);
+        assertEquals(host, brokerInfo.getBrokerFor(partition).host);
+
+        assertEquals(port, brokerInfo.getBrokerFor(secondPartition).port);
+        assertEquals(host, brokerInfo.getBrokerFor(secondPartition).host);
+    }
+
+    @Test
+    public void testSwitchHostForPartition() throws Exception {
+        String host = "localhost";
+        int port = 9092;
+        int partition = 0;
+        addPartition(partition, host, port, topic);
+        List<GlobalPartitionInformation> partitions = dynamicBrokersReader.getBrokerInfo();
+
+        GlobalPartitionInformation brokerInfo = getByTopic(partitions, topic);
+        assertNotNull(brokerInfo);
+        assertEquals(port, brokerInfo.getBrokerFor(partition).port);
+        assertEquals(host, brokerInfo.getBrokerFor(partition).host);
+
+        String newHost = host + "switch";
+        int newPort = port + 1;
+        addPartition(partition, newHost, newPort, topic);
+        partitions = dynamicBrokersReader.getBrokerInfo();
+
+        brokerInfo = getByTopic(partitions, topic);
+        assertNotNull(brokerInfo);
+        assertEquals(newPort, brokerInfo.getBrokerFor(partition).port);
+        assertEquals(newHost, brokerInfo.getBrokerFor(partition).host);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testErrorLogsWhenConfigIsMissing() throws Exception {
+        String connectionString = server.getConnectString();
+        Map conf = new HashMap();
+        conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 1000);
+//        conf.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 1000);
+        conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 4);
+        conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 5);
+
+        DynamicBrokersReader dynamicBrokersReader1 = new DynamicBrokersReader(conf, connectionString, masterPath, topic);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
new file mode 100644
index 0000000..8fa6564
--- /dev/null
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.junit.Test;
+
+public class ExponentialBackoffMsgRetryManagerTest {
+
+    private static final Long TEST_OFFSET = 101L;
+    private static final Long TEST_OFFSET2 = 102L;
+    private static final Long TEST_OFFSET3 = 105L;
+    private static final Long TEST_NEW_OFFSET = 103L;
+
+    @Test
+    public void testImmediateRetry() throws Exception {
+        ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+        manager.failed(TEST_OFFSET);
+        Long next = manager.nextFailedMessageToRetry();
+        assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
+        assertTrue("message should be ready for retry immediately", manager.shouldRetryMsg(TEST_OFFSET));
+
+        manager.retryStarted(TEST_OFFSET);
+
+        manager.failed(TEST_OFFSET);
+        next = manager.nextFailedMessageToRetry();
+        assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
+        assertTrue("message should be ready for retry immediately", manager.shouldRetryMsg(TEST_OFFSET));
+    }
+
+    @Test
+    public void testSingleDelay() throws Exception {
+        ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(100, 1d, 1000);
+        manager.failed(TEST_OFFSET);
+        Thread.sleep(5);
+        Long next = manager.nextFailedMessageToRetry();
+        assertNull("expect no message ready for retry yet", next);
+        assertFalse("message should not be ready for retry yet", manager.shouldRetryMsg(TEST_OFFSET));
+
+        Thread.sleep(100);
+        next = manager.nextFailedMessageToRetry();
+        assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
+        assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
+    }
+
+    @Test
+    public void testExponentialBackoff() throws Exception {
+        final long initial = 10;
+        final double mult = 2d;
+        ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(initial, mult, initial * 10);
+
+        long expectedWaitTime = initial;
+        for (long i = 0L; i < 3L; ++i) {
+            manager.failed(TEST_OFFSET);
+
+            Thread.sleep((expectedWaitTime + 1L) / 2L);
+            assertFalse("message should not be ready for retry yet", manager.shouldRetryMsg(TEST_OFFSET));
+
+            Thread.sleep((expectedWaitTime + 1L) / 2L);
+            Long next = manager.nextFailedMessageToRetry();
+            assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
+            assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
+
+            manager.retryStarted(TEST_OFFSET);
+            expectedWaitTime *= mult;
+        }
+    }
+
+    @Test
+    public void testRetryOrder() throws Exception {
+        final long initial = 10;
+        final double mult = 2d;
+        final long max = 20;
+        ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(initial, mult, max);
+
+        manager.failed(TEST_OFFSET);
+        Thread.sleep(initial);
+
+        manager.retryStarted(TEST_OFFSET);
+        manager.failed(TEST_OFFSET);
+        manager.failed(TEST_OFFSET2);
+
+        // although TEST_OFFSET failed first, it's retry delay time is longer b/c this is the second retry
+        // so TEST_OFFSET2 should come first
+
+        Thread.sleep(initial * 2);
+        assertTrue("message "+TEST_OFFSET+"should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
+        assertTrue("message "+TEST_OFFSET2+"should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET2));
+
+        Long next = manager.nextFailedMessageToRetry();
+        assertEquals("expect first message to retry is "+TEST_OFFSET2, TEST_OFFSET2, next);
+
+        Thread.sleep(initial);
+
+        // haven't retried yet, so first should still be TEST_OFFSET2
+        next = manager.nextFailedMessageToRetry();
+        assertEquals("expect first message to retry is "+TEST_OFFSET2, TEST_OFFSET2, next);
+        manager.retryStarted(next);
+
+        // now it should be TEST_OFFSET
+        next = manager.nextFailedMessageToRetry();
+        assertEquals("expect message to retry is now "+TEST_OFFSET, TEST_OFFSET, next);
+        manager.retryStarted(next);
+
+        // now none left
+        next = manager.nextFailedMessageToRetry();
+        assertNull("expect no message to retry now", next);
+    }
+
+    @Test
+    public void testQueriesAfterRetriedAlready() throws Exception {
+        ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+        manager.failed(TEST_OFFSET);
+        Long next = manager.nextFailedMessageToRetry();
+        assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
+        assertTrue("message should be ready for retry immediately", manager.shouldRetryMsg(TEST_OFFSET));
+
+        manager.retryStarted(TEST_OFFSET);
+        next = manager.nextFailedMessageToRetry();
+        assertNull("expect no message ready after retried", next);
+        assertFalse("message should not be ready after retried", manager.shouldRetryMsg(TEST_OFFSET));
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testRetryWithoutFail() throws Exception {
+        ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+        manager.retryStarted(TEST_OFFSET);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testFailRetryRetry() throws Exception {
+        ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+        manager.failed(TEST_OFFSET);
+        try {
+            manager.retryStarted(TEST_OFFSET);
+        } catch (IllegalStateException ise) {
+            fail("IllegalStateException unexpected here: " + ise);
+        }
+
+        assertFalse("message should not be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
+        manager.retryStarted(TEST_OFFSET);
+    }
+
+    @Test
+    public void testMaxBackoff() throws Exception {
+        final long initial = 100;
+        final double mult = 2d;
+        final long max = 2000;
+        ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(initial, mult, max);
+
+        long expectedWaitTime = initial;
+        for (long i = 0L; i < 4L; ++i) {
+            manager.failed(TEST_OFFSET);
+
+            Thread.sleep((expectedWaitTime + 1L) / 2L);
+            assertFalse("message should not be ready for retry yet", manager.shouldRetryMsg(TEST_OFFSET));
+
+            Thread.sleep((expectedWaitTime + 1L) / 2L);
+            Long next = manager.nextFailedMessageToRetry();
+            assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
+            assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
+
+            manager.retryStarted(TEST_OFFSET);
+            expectedWaitTime = Math.min((long) (expectedWaitTime * mult), max);
+        }
+    }
+
+    @Test
+    public void testFailThenAck() throws Exception {
+        ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+        manager.failed(TEST_OFFSET);
+        assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
+
+        manager.acked(TEST_OFFSET);
+
+        Long next = manager.nextFailedMessageToRetry();
+        assertNull("expect no message ready after acked", next);
+        assertFalse("message should not be ready after acked", manager.shouldRetryMsg(TEST_OFFSET));
+    }
+
+    @Test
+    public void testAckThenFail() throws Exception {
+        ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+        manager.acked(TEST_OFFSET);
+        assertFalse("message should not be ready after acked", manager.shouldRetryMsg(TEST_OFFSET));
+
+        manager.failed(TEST_OFFSET);
+
+        Long next = manager.nextFailedMessageToRetry();
+        assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
+        assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
+    }
+    
+    @Test
+    public void testClearInvalidMessages() throws Exception {
+        ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+        manager.failed(TEST_OFFSET);
+        manager.failed(TEST_OFFSET2);
+        manager.failed(TEST_OFFSET3);
+        
+        assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
+        assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET2));
+        assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET3));
+
+        manager.clearInvalidMessages(TEST_NEW_OFFSET);
+
+        Long next = manager.nextFailedMessageToRetry();
+        assertEquals("expect test offset next available for retry", TEST_OFFSET3, next);
+        
+        manager.acked(TEST_OFFSET3);
+        next = manager.nextFailedMessageToRetry();
+        assertNull("expect no message ready after acked", next);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaErrorTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaErrorTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaErrorTest.java
new file mode 100644
index 0000000..e38bc1e
--- /dev/null
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaErrorTest.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Date: 12/01/2014
+ * Time: 18:09
+ */
+public class KafkaErrorTest {
+
+    @Test
+    public void getError() {
+        assertThat(KafkaError.getError(0), is(equalTo(KafkaError.NO_ERROR)));
+    }
+
+    @Test
+    public void offsetMetaDataTooLarge() {
+        assertThat(KafkaError.getError(12), is(equalTo(KafkaError.OFFSET_METADATA_TOO_LARGE)));
+    }
+
+    @Test
+    public void unknownNegative() {
+        assertThat(KafkaError.getError(-1), is(equalTo(KafkaError.UNKNOWN)));
+    }
+
+    @Test
+    public void unknownPositive() {
+        assertThat(KafkaError.getError(75), is(equalTo(KafkaError.UNKNOWN)));
+    }
+
+    @Test
+    public void unknown() {
+        assertThat(KafkaError.getError(13), is(equalTo(KafkaError.UNKNOWN)));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaTestBroker.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaTestBroker.java b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaTestBroker.java
new file mode 100644
index 0000000..e2fb60f
--- /dev/null
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaTestBroker.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingServer;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServerStartable;
+import org.apache.commons.io.FileUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Date: 11/01/2014
+ * Time: 13:15
+ */
+public class KafkaTestBroker {
+
+    private int port;
+    private KafkaServerStartable kafka;
+    private TestingServer server;
+    private CuratorFramework zookeeper;
+    private File logDir;
+
+    public KafkaTestBroker() {
+        try {
+            server = new TestingServer();
+            String zookeeperConnectionString = server.getConnectString();
+            ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
+            zookeeper = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
+            zookeeper.start();
+            port = InstanceSpec.getRandomPort();
+            logDir = new File(System.getProperty("java.io.tmpdir"), "kafka/logs/kafka-test-" + port);
+            KafkaConfig config = buildKafkaConfig(zookeeperConnectionString);
+            kafka = new KafkaServerStartable(config);
+            kafka.startup();
+        } catch (Exception ex) {
+            throw new RuntimeException("Could not start test broker", ex);
+        }
+    }
+
+    private kafka.server.KafkaConfig buildKafkaConfig(String zookeeperConnectionString) {
+        Properties p = new Properties();
+        p.setProperty("zookeeper.connect", zookeeperConnectionString);
+        p.setProperty("broker.id", "0");
+        p.setProperty("port", "" + port);
+        p.setProperty("log.dirs", logDir.getAbsolutePath());
+        return new KafkaConfig(p);
+    }
+
+    public String getBrokerConnectionString() {
+        return "localhost:" + port;
+    }
+
+    public int getPort() {
+        return port;
+    }
+    public void shutdown() {
+        kafka.shutdown();
+        if (zookeeper.getState().equals(CuratorFrameworkState.STARTED)) {
+            zookeeper.close();
+        }
+        try {
+            server.close();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        FileUtils.deleteQuietly(logDir);
+    }
+}