You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/07/19 03:34:59 UTC

[3/7] storm git commit: STORM-2953: Remove storm-kafka

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java
deleted file mode 100644
index 7276ef6..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.bolt.mapper;
-
-import org.apache.storm.tuple.Tuple;
-
-public class FieldNameBasedTupleToKafkaMapper<K, V> implements TupleToKafkaMapper<K, V> {
-
-    public static final String BOLT_KEY = "key";
-    public static final String BOLT_MESSAGE = "message";
-    public String boltKeyField;
-    public String boltMessageField;
-
-    public FieldNameBasedTupleToKafkaMapper() {
-        this(BOLT_KEY, BOLT_MESSAGE);
-    }
-
-    public FieldNameBasedTupleToKafkaMapper(String boltKeyField, String boltMessageField) {
-        this.boltKeyField = boltKeyField;
-        this.boltMessageField = boltMessageField;
-    }
-
-    @Override
-    public K getKeyFromTuple(Tuple tuple) {
-        //for backward compatibility, we return null when key is not present.
-        return tuple.contains(boltKeyField) ? (K) tuple.getValueByField(boltKeyField) : null;
-    }
-
-    @Override
-    public V getMessageFromTuple(Tuple tuple) {
-        return (V) tuple.getValueByField(boltMessageField);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java
deleted file mode 100644
index 7012e6b..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.bolt.mapper;
-
-import java.io.Serializable;
-import org.apache.storm.tuple.Tuple;
-
-/**
- * as the really verbose name suggests this interface mapps a storm tuple to kafka key and message.
- * @param <K> type of key.
- * @param <V> type of value.
- */
-public interface TupleToKafkaMapper<K, V> extends Serializable {
-    K getKeyFromTuple(Tuple tuple);
-
-    V getMessageFromTuple(Tuple tuple);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java
deleted file mode 100644
index d1784b0..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.bolt.selector;
-
-import org.apache.storm.tuple.Tuple;
-
-public class DefaultTopicSelector implements KafkaTopicSelector {
-
-    private final String topicName;
-
-    public DefaultTopicSelector(final String topicName) {
-        this.topicName = topicName;
-    }
-
-    @Override
-    public String getTopic(Tuple tuple) {
-        return topicName;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/FieldIndexTopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/FieldIndexTopicSelector.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/FieldIndexTopicSelector.java
deleted file mode 100644
index 50c5c1f..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/FieldIndexTopicSelector.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.bolt.selector;
-
-import org.apache.storm.tuple.Tuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Uses field index to select topic name from tuple .
- */
-public class FieldIndexTopicSelector implements KafkaTopicSelector {
-
-    private static final Logger LOG = LoggerFactory.getLogger(FieldIndexTopicSelector.class);
-
-    private final int fieldIndex;
-    private final String defaultTopicName;
-
-    public FieldIndexTopicSelector(int fieldIndex, String defaultTopicName) {
-        this.fieldIndex = fieldIndex;
-        this.defaultTopicName = defaultTopicName;
-    }
-
-    @Override
-    public String getTopic(Tuple tuple) {
-        if (fieldIndex < tuple.size()) {
-            return tuple.getString(fieldIndex);
-        } else {
-            LOG.warn("Field Index " + fieldIndex + " Out of bound . Using default topic " + defaultTopicName);
-            return defaultTopicName;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/FieldNameTopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/FieldNameTopicSelector.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/FieldNameTopicSelector.java
deleted file mode 100644
index d3c304a..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/FieldNameTopicSelector.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.bolt.selector;
-
-import org.apache.storm.tuple.Tuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Uses field name to select topic name from tuple .
- */
-public class FieldNameTopicSelector implements KafkaTopicSelector {
-
-    private static final Logger LOG = LoggerFactory.getLogger(FieldNameTopicSelector.class);
-
-    private final String fieldName;
-    private final String defaultTopicName;
-
-
-    public FieldNameTopicSelector(String fieldName, String defaultTopicName) {
-        this.fieldName = fieldName;
-        this.defaultTopicName = defaultTopicName;
-    }
-
-    @Override
-    public String getTopic(Tuple tuple) {
-        if (tuple.contains(fieldName)) {
-            return tuple.getStringByField(fieldName);
-        } else {
-            LOG.warn("Field " + fieldName + " Not Found . Using default topic " + defaultTopicName);
-            return defaultTopicName;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java
deleted file mode 100644
index 4045df7..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.bolt.selector;
-
-import java.io.Serializable;
-import org.apache.storm.tuple.Tuple;
-
-public interface KafkaTopicSelector extends Serializable {
-    String getTopic(Tuple tuple);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/Coordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/Coordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/Coordinator.java
deleted file mode 100644
index 0e8dba1..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/Coordinator.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.trident;
-
-import java.util.List;
-import java.util.Map;
-import org.apache.storm.kafka.KafkaUtils;
-import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
-import org.apache.storm.trident.spout.IPartitionedTridentSpout;
-
-class Coordinator implements IPartitionedTridentSpout.Coordinator<List<GlobalPartitionInformation>>,
-                             IOpaquePartitionedTridentSpout.Coordinator<List<GlobalPartitionInformation>> {
-
-    private IBrokerReader reader;
-    private TridentKafkaConfig config;
-
-    public Coordinator(Map<String, Object> conf, TridentKafkaConfig tridentKafkaConfig) {
-        config = tridentKafkaConfig;
-        reader = KafkaUtils.makeBrokerReader(conf, config);
-    }
-
-    @Override
-    public void close() {
-        config.coordinator.close();
-    }
-
-    @Override
-    public boolean isReady(long txid) {
-        return config.coordinator.isReady(txid);
-    }
-
-    @Override
-    public List<GlobalPartitionInformation> getPartitionsForBatch() {
-        return reader.getAllBrokers();
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/DefaultCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/DefaultCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/DefaultCoordinator.java
deleted file mode 100644
index 575e235..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/DefaultCoordinator.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.trident;
-
-public class DefaultCoordinator implements IBatchCoordinator {
-
-    @Override
-    public boolean isReady(long txid) {
-        return true;
-    }
-
-    @Override
-    public void close() {
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/GlobalPartitionInformation.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/GlobalPartitionInformation.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/GlobalPartitionInformation.java
deleted file mode 100644
index b26dc7f..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/GlobalPartitionInformation.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.trident;
-
-import com.google.common.base.Objects;
-import java.io.Serializable;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import org.apache.storm.kafka.Broker;
-import org.apache.storm.kafka.Partition;
-
-
-public class GlobalPartitionInformation implements Iterable<Partition>, Serializable {
-
-    public String topic;
-    private Map<Integer, Broker> partitionMap;
-    //Flag to keep the Partition Path Id backward compatible with Old implementation of Partition.getId() == "partition_" + partition
-    private Boolean bUseTopicNameForPartitionPathId;
-
-    public GlobalPartitionInformation(String topic, Boolean bUseTopicNameForPartitionPathId) {
-        this.topic = topic;
-        this.partitionMap = new TreeMap<Integer, Broker>();
-        this.bUseTopicNameForPartitionPathId = bUseTopicNameForPartitionPathId;
-    }
-
-    public GlobalPartitionInformation(String topic) {
-        this.topic = topic;
-        this.partitionMap = new TreeMap<Integer, Broker>();
-        this.bUseTopicNameForPartitionPathId = false;
-    }
-
-    public void addPartition(int partitionId, Broker broker) {
-        partitionMap.put(partitionId, broker);
-    }
-
-    public Boolean getbUseTopicNameForPartitionPathId() {
-        return bUseTopicNameForPartitionPathId;
-    }
-
-    @Override
-    public String toString() {
-        return "GlobalPartitionInformation{" +
-               "topic=" + topic +
-               ", partitionMap=" + partitionMap +
-               '}';
-    }
-
-    public Broker getBrokerFor(Integer partitionId) {
-        return partitionMap.get(partitionId);
-    }
-
-    public List<Partition> getOrderedPartitions() {
-        List<Partition> partitions = new LinkedList<Partition>();
-        for (Map.Entry<Integer, Broker> partition : partitionMap.entrySet()) {
-            partitions.add(new Partition(partition.getValue(), this.topic, partition.getKey(), this.bUseTopicNameForPartitionPathId));
-        }
-        return partitions;
-    }
-
-    @Override
-    public Iterator<Partition> iterator() {
-        final Iterator<Map.Entry<Integer, Broker>> iterator = partitionMap.entrySet().iterator();
-        final String topic = this.topic;
-        final Boolean bUseTopicNameForPartitionPathId = this.bUseTopicNameForPartitionPathId;
-        return new Iterator<Partition>() {
-            @Override
-            public boolean hasNext() {
-                return iterator.hasNext();
-            }
-
-            @Override
-            public Partition next() {
-                Map.Entry<Integer, Broker> next = iterator.next();
-                return new Partition(next.getValue(), topic, next.getKey(), bUseTopicNameForPartitionPathId);
-            }
-
-            @Override
-            public void remove() {
-                iterator.remove();
-            }
-        };
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hashCode(partitionMap);
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj) {
-            return true;
-        }
-        if (obj == null || getClass() != obj.getClass()) {
-            return false;
-        }
-        final GlobalPartitionInformation other = (GlobalPartitionInformation) obj;
-        return Objects.equal(this.partitionMap, other.partitionMap);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBatchCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBatchCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBatchCoordinator.java
deleted file mode 100644
index 4c6c404..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBatchCoordinator.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.trident;
-
-import java.io.Serializable;
-
-public interface IBatchCoordinator extends Serializable {
-    boolean isReady(long txid);
-
-    void close();
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBrokerReader.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBrokerReader.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBrokerReader.java
deleted file mode 100644
index c5cf8b2..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBrokerReader.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.trident;
-
-import java.util.List;
-
-public interface IBrokerReader {
-
-    GlobalPartitionInformation getBrokerForTopic(String topic);
-
-    List<GlobalPartitionInformation> getAllBrokers();
-
-    void close();
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/MaxMetric.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/MaxMetric.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/MaxMetric.java
deleted file mode 100644
index 14324ed..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/MaxMetric.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.trident;
-
-
-import org.apache.storm.metric.api.ICombiner;
-
-public class MaxMetric implements ICombiner<Long> {
-    @Override
-    public Long identity() {
-        return null;
-    }
-
-    @Override
-    public Long combine(Long l1, Long l2) {
-        if (l1 == null) {
-            return l2;
-        }
-        if (l2 == null) {
-            return l1;
-        }
-        return Math.max(l1, l2);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java
deleted file mode 100644
index c98be42..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.trident;
-
-import java.util.List;
-import java.util.Map;
-import org.apache.storm.kafka.Partition;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
-import org.apache.storm.tuple.Fields;
-
-
-public class OpaqueTridentKafkaSpout
-    implements IOpaquePartitionedTridentSpout<
-    List<GlobalPartitionInformation>,
-    Partition,
-    Map<String, Object>> {
-
-
-    TridentKafkaConfig _config;
-
-    public OpaqueTridentKafkaSpout(TridentKafkaConfig config) {
-        _config = config;
-    }
-
-    @Override
-    public Emitter<List<GlobalPartitionInformation>,
-        Partition,
-        Map<String, Object>> getEmitter(Map<String, Object> conf,
-                                        TopologyContext context) {
-        return new TridentKafkaEmitter(conf, context, _config, context
-            .getStormId()).asOpaqueEmitter();
-    }
-
-    @Override
-    public IOpaquePartitionedTridentSpout.Coordinator getCoordinator(
-        Map<String, Object> conf,
-        TopologyContext tc) {
-        return new org.apache.storm.kafka.trident.Coordinator(conf, _config);
-    }
-
-    @Override
-    public Fields getOutputFields() {
-        return _config.scheme.getOutputFields();
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        return null;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/StaticBrokerReader.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/StaticBrokerReader.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/StaticBrokerReader.java
deleted file mode 100644
index 3c5cc09..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/StaticBrokerReader.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.trident;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-public class StaticBrokerReader implements IBrokerReader {
-
-    private Map<String, GlobalPartitionInformation> brokers = new TreeMap<String, GlobalPartitionInformation>();
-
-    public StaticBrokerReader(String topic, GlobalPartitionInformation partitionInformation) {
-        this.brokers.put(topic, partitionInformation);
-    }
-
-    @Override
-    public GlobalPartitionInformation getBrokerForTopic(String topic) {
-        if (brokers.containsKey(topic)) return brokers.get(topic);
-        return null;
-    }
-
-    @Override
-    public List<GlobalPartitionInformation> getAllBrokers() {
-        List<GlobalPartitionInformation> list = new ArrayList<GlobalPartitionInformation>();
-        list.addAll(brokers.values());
-        return list;
-    }
-
-    @Override
-    public void close() {
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TransactionalTridentKafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TransactionalTridentKafkaSpout.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TransactionalTridentKafkaSpout.java
deleted file mode 100644
index 7b1d4dd..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TransactionalTridentKafkaSpout.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.trident;
-
-import java.util.Map;
-import org.apache.storm.kafka.Partition;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.trident.spout.IPartitionedTridentSpout;
-import org.apache.storm.tuple.Fields;
-
-
-public class TransactionalTridentKafkaSpout implements IPartitionedTridentSpout<GlobalPartitionInformation, Partition, Map> {
-
-    TridentKafkaConfig _config;
-
-    public TransactionalTridentKafkaSpout(TridentKafkaConfig config) {
-        _config = config;
-    }
-
-
-    @Override
-    public IPartitionedTridentSpout.Coordinator getCoordinator(Map<String, Object> conf, TopologyContext context) {
-        return new org.apache.storm.kafka.trident.Coordinator(conf, _config);
-    }
-
-    @Override
-    public IPartitionedTridentSpout.Emitter getEmitter(Map<String, Object> conf, TopologyContext context) {
-        return new TridentKafkaEmitter(conf, context, _config, context.getStormId()).asTransactionalEmitter();
-    }
-
-    @Override
-    public Fields getOutputFields() {
-        return _config.scheme.getOutputFields();
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaConfig.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaConfig.java
deleted file mode 100644
index 3dac221..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaConfig.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.trident;
-
-import org.apache.storm.kafka.BrokerHosts;
-import org.apache.storm.kafka.KafkaConfig;
-
-
-public class TridentKafkaConfig extends KafkaConfig {
-
-
-    public final IBatchCoordinator coordinator = new DefaultCoordinator();
-
-    public TridentKafkaConfig(BrokerHosts hosts, String topic) {
-        super(hosts, topic);
-    }
-
-    public TridentKafkaConfig(BrokerHosts hosts, String topic, String clientId) {
-        super(hosts, topic, clientId);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
deleted file mode 100644
index cb00579..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
+++ /dev/null
@@ -1,306 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.trident;
-
-import com.google.common.collect.ImmutableMap;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import kafka.javaapi.consumer.SimpleConsumer;
-import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.message.Message;
-import kafka.message.MessageAndOffset;
-import org.apache.storm.Config;
-import org.apache.storm.kafka.DynamicPartitionConnections;
-import org.apache.storm.kafka.FailedFetchException;
-import org.apache.storm.kafka.KafkaUtils;
-import org.apache.storm.kafka.MessageMetadataSchemeAsMultiScheme;
-import org.apache.storm.kafka.Partition;
-import org.apache.storm.kafka.PartitionManager;
-import org.apache.storm.kafka.TopicOffsetOutOfRangeException;
-import org.apache.storm.metric.api.CombinedMetric;
-import org.apache.storm.metric.api.MeanReducer;
-import org.apache.storm.metric.api.ReducedMetric;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
-import org.apache.storm.trident.spout.IPartitionedTridentSpout;
-import org.apache.storm.trident.topology.TransactionAttempt;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TridentKafkaEmitter {
-
-    private static final Logger LOG = LoggerFactory.getLogger(TridentKafkaEmitter.class);
-
-    private DynamicPartitionConnections _connections;
-    private String _topologyName;
-    private KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric;
-    private ReducedMetric _kafkaMeanFetchLatencyMetric;
-    private CombinedMetric _kafkaMaxFetchLatencyMetric;
-    private TridentKafkaConfig _config;
-    private String _topologyInstanceId;
-
-    public TridentKafkaEmitter(Map<String, Object> conf, TopologyContext context, TridentKafkaConfig config, String topologyInstanceId) {
-        _config = config;
-        _topologyInstanceId = topologyInstanceId;
-        _connections = new DynamicPartitionConnections(_config, KafkaUtils.makeBrokerReader(conf, _config));
-        _topologyName = (String) conf.get(Config.TOPOLOGY_NAME);
-        _kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(_connections);
-        context.registerMetric("kafkaOffset", _kafkaOffsetMetric, _config.metricsTimeBucketSizeInSecs);
-        _kafkaMeanFetchLatencyMetric = context.registerMetric("kafkaFetchAvg", new MeanReducer(), _config.metricsTimeBucketSizeInSecs);
-        _kafkaMaxFetchLatencyMetric = context.registerMetric("kafkaFetchMax", new MaxMetric(), _config.metricsTimeBucketSizeInSecs);
-    }
-
-
-    private Map<String, Object> failFastEmitNewPartitionBatch(
-        final TransactionAttempt attempt,
-        TridentCollector collector,
-        Partition partition,
-        Map<String, Object> lastMeta) {
-        SimpleConsumer consumer = _connections.register(partition);
-        Map<String, Object> ret = doEmitNewPartitionBatch(consumer, partition, collector, lastMeta, attempt);
-        Long offset = (Long) ret.get("offset");
-        Long endOffset = (Long) ret.get("nextOffset");
-        _kafkaOffsetMetric.setOffsetData(partition, new PartitionManager.OffsetData(endOffset, offset));
-        return ret;
-    }
-
-    private Map<String, Object> emitNewPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition,
-                                                      Map<String, Object> lastMeta) {
-        try {
-            return failFastEmitNewPartitionBatch(attempt, collector, partition, lastMeta);
-        } catch (FailedFetchException e) {
-            LOG.warn("Failed to fetch from partition " + partition);
-            if (lastMeta == null) {
-                return null;
-            } else {
-                Map<String, Object> ret = new HashMap<>();
-                ret.put("offset", lastMeta.get("nextOffset"));
-                ret.put("nextOffset", lastMeta.get("nextOffset"));
-                ret.put("partition", partition.partition);
-                ret.put("broker", ImmutableMap.of("host", partition.host.host, "port", partition.host.port));
-                ret.put("topic", partition.topic);
-                ret.put("topology", ImmutableMap.of("name", _topologyName, "id", _topologyInstanceId));
-                return ret;
-            }
-        }
-    }
-
-    private Map<String, Object> doEmitNewPartitionBatch(SimpleConsumer consumer,
-                                                        Partition partition,
-                                                        TridentCollector collector,
-                                                        Map<String, Object> lastMeta,
-                                                        TransactionAttempt attempt) {
-        LOG.debug("Emitting new partition batch - [transaction = {}], [lastMeta = {}]", attempt, lastMeta);
-        long offset;
-        if (lastMeta != null) {
-            String lastInstanceId = null;
-            Map<String, Object> lastTopoMeta = (Map<String, Object>)
-                lastMeta.get("topology");
-            if (lastTopoMeta != null) {
-                lastInstanceId = (String) lastTopoMeta.get("id");
-            }
-            if (_config.ignoreZkOffsets && !_topologyInstanceId.equals(lastInstanceId)) {
-                offset = KafkaUtils.getOffset(consumer, partition.topic, partition.partition, _config.startOffsetTime);
-            } else {
-                offset = (Long) lastMeta.get("nextOffset");
-            }
-        } else {
-            offset = KafkaUtils.getOffset(consumer, partition.topic, partition.partition, _config);
-        }
-        LOG.debug("[transaction = {}], [OFFSET = {}]", attempt, offset);
-
-        ByteBufferMessageSet msgs = null;
-        try {
-            msgs = fetchMessages(consumer, partition, offset);
-        } catch (TopicOffsetOutOfRangeException e) {
-            long newOffset = KafkaUtils.getOffset(consumer, partition.topic, partition.partition, kafka.api.OffsetRequest.EarliestTime());
-            LOG.warn("OffsetOutOfRange: Updating offset from offset = " + offset + " to offset = " + newOffset);
-            offset = newOffset;
-            msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);
-        }
-
-        long endoffset = offset;
-        for (MessageAndOffset msg : msgs) {
-            emit(collector, msg.message(), partition, msg.offset(), attempt);
-            endoffset = msg.nextOffset();
-        }
-        Map<String, Object> newMeta = new HashMap<>();
-        newMeta.put("offset", offset);
-        newMeta.put("nextOffset", endoffset);
-        newMeta.put("instanceId", _topologyInstanceId);
-        newMeta.put("partition", partition.partition);
-        newMeta.put("broker", ImmutableMap.of("host", partition.host.host, "port", partition.host.port));
-        newMeta.put("topic", partition.topic);
-        newMeta.put("topology", ImmutableMap.of("name", _topologyName, "id", _topologyInstanceId));
-        LOG.debug("[transaction = {}], [newMeta = {}]", attempt, newMeta);
-        return newMeta;
-    }
-
-    private ByteBufferMessageSet fetchMessages(SimpleConsumer consumer, Partition partition, long offset) {
-        long start = System.currentTimeMillis();
-        ByteBufferMessageSet msgs = null;
-        msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);
-        long millis = System.currentTimeMillis() - start;
-        _kafkaMeanFetchLatencyMetric.update(millis);
-        _kafkaMaxFetchLatencyMetric.update(millis);
-        return msgs;
-    }
-
-    /**
-     * re-emit the batch described by the meta data provided
-     */
-    private void reEmitPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition,
-                                      Map<String, Object> meta) {
-        LOG.info("re-emitting batch, attempt " + attempt);
-        String instanceId = (String) meta.get("instanceId");
-        if (!_config.ignoreZkOffsets || instanceId.equals(_topologyInstanceId)) {
-            SimpleConsumer consumer = _connections.register(partition);
-            long offset = (Long) meta.get("offset");
-            long nextOffset = (Long) meta.get("nextOffset");
-            ByteBufferMessageSet msgs = null;
-            msgs = fetchMessages(consumer, partition, offset);
-
-            if (msgs != null) {
-                for (MessageAndOffset msg : msgs) {
-                    if (offset == nextOffset) {
-                        break;
-                    }
-                    if (offset > nextOffset) {
-                        throw new RuntimeException("Error when re-emitting batch. overshot the end offset");
-                    }
-                    emit(collector, msg.message(), partition, msg.offset(), attempt);
-                    offset = msg.nextOffset();
-                }
-            }
-        }
-    }
-
-    private void emit(TridentCollector collector, Message msg, Partition partition, long offset, TransactionAttempt attempt) {
-        Iterable<List<Object>> values;
-        if (_config.scheme instanceof MessageMetadataSchemeAsMultiScheme) {
-            values = KafkaUtils.generateTuples((MessageMetadataSchemeAsMultiScheme) _config.scheme, msg, partition, offset);
-        } else {
-            values = KafkaUtils.generateTuples(_config, msg, partition.topic);
-        }
-
-        if (values != null) {
-            for (List<Object> value : values) {
-                LOG.debug("Emitting: [Transaction: {}], [Data: {}]", attempt, value);
-                collector.emit(value);
-            }
-        } else {
-            LOG.debug("NOT Emitting NULL data. [Transaction: {}]", attempt);
-        }
-    }
-
-    private void clear() {
-        _connections.clear();
-    }
-
-    private List<Partition> orderPartitions(List<GlobalPartitionInformation> partitions) {
-        List<Partition> part = new ArrayList<Partition>();
-        for (GlobalPartitionInformation globalPartitionInformation : partitions)
-            part.addAll(globalPartitionInformation.getOrderedPartitions());
-        return part;
-    }
-
-    private void refresh(List<Partition> list) {
-        _connections.clear();
-        _kafkaOffsetMetric.refreshPartitions(new HashSet<Partition>(list));
-    }
-
-
-    public IOpaquePartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map<String, Object>> asOpaqueEmitter() {
-
-        return new IOpaquePartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map<String, Object>>() {
-
-            /**
-             * Emit a batch of tuples for a partition/transaction.
-             *
-             * Return the metadata describing this batch that will be used as lastPartitionMeta
-             * for defining the parameters of the next batch.
-             */
-            @Override
-            public Map<String, Object> emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector,
-                                                          Partition partition, Map<String, Object> map) {
-                return emitNewPartitionBatch(transactionAttempt, tridentCollector, partition, map);
-            }
-
-            @Override
-            public void refreshPartitions(List<Partition> partitions) {
-                refresh(partitions);
-            }
-
-            @Override
-            public List<Partition> getOrderedPartitions(List<GlobalPartitionInformation> partitionInformation) {
-                return orderPartitions(partitionInformation);
-            }
-
-            @Override
-            public void close() {
-                clear();
-            }
-        };
-    }
-
-    public IPartitionedTridentSpout.Emitter asTransactionalEmitter() {
-        return new IPartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map<String, Object>>() {
-
-            /**
-             * Emit a batch of tuples for a partition/transaction that's never been emitted before.
-             * Return the metadata that can be used to reconstruct this partition/batch in the future.
-             */
-            @Override
-            public Map<String, Object> emitPartitionBatchNew(TransactionAttempt transactionAttempt, TridentCollector tridentCollector,
-                                                             Partition partition, Map<String, Object> map) {
-                return failFastEmitNewPartitionBatch(transactionAttempt, tridentCollector, partition, map);
-            }
-
-            /**
-             * Emit a batch of tuples for a partition/transaction that has been emitted before, using
-             * the metadata created when it was first emitted.
-             */
-            @Override
-            public void emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition,
-                                           Map<String, Object> map) {
-                reEmitPartitionBatch(transactionAttempt, tridentCollector, partition, map);
-            }
-
-            /**
-             * This method is called when this task is responsible for a new set of partitions. Should be used
-             * to manage things like connections to brokers.
-             */
-            @Override
-            public void refreshPartitions(List<Partition> partitions) {
-                refresh(partitions);
-            }
-
-            @Override
-            public List<Partition> getOrderedPartitions(List<GlobalPartitionInformation> partitionInformation) {
-                return orderPartitions(partitionInformation);
-            }
-
-            @Override
-            public void close() {
-                clear();
-            }
-        };
-
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
deleted file mode 100644
index 71b2cb1..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.trident;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import org.apache.commons.lang.Validate;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
-import org.apache.storm.kafka.trident.selector.KafkaTopicSelector;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.topology.FailedException;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.state.State;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TridentKafkaState implements State {
-    private static final Logger LOG = LoggerFactory.getLogger(TridentKafkaState.class);
-
-    private KafkaProducer producer;
-    private OutputCollector collector;
-
-    private TridentTupleToKafkaMapper mapper;
-    private KafkaTopicSelector topicSelector;
-
-    public TridentKafkaState withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper mapper) {
-        this.mapper = mapper;
-        return this;
-    }
-
-    public TridentKafkaState withKafkaTopicSelector(KafkaTopicSelector selector) {
-        this.topicSelector = selector;
-        return this;
-    }
-
-    @Override
-    public void beginCommit(Long txid) {
-        LOG.debug("beginCommit is Noop.");
-    }
-
-    @Override
-    public void commit(Long txid) {
-        LOG.debug("commit is Noop.");
-    }
-
-    public void prepare(Properties options) {
-        Validate.notNull(mapper, "mapper can not be null");
-        Validate.notNull(topicSelector, "topicSelector can not be null");
-        producer = new KafkaProducer(options);
-    }
-
-    public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
-        String topic = null;
-        try {
-            List<Future<RecordMetadata>> futures = new ArrayList<>(tuples.size());
-            for (TridentTuple tuple : tuples) {
-                topic = topicSelector.getTopic(tuple);
-
-                if (topic != null) {
-                    Future<RecordMetadata> result = producer.send(new ProducerRecord(topic,
-                                                                                     mapper.getKeyFromTuple(tuple),
-                                                                                     mapper.getMessageFromTuple(tuple)));
-                    futures.add(result);
-                } else {
-                    LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) + ", topic selector returned null.");
-                }
-            }
-
-            List<ExecutionException> exceptions = new ArrayList<>(futures.size());
-            for (Future<RecordMetadata> future : futures) {
-                try {
-                    future.get();
-                } catch (ExecutionException e) {
-                    exceptions.add(e);
-                }
-            }
-
-            if (exceptions.size() > 0) {
-                String errorMsg = "Could not retrieve result for messages " + tuples + " from topic = " + topic
-                                  + " because of the following exceptions: \n";
-                for (ExecutionException exception : exceptions) {
-                    errorMsg = errorMsg + exception.getMessage() + "\n";
-                }
-                LOG.error(errorMsg);
-                throw new FailedException(errorMsg);
-            }
-        } catch (Exception ex) {
-            String errorMsg = "Could not send messages " + tuples + " to topic = " + topic;
-            LOG.warn(errorMsg, ex);
-            throw new FailedException(errorMsg, ex);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java
deleted file mode 100644
index 5b66fd8..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.trident;
-
-import java.util.Map;
-import java.util.Properties;
-import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
-import org.apache.storm.kafka.trident.selector.KafkaTopicSelector;
-import org.apache.storm.task.IMetricsContext;
-import org.apache.storm.trident.state.State;
-import org.apache.storm.trident.state.StateFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TridentKafkaStateFactory implements StateFactory {
-
-    private static final Logger LOG = LoggerFactory.getLogger(TridentKafkaStateFactory.class);
-
-    private TridentTupleToKafkaMapper mapper;
-    private KafkaTopicSelector topicSelector;
-    private Properties producerProperties = new Properties();
-
-    public TridentKafkaStateFactory withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper mapper) {
-        this.mapper = mapper;
-        return this;
-    }
-
-    public TridentKafkaStateFactory withKafkaTopicSelector(KafkaTopicSelector selector) {
-        this.topicSelector = selector;
-        return this;
-    }
-
-    public TridentKafkaStateFactory withProducerProperties(Properties props) {
-        this.producerProperties = props;
-        return this;
-    }
-
-    @Override
-    public State makeState(Map<String, Object> conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
-        LOG.info("makeState(partitonIndex={}, numpartitions={}", partitionIndex, numPartitions);
-        TridentKafkaState state = new TridentKafkaState()
-            .withKafkaTopicSelector(this.topicSelector)
-            .withTridentTupleToKafkaMapper(this.mapper);
-        state.prepare(producerProperties);
-        return state;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaUpdater.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaUpdater.java
deleted file mode 100644
index 1100b66..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaUpdater.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.trident;
-
-import java.util.List;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.state.BaseStateUpdater;
-import org.apache.storm.trident.tuple.TridentTuple;
-
-public class TridentKafkaUpdater extends BaseStateUpdater<TridentKafkaState> {
-    @Override
-    public void updateState(TridentKafkaState state, List<TridentTuple> tuples, TridentCollector collector) {
-        state.updateState(tuples, collector);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/ZkBrokerReader.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/ZkBrokerReader.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/ZkBrokerReader.java
deleted file mode 100644
index d40256e..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/ZkBrokerReader.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.trident;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import org.apache.storm.kafka.DynamicBrokersReader;
-import org.apache.storm.kafka.ZkHosts;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class ZkBrokerReader implements IBrokerReader {
-
-    private static final Logger LOG = LoggerFactory.getLogger(ZkBrokerReader.class);
-
-    List<GlobalPartitionInformation> cachedBrokers = new ArrayList<GlobalPartitionInformation>();
-    DynamicBrokersReader reader;
-    long lastRefreshTimeMs;
-
-
-    long refreshMillis;
-
-    public ZkBrokerReader(Map<String, Object> conf, String topic, ZkHosts hosts) {
-        try {
-            reader = new DynamicBrokersReader(conf, hosts.brokerZkStr, hosts.brokerZkPath, topic);
-            cachedBrokers = reader.getBrokerInfo();
-            lastRefreshTimeMs = System.currentTimeMillis();
-            refreshMillis = hosts.refreshFreqSecs * 1000L;
-        } catch (java.net.SocketTimeoutException e) {
-            LOG.warn("Failed to update brokers", e);
-        }
-
-    }
-
-    private void refresh() {
-        long currTime = System.currentTimeMillis();
-        if (currTime > lastRefreshTimeMs + refreshMillis) {
-            try {
-                LOG.info("brokers need refreshing because " + refreshMillis + "ms have expired");
-                cachedBrokers = reader.getBrokerInfo();
-                lastRefreshTimeMs = currTime;
-            } catch (java.net.SocketTimeoutException e) {
-                LOG.warn("Failed to update brokers", e);
-            }
-        }
-    }
-
-    @Override
-    public GlobalPartitionInformation getBrokerForTopic(String topic) {
-        refresh();
-        for (GlobalPartitionInformation partitionInformation : cachedBrokers) {
-            if (partitionInformation.topic.equals(topic)) return partitionInformation;
-        }
-        return null;
-    }
-
-    @Override
-    public List<GlobalPartitionInformation> getAllBrokers() {
-        refresh();
-        return cachedBrokers;
-    }
-
-    @Override
-    public void close() {
-        reader.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java
deleted file mode 100644
index 01e3eca..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.trident.mapper;
-
-import org.apache.storm.trident.tuple.TridentTuple;
-
-public class FieldNameBasedTupleToKafkaMapper<K, V> implements TridentTupleToKafkaMapper {
-
-    public final String keyFieldName;
-    public final String msgFieldName;
-
-    public FieldNameBasedTupleToKafkaMapper(String keyFieldName, String msgFieldName) {
-        this.keyFieldName = keyFieldName;
-        this.msgFieldName = msgFieldName;
-    }
-
-    @Override
-    public K getKeyFromTuple(TridentTuple tuple) {
-        return (K) tuple.getValueByField(keyFieldName);
-    }
-
-    @Override
-    public V getMessageFromTuple(TridentTuple tuple) {
-        return (V) tuple.getValueByField(msgFieldName);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java
deleted file mode 100644
index 4a522d6..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.trident.mapper;
-
-import java.io.Serializable;
-import org.apache.storm.trident.tuple.TridentTuple;
-
-public interface TridentTupleToKafkaMapper<K, V> extends Serializable {
-    K getKeyFromTuple(TridentTuple tuple);
-
-    V getMessageFromTuple(TridentTuple tuple);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java
deleted file mode 100644
index 93b5566..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.trident.selector;
-
-import org.apache.storm.trident.tuple.TridentTuple;
-
-public class DefaultTopicSelector implements KafkaTopicSelector {
-
-    private final String topicName;
-
-    public DefaultTopicSelector(final String topicName) {
-        this.topicName = topicName;
-    }
-
-    @Override
-    public String getTopic(TridentTuple tuple) {
-        return topicName;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java
deleted file mode 100644
index 6de3921..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.trident.selector;
-
-import java.io.Serializable;
-import org.apache.storm.trident.tuple.TridentTuple;
-
-public interface KafkaTopicSelector extends Serializable {
-    String getTopic(TridentTuple tuple);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/test/org/apache/storm/kafka/DynamicBrokersReaderTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/DynamicBrokersReaderTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/DynamicBrokersReaderTest.java
deleted file mode 100644
index a6bb61c..0000000
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/DynamicBrokersReaderTest.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.curator.test.TestingServer;
-import org.apache.curator.utils.ZKPaths;
-import org.apache.storm.Config;
-import org.apache.storm.kafka.trident.GlobalPartitionInformation;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-/**
- * Date: 16/05/2013
- * Time: 20:35
- */
-public class DynamicBrokersReaderTest {
-    private DynamicBrokersReader dynamicBrokersReader, wildCardBrokerReader;
-    private String masterPath = "/brokers";
-    private String topic = "testing1";
-    private String secondTopic = "testing2";
-    private String thirdTopic = "testing3";
-
-    private CuratorFramework zookeeper;
-    private TestingServer server;
-
-    @Before
-    public void setUp() throws Exception {
-        server = new TestingServer();
-        String connectionString = server.getConnectString();
-        Map<String, Object> conf = new HashMap<>();
-        conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 1000);
-        conf.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 1000);
-        conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 4);
-        conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 5);
-
-        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
-        zookeeper = CuratorFrameworkFactory.newClient(connectionString, retryPolicy);
-        dynamicBrokersReader = new DynamicBrokersReader(conf, connectionString, masterPath, topic);
-
-        Map<String, Object> conf2 = new HashMap<>();
-        conf2.putAll(conf);
-        conf2.put("kafka.topic.wildcard.match", true);
-
-        wildCardBrokerReader = new DynamicBrokersReader(conf2, connectionString, masterPath, "^test.*$");
-        zookeeper.start();
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        dynamicBrokersReader.close();
-        zookeeper.close();
-        server.close();
-    }
-
-    private void addPartition(int id, String host, int port, String topic) throws Exception {
-        writePartitionId(id, topic);
-        writeLeader(id, 0, topic);
-        writeLeaderDetails(0, host, port);
-    }
-
-    private void addPartition(int id, int leader, String host, int port, String topic) throws Exception {
-        writePartitionId(id, topic);
-        writeLeader(id, leader, topic);
-        writeLeaderDetails(leader, host, port);
-    }
-
-    private void writePartitionId(int id, String topic) throws Exception {
-        String path = dynamicBrokersReader.partitionPath(topic);
-        writeDataToPath(path, ("" + id));
-    }
-
-    private void writeDataToPath(String path, String data) throws Exception {
-        ZKPaths.mkdirs(zookeeper.getZookeeperClient().getZooKeeper(), path);
-        zookeeper.setData().forPath(path, data.getBytes());
-    }
-
-    private void writeLeader(int id, int leaderId, String topic) throws Exception {
-        String path = dynamicBrokersReader.partitionPath(topic) + "/" + id + "/state";
-        String value = " { \"controller_epoch\":4, \"isr\":[ 1, 0 ], \"leader\":" + leaderId + ", \"leader_epoch\":1, \"version\":1 }";
-        writeDataToPath(path, value);
-    }
-
-    private void writeLeaderDetails(int leaderId, String host, int port) throws Exception {
-        String path = dynamicBrokersReader.brokerPath() + "/" + leaderId;
-        String value = "{ \"host\":\"" + host + "\", \"jmx_port\":9999, \"port\":" + port + ", \"version\":1 }";
-        writeDataToPath(path, value);
-    }
-
-
-    private GlobalPartitionInformation getByTopic(List<GlobalPartitionInformation> partitions, String topic) {
-        for (GlobalPartitionInformation partitionInformation : partitions) {
-            if (partitionInformation.topic.equals(topic)) return partitionInformation;
-        }
-        return null;
-    }
-
-    @Test
-    public void testGetBrokerInfo() throws Exception {
-        String host = "localhost";
-        int port = 9092;
-        int partition = 0;
-        addPartition(partition, host, port, topic);
-        List<GlobalPartitionInformation> partitions = dynamicBrokersReader.getBrokerInfo();
-
-        GlobalPartitionInformation brokerInfo = getByTopic(partitions, topic);
-        assertNotNull(brokerInfo);
-        assertEquals(1, brokerInfo.getOrderedPartitions().size());
-        assertEquals(port, brokerInfo.getBrokerFor(partition).port);
-        assertEquals(host, brokerInfo.getBrokerFor(partition).host);
-    }
-
-    @Test
-    public void testGetBrokerInfoWildcardMatch() throws Exception {
-        String host = "localhost";
-        int port = 9092;
-        int partition = 0;
-        addPartition(partition, host, port, topic);
-        addPartition(partition, host, port, secondTopic);
-
-        List<GlobalPartitionInformation> partitions = wildCardBrokerReader.getBrokerInfo();
-
-        GlobalPartitionInformation brokerInfo = getByTopic(partitions, topic);
-        assertNotNull(brokerInfo);
-        assertEquals(1, brokerInfo.getOrderedPartitions().size());
-        assertEquals(port, brokerInfo.getBrokerFor(partition).port);
-        assertEquals(host, brokerInfo.getBrokerFor(partition).host);
-
-        brokerInfo = getByTopic(partitions, secondTopic);
-        assertNotNull(brokerInfo);
-        assertEquals(1, brokerInfo.getOrderedPartitions().size());
-        assertEquals(port, brokerInfo.getBrokerFor(partition).port);
-        assertEquals(host, brokerInfo.getBrokerFor(partition).host);
-
-        addPartition(partition, host, port, thirdTopic);
-        //Discover newly added topic
-        partitions = wildCardBrokerReader.getBrokerInfo();
-        assertNotNull(getByTopic(partitions, topic));
-        assertNotNull(getByTopic(partitions, secondTopic));
-        assertNotNull(getByTopic(partitions, secondTopic));
-    }
-
-
-    @Test
-    public void testMultiplePartitionsOnDifferentHosts() throws Exception {
-        String host = "localhost";
-        int port = 9092;
-        int secondPort = 9093;
-        int partition = 0;
-        int secondPartition = partition + 1;
-        addPartition(partition, 0, host, port, topic);
-        addPartition(secondPartition, 1, host, secondPort, topic);
-
-        List<GlobalPartitionInformation> partitions = dynamicBrokersReader.getBrokerInfo();
-
-        GlobalPartitionInformation brokerInfo = getByTopic(partitions, topic);
-        assertNotNull(brokerInfo);
-        assertEquals(2, brokerInfo.getOrderedPartitions().size());
-
-        assertEquals(port, brokerInfo.getBrokerFor(partition).port);
-        assertEquals(host, brokerInfo.getBrokerFor(partition).host);
-
-        assertEquals(secondPort, brokerInfo.getBrokerFor(secondPartition).port);
-        assertEquals(host, brokerInfo.getBrokerFor(secondPartition).host);
-    }
-
-
-    @Test
-    public void testMultiplePartitionsOnSameHost() throws Exception {
-        String host = "localhost";
-        int port = 9092;
-        int partition = 0;
-        int secondPartition = partition + 1;
-        addPartition(partition, 0, host, port, topic);
-        addPartition(secondPartition, 0, host, port, topic);
-
-        List<GlobalPartitionInformation> partitions = dynamicBrokersReader.getBrokerInfo();
-
-        GlobalPartitionInformation brokerInfo = getByTopic(partitions, topic);
-        assertNotNull(brokerInfo);
-        assertEquals(2, brokerInfo.getOrderedPartitions().size());
-
-        assertEquals(port, brokerInfo.getBrokerFor(partition).port);
-        assertEquals(host, brokerInfo.getBrokerFor(partition).host);
-
-        assertEquals(port, brokerInfo.getBrokerFor(secondPartition).port);
-        assertEquals(host, brokerInfo.getBrokerFor(secondPartition).host);
-    }
-
-    @Test
-    public void testSwitchHostForPartition() throws Exception {
-        String host = "localhost";
-        int port = 9092;
-        int partition = 0;
-        addPartition(partition, host, port, topic);
-        List<GlobalPartitionInformation> partitions = dynamicBrokersReader.getBrokerInfo();
-
-        GlobalPartitionInformation brokerInfo = getByTopic(partitions, topic);
-        assertNotNull(brokerInfo);
-        assertEquals(port, brokerInfo.getBrokerFor(partition).port);
-        assertEquals(host, brokerInfo.getBrokerFor(partition).host);
-
-        String newHost = host + "switch";
-        int newPort = port + 1;
-        addPartition(partition, newHost, newPort, topic);
-        partitions = dynamicBrokersReader.getBrokerInfo();
-
-        brokerInfo = getByTopic(partitions, topic);
-        assertNotNull(brokerInfo);
-        assertEquals(newPort, brokerInfo.getBrokerFor(partition).port);
-        assertEquals(newHost, brokerInfo.getBrokerFor(partition).host);
-    }
-
-    @Test(expected = NullPointerException.class)
-    public void testErrorLogsWhenConfigIsMissing() throws Exception {
-        String connectionString = server.getConnectString();
-        Map<String, Object> conf = new HashMap<>();
-        conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 1000);
-        //        conf.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 1000);
-        conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 4);
-        conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 5);
-
-        DynamicBrokersReader dynamicBrokersReader1 = new DynamicBrokersReader(conf, connectionString, masterPath, topic);
-
-    }
-}