You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/04/21 21:44:03 UTC

[04/50] [abbrv] git commit: use consistent formatting

use consistent formatting


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/e8f54d63
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/e8f54d63
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/e8f54d63

Branch: refs/heads/master
Commit: e8f54d63094806a2a1364adb5e209ab2ca10f0f0
Parents: d35a6ee
Author: wurstmeister <wu...@users.noreply.github.com>
Authored: Sun Dec 22 15:58:01 2013 +0000
Committer: wurstmeister <wu...@users.noreply.github.com>
Committed: Sun Dec 22 15:58:01 2013 +0000

----------------------------------------------------------------------
 src/jvm/storm/kafka/DynamicBrokersReader.java   | 158 +++----
 .../kafka/DynamicPartitionConnections.java      |  34 +-
 src/jvm/storm/kafka/HostPort.java               |  50 +-
 src/jvm/storm/kafka/KafkaConfig.java            |  16 +-
 src/jvm/storm/kafka/KafkaSpout.java             |  45 +-
 src/jvm/storm/kafka/Partition.java              |  50 +-
 src/jvm/storm/kafka/PartitionCoordinator.java   |   1 +
 src/jvm/storm/kafka/PartitionManager.java       | 100 ++--
 src/jvm/storm/kafka/StaticCoordinator.java      |  12 +-
 src/jvm/storm/kafka/StaticHosts.java            |  17 +-
 .../storm/kafka/StaticPartitionConnections.java |   8 +-
 src/jvm/storm/kafka/ZkCoordinator.java          |  48 +-
 src/jvm/storm/kafka/ZkHosts.java                |  22 +-
 src/jvm/storm/kafka/ZkState.java                |  72 +--
 src/jvm/storm/kafka/trident/Coordinator.java    |  44 +-
 .../storm/kafka/trident/DefaultCoordinator.java |   2 +-
 .../trident/GlobalPartitionInformation.java     |  86 ++--
 .../storm/kafka/trident/IBatchCoordinator.java  |   1 +
 src/jvm/storm/kafka/trident/IBrokerReader.java  |   1 +
 src/jvm/storm/kafka/trident/KafkaUtils.java     |  74 +--
 src/jvm/storm/kafka/trident/MaxMetric.java      |   8 +-
 .../kafka/trident/OpaqueTridentKafkaSpout.java  |  14 +-
 .../storm/kafka/trident/StaticBrokerReader.java |   6 +-
 .../trident/TransactionalTridentKafkaSpout.java |   8 +-
 .../storm/kafka/trident/TridentKafkaConfig.java |   6 +-
 .../kafka/trident/TridentKafkaEmitter.java      | 466 ++++++++++---------
 src/jvm/storm/kafka/trident/ZkBrokerReader.java |  12 +-
 .../storm/kafka/DynamicBrokersReaderTest.java   | 258 +++++-----
 28 files changed, 821 insertions(+), 798 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/DynamicBrokersReader.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/DynamicBrokersReader.java b/src/jvm/storm/kafka/DynamicBrokersReader.java
index ae15534..c802baf 100644
--- a/src/jvm/storm/kafka/DynamicBrokersReader.java
+++ b/src/jvm/storm/kafka/DynamicBrokersReader.java
@@ -12,117 +12,117 @@ import storm.kafka.trident.GlobalPartitionInformation;
 
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
-import java.util.*;
+import java.util.List;
+import java.util.Map;
 
 public class DynamicBrokersReader {
 
-	public static final Logger LOG = LoggerFactory.getLogger(DynamicBrokersReader.class);
+    public static final Logger LOG = LoggerFactory.getLogger(DynamicBrokersReader.class);
 
     private CuratorFramework _curator;
     private String _zkPath;
     private String _topic;
-    
+
     public DynamicBrokersReader(Map conf, String zkStr, String zkPath, String topic) {
-		_zkPath = zkPath;
-		_topic = topic;
-		try {
-			_curator = CuratorFrameworkFactory.newClient(
-				zkStr,
-				Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)),
-				15000,
-				new RetryNTimes(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)),
-				Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL))));
-			_curator.start();
-		} catch (IOException ex)  {
-			LOG.error("can't connect to zookeeper");
-		}
+        _zkPath = zkPath;
+        _topic = topic;
+        try {
+            _curator = CuratorFrameworkFactory.newClient(
+                    zkStr,
+                    Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)),
+                    15000,
+                    new RetryNTimes(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)),
+                            Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL))));
+            _curator.start();
+        } catch (IOException ex) {
+            LOG.error("can't connect to zookeeper");
+        }
     }
-    
+
     /**
-	 * Get all partitions with their current leaders
+     * Get all partitions with their current leaders
      */
     public GlobalPartitionInformation getBrokerInfo() {
-		GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation();
+        GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation();
         try {
-			int numPartitionsForTopic = getNumPartitions();
-			String brokerInfoPath = brokerPath();
-			for (int partition = 0; partition < numPartitionsForTopic; partition++) {
-				int leader = getLeaderFor(partition);
-				String path = brokerInfoPath + "/" + leader;
-				try {
-					byte[] hostPortData = _curator.getData().forPath(path);
-					HostPort hp = getBrokerHost(hostPortData);
-					globalPartitionInformation.addPartition(partition, hp);
-				} catch(org.apache.zookeeper.KeeperException.NoNodeException e) {
-					LOG.error("Node {} does not exist ", path);
-				}
-			}
-        } catch(Exception e) {
+            int numPartitionsForTopic = getNumPartitions();
+            String brokerInfoPath = brokerPath();
+            for (int partition = 0; partition < numPartitionsForTopic; partition++) {
+                int leader = getLeaderFor(partition);
+                String path = brokerInfoPath + "/" + leader;
+                try {
+                    byte[] hostPortData = _curator.getData().forPath(path);
+                    HostPort hp = getBrokerHost(hostPortData);
+                    globalPartitionInformation.addPartition(partition, hp);
+                } catch (org.apache.zookeeper.KeeperException.NoNodeException e) {
+                    LOG.error("Node {} does not exist ", path);
+                }
+            }
+        } catch (Exception e) {
             throw new RuntimeException(e);
         }
-		LOG.info("Read partition info from zookeeper: " + globalPartitionInformation);
+        LOG.info("Read partition info from zookeeper: " + globalPartitionInformation);
         return globalPartitionInformation;
     }
 
 
+    private int getNumPartitions() {
+        try {
+            String topicBrokersPath = partitionPath();
+            List<String> children = _curator.getChildren().forPath(topicBrokersPath);
+            return children.size();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
 
-	private int getNumPartitions() {
-		try {
-			String topicBrokersPath = partitionPath();
-			List<String> children = _curator.getChildren().forPath(topicBrokersPath);
-			return children.size();
-		} catch(Exception e) {
-			throw new RuntimeException(e);
-		}
-	}
-
-	public String partitionPath() {
-		return _zkPath + "/topics/" + _topic + "/partitions";
-	}
+    public String partitionPath() {
+        return _zkPath + "/topics/" + _topic + "/partitions";
+    }
 
-	public String brokerPath() {
-		return _zkPath + "/ids";
-	}
+    public String brokerPath() {
+        return _zkPath + "/ids";
+    }
 
-	/**
-	 * get /brokers/topics/distributedTopic/partitions/1/state
-	 * { "controller_epoch":4, "isr":[ 1, 0 ], "leader":1, "leader_epoch":1, "version":1 }
-	 * @param partition
-	 * @return
-	 */
-	private int getLeaderFor(long partition) {
-		try {
-			String topicBrokersPath = partitionPath();
-			byte[] hostPortData = _curator.getData().forPath(topicBrokersPath + "/" + partition + "/state" );
-			Map<Object, Object> value = (Map<Object,Object>) JSONValue.parse(new String(hostPortData, "UTF-8"));
-			Integer leader = ((Number) value.get("leader")).intValue();
-			return leader;
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		}
-	}
+    /**
+     * get /brokers/topics/distributedTopic/partitions/1/state
+     * { "controller_epoch":4, "isr":[ 1, 0 ], "leader":1, "leader_epoch":1, "version":1 }
+     *
+     * @param partition
+     * @return
+     */
+    private int getLeaderFor(long partition) {
+        try {
+            String topicBrokersPath = partitionPath();
+            byte[] hostPortData = _curator.getData().forPath(topicBrokersPath + "/" + partition + "/state");
+            Map<Object, Object> value = (Map<Object, Object>) JSONValue.parse(new String(hostPortData, "UTF-8"));
+            Integer leader = ((Number) value.get("leader")).intValue();
+            return leader;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
 
     public void close() {
         _curator.close();
     }
 
-	/**
-	 *
-	 * [zk: localhost:2181(CONNECTED) 56] get /brokers/ids/0
-	 * { "host":"localhost", "jmx_port":9999, "port":9092, "version":1 }
-	 *
-	 * @param contents
-	 * @return
-	 */
+    /**
+     * [zk: localhost:2181(CONNECTED) 56] get /brokers/ids/0
+     * { "host":"localhost", "jmx_port":9999, "port":9092, "version":1 }
+     *
+     * @param contents
+     * @return
+     */
     private HostPort getBrokerHost(byte[] contents) {
         try {
-			Map<Object, Object> value = (Map<Object,Object>) JSONValue.parse(new String(contents, "UTF-8"));
-			String host = (String) value.get("host");
-			Integer port = ((Long) value.get("port")).intValue();
+            Map<Object, Object> value = (Map<Object, Object>) JSONValue.parse(new String(contents, "UTF-8"));
+            String host = (String) value.get("host");
+            Integer port = ((Long) value.get("port")).intValue();
             return new HostPort(host, port);
         } catch (UnsupportedEncodingException e) {
             throw new RuntimeException(e);
         }
-    }  
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/DynamicPartitionConnections.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/DynamicPartitionConnections.java b/src/jvm/storm/kafka/DynamicPartitionConnections.java
index 409ec37..7a799a0 100644
--- a/src/jvm/storm/kafka/DynamicPartitionConnections.java
+++ b/src/jvm/storm/kafka/DynamicPartitionConnections.java
@@ -13,33 +13,33 @@ import java.util.Set;
 
 public class DynamicPartitionConnections {
 
-	public static final Logger LOG = LoggerFactory.getLogger(DynamicPartitionConnections.class);
+    public static final Logger LOG = LoggerFactory.getLogger(DynamicPartitionConnections.class);
 
     static class ConnectionInfo {
         SimpleConsumer consumer;
         Set<Integer> partitions = new HashSet();
-        
+
         public ConnectionInfo(SimpleConsumer consumer) {
             this.consumer = consumer;
         }
     }
-    
+
     Map<HostPort, ConnectionInfo> _connections = new HashMap();
     KafkaConfig _config;
-	IBrokerReader _reader;
-    
+    IBrokerReader _reader;
+
     public DynamicPartitionConnections(KafkaConfig config, IBrokerReader brokerReader) {
         _config = config;
-		_reader = brokerReader;
+        _reader = brokerReader;
     }
-    
+
     public SimpleConsumer register(Partition partition) {
-		HostPort hostPort = _reader.getCurrentBrokers().getHostFor(partition.partition);
-		return register(hostPort, partition.partition);
+        HostPort hostPort = _reader.getCurrentBrokers().getHostFor(partition.partition);
+        return register(hostPort, partition.partition);
     }
-    
+
     public SimpleConsumer register(HostPort host, int partition) {
-        if(!_connections.containsKey(host)) {
+        if (!_connections.containsKey(host)) {
             _connections.put(host, new ConnectionInfo(new SimpleConsumer(host.host, host.port, _config.socketTimeoutMs, _config.bufferSizeBytes, _config.clientId)));
         }
         ConnectionInfo info = _connections.get(host);
@@ -49,14 +49,16 @@ public class DynamicPartitionConnections {
 
     public SimpleConsumer getConnection(Partition partition) {
         ConnectionInfo info = _connections.get(partition.host);
-        if(info != null) return info.consumer;
+        if (info != null) {
+            return info.consumer;
+        }
         return null;
     }
-    
+
     public void unregister(HostPort port, int partition) {
         ConnectionInfo info = _connections.get(port);
         info.partitions.remove(partition);
-        if(info.partitions.isEmpty()) {
+        if (info.partitions.isEmpty()) {
             info.consumer.close();
             _connections.remove(port);
         }
@@ -65,9 +67,9 @@ public class DynamicPartitionConnections {
     public void unregister(Partition partition) {
         unregister(partition.host, partition.partition);
     }
-    
+
     public void clear() {
-        for(ConnectionInfo info: _connections.values()) {
+        for (ConnectionInfo info : _connections.values()) {
             info.consumer.close();
         }
         _connections.clear();

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/HostPort.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/HostPort.java b/src/jvm/storm/kafka/HostPort.java
index afb5da5..5369858 100644
--- a/src/jvm/storm/kafka/HostPort.java
+++ b/src/jvm/storm/kafka/HostPort.java
@@ -1,18 +1,16 @@
 package storm.kafka;
 
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
 
 public class HostPort implements Serializable, Comparable<HostPort> {
     public String host;
     public int port;
-    
+
     public HostPort(String host, int port) {
         this.host = host;
         this.port = port;
     }
-    
+
     public HostPort(String host) {
         this(host, 9092);
     }
@@ -33,26 +31,26 @@ public class HostPort implements Serializable, Comparable<HostPort> {
         return host + ":" + port;
     }
 
-	public static HostPort fromString(String host) {
-		HostPort hp;
-		String[] spec = host.split(":");
-		if (spec.length == 1) {
-			hp = new HostPort(spec[0]);
-		} else if (spec.length == 2) {
-			hp = new HostPort(spec[0], Integer.parseInt(spec[1]));
-		} else {
-			throw new IllegalArgumentException("Invalid host specification: " + host);
-		}
-		return hp;
-	}
-
-
-	@Override
-	public int compareTo(HostPort o) {
-		if ( this.host.equals(o.host)) {
-			return this.port - o.port;
-		} else {
-			return this.host.compareTo(o.host);
-		}
-	}
+    public static HostPort fromString(String host) {
+        HostPort hp;
+        String[] spec = host.split(":");
+        if (spec.length == 1) {
+            hp = new HostPort(spec[0]);
+        } else if (spec.length == 2) {
+            hp = new HostPort(spec[0], Integer.parseInt(spec[1]));
+        } else {
+            throw new IllegalArgumentException("Invalid host specification: " + host);
+        }
+        return hp;
+    }
+
+
+    @Override
+    public int compareTo(HostPort o) {
+        if (this.host.equals(o.host)) {
+            return this.port - o.port;
+        } else {
+            return this.host.compareTo(o.host);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/KafkaConfig.java b/src/jvm/storm/kafka/KafkaConfig.java
index 457eeb4..e241978 100644
--- a/src/jvm/storm/kafka/KafkaConfig.java
+++ b/src/jvm/storm/kafka/KafkaConfig.java
@@ -9,11 +9,11 @@ public class KafkaConfig implements Serializable {
 
     public final BrokerHosts hosts;
     public final String topic;
-	public final String clientId;
+    public final String clientId;
 
-    public int fetchSizeBytes = 1024*1024;
+    public int fetchSizeBytes = 1024 * 1024;
     public int socketTimeoutMs = 10000;
-    public int bufferSizeBytes = 1024*1024;
+    public int bufferSizeBytes = 1024 * 1024;
     public MultiScheme scheme = new RawMultiScheme();
     public boolean forceFromStart = false;
     public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
@@ -22,11 +22,11 @@ public class KafkaConfig implements Serializable {
         this(hosts, topic, kafka.api.OffsetRequest.DefaultClientId());
     }
 
-	public KafkaConfig(BrokerHosts hosts, String topic, String clientId) {
-		this.hosts = hosts;
-		this.topic = topic;
-		this.clientId = clientId;
-	}
+    public KafkaConfig(BrokerHosts hosts, String topic, String clientId) {
+        this.hosts = hosts;
+        this.topic = topic;
+        this.clientId = clientId;
+    }
 
     public void forceStartOffsetTime(long millis) {
         startOffsetTime = millis;

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/KafkaSpout.java b/src/jvm/storm/kafka/KafkaSpout.java
index 781e6ce..cf407ad 100644
--- a/src/jvm/storm/kafka/KafkaSpout.java
+++ b/src/jvm/storm/kafka/KafkaSpout.java
@@ -54,21 +54,25 @@ public class KafkaSpout extends BaseRichSpout {
     public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
         _collector = collector;
 
-	Map stateConf = new HashMap(conf);
+        Map stateConf = new HashMap(conf);
         List<String> zkServers = _spoutConfig.zkServers;
-        if(zkServers==null) zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
+        if (zkServers == null) {
+            zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
+        }
         Integer zkPort = _spoutConfig.zkPort;
-        if(zkPort==null) zkPort = ((Number) conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
+        if (zkPort == null) {
+            zkPort = ((Number) conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
+        }
         stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers);
         stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort);
         stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot);
-	    _state = new ZkState(stateConf);
+        _state = new ZkState(stateConf);
 
         _connections = new DynamicPartitionConnections(_spoutConfig, KafkaUtils.makeBrokerReader(conf, _spoutConfig));
 
         // using TransactionalState like this is a hack
         int totalTasks = context.getComponentTasks(context.getThisComponentId()).size();
-        if(_spoutConfig.hosts instanceof StaticHosts) {
+        if (_spoutConfig.hosts instanceof StaticHosts) {
             _coordinator = new StaticCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);
         } else {
             _coordinator = new ZkCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);
@@ -76,13 +80,16 @@ public class KafkaSpout extends BaseRichSpout {
 
         context.registerMetric("kafkaOffset", new IMetric() {
             KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(_spoutConfig.topic, _connections);
+
             @Override
             public Object getValueAndReset() {
                 List<PartitionManager> pms = _coordinator.getMyManagedPartitions();
                 Set<Partition> latestPartitions = new HashSet();
-                for(PartitionManager pm : pms) { latestPartitions.add(pm.getPartition()); }
+                for (PartitionManager pm : pms) {
+                    latestPartitions.add(pm.getPartition());
+                }
                 _kafkaOffsetMetric.refreshPartitions(latestPartitions);
-                for(PartitionManager pm : pms) {
+                for (PartitionManager pm : pms) {
                     _kafkaOffsetMetric.setLatestEmittedOffset(pm.getPartition(), pm.lastCompletedOffset());
                 }
                 return _kafkaOffsetMetric.getValueAndReset();
@@ -94,7 +101,7 @@ public class KafkaSpout extends BaseRichSpout {
             public Object getValueAndReset() {
                 List<PartitionManager> pms = _coordinator.getMyManagedPartitions();
                 Map concatMetricsDataMaps = new HashMap();
-                for(PartitionManager pm : pms) {
+                for (PartitionManager pm : pms) {
                     concatMetricsDataMaps.putAll(pm.getMetricsDataMap());
                 }
                 return concatMetricsDataMaps;
@@ -104,27 +111,27 @@ public class KafkaSpout extends BaseRichSpout {
 
     @Override
     public void close() {
-	_state.close();
+        _state.close();
     }
 
     @Override
     public void nextTuple() {
         List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
-        for(int i=0; i<managers.size(); i++) {
-            
+        for (int i = 0; i < managers.size(); i++) {
+
             // in case the number of managers decreased
             _currPartitionIndex = _currPartitionIndex % managers.size();
             EmitState state = managers.get(_currPartitionIndex).next(_collector);
-            if(state!=EmitState.EMITTED_MORE_LEFT) {
+            if (state != EmitState.EMITTED_MORE_LEFT) {
                 _currPartitionIndex = (_currPartitionIndex + 1) % managers.size();
             }
-            if(state!=EmitState.NO_EMITTED) {
+            if (state != EmitState.NO_EMITTED) {
                 break;
             }
         }
 
         long now = System.currentTimeMillis();
-        if((now - _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs) {
+        if ((now - _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs) {
             commit();
         }
     }
@@ -133,18 +140,18 @@ public class KafkaSpout extends BaseRichSpout {
     public void ack(Object msgId) {
         KafkaMessageId id = (KafkaMessageId) msgId;
         PartitionManager m = _coordinator.getManager(id.partition);
-        if(m!=null) {
+        if (m != null) {
             m.ack(id.offset);
-        }                
+        }
     }
 
     @Override
     public void fail(Object msgId) {
         KafkaMessageId id = (KafkaMessageId) msgId;
         PartitionManager m = _coordinator.getManager(id.partition);
-        if(m!=null) {
+        if (m != null) {
             m.fail(id.offset);
-        } 
+        }
     }
 
     @Override
@@ -159,7 +166,7 @@ public class KafkaSpout extends BaseRichSpout {
 
     private void commit() {
         _lastUpdateMs = System.currentTimeMillis();
-        for(PartitionManager manager: _coordinator.getMyManagedPartitions()) {
+        for (PartitionManager manager : _coordinator.getMyManagedPartitions()) {
             manager.commit();
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/Partition.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/Partition.java b/src/jvm/storm/kafka/Partition.java
index 6de0f6a..87ab7b8 100644
--- a/src/jvm/storm/kafka/Partition.java
+++ b/src/jvm/storm/kafka/Partition.java
@@ -6,7 +6,7 @@ import storm.trident.spout.ISpoutPartition;
 
 public class Partition implements ISpoutPartition {
 
-	public final HostPort host;
+    public final HostPort host;
     public final int partition;
 
     public Partition(HostPort host, int partition) {
@@ -14,30 +14,30 @@ public class Partition implements ISpoutPartition {
         this.partition = partition;
     }
 
-	@Override
-	public int hashCode() {
-		return Objects.hashCode(host, partition);
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (this == obj) {
-			return true;
-		}
-		if (obj == null || getClass() != obj.getClass()) {
-			return false;
-		}
-		final Partition other = (Partition) obj;
-		return Objects.equal(this.host, other.host) && Objects.equal(this.partition, other.partition);
-	}
-
-	@Override
-	public String toString() {
-		return "Partition{" +
-				"host=" + host +
-				", partition=" + partition +
-				'}';
-	}
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(host, partition);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null || getClass() != obj.getClass()) {
+            return false;
+        }
+        final Partition other = (Partition) obj;
+        return Objects.equal(this.host, other.host) && Objects.equal(this.partition, other.partition);
+    }
+
+    @Override
+    public String toString() {
+        return "Partition{" +
+                "host=" + host +
+                ", partition=" + partition +
+                '}';
+    }
 
     @Override
     public String getId() {

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/PartitionCoordinator.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/PartitionCoordinator.java b/src/jvm/storm/kafka/PartitionCoordinator.java
index 2ee2009..d28248d 100644
--- a/src/jvm/storm/kafka/PartitionCoordinator.java
+++ b/src/jvm/storm/kafka/PartitionCoordinator.java
@@ -4,5 +4,6 @@ import java.util.List;
 
 public interface PartitionCoordinator {
     List<PartitionManager> getMyManagedPartitions();
+
     PartitionManager getManager(Partition partition);
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/PartitionManager.java b/src/jvm/storm/kafka/PartitionManager.java
index 6596f96..623bc10 100644
--- a/src/jvm/storm/kafka/PartitionManager.java
+++ b/src/jvm/storm/kafka/PartitionManager.java
@@ -58,31 +58,30 @@ public class PartitionManager {
         _spoutConfig = spoutConfig;
         _topologyInstanceId = topologyInstanceId;
         _consumer = connections.register(id.host, id.partition);
-		_state = state;
+        _state = state;
         _stormConf = stormConf;
 
         String jsonTopologyId = null;
         Long jsonOffset = null;
         try {
             Map<Object, Object> json = _state.readJSON(committedPath());
-            if(json != null) {
-                jsonTopologyId = (String)((Map<Object,Object>)json.get("topology")).get("id");
-                jsonOffset = (Long)json.get("offset");
+            if (json != null) {
+                jsonTopologyId = (String) ((Map<Object, Object>) json.get("topology")).get("id");
+                jsonOffset = (Long) json.get("offset");
             }
-        }
-        catch(Throwable e) {
+        } catch (Throwable e) {
             LOG.warn("Error reading and/or parsing at ZkNode: " + committedPath(), e);
         }
 
-        if(!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.forceFromStart) {
+        if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.forceFromStart) {
             _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig.startOffsetTime);
-	    LOG.info("Using startOffsetTime to choose last commit offset.");
-        } else if(jsonTopologyId == null || jsonOffset == null) { // failed to parse JSON?
-            _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition,  kafka.api.OffsetRequest.LatestTime());
-	    LOG.info("Setting last commit offset to HEAD.");
+            LOG.info("Using startOffsetTime to choose last commit offset.");
+        } else if (jsonTopologyId == null || jsonOffset == null) { // failed to parse JSON?
+            _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, kafka.api.OffsetRequest.LatestTime());
+            LOG.info("Setting last commit offset to HEAD.");
         } else {
             _committedTo = jsonOffset;
-	    LOG.info("Read last commit offset from zookeeper: " + _committedTo);
+            LOG.info("Read last commit offset from zookeeper: " + _committedTo);
         }
 
         LOG.info("Starting Kafka " + _consumer.host() + ":" + id.partition + " from offset " + _committedTo);
@@ -105,22 +104,25 @@ public class PartitionManager {
 
     //returns false if it's reached the end of current batch
     public EmitState next(SpoutOutputCollector collector) {
-        if(_waitingToEmit.isEmpty()) fill();
-        while(true) {
+        if (_waitingToEmit.isEmpty()) {
+            fill();
+        }
+        while (true) {
             MessageAndRealOffset toEmit = _waitingToEmit.pollFirst();
-            if(toEmit==null) {
+            if (toEmit == null) {
                 return EmitState.NO_EMITTED;
             }
             Iterable<List<Object>> tups = _spoutConfig.scheme.deserialize(Utils.toByteArray(toEmit.msg.payload()));
-            if(tups!=null) {
-                for(List<Object> tup: tups)
+            if (tups != null) {
+                for (List<Object> tup : tups) {
                     collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));
+                }
                 break;
             } else {
                 ack(toEmit.offset);
             }
         }
-        if(!_waitingToEmit.isEmpty()) {
+        if (!_waitingToEmit.isEmpty()) {
             return EmitState.EMITTED_MORE_LEFT;
         } else {
             return EmitState.EMITTED_END;
@@ -132,11 +134,11 @@ public class PartitionManager {
         long start = System.nanoTime();
         ByteBufferMessageSet msgs = _consumer.fetch(
                 new FetchRequestBuilder().addFetch(
-                    _spoutConfig.topic,
-                    _partition.partition,
-                    _emittedToOffset,
-                    _spoutConfig.fetchSizeBytes).build()).messageSet(_spoutConfig.topic,
-				_partition.partition);
+                        _spoutConfig.topic,
+                        _partition.partition,
+                        _emittedToOffset,
+                        _spoutConfig.fetchSizeBytes).build()).messageSet(_spoutConfig.topic,
+                _partition.partition);
         long end = System.nanoTime();
         long millis = (end - start) / 1000000;
         _fetchAPILatencyMax.update(millis);
@@ -145,26 +147,26 @@ public class PartitionManager {
         int numMessages = countMessages(msgs);
         _fetchAPIMessageCount.incrBy(numMessages);
 
-        if(numMessages>0) {
-          LOG.info("Fetched " + numMessages + " messages from Kafka: " + _consumer.host() + ":" + _partition.partition);
+        if (numMessages > 0) {
+            LOG.info("Fetched " + numMessages + " messages from Kafka: " + _consumer.host() + ":" + _partition.partition);
         }
-        for(MessageAndOffset msg: msgs) {
+        for (MessageAndOffset msg : msgs) {
             _pending.add(_emittedToOffset);
             _waitingToEmit.add(new MessageAndRealOffset(msg.message(), _emittedToOffset));
             _emittedToOffset = msg.nextOffset();
         }
-        if(numMessages>0) {
-          LOG.info("Added " + numMessages + " messages from Kafka: " + _consumer.host() + ":" + _partition.partition + " to internal buffers");
+        if (numMessages > 0) {
+            LOG.info("Added " + numMessages + " messages from Kafka: " + _consumer.host() + ":" + _partition.partition + " to internal buffers");
         }
     }
 
-	private int countMessages(ByteBufferMessageSet messageSet) {
-		int counter = 0;
-		for (MessageAndOffset messageAndOffset : messageSet) {
-			counter = counter + 1;
-		}
-		return counter;
-	}
+    private int countMessages(ByteBufferMessageSet messageSet) {
+        int counter = 0;
+        for (MessageAndOffset messageAndOffset : messageSet) {
+            counter = counter + 1;
+        }
+        return counter;
+    }
 
     public void ack(Long offset) {
         _pending.remove(offset);
@@ -173,7 +175,7 @@ public class PartitionManager {
     public void fail(Long offset) {
         //TODO: should it use in-memory ack set to skip anything that's been acked but not committed???
         // things might get crazy with lots of timeouts
-        if(_emittedToOffset > offset) {
+        if (_emittedToOffset > offset) {
             _emittedToOffset = offset;
             _pending.tailSet(offset).clear();
         }
@@ -182,23 +184,23 @@ public class PartitionManager {
     public void commit() {
         LOG.info("Committing offset for " + _partition);
         long committedTo;
-        if(_pending.isEmpty()) {
+        if (_pending.isEmpty()) {
             committedTo = _emittedToOffset;
         } else {
             committedTo = _pending.first();
         }
-        if(committedTo!=_committedTo) {
+        if (committedTo != _committedTo) {
             LOG.info("Writing committed offset to ZK: " + committedTo);
 
-            Map<Object, Object> data = (Map<Object,Object>)ImmutableMap.builder()
-                .put("topology", ImmutableMap.of("id", _topologyInstanceId,
-						"name", _stormConf.get(Config.TOPOLOGY_NAME)))
-                .put("offset", committedTo)
-                .put("partition", _partition.partition)
-                .put("broker", ImmutableMap.of("host", _partition.host.host,
-						"port", _partition.host.port))
-                .put("topic", _spoutConfig.topic).build();
-	    _state.writeJSON(committedPath(), data);
+            Map<Object, Object> data = (Map<Object, Object>) ImmutableMap.builder()
+                    .put("topology", ImmutableMap.of("id", _topologyInstanceId,
+                            "name", _stormConf.get(Config.TOPOLOGY_NAME)))
+                    .put("offset", committedTo)
+                    .put("partition", _partition.partition)
+                    .put("broker", ImmutableMap.of("host", _partition.host.host,
+                            "port", _partition.host.port))
+                    .put("topic", _spoutConfig.topic).build();
+            _state.writeJSON(committedPath(), data);
 
             LOG.info("Wrote committed offset to ZK: " + committedTo);
             _committedTo = committedTo;
@@ -212,7 +214,7 @@ public class PartitionManager {
 
     public long queryPartitionOffsetLatestTime() {
         return KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition,
-				OffsetRequest.LatestTime());
+                OffsetRequest.LatestTime());
     }
 
     public long lastCommittedOffset() {
@@ -220,7 +222,7 @@ public class PartitionManager {
     }
 
     public long lastCompletedOffset() {
-        if(_pending.isEmpty()) {
+        if (_pending.isEmpty()) {
             return _emittedToOffset;
         } else {
             return _pending.first();

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/StaticCoordinator.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/StaticCoordinator.java b/src/jvm/storm/kafka/StaticCoordinator.java
index 6f97c8b..7415522 100644
--- a/src/jvm/storm/kafka/StaticCoordinator.java
+++ b/src/jvm/storm/kafka/StaticCoordinator.java
@@ -13,22 +13,22 @@ public class StaticCoordinator implements PartitionCoordinator {
     public StaticCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig config, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) {
         StaticHosts hosts = (StaticHosts) config.hosts;
         List<Partition> partitions = hosts.getPartitionInformation().getOrderedPartitions();
-        for(int i=taskIndex; i<partitions.size(); i+=totalTasks) {
+        for (int i = taskIndex; i < partitions.size(); i += totalTasks) {
             Partition myPartition = partitions.get(i);
             _managers.put(myPartition, new PartitionManager(connections, topologyInstanceId, state, stormConf, config, myPartition));
-            
+
         }
-        
+
         _allManagers = new ArrayList(_managers.values());
     }
-    
+
     @Override
     public List<PartitionManager> getMyManagedPartitions() {
         return _allManagers;
     }
-    
+
     public PartitionManager getManager(Partition partition) {
         return _managers.get(partition);
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/StaticHosts.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/StaticHosts.java b/src/jvm/storm/kafka/StaticHosts.java
index 6ed828d..9ed7193 100644
--- a/src/jvm/storm/kafka/StaticHosts.java
+++ b/src/jvm/storm/kafka/StaticHosts.java
@@ -2,9 +2,6 @@ package storm.kafka;
 
 import storm.kafka.trident.GlobalPartitionInformation;
 
-import java.util.ArrayList;
-import java.util.List;
-
 /**
  * Date: 11/05/2013
  * Time: 14:43
@@ -12,13 +9,13 @@ import java.util.List;
 public class StaticHosts implements BrokerHosts {
 
 
-	private GlobalPartitionInformation partitionInformation;
+    private GlobalPartitionInformation partitionInformation;
 
-	public StaticHosts(GlobalPartitionInformation partitionInformation) {
-		this.partitionInformation = partitionInformation;
-	}
+    public StaticHosts(GlobalPartitionInformation partitionInformation) {
+        this.partitionInformation = partitionInformation;
+    }
 
-	public GlobalPartitionInformation getPartitionInformation() {
-		return partitionInformation;
-	}
+    public GlobalPartitionInformation getPartitionInformation() {
+        return partitionInformation;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/StaticPartitionConnections.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/StaticPartitionConnections.java b/src/jvm/storm/kafka/StaticPartitionConnections.java
index 2d40c8b..4294362 100644
--- a/src/jvm/storm/kafka/StaticPartitionConnections.java
+++ b/src/jvm/storm/kafka/StaticPartitionConnections.java
@@ -9,17 +9,17 @@ public class StaticPartitionConnections {
     Map<Integer, SimpleConsumer> _kafka = new HashMap<Integer, SimpleConsumer>();
     KafkaConfig _config;
     StaticHosts hosts;
-    
+
     public StaticPartitionConnections(KafkaConfig conf) {
         _config = conf;
-        if(!(conf.hosts instanceof StaticHosts)) {
+        if (!(conf.hosts instanceof StaticHosts)) {
             throw new RuntimeException("Must configure with static hosts");
         }
         this.hosts = (StaticHosts) conf.hosts;
     }
 
     public SimpleConsumer getConsumer(int partition) {
-		if(!_kafka.containsKey(partition)) {
+        if (!_kafka.containsKey(partition)) {
             HostPort hp = hosts.getPartitionInformation().getHostFor(partition);
             _kafka.put(partition, new SimpleConsumer(hp.host, hp.port, _config.socketTimeoutMs, _config.bufferSizeBytes, _config.clientId));
 
@@ -28,7 +28,7 @@ public class StaticPartitionConnections {
     }
 
     public void close() {
-        for(SimpleConsumer consumer: _kafka.values()) {
+        for (SimpleConsumer consumer : _kafka.values()) {
             consumer.close();
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/ZkCoordinator.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/ZkCoordinator.java b/src/jvm/storm/kafka/ZkCoordinator.java
index d457bdd..98e51a3 100644
--- a/src/jvm/storm/kafka/ZkCoordinator.java
+++ b/src/jvm/storm/kafka/ZkCoordinator.java
@@ -9,7 +9,7 @@ import java.util.*;
 
 public class ZkCoordinator implements PartitionCoordinator {
     public static final Logger LOG = LoggerFactory.getLogger(ZkCoordinator.class);
-    
+
     SpoutConfig _spoutConfig;
     int _taskIndex;
     int _totalTasks;
@@ -23,7 +23,7 @@ public class ZkCoordinator implements PartitionCoordinator {
     ZkState _state;
     Map _stormConf;
     IMetricsContext _metricsContext;
-    
+
     public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) {
         _spoutConfig = spoutConfig;
         _connections = connections;
@@ -31,55 +31,55 @@ public class ZkCoordinator implements PartitionCoordinator {
         _totalTasks = totalTasks;
         _topologyInstanceId = topologyInstanceId;
         _stormConf = stormConf;
-		_state = state;
+        _state = state;
 
         ZkHosts brokerConf = (ZkHosts) spoutConfig.hosts;
         _refreshFreqMs = brokerConf.refreshFreqSecs * 1000;
         _reader = new DynamicBrokersReader(stormConf, brokerConf.brokerZkStr, brokerConf.brokerZkPath, spoutConfig.topic);
-        
+
     }
-    
+
     @Override
     public List<PartitionManager> getMyManagedPartitions() {
-        if(_lastRefreshTime==null || (System.currentTimeMillis() - _lastRefreshTime) > _refreshFreqMs) {
+        if (_lastRefreshTime == null || (System.currentTimeMillis() - _lastRefreshTime) > _refreshFreqMs) {
             refresh();
             _lastRefreshTime = System.currentTimeMillis();
         }
         return _cachedList;
     }
-    
+
     void refresh() {
         try {
             LOG.info("Refreshing partition manager connections");
-			GlobalPartitionInformation brokerInfo = _reader.getBrokerInfo();
+            GlobalPartitionInformation brokerInfo = _reader.getBrokerInfo();
             Set<Partition> mine = new HashSet();
-			for (Partition partitionId: brokerInfo){
-				if(myOwnership(partitionId)) {
-					mine.add(partitionId);
-				}
-			}
+            for (Partition partitionId : brokerInfo) {
+                if (myOwnership(partitionId)) {
+                    mine.add(partitionId);
+                }
+            }
 
             Set<Partition> curr = _managers.keySet();
             Set<Partition> newPartitions = new HashSet<Partition>(mine);
             newPartitions.removeAll(curr);
-            
+
             Set<Partition> deletedPartitions = new HashSet<Partition>(curr);
             deletedPartitions.removeAll(mine);
-            
+
             LOG.info("Deleted partition managers: " + deletedPartitions.toString());
-            
-            for(Partition id: deletedPartitions) {
+
+            for (Partition id : deletedPartitions) {
                 PartitionManager man = _managers.remove(id);
                 man.close();
             }
             LOG.info("New partition managers: " + newPartitions.toString());
-            
-            for(Partition id: newPartitions) {
+
+            for (Partition id : newPartitions) {
                 PartitionManager man = new PartitionManager(_connections, _topologyInstanceId, _state, _stormConf, _spoutConfig, id);
                 _managers.put(id, man);
             }
-            
-        } catch(Exception e) {
+
+        } catch (Exception e) {
             throw new RuntimeException(e);
         }
         _cachedList = new ArrayList<PartitionManager>(_managers.values());
@@ -90,9 +90,9 @@ public class ZkCoordinator implements PartitionCoordinator {
     public PartitionManager getManager(Partition partition) {
         return _managers.get(partition);
     }
-    
+
     private boolean myOwnership(Partition id) {
-        int val = Math.abs(id.host.hashCode() + 23 * id.partition);        
+        int val = Math.abs(id.host.hashCode() + 23 * id.partition);
         return val % _totalTasks == _taskIndex;
-    } 
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/ZkHosts.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/ZkHosts.java b/src/jvm/storm/kafka/ZkHosts.java
index dfe3b4c..f2e0fc2 100644
--- a/src/jvm/storm/kafka/ZkHosts.java
+++ b/src/jvm/storm/kafka/ZkHosts.java
@@ -5,18 +5,18 @@ package storm.kafka;
  * Time: 14:38
  */
 public class ZkHosts implements BrokerHosts {
-	private static final String DEFAULT_ZK_PATH = "/brokers";
+    private static final String DEFAULT_ZK_PATH = "/brokers";
 
-	public String brokerZkStr = null;
-	public String brokerZkPath = null; // e.g., /kafka/brokers
-	public int refreshFreqSecs = 60;
+    public String brokerZkStr = null;
+    public String brokerZkPath = null; // e.g., /kafka/brokers
+    public int refreshFreqSecs = 60;
 
-	public ZkHosts(String brokerZkStr, String brokerZkPath) {
-		this.brokerZkStr = brokerZkStr;
-		this.brokerZkPath = brokerZkPath;
-	}
+    public ZkHosts(String brokerZkStr, String brokerZkPath) {
+        this.brokerZkStr = brokerZkStr;
+        this.brokerZkPath = brokerZkPath;
+    }
 
-	public ZkHosts(String brokerZkStr) {
-		this(brokerZkStr, DEFAULT_ZK_PATH);
-	}
+    public ZkHosts(String brokerZkStr) {
+        this(brokerZkStr, DEFAULT_ZK_PATH);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/ZkState.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/ZkState.java b/src/jvm/storm/kafka/ZkState.java
index 17ebb25..d5416af 100644
--- a/src/jvm/storm/kafka/ZkState.java
+++ b/src/jvm/storm/kafka/ZkState.java
@@ -20,78 +20,80 @@ public class ZkState {
     CuratorFramework _curator;
 
     private CuratorFramework newCurator(Map stateConf) throws Exception {
-        Integer port = (Integer)stateConf.get(Config.TRANSACTIONAL_ZOOKEEPER_PORT);
-	String serverPorts = "";
-        for(String server: (List<String>)stateConf.get(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS)) {
+        Integer port = (Integer) stateConf.get(Config.TRANSACTIONAL_ZOOKEEPER_PORT);
+        String serverPorts = "";
+        for (String server : (List<String>) stateConf.get(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS)) {
             serverPorts = serverPorts + server + ":" + port + ",";
         }
-	return CuratorFrameworkFactory.newClient(serverPorts,
-		Utils.getInt(stateConf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)), 
-		15000, 
-		new RetryNTimes(Utils.getInt(stateConf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)),
-				Utils.getInt(stateConf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL))));
+        return CuratorFrameworkFactory.newClient(serverPorts,
+                Utils.getInt(stateConf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)),
+                15000,
+                new RetryNTimes(Utils.getInt(stateConf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)),
+                        Utils.getInt(stateConf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL))));
     }
 
     public CuratorFramework getCurator() {
-	assert _curator != null;
+        assert _curator != null;
         return _curator;
     }
 
     public ZkState(Map stateConf) {
-	stateConf = new HashMap(stateConf);
+        stateConf = new HashMap(stateConf);
 
-	try {
-	    _curator = newCurator(stateConf);
-	    _curator.start();
-	} catch(Exception e) {
-	    throw new RuntimeException(e);
-	}
+        try {
+            _curator = newCurator(stateConf);
+            _curator.start();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
     }
 
-    public void writeJSON(String path, Map<Object,Object> data) {
-	LOG.info("Writing " + path + " the data " + data.toString());
+    public void writeJSON(String path, Map<Object, Object> data) {
+        LOG.info("Writing " + path + " the data " + data.toString());
         writeBytes(path, JSONValue.toJSONString(data).getBytes(Charset.forName("UTF-8")));
     }
 
     public void writeBytes(String path, byte[] bytes) {
         try {
-            if(_curator.checkExists().forPath(path)==null) {
+            if (_curator.checkExists().forPath(path) == null) {
                 _curator.create()
                         .creatingParentsIfNeeded()
                         .withMode(CreateMode.PERSISTENT)
                         .forPath(path, bytes);
             } else {
-		_curator.setData().forPath(path, bytes);
-	    }
-        } catch(Exception e) {
+                _curator.setData().forPath(path, bytes);
+            }
+        } catch (Exception e) {
             throw new RuntimeException(e);
         }
     }
 
-    public Map<Object,Object> readJSON(String path) {
-	try {
-	    byte[] b = readBytes(path);
-	    if(b==null) return null;
-	    return (Map<Object,Object>)JSONValue.parse(new String(b, "UTF-8"));
-	} catch(Exception e) {
-	    throw new RuntimeException(e);
-	}
+    public Map<Object, Object> readJSON(String path) {
+        try {
+            byte[] b = readBytes(path);
+            if (b == null) {
+                return null;
+            }
+            return (Map<Object, Object>) JSONValue.parse(new String(b, "UTF-8"));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
     }
 
     public byte[] readBytes(String path) {
         try {
-            if(_curator.checkExists().forPath(path)!=null) {
-		return _curator.getData().forPath(path);
+            if (_curator.checkExists().forPath(path) != null) {
+                return _curator.getData().forPath(path);
             } else {
                 return null;
             }
-        } catch(Exception e) {
+        } catch (Exception e) {
             throw new RuntimeException(e);
         }
     }
 
     public void close() {
-	_curator.close();
-	_curator = null;
+        _curator.close();
+        _curator = null;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/trident/Coordinator.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/trident/Coordinator.java b/src/jvm/storm/kafka/trident/Coordinator.java
index b7ddd3f..d97feed 100644
--- a/src/jvm/storm/kafka/trident/Coordinator.java
+++ b/src/jvm/storm/kafka/trident/Coordinator.java
@@ -11,26 +11,26 @@ import java.util.Map;
  */
 class Coordinator implements IPartitionedTridentSpout.Coordinator<GlobalPartitionInformation>, IOpaquePartitionedTridentSpout.Coordinator<GlobalPartitionInformation> {
 
-	private IBrokerReader reader;
-	private TridentKafkaConfig config;
-
-	public Coordinator(Map conf, TridentKafkaConfig tridentKafkaConfig) {
-		config = tridentKafkaConfig;
-		reader = KafkaUtils.makeBrokerReader(conf, config);
-	}
-
-	@Override
-	public void close() {
-		config.coordinator.close();
-	}
-
-	@Override
-	public boolean isReady(long txid) {
-		return config.coordinator.isReady(txid);
-	}
-
-	@Override
-	public GlobalPartitionInformation getPartitionsForBatch() {
-		return reader.getCurrentBrokers();
-	}
+    private IBrokerReader reader;
+    private TridentKafkaConfig config;
+
+    public Coordinator(Map conf, TridentKafkaConfig tridentKafkaConfig) {
+        config = tridentKafkaConfig;
+        reader = KafkaUtils.makeBrokerReader(conf, config);
+    }
+
+    @Override
+    public void close() {
+        config.coordinator.close();
+    }
+
+    @Override
+    public boolean isReady(long txid) {
+        return config.coordinator.isReady(txid);
+    }
+
+    @Override
+    public GlobalPartitionInformation getPartitionsForBatch() {
+        return reader.getCurrentBrokers();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/trident/DefaultCoordinator.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/trident/DefaultCoordinator.java b/src/jvm/storm/kafka/trident/DefaultCoordinator.java
index 3a47706..89cd503 100644
--- a/src/jvm/storm/kafka/trident/DefaultCoordinator.java
+++ b/src/jvm/storm/kafka/trident/DefaultCoordinator.java
@@ -10,5 +10,5 @@ public class DefaultCoordinator implements IBatchCoordinator {
     @Override
     public void close() {
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/trident/GlobalPartitionInformation.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/trident/GlobalPartitionInformation.java b/src/jvm/storm/kafka/trident/GlobalPartitionInformation.java
index 139a0d7..6b0fdec 100644
--- a/src/jvm/storm/kafka/trident/GlobalPartitionInformation.java
+++ b/src/jvm/storm/kafka/trident/GlobalPartitionInformation.java
@@ -1,7 +1,7 @@
 package storm.kafka.trident;
 
-import storm.kafka.Partition;
 import storm.kafka.HostPort;
+import storm.kafka.Partition;
 
 import java.io.Serializable;
 import java.util.*;
@@ -12,55 +12,55 @@ import java.util.*;
  */
 public class GlobalPartitionInformation implements Iterable<Partition>, Serializable {
 
-	private Map<Integer, HostPort> partitionMap;
+    private Map<Integer, HostPort> partitionMap;
 
-	public GlobalPartitionInformation() {
-		partitionMap = new TreeMap<Integer, HostPort>();
-	}
+    public GlobalPartitionInformation() {
+        partitionMap = new TreeMap<Integer, HostPort>();
+    }
 
-	public void addPartition(int partitionId, HostPort broker) {
-		partitionMap.put(partitionId, broker);
-	}
+    public void addPartition(int partitionId, HostPort broker) {
+        partitionMap.put(partitionId, broker);
+    }
 
-	@Override
-	public String toString() {
-		return "GlobalPartitionInformation{" +
-				"partitionMap=" + partitionMap +
-				'}';
-	}
+    @Override
+    public String toString() {
+        return "GlobalPartitionInformation{" +
+                "partitionMap=" + partitionMap +
+                '}';
+    }
 
-	public HostPort getHostFor(Integer partitionId) {
-		return partitionMap.get(partitionId);
-	}
+    public HostPort getHostFor(Integer partitionId) {
+        return partitionMap.get(partitionId);
+    }
 
-	public List<Partition> getOrderedPartitions(){
-		List<Partition> partitions = new LinkedList<Partition>();
-		for (Map.Entry<Integer, HostPort> partition : partitionMap.entrySet()) {
-			partitions.add(new Partition(partition.getValue(), partition.getKey()));
-		}
-		return partitions;
-	}
+    public List<Partition> getOrderedPartitions() {
+        List<Partition> partitions = new LinkedList<Partition>();
+        for (Map.Entry<Integer, HostPort> partition : partitionMap.entrySet()) {
+            partitions.add(new Partition(partition.getValue(), partition.getKey()));
+        }
+        return partitions;
+    }
 
-	@Override
-	public Iterator<Partition> iterator() {
-		final Iterator<Map.Entry<Integer, HostPort>> iterator = partitionMap.entrySet().iterator();
+    @Override
+    public Iterator<Partition> iterator() {
+        final Iterator<Map.Entry<Integer, HostPort>> iterator = partitionMap.entrySet().iterator();
 
-		return new Iterator<Partition>() {
-			@Override
-			public boolean hasNext() {
-				return iterator.hasNext();
-			}
+        return new Iterator<Partition>() {
+            @Override
+            public boolean hasNext() {
+                return iterator.hasNext();
+            }
 
-			@Override
-			public Partition next() {
-				Map.Entry<Integer, HostPort> next = iterator.next();
-				return new Partition(next.getValue(), next.getKey());
-			}
+            @Override
+            public Partition next() {
+                Map.Entry<Integer, HostPort> next = iterator.next();
+                return new Partition(next.getValue(), next.getKey());
+            }
 
-			@Override
-			public void remove() {
-				iterator.remove();
-			}
-		};
-	}
+            @Override
+            public void remove() {
+                iterator.remove();
+            }
+        };
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/trident/IBatchCoordinator.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/trident/IBatchCoordinator.java b/src/jvm/storm/kafka/trident/IBatchCoordinator.java
index 9199a8d..1b8a8ce 100644
--- a/src/jvm/storm/kafka/trident/IBatchCoordinator.java
+++ b/src/jvm/storm/kafka/trident/IBatchCoordinator.java
@@ -4,5 +4,6 @@ import java.io.Serializable;
 
 public interface IBatchCoordinator extends Serializable {
     boolean isReady(long txid);
+
     void close();
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/trident/IBrokerReader.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/trident/IBrokerReader.java b/src/jvm/storm/kafka/trident/IBrokerReader.java
index 4e2421b..73c9738 100644
--- a/src/jvm/storm/kafka/trident/IBrokerReader.java
+++ b/src/jvm/storm/kafka/trident/IBrokerReader.java
@@ -3,5 +3,6 @@ package storm.kafka.trident;
 public interface IBrokerReader {
 
     GlobalPartitionInformation getCurrentBrokers();
+
     void close();
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/trident/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/trident/KafkaUtils.java b/src/jvm/storm/kafka/trident/KafkaUtils.java
index efe4fef..18dd851 100644
--- a/src/jvm/storm/kafka/trident/KafkaUtils.java
+++ b/src/jvm/storm/kafka/trident/KafkaUtils.java
@@ -16,33 +16,33 @@ import java.util.Set;
 
 public class KafkaUtils {
     public static final Logger LOG = LoggerFactory.getLogger(KafkaUtils.class);
-	private static final int NO_OFFSET = -5;
+    private static final int NO_OFFSET = -5;
 
 
-	public static IBrokerReader makeBrokerReader(Map stormConf, KafkaConfig conf) {
-		if(conf.hosts instanceof StaticHosts) {
-			return new StaticBrokerReader(((StaticHosts) conf.hosts).getPartitionInformation());
-		} else {
-			return new ZkBrokerReader(stormConf, conf.topic, (ZkHosts) conf.hosts);
-		}
-	}
+    public static IBrokerReader makeBrokerReader(Map stormConf, KafkaConfig conf) {
+        if (conf.hosts instanceof StaticHosts) {
+            return new StaticBrokerReader(((StaticHosts) conf.hosts).getPartitionInformation());
+        } else {
+            return new ZkBrokerReader(stormConf, conf.topic, (ZkHosts) conf.hosts);
+        }
+    }
 
-	public static long getOffset(SimpleConsumer consumer, String topic, int partition, long startOffsetTime) {
-		TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
-		Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
-		requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(startOffsetTime, 1));
-		OffsetRequest request = new OffsetRequest(
-				requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
+    public static long getOffset(SimpleConsumer consumer, String topic, int partition, long startOffsetTime) {
+        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
+        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
+        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(startOffsetTime, 1));
+        OffsetRequest request = new OffsetRequest(
+                requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
 
-		long[] offsets = consumer.getOffsetsBefore(request).offsets(topic, partition);
-		if ( offsets.length > 0) {
-			return offsets[0];
-		} else {
-			return NO_OFFSET;
-		}
-	}
+        long[] offsets = consumer.getOffsetsBefore(request).offsets(topic, partition);
+        if (offsets.length > 0) {
+            return offsets[0];
+        } else {
+            return NO_OFFSET;
+        }
+    }
 
-	public static class KafkaOffsetMetric implements IMetric {
+    public static class KafkaOffsetMetric implements IMetric {
         Map<Partition, Long> _partitionToOffset = new HashMap<Partition, Long>();
         Set<Partition> _partitions;
         String _topic;
@@ -64,16 +64,16 @@ public class KafkaUtils {
                 long totalLatestTimeOffset = 0;
                 long totalLatestEmittedOffset = 0;
                 HashMap ret = new HashMap();
-                if(_partitions != null && _partitions.size() == _partitionToOffset.size()) {
-                    for(Map.Entry<Partition, Long> e : _partitionToOffset.entrySet()) {
+                if (_partitions != null && _partitions.size() == _partitionToOffset.size()) {
+                    for (Map.Entry<Partition, Long> e : _partitionToOffset.entrySet()) {
                         Partition partition = e.getKey();
                         SimpleConsumer consumer = _connections.getConnection(partition);
-                        if(consumer == null) {
+                        if (consumer == null) {
                             LOG.warn("partitionToOffset contains partition not found in _connections. Stale partition data?");
                             return null;
                         }
                         long latestTimeOffset = getOffset(consumer, _topic, partition.partition, kafka.api.OffsetRequest.LatestTime());
-                        if(latestTimeOffset == 0) {
+                        if (latestTimeOffset == 0) {
                             LOG.warn("No data found in Kafka Partition " + partition.getId());
                             return null;
                         }
@@ -93,18 +93,22 @@ public class KafkaUtils {
                 } else {
                     LOG.info("Metrics Tick: Not enough data to calculate spout lag.");
                 }
-            } catch(Throwable t) {
+            } catch (Throwable t) {
                 LOG.warn("Metrics Tick: Exception when computing kafkaOffset metric.", t);
             }
             return null;
         }
 
-       public void refreshPartitions(Set<Partition> partitions) {
-           _partitions = partitions;
-           Iterator<Partition> it = _partitionToOffset.keySet().iterator();
-           while(it.hasNext()) {
-               if(!partitions.contains(it.next())) it.remove();
-           }
-       }
-    };
+        public void refreshPartitions(Set<Partition> partitions) {
+            _partitions = partitions;
+            Iterator<Partition> it = _partitionToOffset.keySet().iterator();
+            while (it.hasNext()) {
+                if (!partitions.contains(it.next())) {
+                    it.remove();
+                }
+            }
+        }
+    }
+
+    ;
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/trident/MaxMetric.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/trident/MaxMetric.java b/src/jvm/storm/kafka/trident/MaxMetric.java
index 087245f..a8f88ba 100644
--- a/src/jvm/storm/kafka/trident/MaxMetric.java
+++ b/src/jvm/storm/kafka/trident/MaxMetric.java
@@ -11,8 +11,12 @@ public class MaxMetric implements ICombiner<Long> {
 
     @Override
     public Long combine(Long l1, Long l2) {
-        if(l1 == null) return l2;
-        if(l2 == null) return l1;
+        if (l1 == null) {
+            return l2;
+        }
+        if (l2 == null) {
+            return l1;
+        }
         return Math.max(l1, l2);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/trident/OpaqueTridentKafkaSpout.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/trident/OpaqueTridentKafkaSpout.java b/src/jvm/storm/kafka/trident/OpaqueTridentKafkaSpout.java
index 0f6e6c5..35b7033 100644
--- a/src/jvm/storm/kafka/trident/OpaqueTridentKafkaSpout.java
+++ b/src/jvm/storm/kafka/trident/OpaqueTridentKafkaSpout.java
@@ -11,19 +11,19 @@ import java.util.UUID;
 
 public class OpaqueTridentKafkaSpout implements IOpaquePartitionedTridentSpout<GlobalPartitionInformation, Partition, Map> {
 
-    
+
     TridentKafkaConfig _config;
     String _topologyInstanceId = UUID.randomUUID().toString();
-    
+
     public OpaqueTridentKafkaSpout(TridentKafkaConfig config) {
         _config = config;
     }
-    
+
     @Override
     public IOpaquePartitionedTridentSpout.Emitter<GlobalPartitionInformation, Partition, Map> getEmitter(Map conf, TopologyContext context) {
-		return new TridentKafkaEmitter(conf, context, _config, _topologyInstanceId).asOpaqueEmitter();
+        return new TridentKafkaEmitter(conf, context, _config, _topologyInstanceId).asOpaqueEmitter();
     }
-    
+
     @Override
     public IOpaquePartitionedTridentSpout.Coordinator getCoordinator(Map conf, TopologyContext tc) {
         return new storm.kafka.trident.Coordinator(conf, _config);
@@ -32,8 +32,8 @@ public class OpaqueTridentKafkaSpout implements IOpaquePartitionedTridentSpout<G
     @Override
     public Fields getOutputFields() {
         return _config.scheme.getOutputFields();
-    }    
-    
+    }
+
     @Override
     public Map<String, Object> getComponentConfiguration() {
         return null;

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/trident/StaticBrokerReader.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/trident/StaticBrokerReader.java b/src/jvm/storm/kafka/trident/StaticBrokerReader.java
index dabbd5e..98a8f53 100644
--- a/src/jvm/storm/kafka/trident/StaticBrokerReader.java
+++ b/src/jvm/storm/kafka/trident/StaticBrokerReader.java
@@ -2,12 +2,12 @@ package storm.kafka.trident;
 
 public class StaticBrokerReader implements IBrokerReader {
 
-	private GlobalPartitionInformation brokers = new GlobalPartitionInformation();
-    
+    private GlobalPartitionInformation brokers = new GlobalPartitionInformation();
+
     public StaticBrokerReader(GlobalPartitionInformation partitionInformation) {
         this.brokers = partitionInformation;
     }
-    
+
     @Override
     public GlobalPartitionInformation getCurrentBrokers() {
         return brokers;

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/trident/TransactionalTridentKafkaSpout.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/trident/TransactionalTridentKafkaSpout.java b/src/jvm/storm/kafka/trident/TransactionalTridentKafkaSpout.java
index 173a98f..b32d301 100644
--- a/src/jvm/storm/kafka/trident/TransactionalTridentKafkaSpout.java
+++ b/src/jvm/storm/kafka/trident/TransactionalTridentKafkaSpout.java
@@ -10,7 +10,7 @@ import java.util.UUID;
 
 
 public class TransactionalTridentKafkaSpout implements IPartitionedTridentSpout<GlobalPartitionInformation, Partition, Map> {
-    
+
     TridentKafkaConfig _config;
     String _topologyInstanceId = UUID.randomUUID().toString();
 
@@ -26,14 +26,14 @@ public class TransactionalTridentKafkaSpout implements IPartitionedTridentSpout<
 
     @Override
     public IPartitionedTridentSpout.Emitter getEmitter(Map conf, TopologyContext context) {
-		return new TridentKafkaEmitter(conf, context, _config, _topologyInstanceId).asTransactionalEmitter();
+        return new TridentKafkaEmitter(conf, context, _config, _topologyInstanceId).asTransactionalEmitter();
     }
 
     @Override
     public Fields getOutputFields() {
-		return _config.scheme.getOutputFields();
+        return _config.scheme.getOutputFields();
     }
-        
+
     @Override
     public Map<String, Object> getComponentConfiguration() {
         return null;

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/trident/TridentKafkaConfig.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/trident/TridentKafkaConfig.java b/src/jvm/storm/kafka/trident/TridentKafkaConfig.java
index 7195500..073afa2 100644
--- a/src/jvm/storm/kafka/trident/TridentKafkaConfig.java
+++ b/src/jvm/storm/kafka/trident/TridentKafkaConfig.java
@@ -13,8 +13,8 @@ public class TridentKafkaConfig extends KafkaConfig {
         super(hosts, topic);
     }
 
-	public TridentKafkaConfig(BrokerHosts hosts, String topic, String clientId) {
-		super(hosts, topic, clientId);
-	}
+    public TridentKafkaConfig(BrokerHosts hosts, String topic, String clientId) {
+        super(hosts, topic, clientId);
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java b/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
index 282e67a..ab4ec63 100644
--- a/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
+++ b/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
@@ -34,237 +34,241 @@ import java.util.Map;
  */
 public class TridentKafkaEmitter {
 
-	public static final Logger LOG = LoggerFactory.getLogger(TridentKafkaEmitter.class);
-
-	private DynamicPartitionConnections _connections;
-	private String _topologyName;
-	private KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric;
-	private ReducedMetric _kafkaMeanFetchLatencyMetric;
-	private CombinedMetric _kafkaMaxFetchLatencyMetric;
-	private TridentKafkaConfig _config;
-	private String _topologyInstanceId;
-
-	public TridentKafkaEmitter(Map conf, TopologyContext context, TridentKafkaConfig config, String topologyInstanceId) {
-		_config = config;
-		_topologyInstanceId = topologyInstanceId;
-		_connections = new DynamicPartitionConnections(_config, KafkaUtils.makeBrokerReader(conf, _config));
-		_topologyName = (String) conf.get(Config.TOPOLOGY_NAME);
-		_kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(_config.topic, _connections);
-		context.registerMetric("kafkaOffset", _kafkaOffsetMetric, 60);
-		_kafkaMeanFetchLatencyMetric = context.registerMetric("kafkaFetchAvg", new MeanReducer(), 60);
-		_kafkaMaxFetchLatencyMetric = context.registerMetric("kafkaFetchMax", new MaxMetric(), 60);
-	}
-
-
-	private Map failFastEmitNewPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map lastMeta) {
-		SimpleConsumer consumer = _connections.register(partition);
-		Map ret = doEmitNewPartitionBatch(consumer, partition, collector, lastMeta);
-		_kafkaOffsetMetric.setLatestEmittedOffset(partition, (Long) ret.get("offset"));
-		return ret;
-	}
-
-	private Map emitNewPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map lastMeta) {
-		try {
-			return failFastEmitNewPartitionBatch(attempt, collector, partition, lastMeta);
-		} catch (FailedFetchException e) {
-			LOG.warn("Failed to fetch from partition " + partition);
-			if (lastMeta == null) {
-				return null;
-			} else {
-				Map ret = new HashMap();
-				ret.put("offset", lastMeta.get("nextOffset"));
-				ret.put("nextOffset", lastMeta.get("nextOffset"));
-				ret.put("partition", partition.partition);
-				ret.put("broker", ImmutableMap.of("host", partition.host.host, "port", partition.host.port));
-				ret.put("topic", _config.topic);
-				ret.put("topology", ImmutableMap.of("name", _topologyName, "id", _topologyInstanceId));
-				return ret;
-			}
-		}
-	}
-
-	private Map doEmitNewPartitionBatch(SimpleConsumer consumer, Partition partition, TridentCollector collector, Map lastMeta) {
-		long offset;
-		if (lastMeta != null) {
-			String lastInstanceId = null;
-			Map lastTopoMeta = (Map) lastMeta.get("topology");
-			if (lastTopoMeta != null) {
-				lastInstanceId = (String) lastTopoMeta.get("id");
-			}
-			if (_config.forceFromStart && !_topologyInstanceId.equals(lastInstanceId)) {
-				offset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, _config.startOffsetTime);
-			} else {
-				offset = (Long) lastMeta.get("nextOffset");
-			}
-		} else {
-			long startTime = kafka.api.OffsetRequest.LatestTime();
-			if (_config.forceFromStart) startTime = _config.startOffsetTime;
-			offset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, startTime);
-		}
-		ByteBufferMessageSet msgs;
-		try {
-			msgs = fetchMessages(consumer, partition, offset);
-		} catch (Exception e) {
-			if (e instanceof ConnectException) {
-				throw new FailedFetchException(e);
-			} else {
-				throw new RuntimeException(e);
-			}
-		}
-		long endoffset = offset;
-		for (MessageAndOffset msg : msgs) {
-			emit(collector, msg.message());
-			endoffset = msg.nextOffset();
-		}
-		Map newMeta = new HashMap();
-		newMeta.put("offset", offset);
-		newMeta.put("nextOffset", endoffset);
-		newMeta.put("instanceId", _topologyInstanceId);
-		newMeta.put("partition", partition.partition);
-		newMeta.put("broker", ImmutableMap.of("host", partition.host.host, "port", partition.host.port));
-		newMeta.put("topic", _config.topic);
-		newMeta.put("topology", ImmutableMap.of("name", _topologyName, "id", _topologyInstanceId));
-		return newMeta;
-	}
-
-	private ByteBufferMessageSet fetchMessages(SimpleConsumer consumer, Partition partition, long offset) {
-		ByteBufferMessageSet msgs;
-		long start = System.nanoTime();
-		FetchRequestBuilder builder = new FetchRequestBuilder();
-		FetchRequest fetchRequest = builder.addFetch(_config.topic, partition.partition, offset, _config.fetchSizeBytes).clientId(_config.clientId).build();
-		msgs = consumer.fetch(fetchRequest).messageSet(_config.topic, partition.partition);
-		long end = System.nanoTime();
-		long millis = (end - start) / 1000000;
-		_kafkaMeanFetchLatencyMetric.update(millis);
-		_kafkaMaxFetchLatencyMetric.update(millis);
-		return msgs;
-	}
-
-	/**
-	 * re-emit the batch described by the meta data provided
-	 *
-	 * @param attempt
-	 * @param collector
-	 * @param partition
-	 * @param meta
-	 */
-	private void reEmitPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map meta) {
-		LOG.info("re-emitting batch, attempt " + attempt);
-		String instanceId = (String) meta.get("instanceId");
-		if (!_config.forceFromStart || instanceId.equals(_topologyInstanceId)) {
-			SimpleConsumer consumer = _connections.register(partition);
-			long offset = (Long) meta.get("offset");
-			long nextOffset = (Long) meta.get("nextOffset");
-			ByteBufferMessageSet msgs = fetchMessages(consumer, partition, offset);
-			for (MessageAndOffset msg : msgs) {
-				if (offset == nextOffset) break;
-				if (offset > nextOffset) {
-					throw new RuntimeException("Error when re-emitting batch. overshot the end offset");
-				}
-				emit(collector, msg.message());
-				offset = msg.nextOffset();
-			}
-		}
-	}
-
-	private void emit(TridentCollector collector, Message msg) {
-		Iterable<List<Object>> values =
-				_config.scheme.deserialize(Utils.toByteArray(msg.payload()));
-		if (values != null) {
-			for (List<Object> value : values) {
-				collector.emit(value);
-			}
-		}
-	}
-
-	private void clear() {
-		_connections.clear();
-	}
-
-	private List<Partition> orderPartitions(GlobalPartitionInformation partitions) {
-		return partitions.getOrderedPartitions();
-	}
-
-	private void refresh(List<Partition> list) {
-		_connections.clear();
-		_kafkaOffsetMetric.refreshPartitions(new HashSet<Partition>(list));
-	}
-
-
-	public IOpaquePartitionedTridentSpout.Emitter<GlobalPartitionInformation, Partition, Map> asOpaqueEmitter() {
-
-		return new IOpaquePartitionedTridentSpout.Emitter<GlobalPartitionInformation, Partition, Map>() {
-
-			/**
-			 * Emit a batch of tuples for a partition/transaction.
-			 *
-			 * Return the metadata describing this batch that will be used as lastPartitionMeta
-			 * for defining the parameters of the next batch.
-			 */
-			@Override
-			public Map emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) {
-				return emitNewPartitionBatch(transactionAttempt, tridentCollector, partition, map);
-			}
-
-			@Override
-			public void refreshPartitions(List<Partition> partitions) {
-				refresh(partitions);
-			}
-
-			@Override
-			public List<Partition> getOrderedPartitions(GlobalPartitionInformation partitionInformation) {
-				return orderPartitions(partitionInformation);
-			}
-
-			@Override
-			public void close() {
-				clear();
-			}
-		};
-	}
-
-	public IPartitionedTridentSpout.Emitter asTransactionalEmitter() {
-		return new IPartitionedTridentSpout.Emitter<GlobalPartitionInformation, Partition, Map>() {
-
-			/**
-			 * Emit a batch of tuples for a partition/transaction that's never been emitted before.
-			 * Return the metadata that can be used to reconstruct this partition/batch in the future.
-			 */
-			@Override
-			public Map emitPartitionBatchNew(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) {
-				return failFastEmitNewPartitionBatch(transactionAttempt, tridentCollector, partition, map);
-			}
-
-			/**
-			 * Emit a batch of tuples for a partition/transaction that has been emitted before, using
-			 * the metadata created when it was first emitted.
-			 */
-			@Override
-			public void emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) {
-				reEmitPartitionBatch(transactionAttempt, tridentCollector, partition, map);
-			}
-
-			/**
-			 * This method is called when this task is responsible for a new set of partitions. Should be used
-			 * to manage things like connections to brokers.
-			 */
-			@Override
-			public void refreshPartitions(List<Partition> partitions) {
-				refresh(partitions);
-			}
-
-			@Override
-			public List<Partition> getOrderedPartitions(GlobalPartitionInformation partitionInformation) {
-				return orderPartitions(partitionInformation);
-			}
-
-			@Override
-			public void close() {
-				clear();
-			}
-		};
-
-	}
+    public static final Logger LOG = LoggerFactory.getLogger(TridentKafkaEmitter.class);
+
+    private DynamicPartitionConnections _connections;
+    private String _topologyName;
+    private KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric;
+    private ReducedMetric _kafkaMeanFetchLatencyMetric;
+    private CombinedMetric _kafkaMaxFetchLatencyMetric;
+    private TridentKafkaConfig _config;
+    private String _topologyInstanceId;
+
+    public TridentKafkaEmitter(Map conf, TopologyContext context, TridentKafkaConfig config, String topologyInstanceId) {
+        _config = config;
+        _topologyInstanceId = topologyInstanceId;
+        _connections = new DynamicPartitionConnections(_config, KafkaUtils.makeBrokerReader(conf, _config));
+        _topologyName = (String) conf.get(Config.TOPOLOGY_NAME);
+        _kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(_config.topic, _connections);
+        context.registerMetric("kafkaOffset", _kafkaOffsetMetric, 60);
+        _kafkaMeanFetchLatencyMetric = context.registerMetric("kafkaFetchAvg", new MeanReducer(), 60);
+        _kafkaMaxFetchLatencyMetric = context.registerMetric("kafkaFetchMax", new MaxMetric(), 60);
+    }
+
+
+    private Map failFastEmitNewPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map lastMeta) {
+        SimpleConsumer consumer = _connections.register(partition);
+        Map ret = doEmitNewPartitionBatch(consumer, partition, collector, lastMeta);
+        _kafkaOffsetMetric.setLatestEmittedOffset(partition, (Long) ret.get("offset"));
+        return ret;
+    }
+
+    private Map emitNewPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map lastMeta) {
+        try {
+            return failFastEmitNewPartitionBatch(attempt, collector, partition, lastMeta);
+        } catch (FailedFetchException e) {
+            LOG.warn("Failed to fetch from partition " + partition);
+            if (lastMeta == null) {
+                return null;
+            } else {
+                Map ret = new HashMap();
+                ret.put("offset", lastMeta.get("nextOffset"));
+                ret.put("nextOffset", lastMeta.get("nextOffset"));
+                ret.put("partition", partition.partition);
+                ret.put("broker", ImmutableMap.of("host", partition.host.host, "port", partition.host.port));
+                ret.put("topic", _config.topic);
+                ret.put("topology", ImmutableMap.of("name", _topologyName, "id", _topologyInstanceId));
+                return ret;
+            }
+        }
+    }
+
+    private Map doEmitNewPartitionBatch(SimpleConsumer consumer, Partition partition, TridentCollector collector, Map lastMeta) {
+        long offset;
+        if (lastMeta != null) {
+            String lastInstanceId = null;
+            Map lastTopoMeta = (Map) lastMeta.get("topology");
+            if (lastTopoMeta != null) {
+                lastInstanceId = (String) lastTopoMeta.get("id");
+            }
+            if (_config.forceFromStart && !_topologyInstanceId.equals(lastInstanceId)) {
+                offset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, _config.startOffsetTime);
+            } else {
+                offset = (Long) lastMeta.get("nextOffset");
+            }
+        } else {
+            long startTime = kafka.api.OffsetRequest.LatestTime();
+            if (_config.forceFromStart) {
+                startTime = _config.startOffsetTime;
+            }
+            offset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, startTime);
+        }
+        ByteBufferMessageSet msgs;
+        try {
+            msgs = fetchMessages(consumer, partition, offset);
+        } catch (Exception e) {
+            if (e instanceof ConnectException) {
+                throw new FailedFetchException(e);
+            } else {
+                throw new RuntimeException(e);
+            }
+        }
+        long endoffset = offset;
+        for (MessageAndOffset msg : msgs) {
+            emit(collector, msg.message());
+            endoffset = msg.nextOffset();
+        }
+        Map newMeta = new HashMap();
+        newMeta.put("offset", offset);
+        newMeta.put("nextOffset", endoffset);
+        newMeta.put("instanceId", _topologyInstanceId);
+        newMeta.put("partition", partition.partition);
+        newMeta.put("broker", ImmutableMap.of("host", partition.host.host, "port", partition.host.port));
+        newMeta.put("topic", _config.topic);
+        newMeta.put("topology", ImmutableMap.of("name", _topologyName, "id", _topologyInstanceId));
+        return newMeta;
+    }
+
+    private ByteBufferMessageSet fetchMessages(SimpleConsumer consumer, Partition partition, long offset) {
+        ByteBufferMessageSet msgs;
+        long start = System.nanoTime();
+        FetchRequestBuilder builder = new FetchRequestBuilder();
+        FetchRequest fetchRequest = builder.addFetch(_config.topic, partition.partition, offset, _config.fetchSizeBytes).clientId(_config.clientId).build();
+        msgs = consumer.fetch(fetchRequest).messageSet(_config.topic, partition.partition);
+        long end = System.nanoTime();
+        long millis = (end - start) / 1000000;
+        _kafkaMeanFetchLatencyMetric.update(millis);
+        _kafkaMaxFetchLatencyMetric.update(millis);
+        return msgs;
+    }
+
+    /**
+     * re-emit the batch described by the meta data provided
+     *
+     * @param attempt
+     * @param collector
+     * @param partition
+     * @param meta
+     */
+    private void reEmitPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map meta) {
+        LOG.info("re-emitting batch, attempt " + attempt);
+        String instanceId = (String) meta.get("instanceId");
+        if (!_config.forceFromStart || instanceId.equals(_topologyInstanceId)) {
+            SimpleConsumer consumer = _connections.register(partition);
+            long offset = (Long) meta.get("offset");
+            long nextOffset = (Long) meta.get("nextOffset");
+            ByteBufferMessageSet msgs = fetchMessages(consumer, partition, offset);
+            for (MessageAndOffset msg : msgs) {
+                if (offset == nextOffset) {
+                    break;
+                }
+                if (offset > nextOffset) {
+                    throw new RuntimeException("Error when re-emitting batch. overshot the end offset");
+                }
+                emit(collector, msg.message());
+                offset = msg.nextOffset();
+            }
+        }
+    }
+
+    private void emit(TridentCollector collector, Message msg) {
+        Iterable<List<Object>> values =
+                _config.scheme.deserialize(Utils.toByteArray(msg.payload()));
+        if (values != null) {
+            for (List<Object> value : values) {
+                collector.emit(value);
+            }
+        }
+    }
+
+    private void clear() {
+        _connections.clear();
+    }
+
+    private List<Partition> orderPartitions(GlobalPartitionInformation partitions) {
+        return partitions.getOrderedPartitions();
+    }
+
+    private void refresh(List<Partition> list) {
+        _connections.clear();
+        _kafkaOffsetMetric.refreshPartitions(new HashSet<Partition>(list));
+    }
+
+
+    public IOpaquePartitionedTridentSpout.Emitter<GlobalPartitionInformation, Partition, Map> asOpaqueEmitter() {
+
+        return new IOpaquePartitionedTridentSpout.Emitter<GlobalPartitionInformation, Partition, Map>() {
+
+            /**
+             * Emit a batch of tuples for a partition/transaction.
+             *
+             * Return the metadata describing this batch that will be used as lastPartitionMeta
+             * for defining the parameters of the next batch.
+             */
+            @Override
+            public Map emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) {
+                return emitNewPartitionBatch(transactionAttempt, tridentCollector, partition, map);
+            }
+
+            @Override
+            public void refreshPartitions(List<Partition> partitions) {
+                refresh(partitions);
+            }
+
+            @Override
+            public List<Partition> getOrderedPartitions(GlobalPartitionInformation partitionInformation) {
+                return orderPartitions(partitionInformation);
+            }
+
+            @Override
+            public void close() {
+                clear();
+            }
+        };
+    }
+
+    public IPartitionedTridentSpout.Emitter asTransactionalEmitter() {
+        return new IPartitionedTridentSpout.Emitter<GlobalPartitionInformation, Partition, Map>() {
+
+            /**
+             * Emit a batch of tuples for a partition/transaction that's never been emitted before.
+             * Return the metadata that can be used to reconstruct this partition/batch in the future.
+             */
+            @Override
+            public Map emitPartitionBatchNew(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) {
+                return failFastEmitNewPartitionBatch(transactionAttempt, tridentCollector, partition, map);
+            }
+
+            /**
+             * Emit a batch of tuples for a partition/transaction that has been emitted before, using
+             * the metadata created when it was first emitted.
+             */
+            @Override
+            public void emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) {
+                reEmitPartitionBatch(transactionAttempt, tridentCollector, partition, map);
+            }
+
+            /**
+             * This method is called when this task is responsible for a new set of partitions. Should be used
+             * to manage things like connections to brokers.
+             */
+            @Override
+            public void refreshPartitions(List<Partition> partitions) {
+                refresh(partitions);
+            }
+
+            @Override
+            public List<Partition> getOrderedPartitions(GlobalPartitionInformation partitionInformation) {
+                return orderPartitions(partitionInformation);
+            }
+
+            @Override
+            public void close() {
+                clear();
+            }
+        };
+
+    }
 
 
 }