You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/11/10 03:36:39 UTC

[1/5] storm git commit: STORM-1164. Code cleanup for typos, warnings and conciseness

Repository: storm
Updated Branches:
  refs/heads/master 5a4e1f85b -> e25f28f71


http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/utils/KeyedRoundRobinQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/KeyedRoundRobinQueue.java b/storm-core/src/jvm/backtype/storm/utils/KeyedRoundRobinQueue.java
index 3cb455d..4ecdda6 100644
--- a/storm-core/src/jvm/backtype/storm/utils/KeyedRoundRobinQueue.java
+++ b/storm-core/src/jvm/backtype/storm/utils/KeyedRoundRobinQueue.java
@@ -28,15 +28,15 @@ import java.util.concurrent.Semaphore;
 public class KeyedRoundRobinQueue<V> {
     private final Object _lock = new Object();
     private Semaphore _size = new Semaphore(0);
-    private Map<Object, Queue<V>> _queues = new HashMap<Object, Queue<V>>();
-    private List<Object> _keyOrder = new ArrayList<Object>();
+    private Map<Object, Queue<V>> _queues = new HashMap<>();
+    private List<Object> _keyOrder = new ArrayList<>();
     private int _currIndex = 0;
 
     public void add(Object key, V val) {
         synchronized(_lock) {
             Queue<V> queue = _queues.get(key);
             if(queue==null) {
-                queue = new LinkedList<V>();
+                queue = new LinkedList<>();
                 _queues.put(key, queue);
                 _keyOrder.add(key);
             }

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/utils/ListDelegate.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/ListDelegate.java b/storm-core/src/jvm/backtype/storm/utils/ListDelegate.java
index 1e091f0..d198a72 100644
--- a/storm-core/src/jvm/backtype/storm/utils/ListDelegate.java
+++ b/storm-core/src/jvm/backtype/storm/utils/ListDelegate.java
@@ -27,7 +27,7 @@ public class ListDelegate implements List<Object> {
     private List<Object> _delegate;
     
     public ListDelegate() {
-    	_delegate = new ArrayList<Object>();
+    	_delegate = new ArrayList<>();
     }
     
     public void setDelegate(List<Object> delegate) {
@@ -84,12 +84,12 @@ public class ListDelegate implements List<Object> {
     }
 
     @Override
-    public boolean addAll(Collection<? extends Object> clctn) {
+    public boolean addAll(Collection<?> clctn) {
         return _delegate.addAll(clctn);
     }
 
     @Override
-    public boolean addAll(int i, Collection<? extends Object> clctn) {
+    public boolean addAll(int i, Collection<?> clctn) {
         return _delegate.addAll(i, clctn);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/utils/Monitor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/Monitor.java b/storm-core/src/jvm/backtype/storm/utils/Monitor.java
index 36fedc4..41717a9 100644
--- a/storm-core/src/jvm/backtype/storm/utils/Monitor.java
+++ b/storm-core/src/jvm/backtype/storm/utils/Monitor.java
@@ -20,7 +20,6 @@ package backtype.storm.utils;
 import backtype.storm.generated.*;
 
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 
 public class Monitor {
@@ -98,7 +97,7 @@ public class Monitor {
     }
 
     private HashSet<String> getComponents(Nimbus.Client client, String topology) throws Exception{
-        HashSet<String> components = new HashSet<String>();
+        HashSet<String> components = new HashSet<>();
         ClusterSummary clusterSummary = client.getClusterInfo();
         TopologySummary topologySummary = null;
         for (TopologySummary ts: clusterSummary.get_topologies()) {

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java b/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
index a715de2..a9306f7 100644
--- a/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
+++ b/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
@@ -22,13 +22,8 @@ import backtype.storm.Config;
 import backtype.storm.generated.ClusterSummary;
 import backtype.storm.generated.Nimbus;
 import backtype.storm.generated.NimbusSummary;
-import backtype.storm.nimbus.ILeaderElector;
-import backtype.storm.nimbus.NimbusInfo;
 import backtype.storm.security.auth.ThriftClient;
 import backtype.storm.security.auth.ThriftConnectionType;
-import clojure.lang.IFn;
-import clojure.lang.PersistentArrayMap;
-import com.google.common.base.Splitter;
 import com.google.common.collect.Lists;
 import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
@@ -38,7 +33,6 @@ import java.util.List;
 import java.util.Map;
 
 public class NimbusClient extends ThriftClient {
-    public static final String DELIMITER = ":";
     private Nimbus.Client _client;
     private static final Logger LOG = LoggerFactory.getLogger(NimbusClient.class);
 
@@ -56,7 +50,7 @@ public class NimbusClient extends ThriftClient {
             asUser = (String) conf.get(Config.STORM_DO_AS_USER);
         }
 
-        List<String> seeds = null;
+        List<String> seeds;
         if(conf.containsKey(Config.NIMBUS_HOST)) {
             LOG.warn("Using deprecated config {} for backward compatibility. Please update your storm.yaml so it only has config {}",
                     Config.NIMBUS_HOST, Config.NIMBUS_SEEDS);
@@ -67,7 +61,7 @@ public class NimbusClient extends ThriftClient {
 
         for (String host : seeds) {
             int port = Integer.parseInt(conf.get(Config.NIMBUS_THRIFT_PORT).toString());
-            ClusterSummary clusterInfo = null;
+            ClusterSummary clusterInfo;
             try {
                 NimbusClient client = new NimbusClient(conf, host, port);
                 clusterInfo = client.getClient().getClusterInfo();

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/utils/RegisteredGlobalState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/RegisteredGlobalState.java b/storm-core/src/jvm/backtype/storm/utils/RegisteredGlobalState.java
index 48053fc..159a7c1 100644
--- a/storm-core/src/jvm/backtype/storm/utils/RegisteredGlobalState.java
+++ b/storm-core/src/jvm/backtype/storm/utils/RegisteredGlobalState.java
@@ -27,7 +27,7 @@ import java.util.UUID;
  * tuples.
  */
 public class RegisteredGlobalState {
-    private static HashMap<String, Object> _states = new HashMap<String, Object>();
+    private static HashMap<String, Object> _states = new HashMap<>();
     private static final Object _lock = new Object();
     
     public static Object globalLock() {
@@ -50,9 +50,7 @@ public class RegisteredGlobalState {
     
     public static Object getState(String id) {
         synchronized(_lock) {
-            Object ret = _states.get(id);
-            //System.out.println("State: " + ret.toString());
-            return ret;
+            return _states.get(id);
         }        
     }
     

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/utils/RotatingMap.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/RotatingMap.java b/storm-core/src/jvm/backtype/storm/utils/RotatingMap.java
index 2b8d66b..03d1bf8 100644
--- a/storm-core/src/jvm/backtype/storm/utils/RotatingMap.java
+++ b/storm-core/src/jvm/backtype/storm/utils/RotatingMap.java
@@ -53,7 +53,7 @@ public class RotatingMap<K, V> {
         if(numBuckets<2) {
             throw new IllegalArgumentException("numBuckets must be >= 2");
         }
-        _buckets = new LinkedList<HashMap<K, V>>();
+        _buckets = new LinkedList<>();
         for(int i=0; i<numBuckets; i++) {
             _buckets.add(new HashMap<K, V>());
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/utils/ServiceRegistry.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/ServiceRegistry.java b/storm-core/src/jvm/backtype/storm/utils/ServiceRegistry.java
index 724bc3e..d5c2eca 100644
--- a/storm-core/src/jvm/backtype/storm/utils/ServiceRegistry.java
+++ b/storm-core/src/jvm/backtype/storm/utils/ServiceRegistry.java
@@ -22,7 +22,7 @@ import java.util.UUID;
 
 // this class should be combined with RegisteredGlobalState
 public class ServiceRegistry {
-    private static HashMap<String, Object> _services = new HashMap<String, Object>();
+    private static HashMap<String, Object> _services = new HashMap<>();
     private static final Object _lock = new Object();
     
     public static String registerService(Object service) {

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/utils/StormBoundedExponentialBackoffRetry.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/StormBoundedExponentialBackoffRetry.java b/storm-core/src/jvm/backtype/storm/utils/StormBoundedExponentialBackoffRetry.java
index 5422c2e..be6f0e9 100644
--- a/storm-core/src/jvm/backtype/storm/utils/StormBoundedExponentialBackoffRetry.java
+++ b/storm-core/src/jvm/backtype/storm/utils/StormBoundedExponentialBackoffRetry.java
@@ -42,8 +42,9 @@ public class StormBoundedExponentialBackoffRetry extends BoundedExponentialBacko
     public StormBoundedExponentialBackoffRetry(int baseSleepTimeMs, int maxSleepTimeMs, int maxRetries) {
         super(baseSleepTimeMs, maxSleepTimeMs, maxRetries);
         expRetriesThreshold = 1;
-        while ((1 << (expRetriesThreshold + 1)) < ((maxSleepTimeMs - baseSleepTimeMs) / 2))
+        while ((1 << (expRetriesThreshold + 1)) < ((maxSleepTimeMs - baseSleepTimeMs) / 2)) {
             expRetriesThreshold++;
+        }
         LOG.debug("The baseSleepTimeMs [{}] the maxSleepTimeMs [{}] the maxRetries [{}]", 
 				baseSleepTimeMs, maxSleepTimeMs, maxRetries);
         if (baseSleepTimeMs > maxSleepTimeMs) {

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/utils/VersionInfo.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/VersionInfo.java b/storm-core/src/jvm/backtype/storm/utils/VersionInfo.java
index 1740e18..f3586bb 100644
--- a/storm-core/src/jvm/backtype/storm/utils/VersionInfo.java
+++ b/storm-core/src/jvm/backtype/storm/utils/VersionInfo.java
@@ -86,7 +86,7 @@ public class VersionInfo {
   }
 
 
-  private static VersionInfo COMMON_VERSION_INFO = new VersionInfo("storm-core");
+  private static final VersionInfo COMMON_VERSION_INFO = new VersionInfo("storm-core");
 
   public static String getVersion() {
     return COMMON_VERSION_INFO._getVersion();

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/storm/trident/TridentTopology.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/TridentTopology.java b/storm-core/src/jvm/storm/trident/TridentTopology.java
index 8164717..58c83e4 100644
--- a/storm-core/src/jvm/storm/trident/TridentTopology.java
+++ b/storm-core/src/jvm/storm/trident/TridentTopology.java
@@ -87,16 +87,17 @@ import storm.trident.util.TridentUtils;
 // all operations have finishBatch and can optionally be committers
 public class TridentTopology {
     
-    //TODO: add a method for drpc stream, needs to know how to automatically do returnresults, etc
+    //TODO: add a method for drpc stream, needs to know how to automatically do return results, etc
     // is it too expensive to do a batch per drpc request?
     
-    DefaultDirectedGraph<Node, IndexedEdge> _graph;
-    Map<String, List<Node>> _colocate = new HashMap();
-    UniqueIdGen _gen;
+    final DefaultDirectedGraph<Node, IndexedEdge> _graph;
+    final Map<String, List<Node>> _colocate;
+    final UniqueIdGen _gen;
     
     public TridentTopology() {
-        _graph = new DefaultDirectedGraph(new ErrorEdgeFactory());
-        _gen = new UniqueIdGen();
+        this(new DefaultDirectedGraph<Node, IndexedEdge>(new ErrorEdgeFactory()),
+                new HashMap<String, List<Node>>(),
+                new UniqueIdGen());
     }
     
     private TridentTopology(DefaultDirectedGraph<Node, IndexedEdge> graph, Map<String, List<Node>> colocate, UniqueIdGen gen) {
@@ -195,7 +196,7 @@ public class TridentTopology {
     }    
     
     public Stream multiReduce(List<Fields> inputFields, List<Stream> streams, MultiReducer function, Fields outputFields) {
-        List<String> names = new ArrayList<String>();
+        List<String> names = new ArrayList<>();
         for(Stream s: streams) {
             if(s._name!=null) {
                 names.add(s._name);
@@ -206,9 +207,9 @@ public class TridentTopology {
     }
     
     public Stream multiReduce(List<Fields> inputFields, List<GroupedStream> groupedStreams, GroupedMultiReducer function, Fields outputFields) {
-        List<Fields> fullInputFields = new ArrayList<Fields>();
-        List<Stream> streams = new ArrayList<Stream>();
-        List<Fields> fullGroupFields = new ArrayList<Fields>();
+        List<Fields> fullInputFields = new ArrayList<>();
+        List<Stream> streams = new ArrayList<>();
+        List<Fields> fullGroupFields = new ArrayList<>();
         for(int i=0; i<groupedStreams.size(); i++) {
             GroupedStream gs = groupedStreams.get(i);
             Fields groupFields = gs.getGroupFields();
@@ -270,10 +271,10 @@ public class TridentTopology {
         
         completeDRPC(graph, _colocate, _gen);
         
-        List<SpoutNode> spoutNodes = new ArrayList<SpoutNode>();
+        List<SpoutNode> spoutNodes = new ArrayList<>();
         
         // can be regular nodes (static state) or processor nodes
-        Set<Node> boltNodes = new HashSet<Node>();
+        Set<Node> boltNodes = new HashSet<>();
         for(Node n: graph.vertexSet()) {
             if(n instanceof SpoutNode) {
                 spoutNodes.add((SpoutNode) n);
@@ -283,7 +284,7 @@ public class TridentTopology {
         }
         
         
-        Set<Group> initialGroups = new HashSet<Group>();
+        Set<Group> initialGroups = new HashSet<>();
         for(List<Node> colocate: _colocate.values()) {
             Group g = new Group(graph, colocate);
             boltNodes.removeAll(colocate);
@@ -301,7 +302,7 @@ public class TridentTopology {
         
         
         // add identity partitions between groups
-        for(IndexedEdge<Node> e: new HashSet<IndexedEdge>(graph.edgeSet())) {
+        for(IndexedEdge<Node> e: new HashSet<>(graph.edgeSet())) {
             if(!(e.source instanceof PartitionNode) && !(e.target instanceof PartitionNode)) {                
                 Group g1 = grouper.nodeGroup(e.source);
                 Group g2 = grouper.nodeGroup(e.target);
@@ -325,7 +326,7 @@ public class TridentTopology {
         // this is because can't currently merge splitting logic into a spout
         // not the most kosher algorithm here, since the grouper indexes are being trounced via the adding of nodes to random groups, but it 
         // works out
-        List<Node> forNewGroups = new ArrayList<Node>();
+        List<Node> forNewGroups = new ArrayList<>();
         for(Group g: mergedGroups) {
             for(PartitionNode n: extraPartitionInputs(g)) {
                 Node idNode = makeIdentityNode(n.allOutputFields);
@@ -350,7 +351,7 @@ public class TridentTopology {
             }
         }
         // TODO: in the future, want a way to include this logic in the spout itself,
-        // or make it unecessary by having storm include metadata about which grouping a tuple
+        // or make it unnecessary by having storm include metadata about which grouping a tuple
         // came from
         
         for(Node n: forNewGroups) {
@@ -366,8 +367,8 @@ public class TridentTopology {
         mergedGroups = grouper.getAllGroups();
                 
         
-        Map<Node, String> batchGroupMap = new HashMap();
-        List<Set<Node>> connectedComponents = new ConnectivityInspector<Node, IndexedEdge>(graph).connectedSets();
+        Map<Node, String> batchGroupMap = new HashMap<>();
+        List<Set<Node>> connectedComponents = new ConnectivityInspector<>(graph).connectedSets();
         for(int i=0; i<connectedComponents.size(); i++) {
             String groupId = "bg" + i;
             for(Node n: connectedComponents.get(i)) {
@@ -413,14 +414,10 @@ public class TridentTopology {
                 Collection<PartitionNode> inputs = uniquedSubscriptions(externalGroupInputs(g));
                 for(PartitionNode n: inputs) {
                     Node parent = TridentUtils.getParent(graph, n);
-                    String componentId;
-                    if(parent instanceof SpoutNode) {
-                        componentId = spoutIds.get(parent);
-                    } else {
-                        componentId = boltIds.get(grouper.nodeGroup(parent));
-                    }
+                    String componentId = parent instanceof SpoutNode ?
+                            spoutIds.get(parent) : boltIds.get(grouper.nodeGroup(parent));
                     d.grouping(new GlobalStreamId(componentId, n.streamId), n.thriftGrouping);
-                } 
+                }
             }
         }
         
@@ -428,7 +425,7 @@ public class TridentTopology {
     }
     
     private static void completeDRPC(DefaultDirectedGraph<Node, IndexedEdge> graph, Map<String, List<Node>> colocate, UniqueIdGen gen) {
-        List<Set<Node>> connectedComponents = new ConnectivityInspector<Node, IndexedEdge>(graph).connectedSets();
+        List<Set<Node>> connectedComponents = new ConnectivityInspector<>(graph).connectedSets();
         
         for(Set<Node> g: connectedComponents) {
             checkValidJoins(g);
@@ -496,7 +493,7 @@ public class TridentTopology {
     }
     
     private static Collection<PartitionNode> uniquedSubscriptions(Set<PartitionNode> subscriptions) {
-        Map<String, PartitionNode> ret = new HashMap();
+        Map<String, PartitionNode> ret = new HashMap<>();
         for(PartitionNode n: subscriptions) {
             PartitionNode curr = ret.get(n.streamId);
             if(curr!=null && !curr.thriftGrouping.equals(n.thriftGrouping)) {
@@ -508,7 +505,7 @@ public class TridentTopology {
     }
 
     private static Map<Node, String> genSpoutIds(Collection<SpoutNode> spoutNodes) {
-        Map<Node, String> ret = new HashMap();
+        Map<Node, String> ret = new HashMap<>();
         int ctr = 0;
         for(SpoutNode n: spoutNodes) {
             if (n.type == SpoutNode.SpoutType.BATCH) { // if Batch spout then id contains txId
@@ -525,11 +522,11 @@ public class TridentTopology {
     }
 
     private static Map<Group, String> genBoltIds(Collection<Group> groups) {
-        Map<Group, String> ret = new HashMap();
+        Map<Group, String> ret = new HashMap<>();
         int ctr = 0;
         for(Group g: groups) {
             if(!isSpoutGroup(g)) {
-                List<String> name = new ArrayList();
+                List<String> name = new ArrayList<>();
                 name.add("b");
                 name.add("" + ctr);
                 String groupName = getGroupName(g);
@@ -544,13 +541,13 @@ public class TridentTopology {
     }
     
     private static String getGroupName(Group g) {
-        TreeMap<Integer, String> sortedNames = new TreeMap();
+        TreeMap<Integer, String> sortedNames = new TreeMap<>();
         for(Node n: g.nodes) {
             if(n.name!=null) {
                 sortedNames.put(n.creationIndex, n.name);
             }
         }
-        List<String> names = new ArrayList<String>();
+        List<String> names = new ArrayList<>();
         String prevName = null;
         for(String n: sortedNames.values()) {
             if(prevName==null || !n.equals(prevName)) {
@@ -562,7 +559,7 @@ public class TridentTopology {
     }
     
     private static Map<String, String> getOutputStreamBatchGroups(Group g, Map<Node, String> batchGroupMap) {
-        Map<String, String> ret = new HashMap();
+        Map<String, String> ret = new HashMap<>();
         Set<PartitionNode> externalGroupOutputs = externalGroupOutputs(g);
         for(PartitionNode n: externalGroupOutputs) {
             ret.put(n.streamId, batchGroupMap.get(n));
@@ -571,7 +568,7 @@ public class TridentTopology {
     }
     
     private static Set<String> committerBatches(Group g, Map<Node, String> batchGroupMap) {
-        Set<String> ret = new HashSet();
+        Set<String> ret = new HashSet<>();
         for(Node n: g.nodes) {
            if(n instanceof ProcessorNode) {
                if(((ProcessorNode) n).committer) {
@@ -583,7 +580,7 @@ public class TridentTopology {
     }
     
     private static Map<Group, Integer> getGroupParallelisms(DirectedGraph<Node, IndexedEdge> graph, GraphGrouper grouper, Collection<Group> groups) {
-        UndirectedGraph<Group, Object> equivs = new Pseudograph<Group, Object>(Object.class);
+        UndirectedGraph<Group, Object> equivs = new Pseudograph<>(Object.class);
         for(Group g: groups) {
             equivs.addVertex(g);
         }
@@ -599,8 +596,8 @@ public class TridentTopology {
             }            
         }
         
-        Map<Group, Integer> ret = new HashMap();
-        List<Set<Group>> equivGroups = new ConnectivityInspector<Group, Object>(equivs).connectedSets();
+        Map<Group, Integer> ret = new HashMap<>();
+        List<Set<Group>> equivGroups = new ConnectivityInspector<>(equivs).connectedSets();
         for(Set<Group> equivGroup: equivGroups) {
             Integer fixedP = getFixedParallelism(equivGroup);
             Integer maxP = getMaxParallelism(equivGroup);
@@ -689,9 +686,9 @@ public class TridentTopology {
     }
     
     private static List<PartitionNode> extraPartitionInputs(Group g) {
-        List<PartitionNode> ret = new ArrayList();
+        List<PartitionNode> ret = new ArrayList<>();
         Set<PartitionNode> inputs = externalGroupInputs(g);
-        Map<String, List<PartitionNode>> grouped = new HashMap();
+        Map<String, List<PartitionNode>> grouped = new HashMap<>();
         for(PartitionNode n: inputs) {
             if(!grouped.containsKey(n.streamId)) {
                 grouped.put(n.streamId, new ArrayList());
@@ -711,7 +708,7 @@ public class TridentTopology {
     }
     
     private static Set<PartitionNode> externalGroupInputs(Group g) {
-        Set<PartitionNode> ret = new HashSet();
+        Set<PartitionNode> ret = new HashSet<>();
         for(Node n: g.incomingNodes()) {
             if(n instanceof PartitionNode) {
                 ret.add((PartitionNode) n);
@@ -721,7 +718,7 @@ public class TridentTopology {
     }
     
     private static Set<PartitionNode> externalGroupOutputs(Group g) {
-        Set<PartitionNode> ret = new HashSet();
+        Set<PartitionNode> ret = new HashSet<>();
         for(Node n: g.outgoingNodes()) {
             if(n instanceof PartitionNode) {
                 ret.add((PartitionNode) n);
@@ -788,7 +785,7 @@ public class TridentTopology {
     }       
     
     private static List<Fields> getAllOutputFields(List streams) {
-        List<Fields> ret = new ArrayList<Fields>();
+        List<Fields> ret = new ArrayList<>();
         for(Object o: streams) {
             ret.add(((IAggregatableStream) o).getOutputFields());
         }
@@ -797,7 +794,7 @@ public class TridentTopology {
     
     
     private static List<GroupedStream> groupedStreams(List<Stream> streams, List<Fields> joinFields) {
-        List<GroupedStream> ret = new ArrayList<GroupedStream>();
+        List<GroupedStream> ret = new ArrayList<>();
         for(int i=0; i<streams.size(); i++) {
             ret.add(streams.get(i).groupBy(joinFields.get(i)));
         }
@@ -805,7 +802,7 @@ public class TridentTopology {
     }
     
     private static List<Fields> strippedInputFields(List<Stream> streams, List<Fields> joinFields) {
-        List<Fields> ret = new ArrayList<Fields>();
+        List<Fields> ret = new ArrayList<>();
         for(int i=0; i<streams.size(); i++) {
             ret.add(TridentUtils.fieldsSubtract(streams.get(i).getOutputFields(), joinFields.get(i)));
         }
@@ -813,7 +810,7 @@ public class TridentTopology {
     }
     
     private static List<JoinType> repeat(int n, JoinType type) {
-        List<JoinType> ret = new ArrayList<JoinType>();
+        List<JoinType> ret = new ArrayList<>();
         for(int i=0; i<n; i++) {
             ret.add(type);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/storm/trident/drpc/ReturnResultsReducer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/drpc/ReturnResultsReducer.java b/storm-core/src/jvm/storm/trident/drpc/ReturnResultsReducer.java
index 849fb10..515d3a2 100644
--- a/storm-core/src/jvm/storm/trident/drpc/ReturnResultsReducer.java
+++ b/storm-core/src/jvm/storm/trident/drpc/ReturnResultsReducer.java
@@ -40,7 +40,7 @@ import storm.trident.tuple.TridentTuple;
 
 public class ReturnResultsReducer implements MultiReducer<ReturnResultsState> {
     public static class ReturnResultsState {
-        List<TridentTuple> results = new ArrayList<TridentTuple>();
+        List<TridentTuple> results = new ArrayList<>();
         String returnInfo;
 
         @Override
@@ -50,7 +50,7 @@ public class ReturnResultsReducer implements MultiReducer<ReturnResultsState> {
     }
     boolean local;
     Map conf;
-    Map<List, DRPCInvocationsClient> _clients = new HashMap<List, DRPCInvocationsClient>();
+    Map<List, DRPCInvocationsClient> _clients = new HashMap<>();
     
     
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/storm/trident/fluent/ChainedAggregatorDeclarer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/fluent/ChainedAggregatorDeclarer.java b/storm-core/src/jvm/storm/trident/fluent/ChainedAggregatorDeclarer.java
index 8040e8b..976f983 100644
--- a/storm-core/src/jvm/storm/trident/fluent/ChainedAggregatorDeclarer.java
+++ b/storm-core/src/jvm/storm/trident/fluent/ChainedAggregatorDeclarer.java
@@ -59,7 +59,7 @@ public class ChainedAggregatorDeclarer implements ChainedFullAggregatorDeclarer,
         }
     }
     
-    List<AggSpec> _aggs = new ArrayList<AggSpec>();
+    List<AggSpec> _aggs = new ArrayList<>();
     IAggregatableStream _stream;
     AggType _type = null;
     GlobalAggregationScheme _globalScheme;
@@ -73,8 +73,8 @@ public class ChainedAggregatorDeclarer implements ChainedFullAggregatorDeclarer,
         Fields[] inputFields = new Fields[_aggs.size()];
         Aggregator[] aggs = new Aggregator[_aggs.size()];
         int[] outSizes = new int[_aggs.size()];
-        List<String> allOutFields = new ArrayList<String>();
-        Set<String> allInFields = new HashSet<String>();
+        List<String> allOutFields = new ArrayList<>();
+        Set<String> allInFields = new HashSet<>();
         for(int i=0; i<_aggs.size(); i++) {
             AggSpec spec = _aggs.get(i);
             Fields infields = spec.inFields;
@@ -92,7 +92,7 @@ public class ChainedAggregatorDeclarer implements ChainedFullAggregatorDeclarer,
             throw new IllegalArgumentException("Output fields for chained aggregators must be distinct: " + allOutFields.toString());
         }
         
-        Fields inFields = new Fields(new ArrayList<String>(allInFields));
+        Fields inFields = new Fields(new ArrayList<>(allInFields));
         Fields outFields = new Fields(allOutFields);
         Aggregator combined = new ChainedAggregatorImpl(aggs, inputFields, new ComboList.Factory(outSizes));
         

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/storm/trident/graph/GraphGrouper.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/graph/GraphGrouper.java b/storm-core/src/jvm/storm/trident/graph/GraphGrouper.java
index 94db077..6f34e8c 100644
--- a/storm-core/src/jvm/storm/trident/graph/GraphGrouper.java
+++ b/storm-core/src/jvm/storm/trident/graph/GraphGrouper.java
@@ -28,14 +28,13 @@ import storm.trident.util.IndexedEdge;
 
 
 public class GraphGrouper {
-    
-    DirectedGraph<Node, IndexedEdge> graph;
-    Set<Group> currGroups;
-    Map<Node, Group> groupIndex = new HashMap();
+    final DirectedGraph<Node, IndexedEdge> graph;
+    final Set<Group> currGroups;
+    final Map<Node, Group> groupIndex = new HashMap<>();
     
     public GraphGrouper(DirectedGraph<Node, IndexedEdge> graph, Collection<Group> initialGroups) {
         this.graph = graph;
-        this.currGroups = new HashSet(initialGroups);
+        this.currGroups = new HashSet<>(initialGroups);
         reindex();      
     }
     
@@ -95,7 +94,7 @@ public class GraphGrouper {
     }
     
     public Collection<Group> outgoingGroups(Group g) {
-        Set<Group> ret = new HashSet();
+        Set<Group> ret = new HashSet<>();
         for(Node n: g.outgoingNodes()) {
             Group other = nodeGroup(n);
             if(other==null || !other.equals(g)) {
@@ -106,7 +105,7 @@ public class GraphGrouper {
     }
     
     public Collection<Group> incomingGroups(Group g) {
-        Set<Group> ret = new HashSet();
+        Set<Group> ret = new HashSet<>();
         for(Node n: g.incomingNodes()) {
             Group other = nodeGroup(n);
             if(other==null || !other.equals(g)) {

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/storm/trident/graph/Group.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/graph/Group.java b/storm-core/src/jvm/storm/trident/graph/Group.java
index 8ed0023..64bdec6 100644
--- a/storm-core/src/jvm/storm/trident/graph/Group.java
+++ b/storm-core/src/jvm/storm/trident/graph/Group.java
@@ -29,14 +29,13 @@ import storm.trident.util.TridentUtils;
 
 
 public class Group {
-    public Set<Node> nodes = new HashSet<Node>();
-    private DirectedGraph<Node, IndexedEdge> graph;
-    private String id;
+    public final Set<Node> nodes = new HashSet<>();
+    private final DirectedGraph<Node, IndexedEdge> graph;
+    private final String id = UUID.randomUUID().toString();
     
     public Group(DirectedGraph graph, List<Node> nodes) {
-        init(graph);
-        this.nodes.addAll(nodes);
         this.graph = graph;
+        this.nodes.addAll(nodes);
     }
     
     public Group(DirectedGraph graph, Node n) {
@@ -44,18 +43,13 @@ public class Group {
     }
     
     public Group(Group g1, Group g2) {
-        init(g1.graph);
+        this.graph = g1.graph;
         nodes.addAll(g1.nodes);
         nodes.addAll(g2.nodes);
     }
     
-    private void init(DirectedGraph graph) {
-        this.graph = graph;
-        this.id = UUID.randomUUID().toString();
-    }
-    
     public Set<Node> outgoingNodes() {
-        Set<Node> ret = new HashSet<Node>();
+        Set<Node> ret = new HashSet<>();
         for(Node n: nodes) {
             ret.addAll(TridentUtils.getChildren(graph, n));
         }
@@ -63,7 +57,7 @@ public class Group {
     }
     
     public Set<Node> incomingNodes() {
-        Set<Node> ret = new HashSet<Node>();
+        Set<Node> ret = new HashSet<>();
         for(Node n: nodes) {
             ret.addAll(TridentUtils.getParents(graph, n));
         }        
@@ -77,6 +71,9 @@ public class Group {
 
     @Override
     public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
         return id.equals(((Group) o).id);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/storm/trident/operation/builtin/SnapshotGet.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/operation/builtin/SnapshotGet.java b/storm-core/src/jvm/storm/trident/operation/builtin/SnapshotGet.java
index 6d24ae6..2a44eab 100644
--- a/storm-core/src/jvm/storm/trident/operation/builtin/SnapshotGet.java
+++ b/storm-core/src/jvm/storm/trident/operation/builtin/SnapshotGet.java
@@ -29,9 +29,9 @@ public class SnapshotGet extends BaseQueryFunction<ReadOnlySnapshottable, Object
 
     @Override
     public List<Object> batchRetrieve(ReadOnlySnapshottable state, List<TridentTuple> args) {
-        List<Object> ret = new ArrayList<Object>(args.size());
+        List<Object> ret = new ArrayList<>(args.size());
         Object snapshot = state.get();
-        for(int i=0; i<args.size(); i++) {
+        for (TridentTuple arg : args) {
             ret.add(snapshot);
         }
         return ret;

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/storm/trident/operation/builtin/TupleCollectionGet.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/operation/builtin/TupleCollectionGet.java b/storm-core/src/jvm/storm/trident/operation/builtin/TupleCollectionGet.java
index 52dd633..426b56f 100644
--- a/storm-core/src/jvm/storm/trident/operation/builtin/TupleCollectionGet.java
+++ b/storm-core/src/jvm/storm/trident/operation/builtin/TupleCollectionGet.java
@@ -30,9 +30,9 @@ public class TupleCollectionGet extends BaseQueryFunction<State, Iterator<List<O
 
     @Override
     public List<Iterator<List<Object>>> batchRetrieve(State state, List<TridentTuple> args) {
-        List<Iterator<List<Object>>> ret = new ArrayList(args.size());
-        for(int i=0; i<args.size(); i++) {
-            ret.add(((ITupleCollection)state).getTuples());
+        List<Iterator<List<Object>>> ret = new ArrayList<>(args.size());
+        for (TridentTuple arg : args) {
+            ret.add(((ITupleCollection) state).getTuples());
         }
         return ret;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/storm/trident/partition/GlobalGrouping.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/partition/GlobalGrouping.java b/storm-core/src/jvm/storm/trident/partition/GlobalGrouping.java
index 54fa844..a535eb2 100644
--- a/storm-core/src/jvm/storm/trident/partition/GlobalGrouping.java
+++ b/storm-core/src/jvm/storm/trident/partition/GlobalGrouping.java
@@ -26,13 +26,11 @@ import java.util.Collections;
 import java.util.List;
 
 public class GlobalGrouping implements CustomStreamGrouping {
-
     List<Integer> target;
     
-    
     @Override
     public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targets) {
-        List<Integer> sorted = new ArrayList<Integer>(targets);
+        List<Integer> sorted = new ArrayList<>(targets);
         Collections.sort(sorted);
         target = Arrays.asList(sorted.get(0));
     }
@@ -41,5 +39,4 @@ public class GlobalGrouping implements CustomStreamGrouping {
     public List<Integer> chooseTasks(int i, List<Object> list) {
         return target;
     }
-    
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/storm/trident/partition/IdentityGrouping.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/partition/IdentityGrouping.java b/storm-core/src/jvm/storm/trident/partition/IdentityGrouping.java
index 30f48ad..88c2922 100644
--- a/storm-core/src/jvm/storm/trident/partition/IdentityGrouping.java
+++ b/storm-core/src/jvm/storm/trident/partition/IdentityGrouping.java
@@ -29,18 +29,16 @@ import java.util.Map;
 
 
 public class IdentityGrouping implements CustomStreamGrouping {
-
-    List<Integer> ret = new ArrayList<Integer>();
-    Map<Integer, List<Integer>> _precomputed = new HashMap();
+    final Map<Integer, List<Integer>> _precomputed = new HashMap<>();
     
     @Override
     public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> tasks) {
-        List<Integer> sourceTasks = new ArrayList<Integer>(context.getComponentTasks(stream.get_componentId()));
+        List<Integer> sourceTasks = new ArrayList<>(context.getComponentTasks(stream.get_componentId()));
         Collections.sort(sourceTasks);
         if(sourceTasks.size()!=tasks.size()) {
             throw new RuntimeException("Can only do an identity grouping when source and target have same number of tasks");
         }
-        tasks = new ArrayList<Integer>(tasks);
+        tasks = new ArrayList<>(tasks);
         Collections.sort(tasks);
         for(int i=0; i<sourceTasks.size(); i++) {
             int s = sourceTasks.get(i);

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/storm/trident/planner/Node.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/planner/Node.java b/storm-core/src/jvm/storm/trident/planner/Node.java
index 6284cb9..d35e7da 100644
--- a/storm-core/src/jvm/storm/trident/planner/Node.java
+++ b/storm-core/src/jvm/storm/trident/planner/Node.java
@@ -26,7 +26,7 @@ import org.apache.commons.lang.builder.ToStringStyle;
 
 
 public class Node implements Serializable {
-    private static AtomicInteger INDEX = new AtomicInteger(0);
+    private static final AtomicInteger INDEX = new AtomicInteger(0);
     
     private String nodeId;
     
@@ -47,6 +47,9 @@ public class Node implements Serializable {
 
     @Override
     public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
         return nodeId.equals(((Node) o).nodeId);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/storm/trident/planner/PartitionNode.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/planner/PartitionNode.java b/storm-core/src/jvm/storm/trident/planner/PartitionNode.java
index 4f10c25..4485765 100644
--- a/storm-core/src/jvm/storm/trident/planner/PartitionNode.java
+++ b/storm-core/src/jvm/storm/trident/planner/PartitionNode.java
@@ -22,8 +22,6 @@ import backtype.storm.tuple.Fields;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
-import java.util.ArrayList;
-import java.util.List;
 import storm.trident.util.TridentUtils;
 
 

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/storm/trident/planner/SubtopologyBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/planner/SubtopologyBolt.java b/storm-core/src/jvm/storm/trident/planner/SubtopologyBolt.java
index cdceaf9..6320a21 100644
--- a/storm-core/src/jvm/storm/trident/planner/SubtopologyBolt.java
+++ b/storm-core/src/jvm/storm/trident/planner/SubtopologyBolt.java
@@ -23,7 +23,6 @@ import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
-import backtype.storm.utils.Utils;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -49,9 +48,9 @@ import storm.trident.util.TridentUtils;
 public class SubtopologyBolt implements ITridentBatchBolt {
     DirectedGraph _graph;
     Set<Node> _nodes;
-    Map<String, InitialReceiver> _roots = new HashMap();
-    Map<Node, Factory> _outputFactories = new HashMap();
-    Map<String, List<TridentProcessor>> _myTopologicallyOrdered = new HashMap();
+    Map<String, InitialReceiver> _roots = new HashMap<>();
+    Map<Node, Factory> _outputFactories = new HashMap<>();
+    Map<String, List<TridentProcessor>> _myTopologicallyOrdered = new HashMap<>();
     Map<Node, String> _batchGroups;
     
     //given processornodes and static state nodes
@@ -71,7 +70,7 @@ public class SubtopologyBolt implements ITridentBatchBolt {
             }
         }
         DirectedSubgraph<Node, Object> subgraph = new DirectedSubgraph(_graph, _nodes, null);
-        TopologicalOrderIterator it = new TopologicalOrderIterator<Node, Object>(subgraph);
+        TopologicalOrderIterator it = new TopologicalOrderIterator<>(subgraph);
         int stateIndex = 0;
         while(it.hasNext()) {
             Node n = (Node) it.next();
@@ -82,8 +81,8 @@ public class SubtopologyBolt implements ITridentBatchBolt {
                     _myTopologicallyOrdered.put(batchGroup, new ArrayList());
                 }
                 _myTopologicallyOrdered.get(batchGroup).add(pn.processor);
-                List<String> parentStreams = new ArrayList();
-                List<Factory> parentFactories = new ArrayList();
+                List<String> parentStreams = new ArrayList<>();
+                List<Factory> parentFactories = new ArrayList<>();
                 for(Node p: TridentUtils.getParents(_graph, n)) {
                     parentStreams.add(p.streamId);
                     if(_nodes.contains(p)) {
@@ -96,7 +95,7 @@ public class SubtopologyBolt implements ITridentBatchBolt {
                         parentFactories.add(_roots.get(p.streamId).getOutputFactory());
                     }
                 }
-                List<TupleReceiver> targets = new ArrayList();
+                List<TupleReceiver> targets = new ArrayList<>();
                 boolean outgoingNode = false;
                 for(Node cn: TridentUtils.getChildren(_graph, n)) {
                     if(_nodes.contains(cn)) {
@@ -185,7 +184,7 @@ public class SubtopologyBolt implements ITridentBatchBolt {
     
     
     protected static class InitialReceiver {
-        List<TridentProcessor> _receivers = new ArrayList();
+        List<TridentProcessor> _receivers = new ArrayList<>();
         RootFactory _factory;
         ProjectionFactory _project;
         String _stream;
@@ -195,7 +194,7 @@ public class SubtopologyBolt implements ITridentBatchBolt {
             // how to distinguish "batch" streams from non-batch streams?
             _stream = stream;
             _factory = new RootFactory(allFields);
-            List<String> projected = new ArrayList(allFields.toList());
+            List<String> projected = new ArrayList<>(allFields.toList());
             projected.remove(0);
             _project = new ProjectionFactory(_factory, new Fields(projected));
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/storm/trident/planner/processor/MultiReducerProcessor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/planner/processor/MultiReducerProcessor.java b/storm-core/src/jvm/storm/trident/planner/processor/MultiReducerProcessor.java
index 6777d2f..4086996 100644
--- a/storm-core/src/jvm/storm/trident/planner/processor/MultiReducerProcessor.java
+++ b/storm-core/src/jvm/storm/trident/planner/processor/MultiReducerProcessor.java
@@ -49,7 +49,7 @@ public class MultiReducerProcessor implements TridentProcessor {
     public void prepare(Map conf, TopologyContext context, TridentContext tridentContext) {
         List<Factory> parents = tridentContext.getParentTupleFactories();
         _context = tridentContext;
-        _streamToIndex = new HashMap<String, Integer>();
+        _streamToIndex = new HashMap<>();
         List<String> parentStreams = tridentContext.getParentStreams();
         for(int i=0; i<parentStreams.size(); i++) {
             _streamToIndex.put(parentStreams.get(i), i);

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/storm/trident/spout/TridentSpoutExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/spout/TridentSpoutExecutor.java b/storm-core/src/jvm/storm/trident/spout/TridentSpoutExecutor.java
index 63debdb..d43d4e4 100644
--- a/storm-core/src/jvm/storm/trident/spout/TridentSpoutExecutor.java
+++ b/storm-core/src/jvm/storm/trident/spout/TridentSpoutExecutor.java
@@ -39,7 +39,7 @@ import storm.trident.tuple.ConsList;
 public class TridentSpoutExecutor implements ITridentBatchBolt {
     public static final String ID_FIELD = "$tx";
     
-    public static Logger LOG = LoggerFactory.getLogger(TridentSpoutExecutor.class);    
+    public static final Logger LOG = LoggerFactory.getLogger(TridentSpoutExecutor.class);
 
     AddIdCollector _collector;
     ITridentSpout<Object> _spout;

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/storm/trident/tuple/TridentTupleView.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/tuple/TridentTupleView.java b/storm-core/src/jvm/storm/trident/tuple/TridentTupleView.java
index 04b554e..761b221 100644
--- a/storm-core/src/jvm/storm/trident/tuple/TridentTupleView.java
+++ b/storm-core/src/jvm/storm/trident/tuple/TridentTupleView.java
@@ -31,7 +31,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.Arrays;
 
-//extends abstractlist so that it can be emitted directly as Storm tuples
+/**
+ * Extends AbstractList so that it can be emitted directly as Storm tuples
+ */
 public class TridentTupleView extends AbstractList<Object> implements TridentTuple {
     ValuePointer[] _index;
     Map<String, ValuePointer> _fieldIndex;
@@ -114,7 +116,7 @@ public class TridentTupleView extends AbstractList<Object> implements TridentTup
 
         public OperationOutputFactory(Factory parent, Fields selfFields) {
             _parent = parent;
-            _fieldIndex = new HashMap(parent.getFieldIndex());
+            _fieldIndex = new HashMap<>(parent.getFieldIndex());
             int myIndex = parent.numDelegates();
             for(int i=0; i<selfFields.size(); i++) {
                 String field = selfFields.get(i);

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java b/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java
index 2ab0109..dfb78fc 100644
--- a/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java
+++ b/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java
@@ -22,7 +22,6 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import com.lmax.disruptor.EventHandler;
 import com.lmax.disruptor.dsl.ProducerType;
-import com.lmax.disruptor.InsufficientCapacityException;
 import org.junit.Assert;
 import org.junit.Test;
 import junit.framework.TestCase;
@@ -41,7 +40,7 @@ public class DisruptorQueueTest extends TestCase {
 
         Runnable producer = new IncProducer(queue, i+100);
 
-        final AtomicReference<Object> result = new AtomicReference<Object>();
+        final AtomicReference<Object> result = new AtomicReference<>();
         Runnable consumer = new Consumer(queue, new EventHandler<Object>() {
             private boolean head = true;
 
@@ -130,7 +129,7 @@ public class DisruptorQueueTest extends TestCase {
                 queue.publish(i);
             }
         }
-    };
+    }
 
     private static class Consumer implements Runnable {
         private EventHandler handler;
@@ -151,7 +150,7 @@ public class DisruptorQueueTest extends TestCase {
                 //break
             }
         }
-    };
+    }
 
     private static DisruptorQueue createQueue(String name, int queueSize) {
         return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 1, 1L);


[5/5] storm git commit: Added STORM-1164 to CHANGELOG.

Posted by sr...@apache.org.
Added STORM-1164 to CHANGELOG.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e25f28f7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e25f28f7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e25f28f7

Branch: refs/heads/master
Commit: e25f28f7175757e98c14b5d1a502c611a8b9282b
Parents: 1acadff
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Mon Nov 9 18:24:54 2015 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Mon Nov 9 18:24:54 2015 -0800

----------------------------------------------------------------------
 CHANGELOG.md | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e25f28f7/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index b914b96..5e23d59 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,6 @@
 ## 0.11.0
- * STORM-902: Simple Log Search
+ * STORM-1164: Code cleanup for typos, warnings and conciseness.
+ * STORM-902: Simple Log Search.
  * STORM-1052: TridentKafkaState uses new Kafka Producer API.
  * STORM-1182: Removing and wrapping some exceptions in ConfigValidation to make code cleaner
  * STORM-1134. Windows: Fix log4j config.


[4/5] storm git commit: Merge branch 'STORM-1164' of https://github.com/sureshms/incubator-storm into STORM-1164

Posted by sr...@apache.org.
Merge branch 'STORM-1164' of https://github.com/sureshms/incubator-storm into STORM-1164


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1acadff4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1acadff4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1acadff4

Branch: refs/heads/master
Commit: 1acadff444a6660f2fedd3466863736e30eb2f01
Parents: 5a4e1f8 f0e4c6f
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Mon Nov 9 16:57:05 2015 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Mon Nov 9 16:57:05 2015 -0800

----------------------------------------------------------------------
 storm-core/src/jvm/backtype/storm/Config.java   | 36 ++++----
 .../src/jvm/backtype/storm/LogWriter.java       |  2 +-
 .../src/jvm/backtype/storm/StormSubmitter.java  |  4 -
 .../storm/drpc/DRPCInvocationsClient.java       |  5 +-
 .../storm/generated/AlreadyAliveException.java  |  7 +-
 .../storm/grouping/PartialKeyGrouping.java      |  5 +-
 .../storm/messaging/ConnectionWithStatus.java   |  4 +-
 .../jvm/backtype/storm/messaging/IContext.java  |  2 +-
 .../storm/messaging/TransportFactory.java       |  2 +-
 .../backtype/storm/messaging/netty/Client.java  |  2 +-
 .../backtype/storm/messaging/netty/Context.java |  8 +-
 .../storm/messaging/netty/ControlMessage.java   |  5 +-
 .../storm/messaging/netty/MessageBatch.java     | 14 ++--
 .../storm/messaging/netty/MessageDecoder.java   |  7 +-
 .../storm/messaging/netty/SaslMessageToken.java | 10 +--
 .../storm/messaging/netty/SaslNettyClient.java  |  6 +-
 .../storm/messaging/netty/SaslNettyServer.java  |  4 -
 .../messaging/netty/SaslStormClientHandler.java |  4 +-
 .../messaging/netty/SaslStormServerHandler.java | 11 +--
 .../storm/messaging/netty/SaslUtils.java        | 11 +--
 .../backtype/storm/messaging/netty/Server.java  | 26 +++---
 .../backtype/storm/metric/EventLoggerBolt.java  |  8 --
 .../storm/metric/FileBasedEventLogger.java      |  2 +-
 .../metric/HttpForwardingMetricsConsumer.java   |  1 -
 .../metric/HttpForwardingMetricsServer.java     |  1 -
 .../jvm/backtype/storm/metric/IEventLogger.java |  8 +-
 .../storm/metric/LoggingMetricsConsumer.java    |  1 -
 .../storm/metric/MetricsConsumerBolt.java       |  1 -
 .../jvm/backtype/storm/metric/SystemBolt.java   |  5 --
 .../backtype/storm/metric/api/CountMetric.java  |  2 -
 .../backtype/storm/metric/api/MeanReducer.java  |  4 +-
 .../storm/metric/api/MultiCountMetric.java      |  2 +-
 .../storm/metric/api/MultiReducedMetric.java    |  2 +-
 .../storm/metric/api/rpc/CountShellMetric.java  |  3 +-
 .../AbstractDNSToSwitchMapping.java             |  2 +-
 .../DefaultRackDNSToSwitchMapping.java          |  4 +-
 .../backtype/storm/nimbus/ILeaderElector.java   |  6 +-
 .../jvm/backtype/storm/nimbus/NimbusInfo.java   |  4 +-
 .../jvm/backtype/storm/scheduler/Cluster.java   |  5 +-
 .../scheduler/SchedulerAssignmentImpl.java      | 15 ++--
 .../storm/scheduler/SupervisorDetails.java      |  6 +-
 .../backtype/storm/scheduler/Topologies.java    |  6 +-
 .../storm/scheduler/TopologyDetails.java        | 30 +++----
 .../scheduler/multitenant/DefaultPool.java      | 22 ++---
 .../storm/scheduler/multitenant/FreePool.java   |  6 +-
 .../scheduler/multitenant/IsolatedPool.java     | 32 ++++---
 .../multitenant/MultitenantScheduler.java       |  6 +-
 .../storm/scheduler/multitenant/Node.java       | 17 ++--
 .../storm/scheduler/multitenant/NodePool.java   | 16 ++--
 .../strategies/ResourceAwareStrategy.java       | 39 +++++----
 .../backtype/storm/security/auth/AuthUtils.java | 27 +++---
 .../auth/DefaultHttpCredentialsPlugin.java      |  6 +-
 .../security/auth/DefaultPrincipalToLocal.java  |  1 -
 .../storm/security/auth/IAuthorizer.java        |  4 +-
 .../security/auth/ICredentialsRenewer.java      |  3 +-
 .../security/auth/IHttpCredentialsPlugin.java   |  2 -
 .../storm/security/auth/IPrincipalToLocal.java  |  2 +-
 .../storm/security/auth/ITransportPlugin.java   |  4 -
 .../security/auth/KerberosPrincipalToLocal.java |  2 +-
 .../storm/security/auth/ReqContext.java         | 11 +--
 .../security/auth/SaslTransportPlugin.java      | 12 +--
 .../security/auth/SimpleTransportPlugin.java    |  6 +-
 .../security/auth/SingleUserPrincipal.java      |  5 +-
 .../storm/security/auth/TBackoffConnect.java    |  1 -
 .../storm/security/auth/ThriftClient.java       | 10 +--
 .../storm/security/auth/ThriftServer.java       |  6 +-
 .../authorizer/DRPCSimpleACLAuthorizer.java     | 11 ++-
 .../auth/authorizer/DenyAuthorizer.java         | 11 +--
 .../authorizer/ImpersonationAuthorizer.java     | 10 +--
 .../auth/authorizer/NoopAuthorizer.java         |  7 +-
 .../auth/authorizer/SimpleACLAuthorizer.java    | 18 ++--
 .../authorizer/SimpleWhitelistAuthorizer.java   |  4 -
 .../auth/digest/ClientCallbackHandler.java      |  2 -
 .../auth/digest/DigestSaslTransportPlugin.java  |  2 -
 .../auth/digest/ServerCallbackHandler.java      |  5 +-
 .../GzipThriftSerializationDelegate.java        |  1 -
 .../storm/serialization/ITupleDeserializer.java |  1 -
 .../serialization/KryoTupleDeserializer.java    |  3 -
 .../serialization/KryoValuesDeserializer.java   |  3 +-
 .../serialization/SerializationFactory.java     | 17 ++--
 .../storm/task/GeneralTopologyContext.java      | 10 +--
 .../backtype/storm/task/TopologyContext.java    |  9 +-
 .../storm/topology/BasicBoltExecutor.java       |  2 +-
 .../storm/topology/OutputFieldsGetter.java      |  2 +-
 .../storm/topology/TopologyBuilder.java         | 16 ++--
 .../storm/topology/base/BaseBatchBolt.java      |  1 -
 .../topology/base/BaseTransactionalSpout.java   |  1 -
 .../TransactionalSpoutCoordinator.java          |  2 +-
 ...uePartitionedTransactionalSpoutExecutor.java |  8 +-
 .../PartitionedTransactionalSpoutExecutor.java  |  2 +-
 .../src/jvm/backtype/storm/tuple/Fields.java    | 10 +--
 .../src/jvm/backtype/storm/tuple/MessageId.java | 10 +--
 .../src/jvm/backtype/storm/tuple/Tuple.java     |  1 -
 .../src/jvm/backtype/storm/tuple/TupleImpl.java |  4 +-
 .../jvm/backtype/storm/utils/DRPCClient.java    |  1 -
 .../backtype/storm/utils/DisruptorQueue.java    |  5 +-
 .../backtype/storm/utils/InprocMessaging.java   |  4 +-
 .../storm/utils/KeyedRoundRobinQueue.java       |  6 +-
 .../jvm/backtype/storm/utils/ListDelegate.java  |  6 +-
 .../src/jvm/backtype/storm/utils/Monitor.java   |  3 +-
 .../jvm/backtype/storm/utils/NimbusClient.java  | 10 +--
 .../storm/utils/RegisteredGlobalState.java      |  6 +-
 .../jvm/backtype/storm/utils/RotatingMap.java   |  2 +-
 .../backtype/storm/utils/ServiceRegistry.java   |  2 +-
 .../StormBoundedExponentialBackoffRetry.java    |  3 +-
 .../jvm/backtype/storm/utils/VersionInfo.java   |  2 +-
 .../src/jvm/storm/trident/TridentTopology.java  | 87 ++++++++++----------
 .../trident/drpc/ReturnResultsReducer.java      |  4 +-
 .../fluent/ChainedAggregatorDeclarer.java       |  8 +-
 .../jvm/storm/trident/graph/GraphGrouper.java   | 13 ++-
 .../src/jvm/storm/trident/graph/Group.java      | 23 +++---
 .../trident/operation/builtin/SnapshotGet.java  |  4 +-
 .../operation/builtin/TupleCollectionGet.java   |  6 +-
 .../storm/trident/partition/GlobalGrouping.java |  5 +-
 .../trident/partition/IdentityGrouping.java     |  8 +-
 .../src/jvm/storm/trident/planner/Node.java     |  5 +-
 .../storm/trident/planner/PartitionNode.java    |  2 -
 .../storm/trident/planner/SubtopologyBolt.java  | 19 ++---
 .../processor/MultiReducerProcessor.java        |  2 +-
 .../trident/spout/TridentSpoutExecutor.java     |  2 +-
 .../storm/trident/tuple/TridentTupleView.java   |  6 +-
 .../storm/utils/DisruptorQueueTest.java         |  7 +-
 122 files changed, 378 insertions(+), 579 deletions(-)
----------------------------------------------------------------------



[3/5] storm git commit: STORM-1164. Code cleanup for typos, warnings and conciseness

Posted by sr...@apache.org.
STORM-1164. Code cleanup for typos, warnings and conciseness


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f0e4c6f2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f0e4c6f2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f0e4c6f2

Branch: refs/heads/master
Commit: f0e4c6f2099196760f0037e7812dda543d332c54
Parents: c12e28c
Author: Suresh Srinivas <su...@yahoo-inc.com>
Authored: Tue Nov 3 16:03:16 2015 -0800
Committer: Suresh Srinivas <su...@yahoo-inc.com>
Committed: Mon Nov 9 11:56:11 2015 -0800

----------------------------------------------------------------------
 storm-core/src/jvm/backtype/storm/Config.java   | 36 ++++----
 .../src/jvm/backtype/storm/LogWriter.java       |  2 +-
 .../src/jvm/backtype/storm/StormSubmitter.java  |  4 -
 .../storm/drpc/DRPCInvocationsClient.java       |  5 +-
 .../storm/generated/AlreadyAliveException.java  |  7 +-
 .../storm/grouping/PartialKeyGrouping.java      |  5 +-
 .../storm/messaging/ConnectionWithStatus.java   |  4 +-
 .../jvm/backtype/storm/messaging/IContext.java  |  2 +-
 .../storm/messaging/TransportFactory.java       |  2 +-
 .../backtype/storm/messaging/netty/Client.java  |  2 +-
 .../backtype/storm/messaging/netty/Context.java |  8 +-
 .../storm/messaging/netty/ControlMessage.java   |  5 +-
 .../storm/messaging/netty/MessageBatch.java     | 14 ++--
 .../storm/messaging/netty/MessageDecoder.java   |  7 +-
 .../storm/messaging/netty/SaslMessageToken.java | 10 +--
 .../storm/messaging/netty/SaslNettyClient.java  |  6 +-
 .../storm/messaging/netty/SaslNettyServer.java  |  4 -
 .../messaging/netty/SaslStormClientHandler.java |  4 +-
 .../messaging/netty/SaslStormServerHandler.java | 11 +--
 .../storm/messaging/netty/SaslUtils.java        | 11 +--
 .../backtype/storm/messaging/netty/Server.java  | 26 +++---
 .../backtype/storm/metric/EventLoggerBolt.java  |  8 --
 .../storm/metric/FileBasedEventLogger.java      |  2 +-
 .../metric/HttpForwardingMetricsConsumer.java   |  1 -
 .../metric/HttpForwardingMetricsServer.java     |  1 -
 .../jvm/backtype/storm/metric/IEventLogger.java |  8 +-
 .../storm/metric/LoggingMetricsConsumer.java    |  1 -
 .../storm/metric/MetricsConsumerBolt.java       |  1 -
 .../jvm/backtype/storm/metric/SystemBolt.java   |  5 --
 .../backtype/storm/metric/api/CountMetric.java  |  2 -
 .../backtype/storm/metric/api/MeanReducer.java  |  4 +-
 .../storm/metric/api/MultiCountMetric.java      |  2 +-
 .../storm/metric/api/MultiReducedMetric.java    |  2 +-
 .../storm/metric/api/rpc/CountShellMetric.java  |  3 +-
 .../AbstractDNSToSwitchMapping.java             |  2 +-
 .../DefaultRackDNSToSwitchMapping.java          |  4 +-
 .../backtype/storm/nimbus/ILeaderElector.java   |  6 +-
 .../jvm/backtype/storm/nimbus/NimbusInfo.java   |  4 +-
 .../jvm/backtype/storm/scheduler/Cluster.java   |  5 +-
 .../scheduler/SchedulerAssignmentImpl.java      | 15 ++--
 .../storm/scheduler/SupervisorDetails.java      |  6 +-
 .../backtype/storm/scheduler/Topologies.java    |  6 +-
 .../storm/scheduler/TopologyDetails.java        | 30 +++----
 .../scheduler/multitenant/DefaultPool.java      | 22 ++---
 .../storm/scheduler/multitenant/FreePool.java   |  6 +-
 .../scheduler/multitenant/IsolatedPool.java     | 32 ++++---
 .../multitenant/MultitenantScheduler.java       |  6 +-
 .../storm/scheduler/multitenant/Node.java       | 17 ++--
 .../storm/scheduler/multitenant/NodePool.java   | 16 ++--
 .../strategies/ResourceAwareStrategy.java       | 39 +++++----
 .../backtype/storm/security/auth/AuthUtils.java | 27 +++---
 .../auth/DefaultHttpCredentialsPlugin.java      |  6 +-
 .../security/auth/DefaultPrincipalToLocal.java  |  1 -
 .../storm/security/auth/IAuthorizer.java        |  4 +-
 .../security/auth/ICredentialsRenewer.java      |  3 +-
 .../security/auth/IHttpCredentialsPlugin.java   |  2 -
 .../storm/security/auth/IPrincipalToLocal.java  |  2 +-
 .../storm/security/auth/ITransportPlugin.java   |  4 -
 .../security/auth/KerberosPrincipalToLocal.java |  2 +-
 .../storm/security/auth/ReqContext.java         | 11 +--
 .../security/auth/SaslTransportPlugin.java      | 12 +--
 .../security/auth/SimpleTransportPlugin.java    |  6 +-
 .../security/auth/SingleUserPrincipal.java      |  5 +-
 .../storm/security/auth/TBackoffConnect.java    |  1 -
 .../storm/security/auth/ThriftClient.java       | 10 +--
 .../storm/security/auth/ThriftServer.java       |  6 +-
 .../authorizer/DRPCSimpleACLAuthorizer.java     | 11 ++-
 .../auth/authorizer/DenyAuthorizer.java         | 11 +--
 .../authorizer/ImpersonationAuthorizer.java     | 10 +--
 .../auth/authorizer/NoopAuthorizer.java         |  7 +-
 .../auth/authorizer/SimpleACLAuthorizer.java    | 18 ++--
 .../authorizer/SimpleWhitelistAuthorizer.java   |  4 -
 .../auth/digest/ClientCallbackHandler.java      |  2 -
 .../auth/digest/DigestSaslTransportPlugin.java  |  2 -
 .../auth/digest/ServerCallbackHandler.java      |  5 +-
 .../GzipThriftSerializationDelegate.java        |  1 -
 .../storm/serialization/ITupleDeserializer.java |  1 -
 .../serialization/KryoTupleDeserializer.java    |  3 -
 .../serialization/KryoValuesDeserializer.java   |  3 +-
 .../serialization/SerializationFactory.java     | 17 ++--
 .../storm/task/GeneralTopologyContext.java      | 10 +--
 .../backtype/storm/task/TopologyContext.java    |  9 +-
 .../storm/topology/BasicBoltExecutor.java       |  2 +-
 .../storm/topology/OutputFieldsGetter.java      |  2 +-
 .../storm/topology/TopologyBuilder.java         | 16 ++--
 .../storm/topology/base/BaseBatchBolt.java      |  1 -
 .../topology/base/BaseTransactionalSpout.java   |  1 -
 .../TransactionalSpoutCoordinator.java          |  2 +-
 ...uePartitionedTransactionalSpoutExecutor.java |  8 +-
 .../PartitionedTransactionalSpoutExecutor.java  |  2 +-
 .../src/jvm/backtype/storm/tuple/Fields.java    | 10 +--
 .../src/jvm/backtype/storm/tuple/MessageId.java | 10 +--
 .../src/jvm/backtype/storm/tuple/Tuple.java     |  1 -
 .../src/jvm/backtype/storm/tuple/TupleImpl.java |  4 +-
 .../jvm/backtype/storm/utils/DRPCClient.java    |  1 -
 .../backtype/storm/utils/DisruptorQueue.java    |  5 +-
 .../backtype/storm/utils/InprocMessaging.java   |  4 +-
 .../storm/utils/KeyedRoundRobinQueue.java       |  6 +-
 .../jvm/backtype/storm/utils/ListDelegate.java  |  6 +-
 .../src/jvm/backtype/storm/utils/Monitor.java   |  3 +-
 .../jvm/backtype/storm/utils/NimbusClient.java  | 10 +--
 .../storm/utils/RegisteredGlobalState.java      |  6 +-
 .../jvm/backtype/storm/utils/RotatingMap.java   |  2 +-
 .../backtype/storm/utils/ServiceRegistry.java   |  2 +-
 .../StormBoundedExponentialBackoffRetry.java    |  3 +-
 .../jvm/backtype/storm/utils/VersionInfo.java   |  2 +-
 .../src/jvm/storm/trident/TridentTopology.java  | 87 ++++++++++----------
 .../trident/drpc/ReturnResultsReducer.java      |  4 +-
 .../fluent/ChainedAggregatorDeclarer.java       |  8 +-
 .../jvm/storm/trident/graph/GraphGrouper.java   | 13 ++-
 .../src/jvm/storm/trident/graph/Group.java      | 23 +++---
 .../trident/operation/builtin/SnapshotGet.java  |  4 +-
 .../operation/builtin/TupleCollectionGet.java   |  6 +-
 .../storm/trident/partition/GlobalGrouping.java |  5 +-
 .../trident/partition/IdentityGrouping.java     |  8 +-
 .../src/jvm/storm/trident/planner/Node.java     |  5 +-
 .../storm/trident/planner/PartitionNode.java    |  2 -
 .../storm/trident/planner/SubtopologyBolt.java  | 19 ++---
 .../processor/MultiReducerProcessor.java        |  2 +-
 .../trident/spout/TridentSpoutExecutor.java     |  2 +-
 .../storm/trident/tuple/TridentTupleView.java   |  6 +-
 .../storm/utils/DisruptorQueueTest.java         |  7 +-
 122 files changed, 378 insertions(+), 579 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index d80876e..1485361 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -166,7 +166,7 @@ public class Config extends HashMap<String, Object> {
     public static final String STORM_LOG4J2_CONF_DIR = "storm.log4j2.conf.dir";
 
     /**
-     * A global task scheduler used to assign topologies's tasks to supervisors' wokers.
+     * A global task scheduler used to assign topologies's tasks to supervisors' workers.
      *
      * If this is not set, a default system scheduler will be used.
      */
@@ -192,7 +192,7 @@ public class Config extends HashMap<String, Object> {
      * The hostname the supervisors/workers should report to nimbus. If unset, Storm will
      * get the hostname to report by calling <code>InetAddress.getLocalHost().getCanonicalHostName()</code>.
      *
-     * You should set this config when you dont have a DNS which supervisors/workers
+     * You should set this config when you don't have a DNS which supervisors/workers
      * can utilize to find each other based on hostname got from calls to
      * <code>InetAddress.getLocalHost().getCanonicalHostName()</code>.
      */
@@ -445,7 +445,7 @@ public class Config extends HashMap<String, Object> {
      * How often nimbus should wake up to check heartbeats and do reassignments. Note
      * that if a machine ever goes down Nimbus will immediately wake up and take action.
      * This parameter is for checking for failures when there's no explicit event like that
-     * occuring.
+     * occurring.
      */
     @isInteger
     @isPositiveNumber
@@ -453,7 +453,7 @@ public class Config extends HashMap<String, Object> {
 
     /**
      * How often nimbus should wake the cleanup thread to clean the inbox.
-     * @see NIMBUS_INBOX_JAR_EXPIRATION_SECS
+     * @see #NIMBUS_INBOX_JAR_EXPIRATION_SECS
      */
     @isInteger
     @isPositiveNumber
@@ -466,7 +466,7 @@ public class Config extends HashMap<String, Object> {
      * Note that the time it takes to delete an inbox jar file is going to be somewhat more than
      * NIMBUS_CLEANUP_INBOX_JAR_EXPIRATION_SECS (depending on how often NIMBUS_CLEANUP_FREQ_SECS
      * is set to).
-     * @see NIMBUS_CLEANUP_FREQ_SECS
+     * @see #NIMBUS_CLEANUP_INBOX_FREQ_SECS
      */
     @isInteger
     public static final String NIMBUS_INBOX_JAR_EXPIRATION_SECS = "nimbus.inbox.jar.expiration.secs";
@@ -622,7 +622,7 @@ public class Config extends HashMap<String, Object> {
     public static final String LOGVIEWER_HTTPS_KEYSTORE_TYPE = "logviewer.https.keystore.type";
 
     /**
-     * Password to the private key in the keystore for settting up HTTPS (SSL).
+     * Password to the private key in the keystore for setting up HTTPS (SSL).
      */
     @isString
     public static final String LOGVIEWER_HTTPS_KEY_PASSWORD = "logviewer.https.key.password";
@@ -647,7 +647,7 @@ public class Config extends HashMap<String, Object> {
     public static final String LOGVIEWER_HTTPS_TRUSTSTORE_TYPE = "logviewer.https.truststore.type";
 
     /**
-     * Password to the truststore used by Storm Logviewer settting up HTTPS (SSL).
+     * Password to the truststore used by Storm Logviewer setting up HTTPS (SSL).
      */
     @isBoolean
     public static final String LOGVIEWER_HTTPS_WANT_CLIENT_AUTH = "logviewer.https.want.client.auth";
@@ -725,19 +725,19 @@ public class Config extends HashMap<String, Object> {
     public static final String UI_HTTPS_KEYSTORE_TYPE = "ui.https.keystore.type";
 
     /**
-     * Password to the private key in the keystore for settting up HTTPS (SSL).
+     * Password to the private key in the keystore for setting up HTTPS (SSL).
      */
     @isString
     public static final String UI_HTTPS_KEY_PASSWORD = "ui.https.key.password";
 
     /**
-     * Path to the truststore used by Storm UI settting up HTTPS (SSL).
+     * Path to the truststore used by Storm UI setting up HTTPS (SSL).
      */
     @isString
     public static final String UI_HTTPS_TRUSTSTORE_PATH = "ui.https.truststore.path";
 
     /**
-     * Password to the truststore used by Storm UI settting up HTTPS (SSL).
+     * Password to the truststore used by Storm UI setting up HTTPS (SSL).
      */
     @isString
     public static final String UI_HTTPS_TRUSTSTORE_PASSWORD = "ui.https.truststore.password";
@@ -750,7 +750,7 @@ public class Config extends HashMap<String, Object> {
     public static final String UI_HTTPS_TRUSTSTORE_TYPE = "ui.https.truststore.type";
 
     /**
-     * Password to the truststore used by Storm DRPC settting up HTTPS (SSL).
+     * Password to the truststore used by Storm DRPC setting up HTTPS (SSL).
      */
     @isBoolean
     public static final String UI_HTTPS_WANT_CLIENT_AUTH = "ui.https.want.client.auth";
@@ -796,19 +796,19 @@ public class Config extends HashMap<String, Object> {
     public static final String DRPC_HTTPS_KEYSTORE_TYPE = "drpc.https.keystore.type";
 
     /**
-     * Password to the private key in the keystore for settting up HTTPS (SSL).
+     * Password to the private key in the keystore for setting up HTTPS (SSL).
      */
     @isString
     public static final String DRPC_HTTPS_KEY_PASSWORD = "drpc.https.key.password";
 
     /**
-     * Path to the truststore used by Storm DRPC settting up HTTPS (SSL).
+     * Path to the truststore used by Storm DRPC setting up HTTPS (SSL).
      */
     @isString
     public static final String DRPC_HTTPS_TRUSTSTORE_PATH = "drpc.https.truststore.path";
 
     /**
-     * Password to the truststore used by Storm DRPC settting up HTTPS (SSL).
+     * Password to the truststore used by Storm DRPC setting up HTTPS (SSL).
      */
     @isString
     public static final String DRPC_HTTPS_TRUSTSTORE_PASSWORD = "drpc.https.truststore.password";
@@ -821,7 +821,7 @@ public class Config extends HashMap<String, Object> {
     public static final String DRPC_HTTPS_TRUSTSTORE_TYPE = "drpc.https.truststore.type";
 
     /**
-     * Password to the truststore used by Storm DRPC settting up HTTPS (SSL).
+     * Password to the truststore used by Storm DRPC setting up HTTPS (SSL).
      */
     @isBoolean
     public static final String DRPC_HTTPS_WANT_CLIENT_AUTH = "drpc.https.want.client.auth";
@@ -850,14 +850,14 @@ public class Config extends HashMap<String, Object> {
 
     /**
      * The Access Control List for the DRPC Authorizer.
-     * @see DRPCSimpleAclAuthorizer
+     * @see backtype.storm.security.auth.authorizer.DRPCSimpleACLAuthorizer
      */
     @isType(type=Map.class)
     public static final String DRPC_AUTHORIZER_ACL = "drpc.authorizer.acl";
 
     /**
      * File name of the DRPC Authorizer ACL.
-     * @see DRPCSimpleAclAuthorizer
+     * @see backtype.storm.security.auth.authorizer.DRPCSimpleACLAuthorizer
      */
     @isString
     public static final String DRPC_AUTHORIZER_ACL_FILENAME = "drpc.authorizer.acl.filename";
@@ -869,7 +869,7 @@ public class Config extends HashMap<String, Object> {
      * permitted, which is appropriate for a development environment. When set
      * to true, explicit ACL entries are required for every DRPC function, and
      * any request for functions will be denied.
-     * @see DRPCSimpleAclAuthorizer
+     * @see backtype.storm.security.auth.authorizer.DRPCSimpleACLAuthorizer
      */
     @isBoolean
     public static final String DRPC_AUTHORIZER_ACL_STRICT = "drpc.authorizer.acl.strict";

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/LogWriter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/LogWriter.java b/storm-core/src/jvm/backtype/storm/LogWriter.java
index b0857e8..849f5ca 100644
--- a/storm-core/src/jvm/backtype/storm/LogWriter.java
+++ b/storm-core/src/jvm/backtype/storm/LogWriter.java
@@ -41,7 +41,7 @@ public class LogWriter extends Thread {
     public void run() {
         Logger logger = this.logger;
         BufferedReader in = this.in;
-        String line = null;
+        String line;
         try {
             while ((line = in.readLine()) != null) {
                 logger.info(line);

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/StormSubmitter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/StormSubmitter.java b/storm-core/src/jvm/backtype/storm/StormSubmitter.java
index 27844f2..1d7a1f3 100644
--- a/storm-core/src/jvm/backtype/storm/StormSubmitter.java
+++ b/storm-core/src/jvm/backtype/storm/StormSubmitter.java
@@ -51,10 +51,6 @@ public class StormSubmitter {
     
     private static ILocalCluster localNimbus = null;
 
-    public static void setLocalNimbus(ILocalCluster localNimbusHandler) {
-        StormSubmitter.localNimbus = localNimbusHandler;
-    }
-
     private static String generateZookeeperDigestSecretPayload() {
         return Utils.secureRandomLong() + ":" + Utils.secureRandomLong();
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/drpc/DRPCInvocationsClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/drpc/DRPCInvocationsClient.java b/storm-core/src/jvm/backtype/storm/drpc/DRPCInvocationsClient.java
index 624db3e..78e8d9b 100644
--- a/storm-core/src/jvm/backtype/storm/drpc/DRPCInvocationsClient.java
+++ b/storm-core/src/jvm/backtype/storm/drpc/DRPCInvocationsClient.java
@@ -31,9 +31,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class DRPCInvocationsClient extends ThriftClient implements DistributedRPCInvocations.Iface {
-    public static Logger LOG = LoggerFactory.getLogger(DRPCInvocationsClient.class);
-    private final AtomicReference<DistributedRPCInvocations.Client> client =
-       new AtomicReference<DistributedRPCInvocations.Client>();
+    public static final Logger LOG = LoggerFactory.getLogger(DRPCInvocationsClient.class);
+    private final AtomicReference<DistributedRPCInvocations.Client> client = new AtomicReference<>();
     private String host;
     private int port;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/generated/AlreadyAliveException.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/AlreadyAliveException.java b/storm-core/src/jvm/backtype/storm/generated/AlreadyAliveException.java
index fb2eee3..adc71a2 100644
--- a/storm-core/src/jvm/backtype/storm/generated/AlreadyAliveException.java
+++ b/storm-core/src/jvm/backtype/storm/generated/AlreadyAliveException.java
@@ -221,11 +221,10 @@ public class AlreadyAliveException extends TException implements org.apache.thri
 
   @Override
   public boolean equals(Object that) {
-    if (that == null)
+    if (that == null) {
       return false;
-    if (that instanceof AlreadyAliveException)
-      return this.equals((AlreadyAliveException)that);
-    return false;
+    }
+    return (that instanceof AlreadyAliveException) ? this.equals((AlreadyAliveException)that) : false;
   }
 
   public boolean equals(AlreadyAliveException that) {

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/grouping/PartialKeyGrouping.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/grouping/PartialKeyGrouping.java b/storm-core/src/jvm/backtype/storm/grouping/PartialKeyGrouping.java
index 43ad5a0..e92156e 100644
--- a/storm-core/src/jvm/backtype/storm/grouping/PartialKeyGrouping.java
+++ b/storm-core/src/jvm/backtype/storm/grouping/PartialKeyGrouping.java
@@ -24,7 +24,6 @@ import java.util.Arrays;
 import java.util.List;
 
 import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.grouping.CustomStreamGrouping;
 import backtype.storm.task.WorkerTopologyContext;
 import backtype.storm.tuple.Fields;
 
@@ -59,9 +58,9 @@ public class PartialKeyGrouping implements CustomStreamGrouping, Serializable {
 
     @Override
     public List<Integer> chooseTasks(int taskId, List<Object> values) {
-        List<Integer> boltIds = new ArrayList<Integer>(1);
+        List<Integer> boltIds = new ArrayList<>(1);
         if (values.size() > 0) {
-            byte[] raw = null;
+            byte[] raw;
             if (fields != null) {
                 List<Object> selectedFields = outFields.select(fields, values);
                 ByteBuffer out = ByteBuffer.allocate(selectedFields.size() * 4);

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/messaging/ConnectionWithStatus.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/ConnectionWithStatus.java b/storm-core/src/jvm/backtype/storm/messaging/ConnectionWithStatus.java
index 37981ca..b92b0d6 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/ConnectionWithStatus.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/ConnectionWithStatus.java
@@ -39,9 +39,9 @@ public abstract class ConnectionWithStatus implements IConnection {
      * data sending or receiving. All data sending request will be dropped.
      */
     Closed
-  };
+  }
 
-  /**
+    /**
    * whether this connection is available to transfer data
    */
   public abstract Status status();

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/messaging/IContext.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/IContext.java b/storm-core/src/jvm/backtype/storm/messaging/IContext.java
index 8645a6f..42f59bd 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/IContext.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/IContext.java
@@ -56,4 +56,4 @@ public interface IContext {
      * @return client side connection
      */
     public IConnection connect(String storm_id, String host, int port);
-};
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/messaging/TransportFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/TransportFactory.java b/storm-core/src/jvm/backtype/storm/messaging/TransportFactory.java
index 656b323..c551e0d 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/TransportFactory.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/TransportFactory.java
@@ -32,7 +32,7 @@ public class TransportFactory {
         String transport_plugin_klassName = (String)storm_conf.get(Config.STORM_MESSAGING_TRANSPORT);
         LOG.info("Storm peer transport plugin:"+transport_plugin_klassName);
 
-        IContext transport = null;
+        IContext transport;
         try {
             //create a factory class
             Class klass = Class.forName(transport_plugin_klassName);

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index b9151f3..ccdef41 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -84,7 +84,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
     /**
      * The channel used for all write operations from this client to the remote destination.
      */
-    private final AtomicReference<Channel> channelRef = new AtomicReference<Channel>();
+    private final AtomicReference<Channel> channelRef = new AtomicReference<>();
 
     /**
      * Total number of connection attempts.

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
index 5d27a16..bc978b6 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
@@ -19,12 +19,8 @@ package backtype.storm.messaging.netty;
 
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 import org.jboss.netty.util.HashedWheelTimer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -34,8 +30,6 @@ import backtype.storm.messaging.IContext;
 import backtype.storm.utils.Utils;
 
 public class Context implements IContext {
-    private static final Logger LOG = LoggerFactory.getLogger(Context.class);
-        
     @SuppressWarnings("rawtypes")
     private Map storm_conf;
     private Map<String, IConnection> connections;
@@ -49,7 +43,7 @@ public class Context implements IContext {
     @SuppressWarnings("rawtypes")
     public void prepare(Map storm_conf) {
         this.storm_conf = storm_conf;
-        connections = new HashMap<String, IConnection>();
+        connections = new HashMap<>();
 
         //each context will have a single client channel factory
         int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java b/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
index fb3efe6..e4507f5 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
@@ -39,9 +39,8 @@ enum ControlMessage {
     }
 
     /**
-     * Return a control message per an encoded status code
-     * @param encoded
-     * @return
+     * @param encoded status code
+     * @return a control message per an encoded status code
      */
     static ControlMessage mkMessage(short encoded) {
         for(ControlMessage cm: ControlMessage.values()) {

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
index ec0dc0f..fea33fd 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
@@ -31,7 +31,7 @@ class MessageBatch {
 
     MessageBatch(int buffer_size) {
         this.buffer_size = buffer_size;
-        msgs = new ArrayList<TaskMessage>();
+        msgs = new ArrayList<>();
         encoded_length = ControlMessage.EOB_MESSAGE.encodeLength();
     }
 
@@ -54,24 +54,21 @@ class MessageBatch {
     }
 
     /**
-     * Has this batch used up allowed buffer size
-     * @return
+     * @return true if this batch used up allowed buffer size
      */
     boolean isFull() {
         return encoded_length >= buffer_size;
     }
 
     /**
-     * true if this batch doesn't have any messages 
-     * @return
+     * @return true if this batch doesn't have any messages
      */
     boolean isEmpty() {
         return msgs.isEmpty();
     }
 
     /**
-     * # of msgs in this batch
-     * @return
+     * @return number of msgs in this batch
      */
     int size() {
         return msgs.size();
@@ -83,8 +80,9 @@ class MessageBatch {
     ChannelBuffer buffer() throws Exception {
         ChannelBufferOutputStream bout = new ChannelBufferOutputStream(ChannelBuffers.directBuffer(encoded_length));
         
-        for (TaskMessage msg : msgs)
+        for (TaskMessage msg : msgs) {
             writeTaskMessage(bout, msg);
+        }
 
         //add a END_OF_BATCH indicator
         ControlMessage.EOB_MESSAGE.write(bout);

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
index 7d8bf54..06d2cde 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
@@ -43,7 +43,7 @@ public class MessageDecoder extends FrameDecoder {
             return null;
         }
 
-        List<Object> ret = new ArrayList<Object>();
+        List<Object> ret = new ArrayList<>();
 
         // Use while loop, try to decode as more messages as possible in single call
         while (available >= 2) {
@@ -100,7 +100,6 @@ public class MessageDecoder extends FrameDecoder {
             }
 
             // case 3: task Message
-            short task = code;
 
             // Make sure that we have received at least an integer (length)
             if (available < 4) {
@@ -115,7 +114,7 @@ public class MessageDecoder extends FrameDecoder {
             available -= 4;
 
             if (length <= 0) {
-                ret.add(new TaskMessage(task, null));
+                ret.add(new TaskMessage(code, null));
                 break;
             }
 
@@ -133,7 +132,7 @@ public class MessageDecoder extends FrameDecoder {
 
             // Successfully decoded a frame.
             // Return a TaskMessage object
-            ret.add(new TaskMessage(task, payload.array()));
+            ret.add(new TaskMessage(code, payload.array()));
         }
 
         if (ret.size() == 0) {

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java
index d0d3ca1..d7a86d1 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java
@@ -20,17 +20,11 @@ package backtype.storm.messaging.netty;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBufferOutputStream;
 import org.jboss.netty.buffer.ChannelBuffers;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Send and receive SASL tokens.
  */
 public class SaslMessageToken {
-    /** Class logger */
-    private static final Logger LOG = LoggerFactory
-            .getLogger(SaslMessageToken.class);
-
     /** Used for client or server's token to send or receive from each other. */
     private byte[] token;
 
@@ -88,8 +82,8 @@ public class SaslMessageToken {
         if (token != null)
             payload_len = token.length;
 
-        bout.writeShort((short) identifier);
-        bout.writeInt((int) payload_len);
+        bout.writeShort(identifier);
+        bout.writeInt(payload_len);
         if (payload_len > 0) {
             bout.write(token);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClient.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClient.java
index 023e950..f6ea78f 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClient.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClient.java
@@ -81,9 +81,7 @@ public class SaslNettyClient {
      */
     public byte[] saslResponse(SaslMessageToken saslTokenMessage) {
         try {
-            byte[] retval = saslClient.evaluateChallenge(saslTokenMessage
-                    .getSaslToken());
-            return retval;
+            return saslClient.evaluateChallenge(saslTokenMessage.getSaslToken());
         } catch (SaslException e) {
             LOG.error(
                     "saslResponse: Failed to respond to SASL server's token:",
@@ -104,8 +102,6 @@ public class SaslNettyClient {
 
         /**
          * Set private members using topology token.
-         * 
-         * @param topologyToken
          */
         public SaslClientCallbackHandler(String topologyToken, byte[] token) {
             this.userName = SaslUtils

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyServer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyServer.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyServer.java
index 2cb47d9..f98e5bd 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyServer.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyServer.java
@@ -18,9 +18,6 @@
 package backtype.storm.messaging.netty;
 
 import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.HashMap;
-import java.util.Map;
 
 import javax.security.auth.callback.Callback;
 import javax.security.auth.callback.CallbackHandler;
@@ -33,7 +30,6 @@ import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslException;
 import javax.security.sasl.SaslServer;
 
-import org.apache.commons.codec.binary.Base64;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java
index 2a5ae99..980c0f6 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java
@@ -69,8 +69,6 @@ public class SaslStormClientHandler extends SimpleChannelUpstreamHandler {
             LOG.error("Failed to authenticate with server " + "due to error: ",
                     e);
         }
-        return;
-
     }
 
     @Override
@@ -131,7 +129,7 @@ public class SaslStormClientHandler extends SimpleChannelUpstreamHandler {
             if (!saslNettyClient.isComplete()) {
                 LOG.warn("Generated a null response, "
                         + "but authentication is not complete.");
-                throw new Exception("Server reponse is null, but as far as "
+                throw new Exception("Server response is null, but as far as "
                         + "we can tell, we are not authenticated yet.");
             }
             this.client.channelReady();

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java
index 02448e2..5ce90a3 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java
@@ -55,7 +55,7 @@ public class SaslStormServerHandler extends SimpleChannelUpstreamHandler {
         Channel channel = ctx.getChannel();
 
         if (msg instanceof ControlMessage
-                && ((ControlMessage) e.getMessage()) == ControlMessage.SASL_TOKEN_MESSAGE_REQUEST) {
+                && e.getMessage() == ControlMessage.SASL_TOKEN_MESSAGE_REQUEST) {
             // initialize server-side SASL functionality, if we haven't yet
             // (in which case we are looking at the first SASL message from the
             // client).
@@ -85,7 +85,7 @@ public class SaslStormServerHandler extends SimpleChannelUpstreamHandler {
             LOG.debug("processToken:  With nettyServer: " + saslNettyServer
                     + " and token length: " + token.length);
 
-            SaslMessageToken saslTokenMessageRequest = null;
+            SaslMessageToken saslTokenMessageRequest;
             saslTokenMessageRequest = new SaslMessageToken(
                     saslNettyServer.response(new byte[0]));
             // Send response to client.
@@ -102,10 +102,8 @@ public class SaslStormServerHandler extends SimpleChannelUpstreamHandler {
             SaslNettyServer saslNettyServer = SaslNettyServerState.getSaslNettyServer
                     .get(channel);
             if (saslNettyServer == null) {
-                if (saslNettyServer == null) {
-                    throw new Exception("saslNettyServer was unexpectedly "
-                            + "null for channel: " + channel);
-                }
+                throw new Exception("saslNettyServer was unexpectedly "
+                        + "null for channel: " + channel);
             }
             SaslMessageToken saslTokenMessageRequest = new SaslMessageToken(
                     saslNettyServer.response(((SaslMessageToken) msg)
@@ -124,7 +122,6 @@ public class SaslStormServerHandler extends SimpleChannelUpstreamHandler {
                         + "authentication is complete.");
                 ctx.getPipeline().remove(this);
             }
-            return;
         } else {
             // Client should not be sending other-than-SASL messages before
             // SaslServerHandler has removed itself from the pipeline. Such

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java
index a2d0b26..cde4d9f 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java
@@ -17,7 +17,6 @@
  */
 package backtype.storm.messaging.netty;
 
-import java.nio.charset.Charset;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -33,7 +32,7 @@ class SaslUtils {
     public static final String DEFAULT_REALM = "default";
 
     static Map<String, String> getSaslProps() {
-        Map<String, String> props = new HashMap<String, String>();
+        Map<String, String> props = new HashMap<>();
         props.put(Sasl.POLICY_NOPLAINTEXT, "true");
         return props;
     }
@@ -62,13 +61,7 @@ class SaslUtils {
     }
 
     static String getSecretKey(Map conf) {
-        if (conf == null || conf.isEmpty())
-            return null;
-
-        String secretPayLoad = (String) conf
-                .get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD);
-
-        return secretPayLoad;
+        return conf == null || conf.isEmpty() ? null : (String) conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
index 7011b42..32c2bd7 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
@@ -56,7 +56,7 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
     @SuppressWarnings("rawtypes")
     Map storm_conf;
     int port;
-    private final ConcurrentHashMap<String, AtomicInteger> messagesEnqueued = new ConcurrentHashMap<String, AtomicInteger>();
+    private final ConcurrentHashMap<String, AtomicInteger> messagesEnqueued = new ConcurrentHashMap<>();
     private final AtomicInteger messagesDequeued = new AtomicInteger(0);
     private final AtomicInteger[] pendingMessages;
     
@@ -84,12 +84,12 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
         
         queueCount = Utils.getInt(storm_conf.get(Config.WORKER_RECEIVER_THREAD_COUNT), 1);
         roundRobinQueueId = 0;
-        taskToQueueId = new HashMap<Integer, Integer>();
+        taskToQueueId = new HashMap<>();
     
         message_queue = new LinkedBlockingQueue[queueCount];
         pendingMessages = new AtomicInteger[queueCount];
         for (int i = 0; i < queueCount; i++) {
-            message_queue[i] = new LinkedBlockingQueue<ArrayList<TaskMessage>>();
+            message_queue[i] = new LinkedBlockingQueue<>();
             pendingMessages[i] = new AtomicInteger(0);
         }
         
@@ -128,8 +128,7 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
     private ArrayList<TaskMessage>[] groupMessages(List<TaskMessage> msgs) {
         ArrayList<TaskMessage> messageGroups[] = new ArrayList[queueCount];
 
-        for (int i = 0; i < msgs.size(); i++) {
-            TaskMessage message = msgs.get(i);
+        for (TaskMessage message : msgs) {
             int task = message.task();
 
             if (task == -1) {
@@ -140,7 +139,7 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
             Integer queueId = getMessageQueueId(task);
 
             if (null == messageGroups[queueId]) {
-                messageGroups[queueId] = new ArrayList<TaskMessage>();
+                messageGroups[queueId] = new ArrayList<>();
             }
             messageGroups[queueId].add(message);
         }
@@ -158,7 +157,7 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
                     if (roundRobinQueueId == queueCount) {
                         roundRobinQueueId = 0;
                     }
-                    HashMap<Integer, Integer> newRef = new HashMap<Integer, Integer>(taskToQueueId);
+                    HashMap<Integer, Integer> newRef = new HashMap<>(taskToQueueId);
                     newRef.put(task, queueId);
                     taskToQueueId = newRef;
                 }
@@ -214,7 +213,7 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
             return closeMessage.iterator();
         }
 
-        ArrayList<TaskMessage> ret = null;
+        ArrayList<TaskMessage> ret;
         int queueId = receiverId % queueCount;
         if ((flags & 0x01) == 0x01) {
             //non-blocking
@@ -240,15 +239,14 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
    
     /**
      * register a newly created channel
-     * @param channel
+     * @param channel newly created channel
      */
     protected void addChannel(Channel channel) {
         allChannels.add(channel);
     }
     
     /**
-     * close a channel
-     * @param channel
+     * @param channel channel to close
      */
     public void closeChannel(Channel channel) {
         channel.close().awaitUninterruptibly();
@@ -326,14 +324,14 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
 
     public Object getState() {
         LOG.debug("Getting metrics for server on port {}", port);
-        HashMap<String, Object> ret = new HashMap<String, Object>();
+        HashMap<String, Object> ret = new HashMap<>();
         ret.put("dequeuedMessages", messagesDequeued.getAndSet(0));
-        ArrayList<Integer> pending = new ArrayList<Integer>(pendingMessages.length);
+        ArrayList<Integer> pending = new ArrayList<>(pendingMessages.length);
         for (AtomicInteger p: pendingMessages) {
             pending.add(p.get());
         }
         ret.put("pending", pending);
-        HashMap<String, Integer> enqueued = new HashMap<String, Integer>();
+        HashMap<String, Integer> enqueued = new HashMap<>();
         Iterator<Map.Entry<String, AtomicInteger>> it = messagesEnqueued.entrySet().iterator();
         while (it.hasNext()) {
             Map.Entry<String, AtomicInteger> ent = it.next();

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/metric/EventLoggerBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/EventLoggerBolt.java b/storm-core/src/jvm/backtype/storm/metric/EventLoggerBolt.java
index 6324237..6d43e65 100644
--- a/storm-core/src/jvm/backtype/storm/metric/EventLoggerBolt.java
+++ b/storm-core/src/jvm/backtype/storm/metric/EventLoggerBolt.java
@@ -20,18 +20,10 @@ package backtype.storm.metric;
 import backtype.storm.task.IBolt;
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.StandardOpenOption;
 import java.util.Map;
 import static backtype.storm.metric.IEventLogger.EventInfo;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/metric/FileBasedEventLogger.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/FileBasedEventLogger.java b/storm-core/src/jvm/backtype/storm/metric/FileBasedEventLogger.java
index 27aa9d9..3003427 100644
--- a/storm-core/src/jvm/backtype/storm/metric/FileBasedEventLogger.java
+++ b/storm-core/src/jvm/backtype/storm/metric/FileBasedEventLogger.java
@@ -35,7 +35,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 public class FileBasedEventLogger implements IEventLogger {
-    private static Logger LOG = LoggerFactory.getLogger(FileBasedEventLogger.class);
+    private static final Logger LOG = LoggerFactory.getLogger(FileBasedEventLogger.class);
 
     private static final int FLUSH_INTERVAL_MILLIS = 1000;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/metric/HttpForwardingMetricsConsumer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/HttpForwardingMetricsConsumer.java b/storm-core/src/jvm/backtype/storm/metric/HttpForwardingMetricsConsumer.java
index 77fe49c..4f4242a 100644
--- a/storm-core/src/jvm/backtype/storm/metric/HttpForwardingMetricsConsumer.java
+++ b/storm-core/src/jvm/backtype/storm/metric/HttpForwardingMetricsConsumer.java
@@ -21,7 +21,6 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Map;
 import java.net.URL;
-import java.net.URLConnection;
 import java.net.HttpURLConnection;
 
 import com.esotericsoftware.kryo.io.Output;

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/metric/HttpForwardingMetricsServer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/HttpForwardingMetricsServer.java b/storm-core/src/jvm/backtype/storm/metric/HttpForwardingMetricsServer.java
index 6441a39..7a8f676 100644
--- a/storm-core/src/jvm/backtype/storm/metric/HttpForwardingMetricsServer.java
+++ b/storm-core/src/jvm/backtype/storm/metric/HttpForwardingMetricsServer.java
@@ -18,7 +18,6 @@
 package backtype.storm.metric;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Map;
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/metric/IEventLogger.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/IEventLogger.java b/storm-core/src/jvm/backtype/storm/metric/IEventLogger.java
index 4f0b34e..5fd5a32 100644
--- a/storm-core/src/jvm/backtype/storm/metric/IEventLogger.java
+++ b/storm-core/src/jvm/backtype/storm/metric/IEventLogger.java
@@ -17,9 +17,7 @@
  */
 package backtype.storm.metric;
 
-import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Tuple;
 
 import java.util.Date;
 import java.util.Map;
@@ -54,11 +52,7 @@ public interface IEventLogger {
          */
         @Override
         public String toString() {
-            return new StringBuilder(new Date(Long.parseLong(ts)).toString()).append(",")
-                    .append(component).append(",")
-                    .append(task).append(",")
-                    .append(messageId).append(",")
-                    .append(values).toString();
+            return new Date(Long.parseLong(ts)).toString() + "," + component + "," + task + "," + messageId + "," + values;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/metric/LoggingMetricsConsumer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/LoggingMetricsConsumer.java b/storm-core/src/jvm/backtype/storm/metric/LoggingMetricsConsumer.java
index c1c7c0a..98fb527 100644
--- a/storm-core/src/jvm/backtype/storm/metric/LoggingMetricsConsumer.java
+++ b/storm-core/src/jvm/backtype/storm/metric/LoggingMetricsConsumer.java
@@ -26,7 +26,6 @@ import java.util.Map;
 import backtype.storm.metric.api.IMetricsConsumer;
 import backtype.storm.task.IErrorReporter;
 import backtype.storm.task.TopologyContext;
-import backtype.storm.utils.Utils;
 
 /*
  * Listens for all metrics, dumps them to log

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/metric/MetricsConsumerBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/MetricsConsumerBolt.java b/storm-core/src/jvm/backtype/storm/metric/MetricsConsumerBolt.java
index 5a40c27..e3ec069 100644
--- a/storm-core/src/jvm/backtype/storm/metric/MetricsConsumerBolt.java
+++ b/storm-core/src/jvm/backtype/storm/metric/MetricsConsumerBolt.java
@@ -20,7 +20,6 @@ package backtype.storm.metric;
 import backtype.storm.Config;
 import backtype.storm.metric.api.IMetricsConsumer;
 import backtype.storm.task.IBolt;
-import backtype.storm.task.IErrorReporter;
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.tuple.Tuple;

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/metric/SystemBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/SystemBolt.java b/storm-core/src/jvm/backtype/storm/metric/SystemBolt.java
index d0bbe44..5cba7d3 100644
--- a/storm-core/src/jvm/backtype/storm/metric/SystemBolt.java
+++ b/storm-core/src/jvm/backtype/storm/metric/SystemBolt.java
@@ -18,7 +18,6 @@
 package backtype.storm.metric;
 
 import backtype.storm.Config;
-import backtype.storm.metric.api.AssignableMetric;
 import backtype.storm.metric.api.IMetric;
 import backtype.storm.task.IBolt;
 import backtype.storm.task.OutputCollector;
@@ -28,12 +27,9 @@ import backtype.storm.utils.Utils;
 import clojure.lang.AFn;
 import clojure.lang.IFn;
 import clojure.lang.RT;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.lang.management.*;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 
@@ -41,7 +37,6 @@ import java.util.Map;
 // TaskID is always -1, therefore you can only send-unanchored tuples to co-located SystemBolt.
 // This bolt was conceived to export worker stats via metrics api.
 public class SystemBolt implements IBolt {
-    private static Logger LOG = LoggerFactory.getLogger(SystemBolt.class);
     private static boolean _prepareWasCalled = false;
 
     private static class MemoryUsageMetric implements IMetric {

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/metric/api/CountMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/api/CountMetric.java b/storm-core/src/jvm/backtype/storm/metric/api/CountMetric.java
index dd048b8..6b317da 100644
--- a/storm-core/src/jvm/backtype/storm/metric/api/CountMetric.java
+++ b/storm-core/src/jvm/backtype/storm/metric/api/CountMetric.java
@@ -17,8 +17,6 @@
  */
 package backtype.storm.metric.api;
 
-import backtype.storm.metric.api.IMetric;
-
 public class CountMetric implements IMetric {
     long _value = 0;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/metric/api/MeanReducer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/api/MeanReducer.java b/storm-core/src/jvm/backtype/storm/metric/api/MeanReducer.java
index e25e26d..fde7053 100644
--- a/storm-core/src/jvm/backtype/storm/metric/api/MeanReducer.java
+++ b/storm-core/src/jvm/backtype/storm/metric/api/MeanReducer.java
@@ -17,8 +17,6 @@
  */
 package backtype.storm.metric.api;
 
-import backtype.storm.metric.api.IReducer;
-
 class MeanReducerState {
     public int count = 0;
     public double sum = 0.0;
@@ -47,7 +45,7 @@ public class MeanReducer implements IReducer<MeanReducerState> {
 
     public Object extractResult(MeanReducerState acc) {
         if(acc.count > 0) {
-            return new Double(acc.sum / (double)acc.count);
+            return acc.sum / (double) acc.count;
         } else {
             return null;
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/metric/api/MultiCountMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/api/MultiCountMetric.java b/storm-core/src/jvm/backtype/storm/metric/api/MultiCountMetric.java
index c420a16..a6077e6 100644
--- a/storm-core/src/jvm/backtype/storm/metric/api/MultiCountMetric.java
+++ b/storm-core/src/jvm/backtype/storm/metric/api/MultiCountMetric.java
@@ -22,7 +22,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 public class MultiCountMetric implements IMetric {
-    Map<String, CountMetric> _value = new HashMap();
+    Map<String, CountMetric> _value = new HashMap<>();
 
     public MultiCountMetric() {
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/metric/api/MultiReducedMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/api/MultiReducedMetric.java b/storm-core/src/jvm/backtype/storm/metric/api/MultiReducedMetric.java
index 530b168..d9d3a02 100644
--- a/storm-core/src/jvm/backtype/storm/metric/api/MultiReducedMetric.java
+++ b/storm-core/src/jvm/backtype/storm/metric/api/MultiReducedMetric.java
@@ -22,7 +22,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 public class MultiReducedMetric implements IMetric {
-    Map<String, ReducedMetric> _value = new HashMap();
+    Map<String, ReducedMetric> _value = new HashMap<>();
     IReducer _reducer;
 
     public MultiReducedMetric(IReducer reducer) {

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/metric/api/rpc/CountShellMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/api/rpc/CountShellMetric.java b/storm-core/src/jvm/backtype/storm/metric/api/rpc/CountShellMetric.java
index def74c2..eae982b 100644
--- a/storm-core/src/jvm/backtype/storm/metric/api/rpc/CountShellMetric.java
+++ b/storm-core/src/jvm/backtype/storm/metric/api/rpc/CountShellMetric.java
@@ -21,8 +21,7 @@ import backtype.storm.metric.api.CountMetric;
 
 public class CountShellMetric extends CountMetric implements IShellMetric {
     /***
-     * @param
-     *  params should be null or long
+     * @param value should be null or long
      *  if value is null, it will call incr()
      *  if value is long, it will call incrBy((long)params)
      * */

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/networktopography/AbstractDNSToSwitchMapping.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/networktopography/AbstractDNSToSwitchMapping.java b/storm-core/src/jvm/backtype/storm/networktopography/AbstractDNSToSwitchMapping.java
index e91e54a..b17ca40 100644
--- a/storm-core/src/jvm/backtype/storm/networktopography/AbstractDNSToSwitchMapping.java
+++ b/storm-core/src/jvm/backtype/storm/networktopography/AbstractDNSToSwitchMapping.java
@@ -75,7 +75,7 @@ public abstract class AbstractDNSToSwitchMapping
         builder.append("Mapping: ").append(toString()).append("\n");
         if (rack != null) {
             builder.append("Map:\n");
-            Set<String> switches = new HashSet<String>();
+            Set<String> switches = new HashSet<>();
             for (Map.Entry<String, String> entry : rack.entrySet()) {
                 builder.append("  ")
                         .append(entry.getKey())

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/networktopography/DefaultRackDNSToSwitchMapping.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/networktopography/DefaultRackDNSToSwitchMapping.java b/storm-core/src/jvm/backtype/storm/networktopography/DefaultRackDNSToSwitchMapping.java
index 8e1e521..18eac60 100644
--- a/storm-core/src/jvm/backtype/storm/networktopography/DefaultRackDNSToSwitchMapping.java
+++ b/storm-core/src/jvm/backtype/storm/networktopography/DefaultRackDNSToSwitchMapping.java
@@ -28,12 +28,12 @@ import java.util.concurrent.ConcurrentHashMap;
  */
 public final class DefaultRackDNSToSwitchMapping extends AbstractDNSToSwitchMapping {
 
-    private Map<String, String> mappingCache = new ConcurrentHashMap<String, String>();
+    private Map<String, String> mappingCache = new ConcurrentHashMap<>();
 
     @Override
     public Map<String,String> resolve(List<String> names) {
 
-        Map<String, String> m = new HashMap<String, String>();
+        Map<String, String> m = new HashMap<>();
         if (names.isEmpty()) {
             //name list is empty, return an empty map
             return m;

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/nimbus/ILeaderElector.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/nimbus/ILeaderElector.java b/storm-core/src/jvm/backtype/storm/nimbus/ILeaderElector.java
index 1148be2..5b54d46 100644
--- a/storm-core/src/jvm/backtype/storm/nimbus/ILeaderElector.java
+++ b/storm-core/src/jvm/backtype/storm/nimbus/ILeaderElector.java
@@ -17,11 +17,7 @@
  */
 package backtype.storm.nimbus;
 
-import org.apache.curator.framework.CuratorFramework;
-
 import java.io.Closeable;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
 import java.util.List;
 import java.util.Map;
 
@@ -32,7 +28,7 @@ public interface ILeaderElector extends Closeable {
 
     /**
      * Method guaranteed to be called as part of initialization of leader elector instance.
-     * @param conf
+     * @param conf configuration
      */
     void prepare(Map conf);
 

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/nimbus/NimbusInfo.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/nimbus/NimbusInfo.java b/storm-core/src/jvm/backtype/storm/nimbus/NimbusInfo.java
index 77c0d7a..539df62 100644
--- a/storm-core/src/jvm/backtype/storm/nimbus/NimbusInfo.java
+++ b/storm-core/src/jvm/backtype/storm/nimbus/NimbusInfo.java
@@ -94,9 +94,7 @@ public class NimbusInfo implements Serializable {
 
         if (isLeader != that.isLeader) return false;
         if (port != that.port) return false;
-        if (!host.equals(that.host)) return false;
-
-        return true;
+        return host.equals(that.host);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java b/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
index b07576f..b3028e9 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
@@ -265,14 +265,13 @@ public class Cluster {
      * @return the number of workers assigned to this topology.
      */
     public int getAssignedNumWorkers(TopologyDetails topology) {
-        SchedulerAssignment assignment = this.getAssignmentById(topology.getId());
-        if (topology == null || assignment == null) {
+        SchedulerAssignment assignment = topology != null ? this.getAssignmentById(topology.getId()) : null;
+        if (assignment == null) {
             return 0;
         }
 
         Set<WorkerSlot> slots = new HashSet<WorkerSlot>();
         slots.addAll(assignment.getExecutorToSlot().values());
-
         return slots.size();
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/scheduler/SchedulerAssignmentImpl.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/SchedulerAssignmentImpl.java b/storm-core/src/jvm/backtype/storm/scheduler/SchedulerAssignmentImpl.java
index 08af4b7..e06abf8 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/SchedulerAssignmentImpl.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/SchedulerAssignmentImpl.java
@@ -38,7 +38,7 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment {
     
     public SchedulerAssignmentImpl(String topologyId, Map<ExecutorDetails, WorkerSlot> executorToSlots) {
         this.topologyId = topologyId;
-        this.executorToSlot = new HashMap<ExecutorDetails, WorkerSlot>(0);
+        this.executorToSlot = new HashMap<>(0);
         if (executorToSlots != null) {
             this.executorToSlot.putAll(executorToSlots);
         }
@@ -46,13 +46,11 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment {
 
     @Override
     public Set<WorkerSlot> getSlots() {
-        return new HashSet(executorToSlot.values());
+        return new HashSet<>(executorToSlot.values());
     }    
     
     /**
      * Assign the slot to executors.
-     * @param slot
-     * @param executors
      */
     public void assign(WorkerSlot slot, Collection<ExecutorDetails> executors) {
         for (ExecutorDetails executor : executors) {
@@ -62,10 +60,9 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment {
     
     /**
      * Release the slot occupied by this assignment.
-     * @param slot
      */
     public void unassignBySlot(WorkerSlot slot) {
-        List<ExecutorDetails> executors = new ArrayList<ExecutorDetails>();
+        List<ExecutorDetails> executors = new ArrayList<>();
         for (ExecutorDetails executor : this.executorToSlot.keySet()) {
             WorkerSlot ws = this.executorToSlot.get(executor);
             if (ws.equals(slot)) {
@@ -80,9 +77,8 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment {
     }
 
     /**
-     * Does this slot occupied by this assignment?
      * @param slot
-     * @return
+     * @return true if slot is occupied by this assignment
      */
     public boolean isSlotOccupied(WorkerSlot slot) {
         return this.executorToSlot.containsValue(slot);
@@ -101,8 +97,7 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment {
     }
 
     /**
-     * Return the executors covered by this assignments
-     * @return
+     * @return the executors covered by this assignments
      */
     public Set<ExecutorDetails> getExecutors() {
         return this.executorToSlot.keySet();

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/scheduler/SupervisorDetails.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/SupervisorDetails.java b/storm-core/src/jvm/backtype/storm/scheduler/SupervisorDetails.java
index 65e6e9b..a748e11 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/SupervisorDetails.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/SupervisorDetails.java
@@ -44,7 +44,7 @@ public class SupervisorDetails {
      */
     Set<Integer> allPorts;
     /**
-     * Map containing a manifest of resources for the node the superivsor resides
+     * Map containing a manifest of resources for the node the supervisor resides
      */
     private Map<String, Double> _total_resources;
 
@@ -58,7 +58,7 @@ public class SupervisorDetails {
         if(allPorts!=null) {
             setAllPorts(allPorts);
         } else {
-            this.allPorts = new HashSet();
+            this.allPorts = new HashSet<>();
         }
         this._total_resources = total_resources;
         LOG.debug("Creating a new supervisor ({}-{}) with resources: {}", this.host, this.id, total_resources);
@@ -86,7 +86,7 @@ public class SupervisorDetails {
     }
 
     private void setAllPorts(Collection<Number> allPorts) {
-        this.allPorts = new HashSet<Integer>();
+        this.allPorts = new HashSet<>();
         if(allPorts!=null) {
             for(Number n: allPorts) {
                 this.allPorts.add(n.intValue());

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java b/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java
index 443bf3f..0828a73 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java
@@ -29,10 +29,10 @@ public class Topologies {
     Map<String, Map<String, Component>> _allComponents;
 
     public Topologies(Map<String, TopologyDetails> topologies) {
-        if(topologies==null) topologies = new HashMap();
-        this.topologies = new HashMap<String, TopologyDetails>(topologies.size());
+        if(topologies==null) topologies = new HashMap<>();
+        this.topologies = new HashMap<>(topologies.size());
         this.topologies.putAll(topologies);
-        this.nameToId = new HashMap<String, String>(topologies.size());
+        this.nameToId = new HashMap<>(topologies.size());
         
         for (Map.Entry<String, TopologyDetails> entry : topologies.entrySet()) {
             TopologyDetails topology = entry.getValue();

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java b/storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java
index 01fa0df..95aa5c8 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java
@@ -59,7 +59,7 @@ public class TopologyDetails {
 
     public TopologyDetails(String topologyId, Map topologyConf, StormTopology topology, int numWorkers, Map<ExecutorDetails, String> executorToComponents) {
         this(topologyId, topologyConf, topology, numWorkers);
-        this.executorToComponent = new HashMap<ExecutorDetails, String>(0);
+        this.executorToComponent = new HashMap<>(0);
         if (executorToComponents != null) {
             this.executorToComponent.putAll(executorToComponents);
         }
@@ -88,7 +88,7 @@ public class TopologyDetails {
     }
 
     public Map<ExecutorDetails, String> selectExecutorToComponent(Collection<ExecutorDetails> executors) {
-        Map<ExecutorDetails, String> ret = new HashMap<ExecutorDetails, String>(executors.size());
+        Map<ExecutorDetails, String> ret = new HashMap<>(executors.size());
         for (ExecutorDetails executor : executors) {
             String compId = this.executorToComponent.get(executor);
             if (compId != null) {
@@ -104,7 +104,7 @@ public class TopologyDetails {
     }
 
     private void initResourceList() {
-        _resourceList = new HashMap<ExecutorDetails, Map<String, Double>>();
+        _resourceList = new HashMap<>();
         // Extract bolt memory info
         if (this.topology.get_bolts() != null) {
             for (Map.Entry<String, Bolt> bolt : this.topology.get_bolts().entrySet()) {
@@ -136,7 +136,7 @@ public class TopologyDetails {
         }
         //schedule tasks that are not part of components returned from topology.get_spout or topology.getbolt (AKA sys tasks most specifically __acker tasks)
         for(ExecutorDetails exec : this.getExecutors()) {
-            if (_resourceList.containsKey(exec) == false) {
+            if (!_resourceList.containsKey(exec)) {
                 LOG.debug(
                         "Scheduling {} {} with memory requirement as 'on heap' - {} and 'off heap' - {} and CPU requirement as {}",
                         this.getExecutorToComponent().get(exec),
@@ -166,7 +166,7 @@ public class TopologyDetails {
      * @return a map of components
      */
     public Map<String, Component> getComponents() {
-        Map<String, Component> all_comp = new HashMap<String, Component>();
+        Map<String, Component> all_comp = new HashMap<>();
 
         StormTopology storm_topo = this.topology;
         // spouts
@@ -174,7 +174,7 @@ public class TopologyDetails {
             for (Map.Entry<String, SpoutSpec> spoutEntry : storm_topo
                     .get_spouts().entrySet()) {
                 if (!Utils.isSystemId(spoutEntry.getKey())) {
-                    Component newComp = null;
+                    Component newComp;
                     if (all_comp.containsKey(spoutEntry.getKey())) {
                         newComp = all_comp.get(spoutEntry.getKey());
                         newComp.execs = componentToExecs(newComp.id);
@@ -209,7 +209,7 @@ public class TopologyDetails {
             for (Map.Entry<String, Bolt> boltEntry : storm_topo.get_bolts()
                     .entrySet()) {
                 if (!Utils.isSystemId(boltEntry.getKey())) {
-                    Component newComp = null;
+                    Component newComp;
                     if (all_comp.containsKey(boltEntry.getKey())) {
                         newComp = all_comp.get(boltEntry.getKey());
                         newComp.execs = componentToExecs(newComp.id);
@@ -297,7 +297,7 @@ public class TopologyDetails {
      *  for all tasks in topology topoId.
      */
     public Map<ExecutorDetails, Double> getTotalMemoryResourceList() {
-        Map<ExecutorDetails, Double> ret = new HashMap<ExecutorDetails, Double>();
+        Map<ExecutorDetails, Double> ret = new HashMap<>();
         for (ExecutorDetails exec : _resourceList.keySet()) {
             ret.put(exec, getTotalMemReqTask(exec));
         }
@@ -306,7 +306,6 @@ public class TopologyDetails {
 
     /**
      * Get the total CPU requirement for executor
-     * @param exec
      * @return Double the total about of cpu requirement for executor
      */
     public Double getTotalCpuReqTask(ExecutorDetails exec) {
@@ -386,17 +385,11 @@ public class TopologyDetails {
      * @return Boolean whether or not a certain ExecutorDetail is included in the _resourceList.
      */
     public boolean hasExecInTopo(ExecutorDetails exec) {
-        if (_resourceList != null) { // null is possible if the first constructor of TopologyDetails is used
-            return _resourceList.containsKey(exec);
-        } else {
-            return false;
-        }
+        return _resourceList != null && _resourceList.containsKey(exec);
     }
 
     /**
      * add resource requirements for a executor
-     * @param exec
-     * @param resourceList
      */
     public void addResourcesForExec(ExecutorDetails exec, Map<String, Double> resourceList) {
         if (hasExecInTopo(exec)) {
@@ -408,10 +401,9 @@ public class TopologyDetails {
 
     /**
      * Add default resource requirements for a executor
-     * @param exec
      */
     public void addDefaultResforExec(ExecutorDetails exec) {
-        Map<String, Double> defaultResourceList = new HashMap<String, Double>();
+        Map<String, Double> defaultResourceList = new HashMap<>();
         defaultResourceList.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT,
                         Utils.getDouble(topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT), null));
         defaultResourceList.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB,
@@ -427,7 +419,7 @@ public class TopologyDetails {
     }
 
     /**
-     * initalizes the scheduler member variable by extracting what scheduler
+     * initializes the scheduler member variable by extracting what scheduler
      * this topology is going to use from topologyConf
      */
     private void initConfigs() {

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/scheduler/multitenant/DefaultPool.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/multitenant/DefaultPool.java b/storm-core/src/jvm/backtype/storm/scheduler/multitenant/DefaultPool.java
index 3053b5b..2e418e9 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/multitenant/DefaultPool.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/multitenant/DefaultPool.java
@@ -37,8 +37,8 @@ import backtype.storm.scheduler.WorkerSlot;
  */
 public class DefaultPool extends NodePool {
   private static final Logger LOG = LoggerFactory.getLogger(DefaultPool.class);
-  private Set<Node> _nodes = new HashSet<Node>();
-  private HashMap<String, TopologyDetails> _tds = new HashMap<String, TopologyDetails>();
+  private Set<Node> _nodes = new HashSet<>();
+  private HashMap<String, TopologyDetails> _tds = new HashMap<>();
   
   @Override
   public void addTopology(TopologyDetails td) {
@@ -61,8 +61,8 @@ public class DefaultPool extends NodePool {
 
   @Override
   public Collection<Node> takeNodes(int nodesNeeded) {
-    HashSet<Node> ret = new HashSet<Node>();
-    LinkedList<Node> sortedNodes = new LinkedList<Node>(_nodes);
+    HashSet<Node> ret = new HashSet<>();
+    LinkedList<Node> sortedNodes = new LinkedList<>(_nodes);
     Collections.sort(sortedNodes, Node.FREE_NODE_COMPARATOR_DEC);
     for (Node n: sortedNodes) {
       if (nodesNeeded <= ret.size()) {
@@ -95,7 +95,7 @@ public class DefaultPool extends NodePool {
   public NodeAndSlotCounts getNodeAndSlotCountIfSlotsWereTaken(int slotsNeeded) {
     int nodesFound = 0;
     int slotsFound = 0;
-    LinkedList<Node> sortedNodes = new LinkedList<Node>(_nodes);
+    LinkedList<Node> sortedNodes = new LinkedList<>(_nodes);
     Collections.sort(sortedNodes, Node.FREE_NODE_COMPARATOR_DEC);
     for (Node n: sortedNodes) {
       if (slotsNeeded <= 0) {
@@ -113,8 +113,8 @@ public class DefaultPool extends NodePool {
   
   @Override
   public Collection<Node> takeNodesBySlots(int slotsNeeded) {
-    HashSet<Node> ret = new HashSet<Node>();
-    LinkedList<Node> sortedNodes = new LinkedList<Node>(_nodes);
+    HashSet<Node> ret = new HashSet<>();
+    LinkedList<Node> sortedNodes = new LinkedList<>(_nodes);
     Collections.sort(sortedNodes, Node.FREE_NODE_COMPARATOR_DEC);
     for (Node n: sortedNodes) {
       if (slotsNeeded <= 0) {
@@ -148,8 +148,8 @@ public class DefaultPool extends NodePool {
         }
         int slotsToUse = Math.min(slotsRequested - slotsUsed, slotsFree + slotsAvailable);
         int executorsNotRunning = _cluster.getUnassignedExecutors(td).size();
-        LOG.debug("Slots... requested {} used {} free {} available {} to be used {}, executors not running {}", 
-            new Object[] {slotsRequested, slotsUsed, slotsFree, slotsAvailable, slotsToUse, executorsNotRunning}); 
+        LOG.debug("Slots... requested {} used {} free {} available {} to be used {}, executors not running {}",
+                slotsRequested, slotsUsed, slotsFree, slotsAvailable, slotsToUse, executorsNotRunning);
         if (slotsToUse <= 0) {
           if (executorsNotRunning > 0) {
             _cluster.setStatus(topId,"Not fully scheduled (No free slots in default pool) "+executorsNotRunning+" executors not scheduled");
@@ -180,9 +180,9 @@ public class DefaultPool extends NodePool {
         RoundRobinSlotScheduler slotSched = 
           new RoundRobinSlotScheduler(td, slotsToUse, _cluster);
         
-        LinkedList<Node> nodes = new LinkedList<Node>(_nodes);
+        LinkedList<Node> nodes = new LinkedList<>(_nodes);
         while (true) {
-          Node n = null;
+          Node n;
           do {
             if (nodes.isEmpty()) {
               throw new IllegalStateException("This should not happen, we" +

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/scheduler/multitenant/FreePool.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/multitenant/FreePool.java b/storm-core/src/jvm/backtype/storm/scheduler/multitenant/FreePool.java
index c625895..456900b 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/multitenant/FreePool.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/multitenant/FreePool.java
@@ -35,7 +35,7 @@ import backtype.storm.scheduler.TopologyDetails;
  */
 public class FreePool extends NodePool {
   private static final Logger LOG = LoggerFactory.getLogger(FreePool.class);
-  private Set<Node> _nodes = new HashSet<Node>();
+  private Set<Node> _nodes = new HashSet<>();
   private int _totalSlots = 0;
 
   @Override
@@ -63,7 +63,7 @@ public class FreePool extends NodePool {
   
   @Override
   public Collection<Node> takeNodes(int nodesNeeded) {
-    HashSet<Node> ret = new HashSet<Node>();
+    HashSet<Node> ret = new HashSet<>();
     Iterator<Node> it = _nodes.iterator();
     while (it.hasNext() && nodesNeeded > ret.size()) {
       Node n = it.next();
@@ -86,7 +86,7 @@ public class FreePool extends NodePool {
 
   @Override
   public Collection<Node> takeNodesBySlots(int slotsNeeded) {
-    HashSet<Node> ret = new HashSet<Node>();
+    HashSet<Node> ret = new HashSet<>();
     Iterator<Node> it = _nodes.iterator();
     while (it.hasNext() && slotsNeeded > 0) {
       Node n = it.next();

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/scheduler/multitenant/IsolatedPool.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/multitenant/IsolatedPool.java b/storm-core/src/jvm/backtype/storm/scheduler/multitenant/IsolatedPool.java
index dc7eded..25a6f25 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/multitenant/IsolatedPool.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/multitenant/IsolatedPool.java
@@ -41,9 +41,9 @@ import backtype.storm.scheduler.WorkerSlot;
  */
 public class IsolatedPool extends NodePool {
   private static final Logger LOG = LoggerFactory.getLogger(IsolatedPool.class);
-  private Map<String, Set<Node>> _topologyIdToNodes = new HashMap<String, Set<Node>>();
-  private HashMap<String, TopologyDetails> _tds = new HashMap<String, TopologyDetails>();
-  private HashSet<String> _isolated = new HashSet<String>();
+  private Map<String, Set<Node>> _topologyIdToNodes = new HashMap<>();
+  private HashMap<String, TopologyDetails> _tds = new HashMap<>();
+  private HashSet<String> _isolated = new HashSet<>();
   private int _maxNodes;
   private int _usedNodes;
 
@@ -57,7 +57,7 @@ public class IsolatedPool extends NodePool {
     String topId = td.getId();
     LOG.debug("Adding in Topology {}", topId);
     SchedulerAssignment assignment = _cluster.getAssignmentById(topId);
-    Set<Node> assignedNodes = new HashSet<Node>();
+    Set<Node> assignedNodes = new HashSet<>();
     if (assignment != null) {
       for (WorkerSlot ws: assignment.getSlots()) {
         Node n = _nodeIdToNode.get(ws.getNodeId());
@@ -96,7 +96,7 @@ public class IsolatedPool extends NodePool {
         LOG.debug("Scheduling topology {}",topId);
         Set<Node> allNodes = _topologyIdToNodes.get(topId);
         Number nodesRequested = (Number) td.getConf().get(Config.TOPOLOGY_ISOLATED_MACHINES);
-        int slotsToUse = 0;
+        int slotsToUse;
         if (nodesRequested == null) {
           slotsToUse = getNodesForNotIsolatedTop(td, allNodes, lesserPools);
         } else {
@@ -111,7 +111,7 @@ public class IsolatedPool extends NodePool {
         RoundRobinSlotScheduler slotSched = 
           new RoundRobinSlotScheduler(td, slotsToUse, _cluster);
         
-        LinkedList<Node> sortedNodes = new LinkedList<Node>(allNodes);
+        LinkedList<Node> sortedNodes = new LinkedList<>(allNodes);
         Collections.sort(sortedNodes, Node.FREE_NODE_COMPARATOR_DEC);
 
         LOG.debug("Nodes sorted by free space {}", sortedNodes);
@@ -157,9 +157,9 @@ public class IsolatedPool extends NodePool {
     int nodesUsed = _topologyIdToNodes.get(topId).size();
     int nodesNeeded = nodesRequested - nodesUsed;
     LOG.debug("Nodes... requested {} used {} available from us {} " +
-        "avail from other {} needed {}", new Object[] {nodesRequested, 
-        nodesUsed, nodesFromUsAvailable, nodesFromOthersAvailable,
-        nodesNeeded});
+        "avail from other {} needed {}", nodesRequested,
+            nodesUsed, nodesFromUsAvailable, nodesFromOthersAvailable,
+            nodesNeeded);
     if ((nodesNeeded - nodesFromUsAvailable) > (_maxNodes - _usedNodes)) {
       _cluster.setStatus(topId,"Max Nodes("+_maxNodes+") for this user would be exceeded. "
         + ((nodesNeeded - nodesFromUsAvailable) - (_maxNodes - _usedNodes)) 
@@ -224,8 +224,8 @@ public class IsolatedPool extends NodePool {
       slotsAvailable = NodePool.slotsAvailable(lesserPools);
     }
     int slotsToUse = Math.min(slotsRequested - slotsUsed, slotsFree + slotsAvailable);
-    LOG.debug("Slots... requested {} used {} free {} available {} to be used {}", 
-        new Object[] {slotsRequested, slotsUsed, slotsFree, slotsAvailable, slotsToUse});
+    LOG.debug("Slots... requested {} used {} free {} available {} to be used {}",
+            slotsRequested, slotsUsed, slotsFree, slotsAvailable, slotsToUse);
     if (slotsToUse <= 0) {
       _cluster.setStatus(topId, "Not Enough Slots Available to Schedule Topology");
       return 0;
@@ -233,7 +233,7 @@ public class IsolatedPool extends NodePool {
     int slotsNeeded = slotsToUse - slotsFree;
     int numNewNodes = NodePool.getNodeCountIfSlotsWereTaken(slotsNeeded, lesserPools);
     LOG.debug("Nodes... new {} used {} max {}",
-        new Object[]{numNewNodes, _usedNodes, _maxNodes});
+            numNewNodes, _usedNodes, _maxNodes);
     if ((numNewNodes + _usedNodes) > _maxNodes) {
       _cluster.setStatus(topId,"Max Nodes("+_maxNodes+") for this user would be exceeded. " +
       (numNewNodes - (_maxNodes - _usedNodes)) + " more nodes needed to run topology.");
@@ -249,7 +249,7 @@ public class IsolatedPool extends NodePool {
   @Override
   public Collection<Node> takeNodes(int nodesNeeded) {
     LOG.debug("Taking {} from {}", nodesNeeded, this);
-    HashSet<Node> ret = new HashSet<Node>();
+    HashSet<Node> ret = new HashSet<>();
     for (Entry<String, Set<Node>> entry: _topologyIdToNodes.entrySet()) {
       if (!_isolated.contains(entry.getKey())) {
         Iterator<Node> it = entry.getValue().iterator();
@@ -293,7 +293,7 @@ public class IsolatedPool extends NodePool {
 
   @Override
   public Collection<Node> takeNodesBySlots(int slotsNeeded) {
-    HashSet<Node> ret = new HashSet<Node>();
+    HashSet<Node> ret = new HashSet<>();
     for (Entry<String, Set<Node>> entry: _topologyIdToNodes.entrySet()) {
       if (!_isolated.contains(entry.getKey())) {
         Iterator<Node> it = entry.getValue().iterator();
@@ -321,9 +321,7 @@ public class IsolatedPool extends NodePool {
     int slotsFound = 0;
     for (Entry<String, Set<Node>> entry: _topologyIdToNodes.entrySet()) {
       if (!_isolated.contains(entry.getKey())) {
-        Iterator<Node> it = entry.getValue().iterator();
-        while (it.hasNext()) {
-          Node n = it.next();
+        for (Node n : entry.getValue()) {
           if (n.isAlive()) {
             nodesFound++;
             int totalSlotsFree = n.totalSlots();

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/scheduler/multitenant/MultitenantScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/multitenant/MultitenantScheduler.java b/storm-core/src/jvm/backtype/storm/scheduler/multitenant/MultitenantScheduler.java
index 320b388..6b77c63 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/multitenant/MultitenantScheduler.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/multitenant/MultitenantScheduler.java
@@ -44,9 +44,9 @@ public class MultitenantScheduler implements IScheduler {
   private Map<String, Number> getUserConf() {
     Map<String, Number> ret = (Map<String, Number>)_conf.get(Config.MULTITENANT_SCHEDULER_USER_POOLS);
     if (ret == null) {
-      ret = new HashMap<String, Number>();
+      ret = new HashMap<>();
     } else {
-      ret = new HashMap<String, Number>(ret); 
+      ret = new HashMap<>(ret);
     }
 
     Map fromFile = Utils.findAndReadConfigFile("multitenant-scheduler.yaml", false);
@@ -65,7 +65,7 @@ public class MultitenantScheduler implements IScheduler {
     
     Map<String, Number> userConf = getUserConf();
     
-    Map<String, IsolatedPool> userPools = new HashMap<String, IsolatedPool>();
+    Map<String, IsolatedPool> userPools = new HashMap<>();
     for (Map.Entry<String, Number> entry : userConf.entrySet()) {
       userPools.put(entry.getKey(), new IsolatedPool(entry.getValue().intValue()));
     }


[2/5] storm git commit: STORM-1164. Code cleanup for typos, warnings and conciseness

Posted by sr...@apache.org.
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/scheduler/multitenant/Node.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/multitenant/Node.java b/storm-core/src/jvm/backtype/storm/scheduler/multitenant/Node.java
index 883c65f..6c2f06b 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/multitenant/Node.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/multitenant/Node.java
@@ -40,8 +40,8 @@ import backtype.storm.scheduler.WorkerSlot;
  */
 public class Node {
   private static final Logger LOG = LoggerFactory.getLogger(Node.class);
-  private Map<String, Set<WorkerSlot>> _topIdToUsedSlots = new HashMap<String,Set<WorkerSlot>>();
-  private Set<WorkerSlot> _freeSlots = new HashSet<WorkerSlot>();
+  private Map<String, Set<WorkerSlot>> _topIdToUsedSlots = new HashMap<>();
+  private Set<WorkerSlot> _freeSlots = new HashSet<>();
   private final String _nodeId;
   private boolean _isAlive;
   
@@ -143,7 +143,7 @@ public class Node {
     }
     Set<WorkerSlot> usedSlots = _topIdToUsedSlots.get(topId);
     if (usedSlots == null) {
-      usedSlots = new HashSet<WorkerSlot>();
+      usedSlots = new HashSet<>();
       _topIdToUsedSlots.put(topId, usedSlots);
     }
     usedSlots.add(ws);
@@ -164,7 +164,7 @@ public class Node {
         _freeSlots.addAll(entry.getValue());
       }
     }
-    _topIdToUsedSlots = new HashMap<String,Set<WorkerSlot>>();
+    _topIdToUsedSlots = new HashMap<>();
   }
   
   /**
@@ -242,10 +242,7 @@ public class Node {
   
   @Override
   public boolean equals(Object other) {
-    if (other instanceof Node) {
-      return _nodeId.equals(((Node)other)._nodeId);
-    }
-    return false;
+      return other instanceof Node && _nodeId.equals(((Node) other)._nodeId);
   }
   
   @Override
@@ -295,13 +292,13 @@ public class Node {
   }
   
   public static Map<String, Node> getAllNodesFrom(Cluster cluster) {
-    Map<String, Node> nodeIdToNode = new HashMap<String, Node>();
+    Map<String, Node> nodeIdToNode = new HashMap<>();
     for (SupervisorDetails sup : cluster.getSupervisors().values()) {
       //Node ID and supervisor ID are the same.
       String id = sup.getId();
       boolean isAlive = !cluster.isBlackListed(id);
       LOG.debug("Found a {} Node {} {}",
-          new Object[] {isAlive? "living":"dead", id, sup.getAllPorts()});
+              isAlive? "living":"dead", id, sup.getAllPorts());
       nodeIdToNode.put(id, new Node(id, sup.getAllPorts(), isAlive));
     }
     

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/scheduler/multitenant/NodePool.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/multitenant/NodePool.java b/storm-core/src/jvm/backtype/storm/scheduler/multitenant/NodePool.java
index 21d1577..5a46df5 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/multitenant/NodePool.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/multitenant/NodePool.java
@@ -81,7 +81,7 @@ public abstract class NodePool {
       
       Map<ExecutorDetails, String> execToComp = td.getExecutorToComponent();
       SchedulerAssignment assignment = _cluster.getAssignmentById(_topId);
-      _nodeToComps = new HashMap<String, Set<String>>();
+      _nodeToComps = new HashMap<>();
 
       if (assignment != null) {
         Map<ExecutorDetails, WorkerSlot> execToSlot = assignment.getExecutorToSlot();
@@ -90,14 +90,14 @@ public abstract class NodePool {
           String nodeId = entry.getValue().getNodeId();
           Set<String> comps = _nodeToComps.get(nodeId);
           if (comps == null) {
-            comps = new HashSet<String>();
+            comps = new HashSet<>();
             _nodeToComps.put(nodeId, comps);
           }
           comps.add(execToComp.get(entry.getKey()));
         }
       }
       
-      _spreadToSchedule = new HashMap<String, List<ExecutorDetails>>();
+      _spreadToSchedule = new HashMap<>();
       List<String> spreadComps = (List<String>)td.getConf().get(Config.TOPOLOGY_SPREAD_COMPONENTS);
       if (spreadComps != null) {
         for (String comp: spreadComps) {
@@ -105,7 +105,7 @@ public abstract class NodePool {
         }
       }
       
-      _slots = new LinkedList<Set<ExecutorDetails>>();
+      _slots = new LinkedList<>();
       for (int i = 0; i < slotsToUse; i++) {
         _slots.add(new HashSet<ExecutorDetails>());
       }
@@ -118,7 +118,7 @@ public abstract class NodePool {
           _spreadToSchedule.get(entry.getKey()).addAll(entry.getValue());
         } else {
           for (ExecutorDetails ed: entry.getValue()) {
-            LOG.debug("Assigning {} {} to slot {}", new Object[]{entry.getKey(), ed, at});
+            LOG.debug("Assigning {} {} to slot {}", entry.getKey(), ed, at);
             _slots.get(at).add(ed);
             at++;
             if (at >= _slots.size()) {
@@ -151,7 +151,7 @@ public abstract class NodePool {
         String nodeId = n.getId();
         Set<String> nodeComps = _nodeToComps.get(nodeId);
         if (nodeComps == null) {
-          nodeComps = new HashSet<String>();
+          nodeComps = new HashSet<>();
           _nodeToComps.put(nodeId, nodeComps);
         }
         for (Entry<String, List<ExecutorDetails>> entry: _spreadToSchedule.entrySet()) {
@@ -251,7 +251,7 @@ public abstract class NodePool {
   
   public static Collection<Node> takeNodesBySlot(int slotsNeeded,NodePool[] pools) {
     LOG.debug("Trying to grab {} free slots from {}",slotsNeeded, pools);
-    HashSet<Node> ret = new HashSet<Node>();
+    HashSet<Node> ret = new HashSet<>();
     for (NodePool pool: pools) {
       Collection<Node> got = pool.takeNodesBySlots(slotsNeeded);
       ret.addAll(got);
@@ -266,7 +266,7 @@ public abstract class NodePool {
   
   public static Collection<Node> takeNodes(int nodesNeeded,NodePool[] pools) {
     LOG.debug("Trying to grab {} free nodes from {}",nodesNeeded, pools);
-    HashSet<Node> ret = new HashSet<Node>();
+    HashSet<Node> ret = new HashSet<>();
     for (NodePool pool: pools) {
       Collection<Node> got = pool.takeNodes(nodesNeeded);
       ret.addAll(got);

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/ResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/ResourceAwareStrategy.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/ResourceAwareStrategy.java
index f98d9ed..b8a96a2 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/ResourceAwareStrategy.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/ResourceAwareStrategy.java
@@ -42,7 +42,7 @@ import backtype.storm.scheduler.resource.Component;
 import backtype.storm.scheduler.resource.RAS_Node;
 
 public class ResourceAwareStrategy implements IStrategy {
-    private Logger LOG = null;
+    private static final Logger LOG = LoggerFactory.getLogger(ResourceAwareStrategy.class);
     private Topologies _topologies;
     private Cluster _cluster;
     //Map key is the supervisor id and the value is the corresponding RAS_Node Object 
@@ -63,7 +63,6 @@ public class ResourceAwareStrategy implements IStrategy {
         _cluster = cluster;
         _nodes = RAS_Node.getAllNodesFrom(cluster, _topologies);
         _availNodes = this.getAvailNodes();
-        this.LOG = LoggerFactory.getLogger(this.getClass());
         _clusterInfo = cluster.getNetworkTopography();
         LOG.debug(this.getClusterInfo());
     }
@@ -71,7 +70,7 @@ public class ResourceAwareStrategy implements IStrategy {
     //the returned TreeMap keeps the Components sorted
     private TreeMap<Integer, List<ExecutorDetails>> getPriorityToExecutorDetailsListMap(
             Queue<Component> ordered__Component_list, Collection<ExecutorDetails> unassignedExecutors) {
-        TreeMap<Integer, List<ExecutorDetails>> retMap = new TreeMap<Integer, List<ExecutorDetails>>();
+        TreeMap<Integer, List<ExecutorDetails>> retMap = new TreeMap<>();
         Integer rank = 0;
         for (Component ras_comp : ordered__Component_list) {
             retMap.put(rank, new ArrayList<ExecutorDetails>());
@@ -91,9 +90,9 @@ public class ResourceAwareStrategy implements IStrategy {
             return null;
         }
         Collection<ExecutorDetails> unassignedExecutors = _cluster.getUnassignedExecutors(td);
-        Map<WorkerSlot, Collection<ExecutorDetails>> schedulerAssignmentMap = new HashMap<WorkerSlot, Collection<ExecutorDetails>>();
+        Map<WorkerSlot, Collection<ExecutorDetails>> schedulerAssignmentMap = new HashMap<>();
         LOG.debug("ExecutorsNeedScheduling: {}", unassignedExecutors);
-        Collection<ExecutorDetails> scheduledTasks = new ArrayList<ExecutorDetails>();
+        Collection<ExecutorDetails> scheduledTasks = new ArrayList<>();
         List<Component> spouts = this.getSpouts(_topologies, td);
 
         if (spouts.size() == 0) {
@@ -104,7 +103,7 @@ public class ResourceAwareStrategy implements IStrategy {
         Queue<Component> ordered__Component_list = bfs(_topologies, td, spouts);
 
         Map<Integer, List<ExecutorDetails>> priorityToExecutorMap = getPriorityToExecutorDetailsListMap(ordered__Component_list, unassignedExecutors);
-        Collection<ExecutorDetails> executorsNotScheduled = new HashSet<ExecutorDetails>(unassignedExecutors);
+        Collection<ExecutorDetails> executorsNotScheduled = new HashSet<>(unassignedExecutors);
         Integer longestPriorityListSize = this.getLongestPriorityListSize(priorityToExecutorMap);
         //Pick the first executor with priority one, then the 1st exec with priority 2, so on an so forth. 
         //Once we reach the last priority, we go back to priority 1 and schedule the second task with priority 1.
@@ -145,7 +144,7 @@ public class ResourceAwareStrategy implements IStrategy {
             WorkerSlot targetSlot = this.findWorkerForExec(exec, td, schedulerAssignmentMap);
             if (targetSlot != null) {
                 RAS_Node targetNode = this.idToNode(targetSlot.getNodeId());
-                if(schedulerAssignmentMap.containsKey(targetSlot) == false) {
+                if(!schedulerAssignmentMap.containsKey(targetSlot)) {
                     schedulerAssignmentMap.put(targetSlot, new LinkedList<ExecutorDetails>());
                 }
                
@@ -175,7 +174,7 @@ public class ResourceAwareStrategy implements IStrategy {
     }
 
     private WorkerSlot findWorkerForExec(ExecutorDetails exec, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
-      WorkerSlot ws = null;
+      WorkerSlot ws;
       // first scheduling
       if (this.refNode == null) {
           String clus = this.getBestClustering();
@@ -205,7 +204,7 @@ public class ResourceAwareStrategy implements IStrategy {
             nodes = this.getAvailableNodes();
         }
         //First sort nodes by distance
-        TreeMap<Double, RAS_Node> nodeRankMap = new TreeMap<Double, RAS_Node>();
+        TreeMap<Double, RAS_Node> nodeRankMap = new TreeMap<>();
         for (RAS_Node n : nodes) {
             if(n.getFreeSlots().size()>0) {
                 if (n.getAvailableMemoryResources() >= taskMem
@@ -262,9 +261,9 @@ public class ResourceAwareStrategy implements IStrategy {
     }
 
     private Double distToNode(RAS_Node src, RAS_Node dest) {
-        if (src.getId().equals(dest.getId()) == true) {
+        if (src.getId().equals(dest.getId())) {
             return 0.0;
-        }else if (this.NodeToCluster(src) == this.NodeToCluster(dest)) {
+        } else if (this.NodeToCluster(src) == this.NodeToCluster(dest)) {
             return 0.5;
         } else {
             return 1.0;
@@ -283,7 +282,7 @@ public class ResourceAwareStrategy implements IStrategy {
     }
     
     private List<RAS_Node> getAvailableNodes() {
-        LinkedList<RAS_Node> nodes = new LinkedList<RAS_Node>();
+        LinkedList<RAS_Node> nodes = new LinkedList<>();
         for (String clusterId : _clusterInfo.keySet()) {
             nodes.addAll(this.getAvailableNodesFromCluster(clusterId));
         }
@@ -291,7 +290,7 @@ public class ResourceAwareStrategy implements IStrategy {
     }
 
     private List<RAS_Node> getAvailableNodesFromCluster(String clus) {
-        List<RAS_Node> retList = new ArrayList<RAS_Node>();
+        List<RAS_Node> retList = new ArrayList<>();
         for (String node_id : _clusterInfo.get(clus)) {
             retList.add(_availNodes.get(this
                     .NodeHostnameToId(node_id)));
@@ -301,7 +300,7 @@ public class ResourceAwareStrategy implements IStrategy {
 
     private List<WorkerSlot> getAvailableWorkersFromCluster(String clusterId) {
         List<RAS_Node> nodes = this.getAvailableNodesFromCluster(clusterId);
-        List<WorkerSlot> workers = new LinkedList<WorkerSlot>();
+        List<WorkerSlot> workers = new LinkedList<>();
         for(RAS_Node node : nodes) {
             workers.addAll(node.getFreeSlots());
         }
@@ -309,7 +308,7 @@ public class ResourceAwareStrategy implements IStrategy {
     }
 
     private List<WorkerSlot> getAvailableWorker() {
-        List<WorkerSlot> workers = new LinkedList<WorkerSlot>();
+        List<WorkerSlot> workers = new LinkedList<>();
         for (String clusterId : _clusterInfo.keySet()) {
             workers.addAll(this.getAvailableWorkersFromCluster(clusterId));
         }
@@ -333,18 +332,18 @@ public class ResourceAwareStrategy implements IStrategy {
     private Queue<Component> bfs(Topologies topologies, TopologyDetails td, List<Component> spouts) {
         // Since queue is a interface
         Queue<Component> ordered__Component_list = new LinkedList<Component>();
-        HashMap<String, Component> visited = new HashMap<String, Component>();
+        HashMap<String, Component> visited = new HashMap<>();
 
         /* start from each spout that is not visited, each does a breadth-first traverse */
         for (Component spout : spouts) {
             if (!visited.containsKey(spout.id)) {
-                Queue<Component> queue = new LinkedList<Component>();
+                Queue<Component> queue = new LinkedList<>();
                 queue.offer(spout);
                 while (!queue.isEmpty()) {
                     Component comp = queue.poll();
                     visited.put(comp.id, comp);
                     ordered__Component_list.add(comp);
-                    List<String> neighbors = new ArrayList<String>();
+                    List<String> neighbors = new ArrayList<>();
                     neighbors.addAll(comp.children);
                     neighbors.addAll(comp.parents);
                     for (String nbID : neighbors) {
@@ -360,7 +359,7 @@ public class ResourceAwareStrategy implements IStrategy {
     }
 
     private List<Component> getSpouts(Topologies topologies, TopologyDetails td) {
-        List<Component> spouts = new ArrayList<Component>();
+        List<Component> spouts = new ArrayList<>();
         for (Component c : topologies.getAllComponents().get(td.getId())
                 .values()) {
             if (c.type == Component.ComponentType.SPOUT) {
@@ -413,7 +412,7 @@ public class ResourceAwareStrategy implements IStrategy {
 
     /**
      * Checks whether we can schedule an Executor exec on the worker slot ws
-     * Only considers memory currenlty.  May include CPU in the future
+     * Only considers memory currently.  May include CPU in the future
      * @param exec
      * @param ws
      * @param td

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/AuthUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/AuthUtils.java b/storm-core/src/jvm/backtype/storm/security/auth/AuthUtils.java
index ac3fb53..8062b4e 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/AuthUtils.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/AuthUtils.java
@@ -21,21 +21,18 @@ import backtype.storm.Config;
 import javax.security.auth.login.Configuration;
 import javax.security.auth.login.AppConfigurationEntry;
 import javax.security.auth.Subject;
-import java.security.NoSuchAlgorithmException;
 import java.security.URIParameter;
 
 import backtype.storm.security.INimbusCredentialPlugin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
 import java.util.Collection;
 import java.util.Set;
 import java.util.HashSet;
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
 
 public class AuthUtils {
     private static final Logger LOG = LoggerFactory.getLogger(AuthUtils.class);
@@ -72,11 +69,11 @@ public class AuthUtils {
 
     /**
      * Construct a principal to local plugin
-     * @param conf storm configuration
+     * @param storm_conf storm configuration
      * @return the plugin
      */
     public static IPrincipalToLocal GetPrincipalToLocalPlugin(Map storm_conf) {
-        IPrincipalToLocal ptol = null;
+        IPrincipalToLocal ptol;
         try {
           String ptol_klassName = (String) storm_conf.get(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN);
           Class klass = Class.forName(ptol_klassName);
@@ -90,11 +87,11 @@ public class AuthUtils {
 
     /**
      * Construct a group mapping service provider plugin
-     * @param conf storm configuration
+     * @param storm_conf storm configuration
      * @return the plugin
      */
     public static IGroupMappingServiceProvider GetGroupMappingServiceProviderPlugin(Map storm_conf) {
-        IGroupMappingServiceProvider gmsp = null;
+        IGroupMappingServiceProvider gmsp;
         try {
             String gmsp_klassName = (String) storm_conf.get(Config.STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN);
             Class klass = Class.forName(gmsp_klassName);
@@ -107,13 +104,13 @@ public class AuthUtils {
     }
 
     /**
-     * Get all of the configured Credential Renwer Plugins.
-     * @param storm_conf the storm configuration to use.
+     * Get all of the configured Credential Renewer Plugins.
+     * @param conf the storm configuration to use.
      * @return the configured credential renewers.
      */
     public static Collection<ICredentialsRenewer> GetCredentialRenewers(Map conf) {
         try {
-            Set<ICredentialsRenewer> ret = new HashSet<ICredentialsRenewer>();
+            Set<ICredentialsRenewer> ret = new HashSet<>();
             Collection<String> clazzes = (Collection<String>)conf.get(Config.NIMBUS_CREDENTIAL_RENEWERS);
             if (clazzes != null) {
                 for (String clazz : clazzes) {
@@ -135,7 +132,7 @@ public class AuthUtils {
      */
     public static Collection<INimbusCredentialPlugin> getNimbusAutoCredPlugins(Map conf) {
         try {
-            Set<INimbusCredentialPlugin> ret = new HashSet<INimbusCredentialPlugin>();
+            Set<INimbusCredentialPlugin> ret = new HashSet<>();
             Collection<String> clazzes = (Collection<String>)conf.get(Config.NIMBUS_AUTO_CRED_PLUGINS);
             if (clazzes != null) {
                 for (String clazz : clazzes) {
@@ -157,7 +154,7 @@ public class AuthUtils {
      */
     public static Collection<IAutoCredentials> GetAutoCredentials(Map storm_conf) {
         try {
-            Set<IAutoCredentials> autos = new HashSet<IAutoCredentials>();
+            Set<IAutoCredentials> autos = new HashSet<>();
             Collection<String> clazzes = (Collection<String>)storm_conf.get(Config.TOPOLOGY_AUTO_CREDENTIALS);
             if (clazzes != null) {
                 for (String clazz : clazzes) {
@@ -216,11 +213,9 @@ public class AuthUtils {
 
     /**
      * Construct a transport plugin per storm configuration
-     * @param conf storm configuration
-     * @return
      */
     public static ITransportPlugin GetTransportPlugin(ThriftConnectionType type, Map storm_conf, Configuration login_conf) {
-        ITransportPlugin  transportPlugin = null;
+        ITransportPlugin  transportPlugin;
         try {
             String transport_plugin_klassName = type.getTransportPlugin(storm_conf);
             Class klass = Class.forName(transport_plugin_klassName);
@@ -234,7 +229,7 @@ public class AuthUtils {
 
     private static IHttpCredentialsPlugin GetHttpCredentialsPlugin(Map conf,
             String klassName) {
-        IHttpCredentialsPlugin plugin = null;
+        IHttpCredentialsPlugin plugin;
         try {
             Class klass = Class.forName(klassName);
             plugin = (IHttpCredentialsPlugin)klass.newInstance();

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/DefaultHttpCredentialsPlugin.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/DefaultHttpCredentialsPlugin.java b/storm-core/src/jvm/backtype/storm/security/auth/DefaultHttpCredentialsPlugin.java
index e2469e5..9c81cdf 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/DefaultHttpCredentialsPlugin.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/DefaultHttpCredentialsPlugin.java
@@ -28,8 +28,6 @@ import javax.servlet.http.HttpServletRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import backtype.storm.security.auth.ReqContext;
-
 public class DefaultHttpCredentialsPlugin implements IHttpCredentialsPlugin {
     private static final Logger LOG =
             LoggerFactory.getLogger(DefaultHttpCredentialsPlugin.class);
@@ -50,7 +48,7 @@ public class DefaultHttpCredentialsPlugin implements IHttpCredentialsPlugin {
      */
     @Override
     public String getUserName(HttpServletRequest req) {
-        Principal princ = null;
+        Principal princ;
         if (req != null && (princ = req.getUserPrincipal()) != null) {
             String userName = princ.getName();
             if (userName != null && !userName.isEmpty()) {
@@ -83,7 +81,7 @@ public class DefaultHttpCredentialsPlugin implements IHttpCredentialsPlugin {
             userName = doAsUser;
         }
 
-        Set<Principal> principals = new HashSet<Principal>();
+        Set<Principal> principals = new HashSet<>();
         if(userName != null) {
             Principal p = new SingleUserPrincipal(userName);
             principals.add(p);

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/DefaultPrincipalToLocal.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/DefaultPrincipalToLocal.java b/storm-core/src/jvm/backtype/storm/security/auth/DefaultPrincipalToLocal.java
index 729d744..9f95101 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/DefaultPrincipalToLocal.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/DefaultPrincipalToLocal.java
@@ -28,7 +28,6 @@ import java.security.Principal;
 public class DefaultPrincipalToLocal implements IPrincipalToLocal {
     /**
      * Invoked once immediately after construction
-     * @param conf Storm configuration 
      */
     public void prepare(Map storm_conf) {}
     

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/IAuthorizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/IAuthorizer.java b/storm-core/src/jvm/backtype/storm/security/auth/IAuthorizer.java
index d592bb7..ff1e2ba 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/IAuthorizer.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/IAuthorizer.java
@@ -32,7 +32,7 @@ import java.util.Map;
 public interface IAuthorizer {
     /**
      * Invoked once immediately after construction
-     * @param conf Storm configuration 
+     * @param storm_conf Storm configuration
      */
     void prepare(Map storm_conf);
     
@@ -40,7 +40,7 @@ public interface IAuthorizer {
      * permit() method is invoked for each incoming Thrift request.
      * @param context request context includes info about 
      * @param operation operation name
-     * @param topology_storm configuration of targeted topology 
+     * @param topology_conf configuration of targeted topology
      * @return true if the request is authorized, false if reject
      */
     public boolean permit(ReqContext context, String operation, Map topology_conf);

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/ICredentialsRenewer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/ICredentialsRenewer.java b/storm-core/src/jvm/backtype/storm/security/auth/ICredentialsRenewer.java
index 3eaf6c4..9a6f02e 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/ICredentialsRenewer.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/ICredentialsRenewer.java
@@ -18,11 +18,10 @@
 
 package backtype.storm.security.auth;
 
-import java.util.Collection;
 import java.util.Map;
 
 /**
- * Provides a way to renew credentials on behelf of a user.
+ * Provides a way to renew credentials on behalf of a user.
  */
 public interface ICredentialsRenewer {
 

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/IHttpCredentialsPlugin.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/IHttpCredentialsPlugin.java b/storm-core/src/jvm/backtype/storm/security/auth/IHttpCredentialsPlugin.java
index a012ce4..0b57eca 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/IHttpCredentialsPlugin.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/IHttpCredentialsPlugin.java
@@ -21,8 +21,6 @@ package backtype.storm.security.auth;
 import java.util.Map;
 import javax.servlet.http.HttpServletRequest;
 
-import backtype.storm.security.auth.ReqContext;
-
 /**
  * Interface for handling credentials in an HttpServletRequest
  */

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/IPrincipalToLocal.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/IPrincipalToLocal.java b/storm-core/src/jvm/backtype/storm/security/auth/IPrincipalToLocal.java
index fca3d37..e938d39 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/IPrincipalToLocal.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/IPrincipalToLocal.java
@@ -28,7 +28,7 @@ import java.security.Principal;
 public interface IPrincipalToLocal {
     /**
      * Invoked once immediately after construction
-     * @param conf Storm configuration 
+     * @param storm_conf Storm configuration
      */
     void prepare(Map storm_conf);
     

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/ITransportPlugin.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/ITransportPlugin.java b/storm-core/src/jvm/backtype/storm/security/auth/ITransportPlugin.java
index 5ba2557..ba09fad 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/ITransportPlugin.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/ITransportPlugin.java
@@ -18,9 +18,7 @@
 package backtype.storm.security.auth;
 
 import java.io.IOException;
-import java.security.Principal;
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
 
 import javax.security.auth.login.Configuration;
 
@@ -29,8 +27,6 @@ import org.apache.thrift.server.TServer;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 
-import backtype.storm.security.auth.ThriftConnectionType;
-
 /**
  * Interface for Thrift Transport plugin
  */

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/KerberosPrincipalToLocal.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/KerberosPrincipalToLocal.java b/storm-core/src/jvm/backtype/storm/security/auth/KerberosPrincipalToLocal.java
index 35c7788..1f67c14 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/KerberosPrincipalToLocal.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/KerberosPrincipalToLocal.java
@@ -28,7 +28,7 @@ public class KerberosPrincipalToLocal implements IPrincipalToLocal {
 
     /**
      * Invoked once immediately after construction
-     * @param conf Storm configuration 
+     * @param storm_conf Storm configuration
      */
     public void prepare(Map storm_conf) {}
     

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/ReqContext.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/ReqContext.java b/storm-core/src/jvm/backtype/storm/security/auth/ReqContext.java
index 2f18982..31aeef9 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/ReqContext.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/ReqContext.java
@@ -22,8 +22,6 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.net.InetAddress;
 import com.google.common.annotations.VisibleForTesting;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.security.AccessControlContext;
 import java.security.AccessController;
@@ -44,12 +42,8 @@ public class ReqContext {
     private Map _storm_conf;
     private Principal realPrincipal;
 
-    private static final Logger LOG = LoggerFactory.getLogger(ReqContext.class);
-
-
     /**
-     * Get a request context associated with current thread
-     * @return
+     * @return a request context associated with current thread
      */
     public static ReqContext context() {
         return ctxt.get();
@@ -132,8 +126,7 @@ public class ReqContext {
     }
 
     /**
-     * Returns true if this request is an impersonation request.
-     * @return
+     * @return true if this request is an impersonation request.
      */
     public boolean isImpersonating() {
         return this.realPrincipal != null;

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/SaslTransportPlugin.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/SaslTransportPlugin.java b/storm-core/src/jvm/backtype/storm/security/auth/SaslTransportPlugin.java
index 7013cd4..92004fa 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/SaslTransportPlugin.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/SaslTransportPlugin.java
@@ -45,10 +45,6 @@ import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 import org.apache.thrift.transport.TTransportFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.security.auth.ThriftConnectionType;
 
 /**
  * Base class for SASL authentication plugin.
@@ -57,7 +53,6 @@ public abstract class SaslTransportPlugin implements ITransportPlugin {
     protected ThriftConnectionType type;
     protected Map storm_conf;
     protected Configuration login_conf;
-    private static final Logger LOG = LoggerFactory.getLogger(SaslTransportPlugin.class);
 
     @Override
     public void prepare(ThriftConnectionType type, Map storm_conf, Configuration login_conf) {
@@ -95,7 +90,7 @@ public abstract class SaslTransportPlugin implements ITransportPlugin {
 
     /**
      * All subclass must implement this method
-     * @return
+     * @return server transport factory
      * @throws IOException
      */
     protected abstract TTransportFactory getServerTransportFactory() throws IOException;
@@ -162,11 +157,8 @@ public abstract class SaslTransportPlugin implements ITransportPlugin {
         public boolean equals(Object o) {
             if (this == o) {
                 return true;
-            } else if (o == null || getClass() != o.getClass()) {
-                return false;
-            } else {
-                return (name.equals(((User) o).name));
             }
+            return !(o == null || getClass() != o.getClass()) && (name.equals(((User) o).name));
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/SimpleTransportPlugin.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/SimpleTransportPlugin.java b/storm-core/src/jvm/backtype/storm/security/auth/SimpleTransportPlugin.java
index 2abcdae..7a0a6f2 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/SimpleTransportPlugin.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/SimpleTransportPlugin.java
@@ -45,8 +45,6 @@ import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import backtype.storm.security.auth.ThriftConnectionType;
-
 /**
  * Simple transport for Thrift plugin.
  * 
@@ -146,12 +144,12 @@ public class SimpleTransportPlugin implements ITransportPlugin {
             if (s == null) {
               final String user = (String)storm_conf.get("debug.simple.transport.user");
               if (user != null) {
-                HashSet<Principal> principals = new HashSet<Principal>();
+                HashSet<Principal> principals = new HashSet<>();
                 principals.add(new Principal() {
                   public String getName() { return user; }
                   public String toString() { return user; }
                 });
-                s = new Subject(true, principals, new HashSet<Object>(), new HashSet<Object>());
+                s = new Subject(true, principals, new HashSet<>(), new HashSet<>());
               }
             }
             req_context.setSubject(s);

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/SingleUserPrincipal.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/SingleUserPrincipal.java b/storm-core/src/jvm/backtype/storm/security/auth/SingleUserPrincipal.java
index 6af17fa..0cadba6 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/SingleUserPrincipal.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/SingleUserPrincipal.java
@@ -33,10 +33,7 @@ public class SingleUserPrincipal implements Principal {
 
     @Override
     public boolean equals(Object another) {
-        if (another instanceof SingleUserPrincipal) {
-            return _userName.equals(((SingleUserPrincipal)another)._userName);
-        }
-        return false;
+        return another instanceof SingleUserPrincipal && _userName.equals(((SingleUserPrincipal) another)._userName);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/TBackoffConnect.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/TBackoffConnect.java b/storm-core/src/jvm/backtype/storm/security/auth/TBackoffConnect.java
index f547868..9729671 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/TBackoffConnect.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/TBackoffConnect.java
@@ -19,7 +19,6 @@
 package backtype.storm.security.auth;
 
 import java.io.IOException;
-import java.util.Random;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/ThriftClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/ThriftClient.java b/storm-core/src/jvm/backtype/storm/security/auth/ThriftClient.java
index 9f77ab9..8b3d4c5 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/ThriftClient.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/ThriftClient.java
@@ -24,14 +24,10 @@ import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import backtype.storm.utils.Utils;
 import backtype.storm.Config;
-import backtype.storm.security.auth.TBackoffConnect;
 
-public class ThriftClient {	
-    private static final Logger LOG = LoggerFactory.getLogger(ThriftClient.class);
+public class ThriftClient {
     private TTransport _transport;
     protected TProtocol _protocol;
     private String _host;
@@ -90,8 +86,6 @@ public class ThriftClient {
             //construct a transport plugin
             ITransportPlugin transportPlugin = AuthUtils.GetTransportPlugin(_type, _conf, login_conf);
 
-            final TTransport underlyingTransport = socket;
-
             //TODO get this from type instead of hardcoding to Nimbus.
             //establish client-server transport via plugin
             //do retries if the connect fails
@@ -100,7 +94,7 @@ public class ThriftClient {
                                       Utils.getInt(_conf.get(Config.STORM_NIMBUS_RETRY_TIMES)),
                                       Utils.getInt(_conf.get(Config.STORM_NIMBUS_RETRY_INTERVAL)),
                                       Utils.getInt(_conf.get(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING)));
-            _transport = connectionRetry.doConnectWithRetry(transportPlugin, underlyingTransport, _host, _asUser);
+            _transport = connectionRetry.doConnectWithRetry(transportPlugin, socket, _host, _asUser);
         } catch (IOException ex) {
             throw new RuntimeException(ex);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/ThriftServer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/ThriftServer.java b/storm-core/src/jvm/backtype/storm/security/auth/ThriftServer.java
index 64243ce..fdbdc7c 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/ThriftServer.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/ThriftServer.java
@@ -53,12 +53,10 @@ public class ThriftServer {
     }
 
     /**
-     * Is ThriftServer listening to requests?
-     * @return
+     * @return true if ThriftServer is listening to requests?
      */
     public boolean isServing() {
-        if (_server == null) return false;
-        return _server.isServing();
+        return _server != null && _server.isServing();
     }
     
     public void serve()  {

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java b/storm-core/src/jvm/backtype/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java
index 5e83b9f..4a9b379 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java
@@ -28,7 +28,6 @@ import java.util.Set;
 
 import backtype.storm.Config;
 import backtype.storm.security.auth.ReqContext;
-import backtype.storm.security.auth.authorizer.DRPCAuthorizerBase;
 import backtype.storm.security.auth.AuthUtils;
 import backtype.storm.security.auth.IPrincipalToLocal;
 import backtype.storm.utils.Utils;
@@ -54,7 +53,7 @@ public class DRPCSimpleACLAuthorizer extends DRPCAuthorizerBase {
         public AclFunctionEntry(Collection<String> clientUsers,
                 String invocationUser) {
             this.clientUsers = (clientUsers != null) ?
-                new HashSet<String>(clientUsers) : new HashSet<String>();
+                new HashSet<>(clientUsers) : new HashSet<String>();
             this.invocationUser = invocationUser;
         }
     }
@@ -68,7 +67,7 @@ public class DRPCSimpleACLAuthorizer extends DRPCAuthorizerBase {
         //change is atomic
         long now = System.currentTimeMillis();
         if ((now - 5000) > _lastUpdate || _acl == null) {
-            Map<String,AclFunctionEntry> acl = new HashMap<String,AclFunctionEntry>();
+            Map<String,AclFunctionEntry> acl = new HashMap<>();
             Map conf = Utils.findAndReadConfigFile(_aclFileName);
             if (conf.containsKey(Config.DRPC_AUTHORIZER_ACL)) {
                 Map<String,Map<String,?>> confAcl =
@@ -88,7 +87,7 @@ public class DRPCSimpleACLAuthorizer extends DRPCAuthorizerBase {
                 }
             } else if (!_permitWhenMissingFunctionEntry) {
                 LOG.warn("Requiring explicit ACL entries, but none given. " +
-                        "Therefore, all operiations will be denied.");
+                        "Therefore, all operations will be denied.");
             }
             _acl = acl;
             _lastUpdate = System.currentTimeMillis();
@@ -100,8 +99,8 @@ public class DRPCSimpleACLAuthorizer extends DRPCAuthorizerBase {
     public void prepare(Map conf) {
         Boolean isStrict = 
                 (Boolean) conf.get(Config.DRPC_AUTHORIZER_ACL_STRICT);
-        _permitWhenMissingFunctionEntry = 
-                (isStrict != null && !isStrict) ? true : false;
+        _permitWhenMissingFunctionEntry =
+                (isStrict != null && !isStrict);
         _aclFileName = (String) conf.get(Config.DRPC_AUTHORIZER_ACL_FILENAME);
         _ptol = AuthUtils.GetPrincipalToLocalPlugin(conf);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/authorizer/DenyAuthorizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/authorizer/DenyAuthorizer.java b/storm-core/src/jvm/backtype/storm/security/auth/authorizer/DenyAuthorizer.java
index 8d61492..d1d6f87 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/authorizer/DenyAuthorizer.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/authorizer/DenyAuthorizer.java
@@ -19,19 +19,14 @@ package backtype.storm.security.auth.authorizer;
 
 import java.util.Map;
 
-import backtype.storm.Config;
 import backtype.storm.security.auth.IAuthorizer;
 import backtype.storm.security.auth.ReqContext;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
  * An authorization implementation that denies everything, for testing purposes
  */
 public class DenyAuthorizer implements IAuthorizer {
-    private static final Logger LOG = LoggerFactory.getLogger(DenyAuthorizer.class);
-    
+
     /**
      * Invoked once immediately after construction
      * @param conf Storm configuration 
@@ -41,9 +36,9 @@ public class DenyAuthorizer implements IAuthorizer {
 
     /**
      * permit() method is invoked for each incoming Thrift request
-     * @param contrext request context 
+     * @param context request context
      * @param operation operation name
-     * @param topology_storm configuration of targeted topology 
+     * @param topology_conf configuration of targeted topology
      * @return true if the request is authorized, false if reject
      */
     public boolean permit(ReqContext context, String operation, Map topology_conf) {

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/authorizer/ImpersonationAuthorizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/authorizer/ImpersonationAuthorizer.java b/storm-core/src/jvm/backtype/storm/security/auth/authorizer/ImpersonationAuthorizer.java
index 0cfd488..07e6447 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/authorizer/ImpersonationAuthorizer.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/authorizer/ImpersonationAuthorizer.java
@@ -38,7 +38,7 @@ public class ImpersonationAuthorizer implements IAuthorizer {
 
     @Override
     public void prepare(Map conf) {
-        userImpersonationACL = new HashMap<String, ImpersonationACL>();
+        userImpersonationACL = new HashMap<>();
 
         Map<String, Map<String, List<String>>> userToHostAndGroup = (Map<String, Map<String, List<String>>>) conf.get(Config.NIMBUS_IMPERSONATION_ACL);
 
@@ -67,7 +67,7 @@ public class ImpersonationAuthorizer implements IAuthorizer {
         String userBeingImpersonated = _ptol.toLocal(context.principal());
         InetAddress remoteAddress = context.remoteAddress();
 
-        LOG.info("user = {}, principal = {} is attmepting to impersonate user = {} for operation = {} from host = {}",
+        LOG.info("user = {}, principal = {} is attempting to impersonate user = {} for operation = {} from host = {}",
                 impersonatingUser, impersonatingPrincipal, userBeingImpersonated, operation, remoteAddress);
 
         /**
@@ -83,8 +83,8 @@ public class ImpersonationAuthorizer implements IAuthorizer {
         ImpersonationACL principalACL = userImpersonationACL.get(impersonatingPrincipal);
         ImpersonationACL userACL = userImpersonationACL.get(impersonatingUser);
 
-        Set<String> authorizedHosts = new HashSet<String>();
-        Set<String> authorizedGroups = new HashSet<String>();
+        Set<String> authorizedHosts = new HashSet<>();
+        Set<String> authorizedGroups = new HashSet<>();
 
         if (principalACL != null) {
             authorizedHosts.addAll(principalACL.authorizedHosts);
@@ -127,7 +127,7 @@ public class ImpersonationAuthorizer implements IAuthorizer {
             return true;
         }
 
-        Set<String> groups = null;
+        Set<String> groups;
         try {
             groups = _groupMappingProvider.getGroups(userBeingImpersonated);
         } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/authorizer/NoopAuthorizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/authorizer/NoopAuthorizer.java b/storm-core/src/jvm/backtype/storm/security/auth/authorizer/NoopAuthorizer.java
index c8008f1..ab5bd4b 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/authorizer/NoopAuthorizer.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/authorizer/NoopAuthorizer.java
@@ -19,18 +19,13 @@ package backtype.storm.security.auth.authorizer;
 
 import java.util.Map;
 
-import backtype.storm.Config;
 import backtype.storm.security.auth.IAuthorizer;
 import backtype.storm.security.auth.ReqContext;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
  * A no-op authorization implementation that illustrate info available for authorization decisions.
  */
 public class NoopAuthorizer implements IAuthorizer {
-    private static final Logger LOG = LoggerFactory.getLogger(NoopAuthorizer.class);
 
     /**
      * Invoked once immediately after construction
@@ -43,7 +38,7 @@ public class NoopAuthorizer implements IAuthorizer {
      * permit() method is invoked for each incoming Thrift request
      * @param context request context includes info about 
      * @param operation operation name
-     * @param topology_storm configuration of targeted topology 
+     * @param topology_conf configuration of targeted topology
      * @return true if the request is authorized, false if reject
      */
     public boolean permit(ReqContext context, String operation, Map topology_conf) {

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/authorizer/SimpleACLAuthorizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/authorizer/SimpleACLAuthorizer.java b/storm-core/src/jvm/backtype/storm/security/auth/authorizer/SimpleACLAuthorizer.java
index a2549a5..0063f92 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/authorizer/SimpleACLAuthorizer.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/authorizer/SimpleACLAuthorizer.java
@@ -42,9 +42,9 @@ import org.slf4j.LoggerFactory;
 public class SimpleACLAuthorizer implements IAuthorizer {
     private static final Logger LOG = LoggerFactory.getLogger(SimpleACLAuthorizer.class);
 
-    protected Set<String> _userCommands = new HashSet<String>(Arrays.asList("submitTopology", "fileUpload", "getNimbusConf", "getClusterInfo"));
-    protected Set<String> _supervisorCommands = new HashSet<String>(Arrays.asList("fileDownload"));
-    protected Set<String> _topoCommands = new HashSet<String>(Arrays.asList(
+    protected Set<String> _userCommands = new HashSet<>(Arrays.asList("submitTopology", "fileUpload", "getNimbusConf", "getClusterInfo"));
+    protected Set<String> _supervisorCommands = new HashSet<>(Arrays.asList("fileDownload"));
+    protected Set<String> _topoCommands = new HashSet<>(Arrays.asList(
             "killTopology",
             "rebalance",
             "activate",
@@ -79,10 +79,10 @@ public class SimpleACLAuthorizer implements IAuthorizer {
      */
     @Override
     public void prepare(Map conf) {
-        _admins = new HashSet<String>();
-        _supervisors = new HashSet<String>();
-        _nimbusUsers = new HashSet<String>();
-        _nimbusGroups = new HashSet<String>();
+        _admins = new HashSet<>();
+        _supervisors = new HashSet<>();
+        _nimbusUsers = new HashSet<>();
+        _nimbusGroups = new HashSet<>();
 
         if (conf.containsKey(Config.NIMBUS_ADMINS)) {
             _admins.addAll((Collection<String>)conf.get(Config.NIMBUS_ADMINS));
@@ -113,7 +113,7 @@ public class SimpleACLAuthorizer implements IAuthorizer {
     public boolean permit(ReqContext context, String operation, Map topology_conf) {
         String principal = context.principal().getName();
         String user = _ptol.toLocal(context.principal());
-        Set<String> userGroups = new HashSet<String>();
+        Set<String> userGroups = new HashSet<>();
 
         if (_groupMappingProvider != null) {
             try {
@@ -145,7 +145,7 @@ public class SimpleACLAuthorizer implements IAuthorizer {
                 return true;
             }
 
-            Set<String> topoGroups = new HashSet<String>();
+            Set<String> topoGroups = new HashSet<>();
             if (topology_conf.containsKey(Config.TOPOLOGY_GROUPS) && topology_conf.get(Config.TOPOLOGY_GROUPS) != null) {
                 topoGroups.addAll((Collection<String>)topology_conf.get(Config.TOPOLOGY_GROUPS));
             }

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/authorizer/SimpleWhitelistAuthorizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/authorizer/SimpleWhitelistAuthorizer.java b/storm-core/src/jvm/backtype/storm/security/auth/authorizer/SimpleWhitelistAuthorizer.java
index 21ee9a6..5731f06 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/authorizer/SimpleWhitelistAuthorizer.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/authorizer/SimpleWhitelistAuthorizer.java
@@ -26,15 +26,11 @@ import java.util.Collection;
 import backtype.storm.security.auth.IAuthorizer;
 import backtype.storm.security.auth.ReqContext;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
  * An authorization implementation that simply checks a whitelist of users that
  * are allowed to use the cluster.
  */
 public class SimpleWhitelistAuthorizer implements IAuthorizer {
-    private static final Logger LOG = LoggerFactory.getLogger(SimpleWhitelistAuthorizer.class);
     public static final String WHITELIST_USERS_CONF = "storm.auth.simple-white-list.users";
     protected Set<String> users;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/digest/ClientCallbackHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/digest/ClientCallbackHandler.java b/storm-core/src/jvm/backtype/storm/security/auth/digest/ClientCallbackHandler.java
index 3caacaa..420326c 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/digest/ClientCallbackHandler.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/digest/ClientCallbackHandler.java
@@ -46,8 +46,6 @@ public class ClientCallbackHandler implements CallbackHandler {
      * Constructor based on a JAAS configuration
      * 
      * For digest, you should have a pair of user name and password defined.
-     * 
-     * @param configuration
      * @throws IOException
      */
     public ClientCallbackHandler(Configuration configuration) throws IOException {

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/digest/DigestSaslTransportPlugin.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/digest/DigestSaslTransportPlugin.java b/storm-core/src/jvm/backtype/storm/security/auth/digest/DigestSaslTransportPlugin.java
index ad642d8..09d6f78 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/digest/DigestSaslTransportPlugin.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/digest/DigestSaslTransportPlugin.java
@@ -18,10 +18,8 @@
 package backtype.storm.security.auth.digest;
 
 import java.io.IOException;
-import java.util.Map;
 
 import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.login.Configuration;
 
 import org.apache.thrift.transport.TSaslClientTransport;
 import org.apache.thrift.transport.TSaslServerTransport;

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/digest/ServerCallbackHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/digest/ServerCallbackHandler.java b/storm-core/src/jvm/backtype/storm/security/auth/digest/ServerCallbackHandler.java
index 1788dab..e80072c 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/digest/ServerCallbackHandler.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/digest/ServerCallbackHandler.java
@@ -26,7 +26,6 @@ import backtype.storm.security.auth.SaslTransportPlugin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.security.auth.Subject;
 import javax.security.auth.callback.Callback;
 import javax.security.auth.callback.CallbackHandler;
 import javax.security.auth.callback.NameCallback;
@@ -40,7 +39,7 @@ import javax.security.sasl.RealmCallback;
 import backtype.storm.security.auth.AuthUtils;
 
 /**
- * SASL server side collback handler
+ * SASL server side callback handler
  */
 public class ServerCallbackHandler implements CallbackHandler {
     private static final String USER_PREFIX = "user_";
@@ -48,7 +47,7 @@ public class ServerCallbackHandler implements CallbackHandler {
     private static final String SYSPROP_SUPER_PASSWORD = "storm.SASLAuthenticationProvider.superPassword";
 
     private String userName;
-    private final Map<String,String> credentials = new HashMap<String,String>();
+    private final Map<String,String> credentials = new HashMap<>();
 
     public ServerCallbackHandler(Configuration configuration) throws IOException {
         if (configuration==null) return;

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/serialization/GzipThriftSerializationDelegate.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/serialization/GzipThriftSerializationDelegate.java b/storm-core/src/jvm/backtype/storm/serialization/GzipThriftSerializationDelegate.java
index 933a125..54193da 100644
--- a/storm-core/src/jvm/backtype/storm/serialization/GzipThriftSerializationDelegate.java
+++ b/storm-core/src/jvm/backtype/storm/serialization/GzipThriftSerializationDelegate.java
@@ -17,7 +17,6 @@
  */
 package backtype.storm.serialization;
 
-import java.io.IOException;
 import java.util.Map;
 import backtype.storm.utils.Utils;
 import org.apache.thrift.TBase;

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/serialization/ITupleDeserializer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/serialization/ITupleDeserializer.java b/storm-core/src/jvm/backtype/storm/serialization/ITupleDeserializer.java
index 4e68658..09cc422 100644
--- a/storm-core/src/jvm/backtype/storm/serialization/ITupleDeserializer.java
+++ b/storm-core/src/jvm/backtype/storm/serialization/ITupleDeserializer.java
@@ -18,7 +18,6 @@
 package backtype.storm.serialization;
 
 import backtype.storm.tuple.Tuple;
-import java.io.IOException;
 
 public interface ITupleDeserializer {
     Tuple deserialize(byte[] ser);        

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/serialization/KryoTupleDeserializer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/serialization/KryoTupleDeserializer.java b/storm-core/src/jvm/backtype/storm/serialization/KryoTupleDeserializer.java
index 5a5e3a4..07b9cd5 100644
--- a/storm-core/src/jvm/backtype/storm/serialization/KryoTupleDeserializer.java
+++ b/storm-core/src/jvm/backtype/storm/serialization/KryoTupleDeserializer.java
@@ -21,10 +21,7 @@ import backtype.storm.task.GeneralTopologyContext;
 import backtype.storm.tuple.MessageId;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.TupleImpl;
-import backtype.storm.utils.WritableUtils;
 import com.esotericsoftware.kryo.io.Input;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/serialization/KryoValuesDeserializer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/serialization/KryoValuesDeserializer.java b/storm-core/src/jvm/backtype/storm/serialization/KryoValuesDeserializer.java
index 209ae53..418068e 100644
--- a/storm-core/src/jvm/backtype/storm/serialization/KryoValuesDeserializer.java
+++ b/storm-core/src/jvm/backtype/storm/serialization/KryoValuesDeserializer.java
@@ -21,7 +21,6 @@ import backtype.storm.utils.ListDelegate;
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.io.Input;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -35,7 +34,7 @@ public class KryoValuesDeserializer {
     }
     
     public List<Object> deserializeFrom(Input input) {
-    	ListDelegate delegate = (ListDelegate) _kryo.readObject(input, ListDelegate.class);
+    	ListDelegate delegate = _kryo.readObject(input, ListDelegate.class);
    	return delegate.getDelegate();
     }
     

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/serialization/SerializationFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/serialization/SerializationFactory.java b/storm-core/src/jvm/backtype/storm/serialization/SerializationFactory.java
index 2a829d1..417f102 100644
--- a/storm-core/src/jvm/backtype/storm/serialization/SerializationFactory.java
+++ b/storm-core/src/jvm/backtype/storm/serialization/SerializationFactory.java
@@ -21,7 +21,6 @@ import backtype.storm.Config;
 import backtype.storm.generated.ComponentCommon;
 import backtype.storm.generated.StormTopology;
 import backtype.storm.serialization.types.ArrayListSerializer;
-import backtype.storm.serialization.types.ListDelegateSerializer;
 import backtype.storm.serialization.types.HashMapSerializer;
 import backtype.storm.serialization.types.HashSetSerializer;
 import backtype.storm.transactional.TransactionAttempt;
@@ -130,17 +129,17 @@ public class SerializationFactory {
     }
 
     public static class IdDictionary {
-        Map<String, Map<String, Integer>> streamNametoId = new HashMap<String, Map<String, Integer>>();
-        Map<String, Map<Integer, String>> streamIdToName = new HashMap<String, Map<Integer, String>>();
+        Map<String, Map<String, Integer>> streamNametoId = new HashMap<>();
+        Map<String, Map<Integer, String>> streamIdToName = new HashMap<>();
 
         public IdDictionary(StormTopology topology) {
-            List<String> componentNames = new ArrayList<String>(topology.get_spouts().keySet());
+            List<String> componentNames = new ArrayList<>(topology.get_spouts().keySet());
             componentNames.addAll(topology.get_bolts().keySet());
             componentNames.addAll(topology.get_state_spouts().keySet());
 
             for(String name: componentNames) {
                 ComponentCommon common = Utils.getComponentCommon(topology, name);
-                List<String> streams = new ArrayList<String>(common.get_streams().keySet());
+                List<String> streams = new ArrayList<>(common.get_streams().keySet());
                 streamNametoId.put(name, idify(streams));
                 streamIdToName.put(name, Utils.reverseMap(streamNametoId.get(name)));
             }
@@ -156,7 +155,7 @@ public class SerializationFactory {
 
         private static Map<String, Integer> idify(List<String> names) {
             Collections.sort(names);
-            Map<String, Integer> ret = new HashMap<String, Integer>();
+            Map<String, Integer> ret = new HashMap<>();
             int i = 1;
             for(String name: names) {
                 ret.put(name, i);
@@ -204,8 +203,8 @@ public class SerializationFactory {
     private static Map<String, String> normalizeKryoRegister(Map conf) {
         // TODO: de-duplicate this logic with the code in nimbus
         Object res = conf.get(Config.TOPOLOGY_KRYO_REGISTER);
-        if(res==null) return new TreeMap<String, String>();
-        Map<String, String> ret = new HashMap<String, String>();
+        if(res==null) return new TreeMap<>();
+        Map<String, String> ret = new HashMap<>();
         if(res instanceof Map) {
             ret = (Map<String, String>) res;
         } else {
@@ -219,6 +218,6 @@ public class SerializationFactory {
         }
 
         //ensure always same order for registrations with TreeMap
-        return new TreeMap<String, String>(ret);
+        return new TreeMap<>(ret);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/task/GeneralTopologyContext.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/GeneralTopologyContext.java b/storm-core/src/jvm/backtype/storm/task/GeneralTopologyContext.java
index 54de390..11bdc19 100644
--- a/storm-core/src/jvm/backtype/storm/task/GeneralTopologyContext.java
+++ b/storm-core/src/jvm/backtype/storm/task/GeneralTopologyContext.java
@@ -100,8 +100,8 @@ public class GeneralTopologyContext implements JSONAware {
      */
     public List<Integer> getComponentTasks(String componentId) {
         List<Integer> ret = _componentToTasks.get(componentId);
-        if(ret==null) return new ArrayList<Integer>();
-        else return new ArrayList<Integer>(ret);
+        if(ret==null) return new ArrayList<>();
+        else return new ArrayList<>(ret);
     }
 
     /**
@@ -138,14 +138,14 @@ public class GeneralTopologyContext implements JSONAware {
      * @return Map from stream id to component id to the Grouping used.
      */
     public Map<String, Map<String, Grouping>> getTargets(String componentId) {
-        Map<String, Map<String, Grouping>> ret = new HashMap<String, Map<String, Grouping>>();
+        Map<String, Map<String, Grouping>> ret = new HashMap<>();
         for(String otherComponentId: getComponentIds()) {
             Map<GlobalStreamId, Grouping> inputs = getComponentCommon(otherComponentId).get_inputs();
             for(Map.Entry<GlobalStreamId, Grouping> entry: inputs.entrySet()) {
                 GlobalStreamId id = entry.getKey();
                 if(id.get_componentId().equals(componentId)) {
                     Map<String, Grouping> curr = ret.get(id.get_streamId());
-                    if(curr==null) curr = new HashMap<String, Grouping>();
+                    if(curr==null) curr = new HashMap<>();
                     curr.put(otherComponentId, entry.getValue());
                     ret.put(id.get_streamId(), curr);
                 }
@@ -196,4 +196,4 @@ public class GeneralTopologyContext implements JSONAware {
         }
         return max;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/TopologyContext.java b/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
index cefa207..e9cdb5b 100644
--- a/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
+++ b/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
@@ -51,8 +51,8 @@ import org.json.simple.JSONValue;
  */
 public class TopologyContext extends WorkerTopologyContext implements IMetricsContext {
     private Integer _taskId;
-    private Map<String, Object> _taskData = new HashMap<String, Object>();
-    private List<ITaskHook> _hooks = new ArrayList<ITaskHook>();
+    private Map<String, Object> _taskData = new HashMap<>();
+    private List<ITaskHook> _hooks = new ArrayList<>();
     private Map<String, Object> _executorData;
     private Map<Integer,Map<Integer, Map<String, IMetric>>> _registeredMetrics;
     private clojure.lang.Atom _openOrPrepareWasCalled;
@@ -139,9 +139,8 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
     }
 
     /**
-     * Gets the component id for this task. The component id maps
+     * @return the component id for this task. The component id maps
      * to a component id specified for a Spout or Bolt in the topology definition.
-     * @return
      */
     public String getThisComponentId() {
         return getComponentId(_taskId);
@@ -308,7 +307,7 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
      * @return The IMetric argument unchanged.
      */
     public <T extends IMetric> T registerMetric(String name, T metric, int timeBucketSizeInSecs) {
-        if((Boolean)_openOrPrepareWasCalled.deref() == true) {
+        if((Boolean) _openOrPrepareWasCalled.deref()) {
             throw new RuntimeException("TopologyContext.registerMetric can only be called from within overridden " +
                                        "IBolt::prepare() or ISpout::open() method.");
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/topology/BasicBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/topology/BasicBoltExecutor.java b/storm-core/src/jvm/backtype/storm/topology/BasicBoltExecutor.java
index 6c9cdc1..cf828fe 100644
--- a/storm-core/src/jvm/backtype/storm/topology/BasicBoltExecutor.java
+++ b/storm-core/src/jvm/backtype/storm/topology/BasicBoltExecutor.java
@@ -25,7 +25,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class BasicBoltExecutor implements IRichBolt {
-    public static Logger LOG = LoggerFactory.getLogger(BasicBoltExecutor.class);    
+    public static final Logger LOG = LoggerFactory.getLogger(BasicBoltExecutor.class);
     
     private IBasicBolt _bolt;
     private transient BasicOutputCollector _collector;

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/topology/OutputFieldsGetter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/topology/OutputFieldsGetter.java b/storm-core/src/jvm/backtype/storm/topology/OutputFieldsGetter.java
index 0e7fd59..26a791e 100644
--- a/storm-core/src/jvm/backtype/storm/topology/OutputFieldsGetter.java
+++ b/storm-core/src/jvm/backtype/storm/topology/OutputFieldsGetter.java
@@ -24,7 +24,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 public class OutputFieldsGetter implements OutputFieldsDeclarer {
-    private Map<String, StreamInfo> _fields = new HashMap<String, StreamInfo>();
+    private Map<String, StreamInfo> _fields = new HashMap<>();
 
     public void declare(Fields fields) {
         declare(false, fields);

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java b/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java
index 38b30d7..bccb1bf 100644
--- a/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java
+++ b/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java
@@ -89,18 +89,18 @@ import org.json.simple.JSONValue;
  * the inputs for that component.</p>
  */
 public class TopologyBuilder {
-    private Map<String, IRichBolt> _bolts = new HashMap<String, IRichBolt>();
-    private Map<String, IRichSpout> _spouts = new HashMap<String, IRichSpout>();
-    private Map<String, ComponentCommon> _commons = new HashMap<String, ComponentCommon>();
+    private Map<String, IRichBolt> _bolts = new HashMap<>();
+    private Map<String, IRichSpout> _spouts = new HashMap<>();
+    private Map<String, ComponentCommon> _commons = new HashMap<>();
 
 //    private Map<String, Map<GlobalStreamId, Grouping>> _inputs = new HashMap<String, Map<GlobalStreamId, Grouping>>();
 
-    private Map<String, StateSpoutSpec> _stateSpouts = new HashMap<String, StateSpoutSpec>();
+    private Map<String, StateSpoutSpec> _stateSpouts = new HashMap<>();
     
     
     public StormTopology createTopology() {
-        Map<String, Bolt> boltSpecs = new HashMap<String, Bolt>();
-        Map<String, SpoutSpec> spoutSpecs = new HashMap<String, SpoutSpec>();
+        Map<String, Bolt> boltSpecs = new HashMap<>();
+        Map<String, SpoutSpec> spoutSpecs = new HashMap<>();
         for(String boltId: _bolts.keySet()) {
             IRichBolt bolt = _bolts.get(boltId);
             ComponentCommon common = getComponentCommon(boltId, bolt);
@@ -168,7 +168,7 @@ public class TopologyBuilder {
      *
      * @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
      * @param bolt the basic bolt
-     * @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somwehere around the cluster.
+     * @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somewhere around the cluster.
      * @return use the returned object to declare the inputs to this component
      * @throws IllegalArgumentException if {@code parallelism_hint} is not positive
      */
@@ -193,7 +193,7 @@ public class TopologyBuilder {
      * will be allocated to this component.
      *
      * @param id the id of this component. This id is referenced by other components that want to consume this spout's outputs.
-     * @param parallelism_hint the number of tasks that should be assigned to execute this spout. Each task will run on a thread in a process somwehere around the cluster.
+     * @param parallelism_hint the number of tasks that should be assigned to execute this spout. Each task will run on a thread in a process somewhere around the cluster.
      * @param spout the spout
      * @throws IllegalArgumentException if {@code parallelism_hint} is not positive
      */

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/topology/base/BaseBatchBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/topology/base/BaseBatchBolt.java b/storm-core/src/jvm/backtype/storm/topology/base/BaseBatchBolt.java
index 3206941..3f35149 100644
--- a/storm-core/src/jvm/backtype/storm/topology/base/BaseBatchBolt.java
+++ b/storm-core/src/jvm/backtype/storm/topology/base/BaseBatchBolt.java
@@ -18,7 +18,6 @@
 package backtype.storm.topology.base;
 
 import backtype.storm.coordination.IBatchBolt;
-import java.util.Map;
 
 public abstract class BaseBatchBolt<T> extends BaseComponent implements IBatchBolt<T> {
  

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/topology/base/BaseTransactionalSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/topology/base/BaseTransactionalSpout.java b/storm-core/src/jvm/backtype/storm/topology/base/BaseTransactionalSpout.java
index 704a95b..e2b3a93 100644
--- a/storm-core/src/jvm/backtype/storm/topology/base/BaseTransactionalSpout.java
+++ b/storm-core/src/jvm/backtype/storm/topology/base/BaseTransactionalSpout.java
@@ -18,7 +18,6 @@
 package backtype.storm.topology.base;
 
 import backtype.storm.transactional.ITransactionalSpout;
-import java.util.Map;
 
 public abstract class BaseTransactionalSpout<T> extends BaseComponent implements ITransactionalSpout<T> {
 

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/transactional/TransactionalSpoutCoordinator.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/transactional/TransactionalSpoutCoordinator.java b/storm-core/src/jvm/backtype/storm/transactional/TransactionalSpoutCoordinator.java
index f7ce534..f8f73f6 100644
--- a/storm-core/src/jvm/backtype/storm/transactional/TransactionalSpoutCoordinator.java
+++ b/storm-core/src/jvm/backtype/storm/transactional/TransactionalSpoutCoordinator.java
@@ -52,7 +52,7 @@ public class TransactionalSpoutCoordinator extends BaseRichSpout {
     private TransactionalState _state;
     private RotatingTransactionalState _coordinatorState;
     
-    TreeMap<BigInteger, TransactionStatus> _activeTx = new TreeMap<BigInteger, TransactionStatus>();
+    TreeMap<BigInteger, TransactionStatus> _activeTx = new TreeMap<>();
     
     private SpoutOutputCollector _collector;
     private Random _rand;

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/transactional/partitioned/OpaquePartitionedTransactionalSpoutExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/transactional/partitioned/OpaquePartitionedTransactionalSpoutExecutor.java b/storm-core/src/jvm/backtype/storm/transactional/partitioned/OpaquePartitionedTransactionalSpoutExecutor.java
index 3be5d37..02185f4 100644
--- a/storm-core/src/jvm/backtype/storm/transactional/partitioned/OpaquePartitionedTransactionalSpoutExecutor.java
+++ b/storm-core/src/jvm/backtype/storm/transactional/partitioned/OpaquePartitionedTransactionalSpoutExecutor.java
@@ -63,8 +63,8 @@ public class OpaquePartitionedTransactionalSpoutExecutor implements ICommitterTr
     public class Emitter implements ICommitterTransactionalSpout.Emitter {
         IOpaquePartitionedTransactionalSpout.Emitter _emitter;
         TransactionalState _state;
-        TreeMap<BigInteger, Map<Integer, Object>> _cachedMetas = new TreeMap<BigInteger, Map<Integer, Object>>();
-        Map<Integer, RotatingTransactionalState> _partitionStates = new HashMap<Integer, RotatingTransactionalState>();
+        TreeMap<BigInteger, Map<Integer, Object>> _cachedMetas = new TreeMap<>();
+        Map<Integer, RotatingTransactionalState> _partitionStates = new HashMap<>();
         int _index;
         int _numTasks;
         
@@ -84,7 +84,7 @@ public class OpaquePartitionedTransactionalSpoutExecutor implements ICommitterTr
         
         @Override
         public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, BatchOutputCollector collector) {
-            Map<Integer, Object> metas = new HashMap<Integer, Object>();
+            Map<Integer, Object> metas = new HashMap<>();
             _cachedMetas.put(tx.getTransactionId(), metas);
             int partitions = _emitter.numPartitions();
             Entry<BigInteger, Map<Integer, Object>> entry = _cachedMetas.lowerEntry(tx.getTransactionId());
@@ -92,7 +92,7 @@ public class OpaquePartitionedTransactionalSpoutExecutor implements ICommitterTr
             if(entry!=null) {
                 prevCached = entry.getValue();
             } else {
-                prevCached = new HashMap<Integer, Object>();
+                prevCached = new HashMap<>();
             }
             
             for(int i=_index; i < partitions; i+=_numTasks) {

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/transactional/partitioned/PartitionedTransactionalSpoutExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/transactional/partitioned/PartitionedTransactionalSpoutExecutor.java b/storm-core/src/jvm/backtype/storm/transactional/partitioned/PartitionedTransactionalSpoutExecutor.java
index 479dda4..76859cf 100644
--- a/storm-core/src/jvm/backtype/storm/transactional/partitioned/PartitionedTransactionalSpoutExecutor.java
+++ b/storm-core/src/jvm/backtype/storm/transactional/partitioned/PartitionedTransactionalSpoutExecutor.java
@@ -67,7 +67,7 @@ public class PartitionedTransactionalSpoutExecutor implements ITransactionalSpou
     class Emitter implements ITransactionalSpout.Emitter<Integer> {
         private IPartitionedTransactionalSpout.Emitter _emitter;
         private TransactionalState _state;
-        private Map<Integer, RotatingTransactionalState> _partitionStates = new HashMap<Integer, RotatingTransactionalState>();
+        private Map<Integer, RotatingTransactionalState> _partitionStates = new HashMap<>();
         private int _index;
         private int _numTasks;
         

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/tuple/Fields.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/tuple/Fields.java b/storm-core/src/jvm/backtype/storm/tuple/Fields.java
index 3eed409..b52b798 100644
--- a/storm-core/src/jvm/backtype/storm/tuple/Fields.java
+++ b/storm-core/src/jvm/backtype/storm/tuple/Fields.java
@@ -30,14 +30,14 @@ import java.io.Serializable;
  */
 public class Fields implements Iterable<String>, Serializable {
     private List<String> _fields;
-    private Map<String, Integer> _index = new HashMap<String, Integer>();
+    private Map<String, Integer> _index = new HashMap<>();
     
     public Fields(String... fields) {
         this(Arrays.asList(fields));
     }
     
     public Fields(List<String> fields) {
-        _fields = new ArrayList<String>(fields.size());
+        _fields = new ArrayList<>(fields.size());
         for (String field : fields) {
             if (_fields.contains(field))
                 throw new IllegalArgumentException(
@@ -49,7 +49,7 @@ public class Fields implements Iterable<String>, Serializable {
     }
     
     public List<Object> select(Fields selector, List<Object> tuple) {
-        List<Object> ret = new ArrayList<Object>(selector.size());
+        List<Object> ret = new ArrayList<>(selector.size());
         for(String s: selector) {
             ret.add(tuple.get(_index.get(s)));
         }
@@ -57,7 +57,7 @@ public class Fields implements Iterable<String>, Serializable {
     }
 
     public List<String> toList() {
-        return new ArrayList<String>(_fields);
+        return new ArrayList<>(_fields);
     }
     
     /**
@@ -98,7 +98,7 @@ public class Fields implements Iterable<String>, Serializable {
     }
     
     /**
-     * @returns true if this contains the specified name of the field.
+     * @return true if this contains the specified name of the field.
      */
     public boolean contains(String field) {
         return _index.containsKey(field);

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/tuple/MessageId.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/tuple/MessageId.java b/storm-core/src/jvm/backtype/storm/tuple/MessageId.java
index 680af38..554bab6 100644
--- a/storm-core/src/jvm/backtype/storm/tuple/MessageId.java
+++ b/storm-core/src/jvm/backtype/storm/tuple/MessageId.java
@@ -43,7 +43,7 @@ public class MessageId {
     }
         
     public static MessageId makeRootId(long id, long val) {
-        Map<Long, Long> anchorsToIds = new HashMap<Long, Long>();
+        Map<Long, Long> anchorsToIds = new HashMap<>();
         anchorsToIds.put(id, val);
         return new MessageId(anchorsToIds);
     }
@@ -67,11 +67,7 @@ public class MessageId {
 
     @Override
     public boolean equals(Object other) {
-        if(other instanceof MessageId) {
-            return _anchorsToIds.equals(((MessageId) other)._anchorsToIds);
-        } else {
-            return false;
-        }
+        return other instanceof MessageId && _anchorsToIds.equals(((MessageId) other)._anchorsToIds);
     }
 
     @Override
@@ -89,7 +85,7 @@ public class MessageId {
 
     public static MessageId deserialize(Input in) throws IOException {
         int numAnchors = in.readInt(true);
-        Map<Long, Long> anchorsToIds = new HashMap<Long, Long>();
+        Map<Long, Long> anchorsToIds = new HashMap<>();
         for(int i=0; i<numAnchors; i++) {
             anchorsToIds.put(in.readLong(), in.readLong());
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/tuple/Tuple.java b/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
index c4ea7c8..19d0cb4 100644
--- a/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
+++ b/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
@@ -18,7 +18,6 @@
 package backtype.storm.tuple;
 
 import backtype.storm.generated.GlobalStreamId;
-import java.util.List;
 
 /**
  * The tuple is the main data structure in Storm. A tuple is a named list of values, 

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
index 53b63ba..dd31c96 100644
--- a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
+++ b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
@@ -237,7 +237,7 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
         return System.identityHashCode(this);
     }
 
-    private final Keyword makeKeyword(String name) {
+    private Keyword makeKeyword(String name) {
         return Keyword.intern(Symbol.create(name));
     }    
 
@@ -250,7 +250,7 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
             } else if(o instanceof String) {
                 return getValueByField((String) o);
             }
-        } catch(IllegalArgumentException e) {
+        } catch(IllegalArgumentException ignored) {
         }
         return null;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/utils/DRPCClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/DRPCClient.java b/storm-core/src/jvm/backtype/storm/utils/DRPCClient.java
index b2a2a7d..23deb28 100644
--- a/storm-core/src/jvm/backtype/storm/utils/DRPCClient.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DRPCClient.java
@@ -17,7 +17,6 @@
  */
 package backtype.storm.utils;
 
-import backtype.storm.Config;
 import backtype.storm.generated.DRPCExecutionException;
 import backtype.storm.generated.DistributedRPC;
 import backtype.storm.generated.AuthorizationException;

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
index 246f9a7..7ad6ae0 100644
--- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
@@ -34,14 +34,11 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.slf4j.Logger;
@@ -255,7 +252,7 @@ public class DisruptorQueue implements IStatefulObject {
 
     public DisruptorQueue(String queueName, ProducerType type, int size, long readTimeout, int inputBatchSize, long flushInterval) {
         this._queueName = PREFIX + queueName;
-        WaitStrategy wait = null;
+        WaitStrategy wait;
         if (readTimeout <= 0) {
             wait = new LiteBlockingWaitStrategy();
         } else {

http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/utils/InprocMessaging.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/InprocMessaging.java b/storm-core/src/jvm/backtype/storm/utils/InprocMessaging.java
index b20c775..4abbc3e 100644
--- a/storm-core/src/jvm/backtype/storm/utils/InprocMessaging.java
+++ b/storm-core/src/jvm/backtype/storm/utils/InprocMessaging.java
@@ -22,7 +22,7 @@ import java.util.Map;
 import java.util.concurrent.LinkedBlockingQueue;
 
 public class InprocMessaging {
-    private static Map<Integer, LinkedBlockingQueue<Object>> _queues = new HashMap<Integer, LinkedBlockingQueue<Object>>();
+    private static Map<Integer, LinkedBlockingQueue<Object>> _queues = new HashMap<>();
     private static final Object _lock = new Object();
     private static int port = 1;
     
@@ -50,7 +50,7 @@ public class InprocMessaging {
     private static LinkedBlockingQueue<Object> getQueue(int port) {
         synchronized(_lock) {
             if(!_queues.containsKey(port)) {
-              _queues.put(port, new LinkedBlockingQueue<Object>());   
+              _queues.put(port, new LinkedBlockingQueue<>());
             }
             return _queues.get(port);
         }