You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by da...@apache.org on 2018/03/21 18:11:29 UTC

[2/5] storm git commit: avoiding rawtypes in configuration and metadata Maps in order to allow usage without @SuppressWarnings("rawtypes") and early detection of design flaws

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkState.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkState.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkState.java
index 9f26479..7241c60 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkState.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkState.java
@@ -36,7 +36,8 @@ public class ZkState {
     private static final Logger LOG = LoggerFactory.getLogger(ZkState.class);
     CuratorFramework _curator;
 
-    private CuratorFramework newCurator(Map stateConf) throws Exception {
+    private CuratorFramework newCurator(final Map<String, Object> stateConf)
+            throws Exception {
         Integer port = (Integer) stateConf.get(Config.TRANSACTIONAL_ZOOKEEPER_PORT);
         String serverPorts = "";
         for (String server : (List<String>) stateConf.get(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS)) {
@@ -54,8 +55,8 @@ public class ZkState {
         return _curator;
     }
 
-    public ZkState(Map stateConf) {
-        stateConf = new HashMap(stateConf);
+    public ZkState(Map<String, Object> stateConf) {
+        stateConf = new HashMap<>(stateConf);
 
         try {
             _curator = newCurator(stateConf);

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java
index 8a47ddc..c17c912 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java
@@ -26,7 +26,11 @@ import java.util.List;
 import java.util.Map;
 
 
-public class OpaqueTridentKafkaSpout implements IOpaquePartitionedTridentSpout<List<GlobalPartitionInformation>, Partition, Map> {
+public class OpaqueTridentKafkaSpout
+        implements IOpaquePartitionedTridentSpout<
+                List<GlobalPartitionInformation>,
+                Partition,
+                Map<String, Object>> {
 
 
     TridentKafkaConfig _config;
@@ -36,13 +40,18 @@ public class OpaqueTridentKafkaSpout implements IOpaquePartitionedTridentSpout<L
     }
 
     @Override
-    public IOpaquePartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map> getEmitter(Map<String, Object> conf, TopologyContext context) {
+    public Emitter<List<GlobalPartitionInformation>,
+            Partition,
+            Map<String, Object>> getEmitter(Map<String, Object> conf,
+                    TopologyContext context) {
         return new TridentKafkaEmitter(conf, context, _config, context
                 .getStormId()).asOpaqueEmitter();
     }
 
     @Override
-    public IOpaquePartitionedTridentSpout.Coordinator getCoordinator(Map<String, Object> conf, TopologyContext tc) {
+    public IOpaquePartitionedTridentSpout.Coordinator getCoordinator(
+            Map<String, Object> conf,
+            TopologyContext tc) {
         return new org.apache.storm.kafka.trident.Coordinator(conf, _config);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
index 1339387..3333c2c 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
@@ -73,16 +73,20 @@ public class TridentKafkaEmitter {
     }
 
 
-    private Map failFastEmitNewPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map lastMeta) {
+    private Map<String, Object> failFastEmitNewPartitionBatch(
+            final TransactionAttempt attempt,
+            TridentCollector collector,
+            Partition partition,
+            Map<String, Object> lastMeta) {
         SimpleConsumer consumer = _connections.register(partition);
-        Map ret = doEmitNewPartitionBatch(consumer, partition, collector, lastMeta, attempt);
+        Map<String, Object> ret = doEmitNewPartitionBatch(consumer, partition, collector, lastMeta, attempt);
         Long offset = (Long) ret.get("offset");
         Long endOffset = (Long) ret.get("nextOffset");
         _kafkaOffsetMetric.setOffsetData(partition, new PartitionManager.OffsetData(endOffset, offset));
         return ret;
     }
 
-    private Map emitNewPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map lastMeta) {
+    private Map<String, Object> emitNewPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map<String, Object> lastMeta) {
         try {
             return failFastEmitNewPartitionBatch(attempt, collector, partition, lastMeta);
         } catch (FailedFetchException e) {
@@ -90,7 +94,7 @@ public class TridentKafkaEmitter {
             if (lastMeta == null) {
                 return null;
             } else {
-                Map ret = new HashMap();
+                Map<String, Object> ret = new HashMap<>();
                 ret.put("offset", lastMeta.get("nextOffset"));
                 ret.put("nextOffset", lastMeta.get("nextOffset"));
                 ret.put("partition", partition.partition);
@@ -102,12 +106,17 @@ public class TridentKafkaEmitter {
         }
     }
 
-    private Map doEmitNewPartitionBatch(SimpleConsumer consumer, Partition partition, TridentCollector collector, Map lastMeta, TransactionAttempt attempt) {
+    private Map<String, Object> doEmitNewPartitionBatch(SimpleConsumer consumer,
+            Partition partition,
+            TridentCollector collector,
+            Map<String, Object> lastMeta,
+            TransactionAttempt attempt) {
         LOG.debug("Emitting new partition batch - [transaction = {}], [lastMeta = {}]", attempt, lastMeta);
         long offset;
         if (lastMeta != null) {
             String lastInstanceId = null;
-            Map lastTopoMeta = (Map) lastMeta.get("topology");
+            Map<String, Object> lastTopoMeta = (Map<String, Object>)
+                    lastMeta.get("topology");
             if (lastTopoMeta != null) {
                 lastInstanceId = (String) lastTopoMeta.get("id");
             }
@@ -136,7 +145,7 @@ public class TridentKafkaEmitter {
             emit(collector, msg.message(), partition, msg.offset(), attempt);
             endoffset = msg.nextOffset();
         }
-        Map newMeta = new HashMap();
+        Map<String, Object> newMeta = new HashMap<>();
         newMeta.put("offset", offset);
         newMeta.put("nextOffset", endoffset);
         newMeta.put("instanceId", _topologyInstanceId);
@@ -161,7 +170,7 @@ public class TridentKafkaEmitter {
     /**
      * re-emit the batch described by the meta data provided
      */
-    private void reEmitPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map meta) {
+    private void reEmitPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map<String, Object> meta) {
         LOG.info("re-emitting batch, attempt " + attempt);
         String instanceId = (String) meta.get("instanceId");
         if (!_config.ignoreZkOffsets || instanceId.equals(_topologyInstanceId)) {
@@ -221,9 +230,9 @@ public class TridentKafkaEmitter {
     }
 
 
-    public IOpaquePartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map> asOpaqueEmitter() {
+    public IOpaquePartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map<String, Object>> asOpaqueEmitter() {
 
-        return new IOpaquePartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map>() {
+        return new IOpaquePartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map<String, Object>>() {
 
             /**
              * Emit a batch of tuples for a partition/transaction.
@@ -232,7 +241,7 @@ public class TridentKafkaEmitter {
              * for defining the parameters of the next batch.
              */
             @Override
-            public Map emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) {
+            public Map<String, Object> emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map<String, Object> map) {
                 return emitNewPartitionBatch(transactionAttempt, tridentCollector, partition, map);
             }
 
@@ -254,14 +263,14 @@ public class TridentKafkaEmitter {
     }
 
     public IPartitionedTridentSpout.Emitter asTransactionalEmitter() {
-        return new IPartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map>() {
+        return new IPartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map<String, Object>>() {
 
             /**
              * Emit a batch of tuples for a partition/transaction that's never been emitted before.
              * Return the metadata that can be used to reconstruct this partition/batch in the future.
              */
             @Override
-            public Map emitPartitionBatchNew(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) {
+            public Map<String, Object> emitPartitionBatchNew(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map<String, Object> map) {
                 return failFastEmitNewPartitionBatch(transactionAttempt, tridentCollector, partition, map);
             }
 
@@ -270,7 +279,7 @@ public class TridentKafkaEmitter {
              * the metadata created when it was first emitted.
              */
             @Override
-            public void emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) {
+            public void emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map<String, Object> map) {
                 reEmitPartitionBatch(transactionAttempt, tridentCollector, partition, map);
             }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
index 8d8746f..31dfffe 100644
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
@@ -63,8 +63,8 @@ public class ZkCoordinatorTest {
         when(dynamicPartitionConnections.register(any(Broker.class), any(String.class) ,anyInt())).thenReturn(simpleConsumer);
     }
 
-    private Map buildZookeeperConfig(TestingServer server) {
-        Map<String, Object> conf = new HashMap();
+    private Map<String, Object> buildZookeeperConfig(TestingServer server) {
+        Map<String, Object> conf = new HashMap<>();
         conf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, server.getPort());
         conf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Arrays.asList("localhost"));
         conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 20000);

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoMapState.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoMapState.java
index 56fd33a..b06d35d 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoMapState.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoMapState.java
@@ -68,9 +68,9 @@ public class MongoMapState<T> implements IBackingMap<T> {
     private Options<T> options;
     private Serializer<T> serializer;
     private MongoDbClient mongoClient;
-    private Map map;
+    private Map<String, Object> map;
 
-    protected MongoMapState(Map map, Options options) {
+    protected MongoMapState(Map<String, Object> map, Options options) {
         this.options = options;
         this.map = map;
         this.serializer = options.serializer;

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
index c4e8cca..77c394c 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
@@ -45,9 +45,9 @@ public class MongoState implements State {
 
     private Options options;
     private MongoDbClient mongoClient;
-    private Map map;
+    private Map<String, Object> map;
 
-    protected MongoState(Map map, Options options) {
+    protected MongoState(Map<String, Object> map, Options options) {
         this.options = options;
         this.map = map;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java
index 471d6cd..f0f3fa3 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java
@@ -82,7 +82,7 @@ public abstract class AbstractRedisBolt extends BaseTickTupleAwareRichBolt {
      * {@inheritDoc}
      */
     @Override
-    public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
+    public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector collector) {
         // FIXME: stores map (topoConf), topologyContext and expose these to derived classes
         this.collector = collector;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqState.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqState.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqState.java
index 9a8a46e..daaa93c 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqState.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqState.java
@@ -45,7 +45,7 @@ public class RocketMqState implements State {
     private Options options;
     private MQProducer producer;
 
-    protected RocketMqState(Map map, Options options) {
+    protected RocketMqState(Map<String, Object> map, Options options) {
         this.options = options;
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrStateFactory.java b/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrStateFactory.java
index 7d3e69d..2e8997c 100644
--- a/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrStateFactory.java
+++ b/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrStateFactory.java
@@ -36,7 +36,7 @@ public class SolrStateFactory implements StateFactory {
     }
 
     @Override
-    public State makeState(Map map, IMetricsContext iMetricsContext, int partitionIndex, int numPartitions) {
+    public State makeState(Map<String, Object> map, IMetricsContext iMetricsContext, int partitionIndex, int numPartitions) {
         SolrState state = new SolrState(solrConfig, solrMapper);
         state.prepare();
         return state;

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/flux/flux-core/src/main/java/org/apache/storm/flux/model/ConfigMethodDef.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/model/ConfigMethodDef.java b/flux/flux-core/src/main/java/org/apache/storm/flux/model/ConfigMethodDef.java
index d100598..8d32c06 100644
--- a/flux/flux-core/src/main/java/org/apache/storm/flux/model/ConfigMethodDef.java
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/model/ConfigMethodDef.java
@@ -49,7 +49,7 @@ public class ConfigMethodDef {
         List<Object> newVal = new ArrayList<Object>();
         for (Object obj : args) {
             if (obj instanceof LinkedHashMap) {
-                Map map = (Map)obj;
+                Map<String, Object> map = (Map<String, Object>)obj;
                 if (map.containsKey("ref") && map.size() == 1) {
                     newVal.add(new BeanReference((String)map.get("ref")));
                     this.hasReferences = true;

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/flux/flux-core/src/main/java/org/apache/storm/flux/model/ObjectDef.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/model/ObjectDef.java b/flux/flux-core/src/main/java/org/apache/storm/flux/model/ObjectDef.java
index 170ee4f..1df9f9f 100644
--- a/flux/flux-core/src/main/java/org/apache/storm/flux/model/ObjectDef.java
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/model/ObjectDef.java
@@ -57,7 +57,7 @@ public class ObjectDef {
         List<Object> newVal = new ArrayList<Object>();
         for (Object obj : constructorArgs) {
             if (obj instanceof LinkedHashMap) {
-                Map map = (Map)obj;
+                Map<String, Object> map = (Map<String, Object>)obj;
                 if (map.containsKey("ref") && map.size() == 1) {
                     newVal.add(new BeanReference((String) map.get("ref")));
                     this.hasReferences = true;

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/integration-test/src/test/java/org/apache/storm/st/wrapper/StormCluster.java
----------------------------------------------------------------------
diff --git a/integration-test/src/test/java/org/apache/storm/st/wrapper/StormCluster.java b/integration-test/src/test/java/org/apache/storm/st/wrapper/StormCluster.java
index 4521b36..f8e59f5 100644
--- a/integration-test/src/test/java/org/apache/storm/st/wrapper/StormCluster.java
+++ b/integration-test/src/test/java/org/apache/storm/st/wrapper/StormCluster.java
@@ -49,7 +49,7 @@ public class StormCluster {
         this.client = NimbusClient.getConfiguredClient(conf).getClient();
     }
 
-    public static Map getConfig() {
+    public static Map<String, Object> getConfig() {
         return Utils.readStormConfig();
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index 7a286d5..ed6068c 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -1853,11 +1853,11 @@ public class Config extends HashMap<String, Object> {
         setClasspath(this, cp);
     }
 
-    public static void setEnvironment(Map<String, Object> conf, Map env) {
+    public static void setEnvironment(Map<String, Object> conf, Map<String, Object> env) {
         conf.put(Config.TOPOLOGY_ENVIRONMENT, env);
     }
 
-    public void setEnvironment(Map env) {
+    public void setEnvironment(Map<String, Object> env) {
         setEnvironment(this, env);
     }
 
@@ -1955,7 +1955,7 @@ public class Config extends HashMap<String, Object> {
     }
 
     public static void registerMetricsConsumer(Map<String, Object> conf, Class klass, Object argument, long parallelismHint) {
-        HashMap m = new HashMap();
+        HashMap<String, Object> m = new HashMap<>();
         m.put("class", klass.getCanonicalName());
         m.put("parallelism.hint", parallelismHint);
         m.put("argument", argument);

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/StormSubmitter.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/StormSubmitter.java b/storm-client/src/jvm/org/apache/storm/StormSubmitter.java
index 0f6baf2..5ce0d44 100644
--- a/storm-client/src/jvm/org/apache/storm/StormSubmitter.java
+++ b/storm-client/src/jvm/org/apache/storm/StormSubmitter.java
@@ -80,9 +80,8 @@ public class StormSubmitter {
         return false;
     }
 
-    @SuppressWarnings("unchecked")
-    public static Map prepareZookeeperAuthentication(Map<String, Object> conf) {
-        Map toRet = new HashMap();
+    public static Map<String, Object> prepareZookeeperAuthentication(Map<String, Object> conf) {
+        Map<String, Object> toRet = new HashMap<>();
         String secretPayload = (String) conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD);
         // Is the topology ZooKeeper authentication configuration unset?
         if (! conf.containsKey(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD) ||

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/Thrift.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/Thrift.java b/storm-client/src/jvm/org/apache/storm/Thrift.java
index bffd04f..e00b0ee 100644
--- a/storm-client/src/jvm/org/apache/storm/Thrift.java
+++ b/storm-client/src/jvm/org/apache/storm/Thrift.java
@@ -94,7 +94,7 @@ public class Thrift {
             return parallelism;
         }
 
-        public Map getConf() {
+        public Map<String, Object> getConf() {
             return conf;
         }
     }
@@ -117,7 +117,7 @@ public class Thrift {
             return bolt;
         }
 
-        public Map getConf() {
+        public Map<String, Object> getConf() {
             return conf;
         }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java b/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
index 362d4dd..4bd5ded 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
@@ -256,12 +256,12 @@ public class ClusterUtils {
         if (stateStorage instanceof IStateStorage) {
             return new StormClusterStateImpl((IStateStorage) stateStorage, context, false);
         } else {
-            IStateStorage Storage = _instance.mkStateStorageImpl((Map) stateStorage, (Map) stateStorage, context);
+            IStateStorage Storage = _instance.mkStateStorageImpl((Map<String, Object>) stateStorage, (Map<String, Object>) stateStorage, context);
             return new StormClusterStateImpl(Storage, context, true);
         }
     }
 
-    public IStateStorage mkStateStorageImpl(Map<String, Object> config, Map auth_conf, ClusterStateContext context) throws Exception {
+    public IStateStorage mkStateStorageImpl(Map<String, Object> config, Map<String, Object> auth_conf, ClusterStateContext context) throws Exception {
         String className = null;
         IStateStorage stateStorage = null;
         if (config.get(Config.STORM_CLUSTER_STATE_STORE) != null) {
@@ -275,7 +275,7 @@ public class ClusterUtils {
         return stateStorage;
     }
 
-    public static IStateStorage mkStateStorage(Map<String, Object> config, Map auth_conf, ClusterStateContext context) throws Exception {
+    public static IStateStorage mkStateStorage(Map<String, Object> config, Map<String, Object> auth_conf, ClusterStateContext context) throws Exception {
         return _instance.mkStateStorageImpl(config, auth_conf, context);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/cluster/PaceMakerStateStorageFactory.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/PaceMakerStateStorageFactory.java b/storm-client/src/jvm/org/apache/storm/cluster/PaceMakerStateStorageFactory.java
index 065e0df..5e0cdd7 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/PaceMakerStateStorageFactory.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/PaceMakerStateStorageFactory.java
@@ -24,7 +24,7 @@ import org.apache.storm.utils.Utils;
 
 public class PaceMakerStateStorageFactory implements StateStorageFactory {
     @Override
-    public IStateStorage mkStore(Map<String, Object> config, Map auth_conf, ClusterStateContext context) {
+    public IStateStorage mkStore(Map<String, Object> config, Map<String, Object> auth_conf, ClusterStateContext context) {
         try {
             ZKStateStorageFactory zkfact = new ZKStateStorageFactory();
             IStateStorage zkState = zkfact.mkStore(config, auth_conf, context);

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/cluster/StateStorageFactory.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/StateStorageFactory.java b/storm-client/src/jvm/org/apache/storm/cluster/StateStorageFactory.java
index 584a2d8..7b58855 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/StateStorageFactory.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/StateStorageFactory.java
@@ -22,5 +22,5 @@ import java.util.Map;
 
 public interface StateStorageFactory {
 
-    IStateStorage mkStore(Map<String, Object> config, Map auth_conf, ClusterStateContext context);
+    IStateStorage mkStore(Map<String, Object> config, Map<String, Object> auth_conf, ClusterStateContext context);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorage.java b/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
index c68011e..d5c29f9 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
@@ -50,7 +50,7 @@ public class ZKStateStorage implements IStateStorage {
     private AtomicBoolean active;
 
     private boolean isNimbus;
-    private Map authConf;
+    private Map<String, Object> authConf;
     private Map<String, Object> conf;
 
     private class ZkWatcherCallBack implements WatcherCallBack{
@@ -73,7 +73,7 @@ public class ZKStateStorage implements IStateStorage {
         }
     }
 
-    public ZKStateStorage(Map<String, Object> conf, Map authConf, ClusterStateContext context) throws Exception {
+    public ZKStateStorage(Map<String, Object> conf, Map<String, Object> authConf, ClusterStateContext context) throws Exception {
         this.conf = conf;
         this.authConf = authConf;
         if (context.getDaemonType().equals(DaemonType.NIMBUS))

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java b/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java
index 987c2fd..0ae745f 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java
@@ -26,7 +26,7 @@ import java.util.Map;
 public class ZKStateStorageFactory implements StateStorageFactory {
 
     @Override
-    public IStateStorage mkStore(Map<String, Object> config, Map auth_conf, ClusterStateContext context) {
+    public IStateStorage mkStore(Map<String, Object> config, Map<String, Object> auth_conf, ClusterStateContext context) {
         try {
             return new ZKStateStorage(config, auth_conf, context);
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java b/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
index f2ebd5f..9a21679 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
@@ -296,7 +296,7 @@ public class StormCommon {
 
         for (SpoutSpec spout : topology.get_spouts().values()) {
             ComponentCommon common = spout.get_common();
-            Map spoutConf = componentConf(spout);
+            Map<String, Object> spoutConf = componentConf(spout);
             spoutConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,
                     ObjectReader.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
             common.set_json_conf(JSONValue.toJSONString(spoutConf));

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java b/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java
index 8237ad1..0fdcb9f 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java
@@ -25,6 +25,9 @@ import org.apache.storm.task.TopologyContext;
 
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.storm.generated.NodeInfo;
+import org.apache.storm.messaging.IConnection;
+import org.apache.storm.utils.JCQueue;
 
 public class BuiltinMetricsUtil {
     public static void registerIconnectionServerMetric(Object server, Map<String, Object> topoConf, TopologyContext context) {
@@ -33,15 +36,14 @@ public class BuiltinMetricsUtil {
         }
     }
 
-    public static void registerIconnectionClientMetrics(final Map nodePortToSocket, Map<String, Object> topoConf, TopologyContext context) {
+    public static void registerIconnectionClientMetrics(final Map<NodeInfo, IConnection> nodePortToSocket, Map<String, Object> topoConf, TopologyContext context) {
         IMetric metric = new IMetric() {
             @Override
             public Object getValueAndReset() {
                 Map<Object, Object> ret = new HashMap<>();
-                for (Object o : nodePortToSocket.entrySet()) {
-                    Map.Entry entry = (Map.Entry) o;
-                    Object nodePort = entry.getKey();
-                    Object connection = entry.getValue();
+                for (Map.Entry<NodeInfo, IConnection> entry : nodePortToSocket.entrySet()) {
+                    NodeInfo nodePort = entry.getKey();
+                    IConnection connection = entry.getValue();
                     if (connection instanceof IStatefulObject) {
                         ret.put(nodePort, ((IStatefulObject) connection).getState());
                     }
@@ -52,11 +54,10 @@ public class BuiltinMetricsUtil {
         registerMetric("__send-iconnection", metric, topoConf, context);
     }
 
-    public static void registerQueueMetrics(Map queues, Map<String, Object> topoConf, TopologyContext context) {
-        for (Object o : queues.entrySet()) {
-            Map.Entry entry = (Map.Entry) o;
+    public static void registerQueueMetrics(Map<String, JCQueue> queues, Map<String, Object> topoConf, TopologyContext context) {
+        for (Map.Entry<String, JCQueue> entry : queues.entrySet()) {
             String name = "__" + entry.getKey();
-            IMetric metric = new StateMetric((IStatefulObject) entry.getValue());
+            IMetric metric = new StateMetric(entry.getValue());
             registerMetric(name, metric, topoConf, context);
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
index 9c7cf9e..d080471 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
@@ -96,7 +96,7 @@ public class WorkerState {
     private final WorkerTransfer workerTransfer;
     private final BackPressureTracker bpTracker;
 
-    public Map getConf() {
+    public Map<String, Object> getConf() {
         return conf;
     }
 
@@ -140,7 +140,7 @@ public class WorkerState {
         return localReceiveQueues;
     }
 
-    public Map getTopologyConf() {
+    public Map<String, Object> getTopologyConf() {
         return topologyConf;
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java b/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java
index 00ae469..80e4ea3 100644
--- a/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java
+++ b/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java
@@ -204,7 +204,7 @@ public class DRPCSpout extends BaseRichSpout {
                 try {
                     DRPCRequest req = client.fetchRequest(_function);
                     if(req.get_request_id().length() > 0) {
-                        Map returnInfo = new HashMap();
+                        Map<String, Object> returnInfo = new HashMap<>();
                         returnInfo.put("id", req.get_request_id());
                         returnInfo.put("host", client.getHost());
                         returnInfo.put("port", client.getPort());
@@ -228,7 +228,7 @@ public class DRPCSpout extends BaseRichSpout {
                 try {
                     DRPCRequest req = drpc.fetchRequest(_function);
                     if(req.get_request_id().length() > 0) {
-                        Map returnInfo = new HashMap();
+                        Map<String, Object> returnInfo = new HashMap<>();
                         returnInfo.put("id", req.get_request_id());
                         returnInfo.put("host", _local_drpc_id);
                         returnInfo.put("port", 0);

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/drpc/JoinResult.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/drpc/JoinResult.java b/storm-client/src/jvm/org/apache/storm/drpc/JoinResult.java
index f57bbb1..be668fe 100644
--- a/storm-client/src/jvm/org/apache/storm/drpc/JoinResult.java
+++ b/storm-client/src/jvm/org/apache/storm/drpc/JoinResult.java
@@ -44,7 +44,7 @@ public class JoinResult extends BaseRichBolt {
         this.returnComponent = returnComponent;
     }
  
-    public void prepare(Map map, TopologyContext context, OutputCollector collector) {
+    public void prepare(Map<String, Object> map, TopologyContext context, OutputCollector collector) {
         _collector = collector;
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/drpc/PrepareRequest.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/drpc/PrepareRequest.java b/storm-client/src/jvm/org/apache/storm/drpc/PrepareRequest.java
index 06e576c..6cfb898 100644
--- a/storm-client/src/jvm/org/apache/storm/drpc/PrepareRequest.java
+++ b/storm-client/src/jvm/org/apache/storm/drpc/PrepareRequest.java
@@ -38,7 +38,7 @@ public class PrepareRequest extends BaseBasicBolt {
     Random rand;
 
     @Override
-    public void prepare(Map map, TopologyContext context) {
+    public void prepare(Map<String, Object> map, TopologyContext context) {
         rand = new Random();
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/drpc/ReturnResults.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/drpc/ReturnResults.java b/storm-client/src/jvm/org/apache/storm/drpc/ReturnResults.java
index 04ddf25..6146890 100644
--- a/storm-client/src/jvm/org/apache/storm/drpc/ReturnResults.java
+++ b/storm-client/src/jvm/org/apache/storm/drpc/ReturnResults.java
@@ -47,7 +47,7 @@ public class ReturnResults extends BaseRichBolt {
     public static final Logger LOG = LoggerFactory.getLogger(ReturnResults.class);
     OutputCollector _collector;
     boolean local;
-    Map _conf; 
+    Map<String, Object> _conf;
     Map<List, DRPCInvocationsClient> _clients = new HashMap<List, DRPCInvocationsClient>();
 
     @Override
@@ -62,9 +62,9 @@ public class ReturnResults extends BaseRichBolt {
         String result = (String) input.getValue(0);
         String returnInfo = (String) input.getValue(1);
         if (returnInfo!=null) {
-            Map retMap = null;
+            Map<String, Object> retMap;
             try {
-                retMap = (Map) JSONValue.parseWithException(returnInfo);
+                retMap = (Map<String, Object>) JSONValue.parseWithException(returnInfo);
             } catch (ParseException e) {
                  LOG.error("Parseing returnInfo failed", e);
                  _collector.fail(input);

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/executor/Executor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/Executor.java b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
index 267fe74..f811b82 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/Executor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
@@ -225,15 +225,21 @@ public abstract class Executor implements Callable, JCQueue.Consumer {
         }
     }
 
-    private static List<Object> All_CONFIGS() {
-        List<Object> ret = new ArrayList<Object>();
-        Config config = new Config();
-        Class<?> ConfigClass = config.getClass();
-        Field[] fields = ConfigClass.getFields();
+    /**
+     * Retrieves all values of all static fields of {@link Config} which
+     * represent all available configuration keys through reflection. The method
+     * assumes that they are {@code String}s through reflection.
+     * @return the list of retrieved field values
+     * @throws ClassCastException if one of the fields is not of type
+     * {@code String}
+     */
+    private static List<String> retrieveAllConfigKeys() {
+        List<String> ret = new ArrayList<>();
+        Field[] fields = Config.class.getFields();
         for (int i = 0; i < fields.length; i++) {
             try {
-                Object obj = fields[i].get(null);
-                ret.add(obj);
+                String fieldValue = (String)fields[i].get(null);
+                ret.add(fieldValue);
             } catch (IllegalArgumentException e) {
                 LOG.error(e.getMessage(), e);
             } catch (IllegalAccessException e) {
@@ -432,8 +438,8 @@ public abstract class Executor implements Callable, JCQueue.Consumer {
     // ============================ getter methods =================================
     // =============================================================================
 
-    private Map normalizedComponentConf(Map<String, Object> topoConf, WorkerTopologyContext topologyContext, String componentId) {
-        List<Object> keysToRemove = All_CONFIGS();
+    private Map<String, Object> normalizedComponentConf(Map<String, Object> topoConf, WorkerTopologyContext topologyContext, String componentId) {
+        List<String> keysToRemove = retrieveAllConfigKeys();
         keysToRemove.remove(Config.TOPOLOGY_DEBUG);
         keysToRemove.remove(Config.TOPOLOGY_MAX_SPOUT_PENDING);
         keysToRemove.remove(Config.TOPOLOGY_MAX_TASK_PARALLELISM);
@@ -451,11 +457,11 @@ public abstract class Executor implements Callable, JCQueue.Consumer {
         keysToRemove.remove(Config.TOPOLOGY_STATE_PROVIDER_CONFIG);
         keysToRemove.remove(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM);
 
-        Map<Object, Object> componentConf;
+        Map<String, Object> componentConf;
         String specJsonConf = topologyContext.getComponentCommon(componentId).get_json_conf();
         if (specJsonConf != null) {
             try {
-                componentConf = (Map<Object, Object>) JSONValue.parseWithException(specJsonConf);
+                componentConf = (Map<String, Object>) JSONValue.parseWithException(specJsonConf);
             } catch (ParseException e) {
                 throw new RuntimeException(e);
             }
@@ -466,7 +472,7 @@ public abstract class Executor implements Callable, JCQueue.Consumer {
             componentConf = new HashMap<>();
         }
 
-        Map<Object, Object> ret = new HashMap<>();
+        Map<String, Object> ret = new HashMap<>();
         ret.putAll(topoConf);
         ret.putAll(componentConf);
 
@@ -489,7 +495,7 @@ public abstract class Executor implements Callable, JCQueue.Consumer {
         return openOrPrepareWasCalled;
     }
 
-    public Map getTopoConf() {
+    public Map<String, Object> getTopoConf() {
         return topoConf;
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
index 61e6488..91bbcad 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
@@ -37,7 +37,9 @@ import org.apache.storm.daemon.Task;
 import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
 import org.apache.storm.daemon.worker.WorkerState;
 import org.apache.storm.executor.Executor;
+import org.apache.storm.generated.NodeInfo;
 import org.apache.storm.hooks.info.BoltExecuteInfo;
+import org.apache.storm.messaging.IConnection;
 import org.apache.storm.metric.api.IMetricsRegistrant;
 import org.apache.storm.security.auth.IAutoCredentials;
 import org.apache.storm.policy.IWaitStrategy.WAIT_SITUATION;
@@ -95,7 +97,7 @@ public class BoltExecutor extends Executor {
 
     private static IWaitStrategy makeSystemBoltWaitStrategy() {
         WaitStrategyPark ws = new WaitStrategyPark();
-        HashMap conf = new HashMap<String, Object>();
+        Map<String, Object> conf = new HashMap<>();
         conf.put(Config.TOPOLOGY_BOLT_WAIT_PARK_MICROSEC, 5000);
         ws.prepare(conf, WAIT_SITUATION.BOLT_WAIT);
         return ws;
@@ -126,7 +128,7 @@ public class BoltExecutor extends Executor {
                 Map<String, JCQueue> map = ImmutableMap.of("receive", receiveQueue, "transfer", workerData.getTransferQueue());
                 BuiltinMetricsUtil.registerQueueMetrics(map, topoConf, userContext);
 
-                Map cachedNodePortToSocket = workerData.getCachedNodeToPortSocket().get();
+                Map<NodeInfo, IConnection> cachedNodePortToSocket = workerData.getCachedNodeToPortSocket().get();
                 BuiltinMetricsUtil.registerIconnectionClientMetrics(cachedNodePortToSocket, topoConf, userContext);
                 BuiltinMetricsUtil.registerIconnectionServerMetric(workerData.getReceiver(), topoConf, userContext);
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java b/storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java
index b80f7c0..7dbc5d6 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java
@@ -39,7 +39,7 @@ import java.util.concurrent.atomic.AtomicLong;
  */
 public class DeserializingConnectionCallback implements IConnectionCallback, IMetric {
     private final WorkerState.ILocalTransferCallback cb;
-    private final Map conf;
+    private final Map<String, Object> conf;
     private final GeneralTopologyContext context;
 
     private final ThreadLocal<KryoTupleDeserializer> _des =
@@ -55,7 +55,7 @@ public class DeserializingConnectionCallback implements IConnectionCallback, IMe
     private final ConcurrentHashMap<String, AtomicLong> byteCounts = new ConcurrentHashMap<>();
 
 
-    public DeserializingConnectionCallback(final Map conf, final GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) {
+    public DeserializingConnectionCallback(final Map<String, Object> conf, final GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) {
         this.conf = conf;
         this.context = context;
         cb = callback;

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
index ac92c6b..e0736b5 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
@@ -549,7 +549,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
         return ret;
     }
 
-    public Map getConfig() {
+    public Map<String, Object> getConfig() {
         return topoConf;
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/metric/api/MultiCountMetric.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/metric/api/MultiCountMetric.java b/storm-client/src/jvm/org/apache/storm/metric/api/MultiCountMetric.java
index 5c0caf5..343a3d7 100644
--- a/storm-client/src/jvm/org/apache/storm/metric/api/MultiCountMetric.java
+++ b/storm-client/src/jvm/org/apache/storm/metric/api/MultiCountMetric.java
@@ -35,8 +35,8 @@ public class MultiCountMetric implements IMetric {
         return val;
     }
 
-    public Object getValueAndReset() {
-        Map ret = new HashMap();
+    public Map<String, Object> getValueAndReset() {
+        Map<String, Object> ret = new HashMap<>();
         for(Map.Entry<String, CountMetric> e : _value.entrySet()) {
             ret.put(e.getKey(), e.getValue().getValueAndReset());
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/metric/api/MultiReducedMetric.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/metric/api/MultiReducedMetric.java b/storm-client/src/jvm/org/apache/storm/metric/api/MultiReducedMetric.java
index 9bd9113..a6cb49a 100644
--- a/storm-client/src/jvm/org/apache/storm/metric/api/MultiReducedMetric.java
+++ b/storm-client/src/jvm/org/apache/storm/metric/api/MultiReducedMetric.java
@@ -37,8 +37,8 @@ public class MultiReducedMetric implements IMetric {
         return val;
     }
 
-    public Object getValueAndReset() {
-        Map ret = new HashMap();
+    public Map<String, Object> getValueAndReset() {
+        Map<String, Object> ret = new HashMap<>();
         for(Map.Entry<String, ReducedMetric> e : _value.entrySet()) {
             Object val = e.getValue().getValueAndReset();
             if(val != null) {

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/security/auth/FixedGroupsMapping.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/FixedGroupsMapping.java b/storm-client/src/jvm/org/apache/storm/security/auth/FixedGroupsMapping.java
index 62f1d9b..b55dec2 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/FixedGroupsMapping.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/FixedGroupsMapping.java
@@ -41,7 +41,7 @@ public class FixedGroupsMapping implements IGroupMappingServiceProvider {
      * @param storm_conf Storm configuration
      */
     @Override
-    public void prepare(Map storm_conf) {
+    public void prepare(Map<String, Object> storm_conf) {
         Map<?, ?> params = (Map<?, ?>) storm_conf.get(Config.STORM_GROUP_MAPPING_SERVICE_PARAMS);
         Map<String, Set<String>> mapping = (Map<String, Set<String>>) params.get(STORM_FIXED_GROUP_MAPPING);
         if (mapping != null) {

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/security/auth/authorizer/DRPCAuthorizerBase.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/authorizer/DRPCAuthorizerBase.java b/storm-client/src/jvm/org/apache/storm/security/auth/authorizer/DRPCAuthorizerBase.java
index fdcd31e..f29e882 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/authorizer/DRPCAuthorizerBase.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/authorizer/DRPCAuthorizerBase.java
@@ -36,9 +36,9 @@ public abstract class DRPCAuthorizerBase implements IAuthorizer {
     @Override
     public abstract void prepare(Map<String, Object> conf);
 
-    abstract protected boolean permitClientRequest(ReqContext context, String operation, Map params);
+    abstract protected boolean permitClientRequest(ReqContext context, String operation, Map<String, Object> params);
 
-    abstract protected boolean permitInvocationRequest(ReqContext context, String operation, Map params);
+    abstract protected boolean permitInvocationRequest(ReqContext context, String operation, Map<String, Object> params);
     
     /**
      * Authorizes request from to the DRPC server.
@@ -47,7 +47,7 @@ public abstract class DRPCAuthorizerBase implements IAuthorizer {
      * @param params a Map with any key-value entries of use to the authorization implementation
      */
     @Override
-    public boolean permit(ReqContext context, String operation, Map params) {
+    public boolean permit(ReqContext context, String operation, Map<String, Object> params) {
         if ("execute".equals(operation)) {
             return permitClientRequest(context, operation, params);
         } else if ("failRequest".equals(operation) || 

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java b/storm-client/src/jvm/org/apache/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java
index dc4d911..ca8646f 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java
@@ -125,7 +125,7 @@ public class DRPCSimpleACLAuthorizer extends DRPCAuthorizerBase {
         return null;
     }
 
-    protected boolean permitClientOrInvocationRequest(ReqContext context, Map params,
+    protected boolean permitClientOrInvocationRequest(ReqContext context, Map<String, Object> params,
             String fieldName) {
         Map<String,AclFunctionEntry> acl = readAclFromConfig();
         String function = (String) params.get(FUNCTION_KEY);
@@ -165,13 +165,13 @@ public class DRPCSimpleACLAuthorizer extends DRPCAuthorizerBase {
 
     @Override
     protected boolean permitClientRequest(ReqContext context, String operation,
-            Map params) {
+            Map<String, Object> params) {
         return permitClientOrInvocationRequest(context, params, "clientUsers");
     }
 
     @Override
     protected boolean permitInvocationRequest(ReqContext context, String operation,
-            Map params) {
+            Map<String, Object> params) {
         return permitClientOrInvocationRequest(context, params, "invocationUser");
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/serialization/types/HashMapSerializer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/serialization/types/HashMapSerializer.java b/storm-client/src/jvm/org/apache/storm/serialization/types/HashMapSerializer.java
index a1e917a..73b1273 100644
--- a/storm-client/src/jvm/org/apache/storm/serialization/types/HashMapSerializer.java
+++ b/storm-client/src/jvm/org/apache/storm/serialization/types/HashMapSerializer.java
@@ -26,7 +26,7 @@ import java.util.Map;
 
 public class HashMapSerializer extends MapSerializer {
     @Override
-    public Map create(Kryo kryo, Input input, Class<Map> type) {
-        return new HashMap();
+    public Map<String, Object> create(Kryo kryo, Input input, Class<Map> type) {
+        return new HashMap<>();
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/stats/CommonStats.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/stats/CommonStats.java b/storm-client/src/jvm/org/apache/storm/stats/CommonStats.java
index f115d12..b6461c5 100644
--- a/storm-client/src/jvm/org/apache/storm/stats/CommonStats.java
+++ b/storm-client/src/jvm/org/apache/storm/stats/CommonStats.java
@@ -61,11 +61,11 @@ public abstract class CommonStats {
         transferredStats.close();
     }
 
-    protected Map valueStat(MultiCountStatAndMetric metric) {
+    protected Map<String,Map<String,Long>> valueStat(MultiCountStatAndMetric metric) {
         return metric.getTimeCounts();
     }
 
-    protected Map valueStat(MultiLatencyStatAndMetric metric) {
+    protected Map<String, Map<String, Double>> valueStat(MultiLatencyStatAndMetric metric) {
         return metric.getTimeLatAvg();
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/task/GeneralTopologyContext.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/task/GeneralTopologyContext.java b/storm-client/src/jvm/org/apache/storm/task/GeneralTopologyContext.java
index d59c9cf..4ea979e 100644
--- a/storm-client/src/jvm/org/apache/storm/task/GeneralTopologyContext.java
+++ b/storm-client/src/jvm/org/apache/storm/task/GeneralTopologyContext.java
@@ -160,7 +160,7 @@ public class GeneralTopologyContext implements JSONAware {
 
     @Override
     public String toJSONString() {
-        Map obj = new HashMap();
+        Map<String, Object> obj = new HashMap<>();
         obj.put("task->component", _taskToComponent);
         // TODO: jsonify StormTopology
         // at the minimum should send source info

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/testing/NonRichBoltTracker.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/testing/NonRichBoltTracker.java b/storm-client/src/jvm/org/apache/storm/testing/NonRichBoltTracker.java
index 4c04d84..8193f9a 100644
--- a/storm-client/src/jvm/org/apache/storm/testing/NonRichBoltTracker.java
+++ b/storm-client/src/jvm/org/apache/storm/testing/NonRichBoltTracker.java
@@ -41,7 +41,7 @@ public class NonRichBoltTracker implements IBolt {
 
     public void execute(Tuple input) {
         _delegate.execute(input);
-        Map stats = (Map) RegisteredGlobalState.getState(_trackId);
+        Map<String, Object> stats = (Map<String, Object>) RegisteredGlobalState.getState(_trackId);
         ((AtomicInteger) stats.get("processed")).incrementAndGet();
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/testing/SpoutTracker.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/testing/SpoutTracker.java b/storm-client/src/jvm/org/apache/storm/testing/SpoutTracker.java
index 5f75eae..d2af80f 100644
--- a/storm-client/src/jvm/org/apache/storm/testing/SpoutTracker.java
+++ b/storm-client/src/jvm/org/apache/storm/testing/SpoutTracker.java
@@ -45,7 +45,7 @@ public class SpoutTracker extends BaseRichSpout {
         }
         
         private void recordSpoutEmit() {
-            Map stats = (Map) RegisteredGlobalState.getState(_trackId);
+            Map<String, Object> stats = (Map<String, Object>) RegisteredGlobalState.getState(_trackId);
             ((AtomicInteger) stats.get("spout-emitted")).incrementAndGet();
             
         }
@@ -99,13 +99,13 @@ public class SpoutTracker extends BaseRichSpout {
 
     public void ack(Object msgId) {
         _delegate.ack(msgId);
-        Map stats = (Map) RegisteredGlobalState.getState(_trackId);
+        Map<String, Object> stats = (Map<String, Object>) RegisteredGlobalState.getState(_trackId);
         ((AtomicInteger) stats.get("processed")).incrementAndGet();
     }
 
     public void fail(Object msgId) {
         _delegate.fail(msgId);
-        Map stats = (Map) RegisteredGlobalState.getState(_trackId);
+        Map<String, Object> stats = (Map<String, Object>) RegisteredGlobalState.getState(_trackId);
         ((AtomicInteger) stats.get("processed")).incrementAndGet();        
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java b/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java
index 60a7eb3..e10bc44 100644
--- a/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java
+++ b/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java
@@ -735,8 +735,8 @@ public class TopologyBuilder {
         }        
     }
 
-    private static String mergeIntoJson(Map into, Map newMap) {
-        Map res = new HashMap<>(into);
+    private static String mergeIntoJson(Map<String, Object> into, Map<String, Object> newMap) {
+        Map<String, Object> res = new HashMap<>(into);
         res.putAll(newMap);
         return JSONValue.toJSONString(res);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/transactional/state/TestTransactionalState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/transactional/state/TestTransactionalState.java b/storm-client/src/jvm/org/apache/storm/transactional/state/TestTransactionalState.java
index bf6b181..b83c849 100644
--- a/storm-client/src/jvm/org/apache/storm/transactional/state/TestTransactionalState.java
+++ b/storm-client/src/jvm/org/apache/storm/transactional/state/TestTransactionalState.java
@@ -35,7 +35,7 @@ public class TestTransactionalState extends TransactionalState {
      * Matching constructor in absence of a default constructor in the parent
      * class.
      */
-    protected TestTransactionalState(Map<String, Object> conf, String id, Map componentConf, String subroot) {
+    protected TestTransactionalState(Map<String, Object> conf, String id, Map<String, Object> componentConf, String subroot) {
         super(conf, id, componentConf, subroot);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/transactional/state/TransactionalState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/transactional/state/TransactionalState.java b/storm-client/src/jvm/org/apache/storm/transactional/state/TransactionalState.java
index d7079f3..c940b3f 100644
--- a/storm-client/src/jvm/org/apache/storm/transactional/state/TransactionalState.java
+++ b/storm-client/src/jvm/org/apache/storm/transactional/state/TransactionalState.java
@@ -43,17 +43,17 @@ public class TransactionalState {
     KryoValuesDeserializer _des;
     List<ACL> _zkAcls = null;
     
-    public static TransactionalState newUserState(Map<String, Object> conf, String id, Map componentConf) {
+    public static TransactionalState newUserState(Map<String, Object> conf, String id, Map<String, Object> componentConf) {
         return new TransactionalState(conf, id, componentConf, "user");
     }
     
-    public static TransactionalState newCoordinatorState(Map<String, Object> conf, String id, Map componentConf) {
+    public static TransactionalState newCoordinatorState(Map<String, Object> conf, String id, Map<String, Object> componentConf) {
         return new TransactionalState(conf, id, componentConf, "coordinator");        
     }
     
-    protected TransactionalState(Map<String, Object> conf, String id, Map componentConf, String subroot) {
+    protected TransactionalState(Map<String, Object> conf, String id, Map<String, Object> componentConf, String subroot) {
         try {
-            conf = new HashMap(conf);
+            conf = new HashMap<>(conf);
             // ensure that the serialization registrations are consistent with the declarations in this spout
             if(componentConf!=null) {
                 conf.put(Config.TOPOLOGY_KRYO_REGISTER,

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/trident/TridentTopology.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/trident/TridentTopology.java b/storm-client/src/jvm/org/apache/storm/trident/TridentTopology.java
index d51b883..7b18187 100644
--- a/storm-client/src/jvm/org/apache/storm/trident/TridentTopology.java
+++ b/storm-client/src/jvm/org/apache/storm/trident/TridentTopology.java
@@ -529,17 +529,17 @@ public class TridentTopology {
                 }
             }
         }
-        HashMap<String, Number> combinedMasterCoordResources = new HashMap<String, Number>(_resourceDefaults);
+        HashMap<String, Number> combinedMasterCoordResources = new HashMap<>(_resourceDefaults);
         combinedMasterCoordResources.putAll(_masterCoordResources);
         return builder.buildTopology(combinedMasterCoordResources);
     }
 
-    private static Map<String, Number> mergeDefaultResources(Map<String, Number> res, Map defaultConfig) {
-        Map<String, Number> ret = new HashMap<String, Number>();
+    private static Map<String, Number> mergeDefaultResources(Map<String, Number> res, Map<String, Number> defaultConfig) {
+        Map<String, Number> ret = new HashMap<>();
 
-        Number onHeapDefault = (Number)defaultConfig.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
-        Number offHeapDefault = (Number)defaultConfig.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
-        Number cpuLoadDefault = (Number)defaultConfig.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
+        Number onHeapDefault = defaultConfig.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+        Number offHeapDefault = defaultConfig.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
+        Number cpuLoadDefault = defaultConfig.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
 
         if(res == null) {
             ret.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, onHeapDefault);

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/trident/drpc/ReturnResultsReducer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/trident/drpc/ReturnResultsReducer.java b/storm-client/src/jvm/org/apache/storm/trident/drpc/ReturnResultsReducer.java
index 617a42f..6d6648a 100644
--- a/storm-client/src/jvm/org/apache/storm/trident/drpc/ReturnResultsReducer.java
+++ b/storm-client/src/jvm/org/apache/storm/trident/drpc/ReturnResultsReducer.java
@@ -80,9 +80,9 @@ public class ReturnResultsReducer implements MultiReducer<ReturnResultsState> {
         // only one of the multireducers will receive the tuples
         if (state.returnInfo!=null) {
             String result = JSONValue.toJSONString(state.results);
-            Map retMap = null;
+            Map<String, Object> retMap;
             try {
-                retMap = (Map) JSONValue.parseWithException(state.returnInfo);
+                retMap = (Map<String, Object>) JSONValue.parseWithException(state.returnInfo);
             } catch (ParseException e) {
                 collector.reportError(e);
                 return;

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java b/storm-client/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
index bb05450..61d50f8 100644
--- a/storm-client/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
+++ b/storm-client/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
@@ -59,7 +59,7 @@ public class TransactionalState {
     
     protected TransactionalState(Map<String, Object> conf, String id, String subroot) {
         try {
-            conf = new HashMap(conf);
+            conf = new HashMap<>(conf);
             String transactionalRoot = (String)conf.get(Config.TRANSACTIONAL_ZOOKEEPER_ROOT);
             String rootDir = transactionalRoot + "/" + id + "/" + subroot;
             List<String> servers = (List<String>) getWithBackup(conf, Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Config.STORM_ZOOKEEPER_SERVERS);
@@ -193,9 +193,11 @@ public class TransactionalState {
         _curator.close();
     }
     
-    private Object getWithBackup(Map amap, Object primary, Object backup) {
+    private Object getWithBackup(Map<String, Object> amap, String primary, String backup) {
         Object ret = amap.get(primary);
-        if(ret==null) return amap.get(backup);
+        if(ret==null) {
+            return amap.get(backup);
+        }
         return ret;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java b/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java
index 0d5bab8..eb4557f 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java
@@ -271,7 +271,7 @@ public class ConfigUtils {
         return ret;
     }
 
-    public static Map overrideLoginConfigWithSystemProperty(Map<String, Object> conf) { // note that we delete the return value
+    public static Map<String, Object> overrideLoginConfigWithSystemProperty(Map<String, Object> conf) { // note that we delete the return value
         String loginConfFile = System.getProperty("java.security.auth.login.config");
         if (loginConfFile != null) {
             conf.put("java.security.auth.login.config", loginConfFile);
@@ -322,7 +322,7 @@ public class ConfigUtils {
         return conf;
     }
 
-    public static Map readYamlConfig(String name) {
+    public static Map<String, Object> readYamlConfig(String name) {
         return readYamlConfig(name, true);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java b/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
index 26b0ac4..7a64812 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
 
@@ -189,7 +190,7 @@ public class JCQueue implements IStatefulObject {
         }
 
         public Object getState() {
-            HashMap state = new HashMap<String, Object>();
+            Map<String, Object> state = new HashMap<>();
 
             final double arrivalRateInSecs = arrivalsTracker.reportRate();
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/utils/NimbusClient.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/NimbusClient.java b/storm-client/src/jvm/org/apache/storm/utils/NimbusClient.java
index e27304a..bbd47a9 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/NimbusClient.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/NimbusClient.java
@@ -111,7 +111,7 @@ public class NimbusClient extends ThriftClient {
         return true;
     }
 
-    public static NimbusClient getConfiguredClientAs(Map conf, String asUser) {
+    public static NimbusClient getConfiguredClientAs(Map<String, Object> conf, String asUser) {
         return getConfiguredClientAs(conf, asUser, null);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
index af03787..ba2ed74 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
@@ -934,18 +934,17 @@ public class Utils {
      * @param listSeq to reverse
      * @return a reversed map
      */
-    public static HashMap reverseMap(List listSeq) {
-        HashMap<Object, List<Object>> rtn = new HashMap();
+    public static Map<Object, List<Object>> reverseMap(List<List<Object>> listSeq) {
+        Map<Object, List<Object>> rtn = new HashMap<>();
         if (listSeq == null) {
             return rtn;
         }
-        for (Object entry : listSeq) {
-            List listEntry = (List) entry;
+        for (List<Object> listEntry : listSeq) {
             Object key = listEntry.get(0);
             Object val = listEntry.get(1);
-            List list = rtn.get(val);
+            List<Object> list = rtn.get(val);
             if (list == null) {
-                list = new ArrayList<Object>();
+                list = new ArrayList<>();
                 rtn.put(val, list);
             }
             list.add(key);

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/zookeeper/ClientZookeeper.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/zookeeper/ClientZookeeper.java b/storm-client/src/jvm/org/apache/storm/zookeeper/ClientZookeeper.java
index d2d862b..42d4417 100644
--- a/storm-client/src/jvm/org/apache/storm/zookeeper/ClientZookeeper.java
+++ b/storm-client/src/jvm/org/apache/storm/zookeeper/ClientZookeeper.java
@@ -73,7 +73,7 @@ public class ClientZookeeper {
         _instance.mkdirsImpl(zk, path, acls);
     }
 
-    public static CuratorFramework mkClient(Map<String, Object> conf, List<String> servers, Object port, String root, final WatcherCallBack watcher, Map authConf) {
+    public static CuratorFramework mkClient(Map<String, Object> conf, List<String> servers, Object port, String root, final WatcherCallBack watcher, Map<String, Object> authConf) {
         return _instance.mkClientImpl(conf, servers, port, root, watcher, authConf);
     }
 
@@ -313,15 +313,15 @@ public class ClientZookeeper {
         return mkClientImpl(conf, servers, port, root, new DefaultWatcherCallBack());
     }
 
-    public  CuratorFramework mkClientImpl(Map<String, Object> conf, List<String> servers, Object port, Map authConf) {
+    public  CuratorFramework mkClientImpl(Map<String, Object> conf, List<String> servers, Object port, Map<String, Object> authConf) {
         return mkClientImpl(conf, servers, port, "", new DefaultWatcherCallBack(), authConf);
     }
 
-    public  CuratorFramework mkClientImpl(Map<String, Object> conf, List<String> servers, Object port, String root, Map authConf) {
+    public  CuratorFramework mkClientImpl(Map<String, Object> conf, List<String> servers, Object port, String root, Map<String, Object> authConf) {
         return mkClientImpl(conf, servers, port, root, new DefaultWatcherCallBack(), authConf);
     }
 
-    public  CuratorFramework mkClientImpl(Map<String, Object> conf, List<String> servers, Object port, String root, final WatcherCallBack watcher, Map authConf) {
+    public  CuratorFramework mkClientImpl(Map<String, Object> conf, List<String> servers, Object port, String root, final WatcherCallBack watcher, Map<String, Object> authConf) {
         CuratorFramework fk;
         if (authConf != null) {
             fk = CuratorUtils.newCurator(conf, servers, port, root, new ZookeeperAuthInfo(authConf));

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/test/jvm/org/apache/storm/messaging/DeserializingConnectionCallbackTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/messaging/DeserializingConnectionCallbackTest.java b/storm-client/test/jvm/org/apache/storm/messaging/DeserializingConnectionCallbackTest.java
index 9f0e1d1..985aa1c 100644
--- a/storm-client/test/jvm/org/apache/storm/messaging/DeserializingConnectionCallbackTest.java
+++ b/storm-client/test/jvm/org/apache/storm/messaging/DeserializingConnectionCallbackTest.java
@@ -48,7 +48,7 @@ public class DeserializingConnectionCallbackTest {
 
     @Test
     public void testUpdateMetricsConfigOff() {
-        Map config = new HashMap();
+        Map<String, Object> config = new HashMap<>();
         config.put(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS, Boolean.FALSE);
         DeserializingConnectionCallback withoutMetrics = 
             new DeserializingConnectionCallback(config, mock(GeneralTopologyContext.class), mock(
@@ -64,7 +64,7 @@ public class DeserializingConnectionCallbackTest {
     
     @Test
     public void testUpdateMetricsConfigOn() {
-        Map config = new HashMap();
+        Map<String, Object> config = new HashMap<>();
         config.put(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS, Boolean.TRUE);
         DeserializingConnectionCallback withMetrics =
             new DeserializingConnectionCallback(config, mock(GeneralTopologyContext.class), mock(

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/test/jvm/org/apache/storm/security/auth/AutoSSLTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/security/auth/AutoSSLTest.java b/storm-client/test/jvm/org/apache/storm/security/auth/AutoSSLTest.java
index 272ca60..9283132 100644
--- a/storm-client/test/jvm/org/apache/storm/security/auth/AutoSSLTest.java
+++ b/storm-client/test/jvm/org/apache/storm/security/auth/AutoSSLTest.java
@@ -59,7 +59,7 @@ public class AutoSSLTest {
     @Test
     public void testgetSSLFilesFromConf() throws Exception {
         AutoSSL assl = new AutoSSL();
-        Map<String, Object> conf = new HashMap();
+        Map<String, Object> conf = new HashMap<>();
         assertNull(assl.getSSLFilesFromConf(conf));
         conf.put(AutoSSL.SSL_FILES_CONF, "sslfile1.txt");
         assl.prepare(conf);
@@ -74,14 +74,14 @@ public class AutoSSLTest {
     @Test
     public void testgetSSLFilesFromConfMultipleComma() throws Exception {
         AutoSSL assl = new AutoSSL();
-        Map<String, Object> conf = new HashMap();
+        Map<String, Object> conf = new HashMap<>();
         assertNull(assl.getSSLFilesFromConf(conf));
         conf.put(AutoSSL.SSL_FILES_CONF, "sslfile1.txt,sslfile2.txt,sslfile3.txt");
         assl.prepare(conf);
         Collection<String> sslFiles = assl.getSSLFilesFromConf(conf);
         assertNotNull(sslFiles);
         assertEquals(3, sslFiles.size());
-        ArrayList valid = new ArrayList<String>();
+        List<String> valid = new ArrayList<>();
         Collections.addAll(valid, "sslfile1.txt", "sslfile2.txt", "sslfile3.txt");
         for(String file: sslFiles) {
             assertTrue("removing: " + file, valid.remove(file));
@@ -104,13 +104,13 @@ public class AutoSSLTest {
             AutoSSL assl = new TestAutoSSL(baseDir.getPath());
 
             LOG.debug("base dir is; " + baseDir);
-            Map sslconf = new HashMap();
+            Map<String, Object> sslconf = new HashMap<>();
 
             sslconf.put(AutoSSL.SSL_FILES_CONF, temp.getPath());
             assl.prepare(sslconf);
             Collection<String> sslFiles = assl.getSSLFilesFromConf(sslconf);
  
-            Map<String, String> creds = new HashMap();
+            Map<String, String> creds = new HashMap<>();
             assl.populateCredentials(creds);
             assertTrue(creds.containsKey(temp.getName()));
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/test/jvm/org/apache/storm/streams/ProcessorBoltTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/streams/ProcessorBoltTest.java b/storm-client/test/jvm/org/apache/storm/streams/ProcessorBoltTest.java
index 9380714..ba0747c 100644
--- a/storm-client/test/jvm/org/apache/storm/streams/ProcessorBoltTest.java
+++ b/storm-client/test/jvm/org/apache/storm/streams/ProcessorBoltTest.java
@@ -40,6 +40,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
+import org.apache.storm.generated.Grouping;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -140,7 +141,7 @@ public class ProcessorBoltTest {
         node.setWindowed(isWindowed);
         Mockito.when(mockStreamToProcessors.get(Mockito.anyString())).thenReturn(Collections.singletonList(node));
         Mockito.when(mockStreamToProcessors.keySet()).thenReturn(Collections.singleton("inputstream"));
-        Map mockSources = Mockito.mock(Map.class);
+        Map<GlobalStreamId, Grouping> mockSources = Mockito.mock(Map.class);
         GlobalStreamId mockGlobalStreamId = Mockito.mock(GlobalStreamId.class);
         Mockito.when(mockTopologyContext.getThisSources()).thenReturn(mockSources);
         Mockito.when(mockSources.keySet()).thenReturn(Collections.singleton(mockGlobalStreamId));

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/test/jvm/org/apache/storm/utils/UtilsTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/utils/UtilsTest.java b/storm-client/test/jvm/org/apache/storm/utils/UtilsTest.java
index c8499e9..30cdae9 100644
--- a/storm-client/test/jvm/org/apache/storm/utils/UtilsTest.java
+++ b/storm-client/test/jvm/org/apache/storm/utils/UtilsTest.java
@@ -50,22 +50,22 @@ public class UtilsTest {
                 Utils.isZkAuthenticationConfiguredTopology(topologyMockMap("foobar")));
     }
 
-    private Map topologyMockMap(String value) {
+    private Map<String, Object> topologyMockMap(String value) {
         return mockMap(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME, value);
     }
 
-    private Map mockMap(String key, String value) {
-        Map<String, Object> map = new HashMap<String, Object>();
+    private Map<String, Object> mockMap(String key, String value) {
+        Map<String, Object> map = new HashMap<>();
         map.put(key, value);
         return map;
     }
 
-    private Map serverMockMap(String value) {
+    private Map<String, Object> serverMockMap(String value) {
         return mockMap(Config.STORM_ZOOKEEPER_AUTH_SCHEME, value);
     }
 
-    private Map emptyMockMap() {
-        return new HashMap<String, Object>();
+    private Map<String, Object> emptyMockMap() {
+        return new HashMap<>();
     }
     
     private void doParseJvmHeapMemByChildOptsTest(String message, String opt, double expected) {

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-core/src/jvm/org/apache/storm/command/UploadCredentials.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/UploadCredentials.java b/storm-core/src/jvm/org/apache/storm/command/UploadCredentials.java
index 6023409..a72039b 100644
--- a/storm-core/src/jvm/org/apache/storm/command/UploadCredentials.java
+++ b/storm-core/src/jvm/org/apache/storm/command/UploadCredentials.java
@@ -44,18 +44,21 @@ public class UploadCredentials {
         if (null != rawCredentials && ((rawCredentials.size() % 2) != 0)) {
             throw new RuntimeException("Need an even number of arguments to make a map");
         }
-        Map credentialsMap = new HashMap<>();
+        Map<String,String> credentialsMap = new HashMap<>();
         if (null != credentialFile) {
             Properties credentialProps = new Properties();
             credentialProps.load(new FileReader(credentialFile));
-            credentialsMap.putAll(credentialProps);
+            for(Map.Entry<Object, Object> credentialProp : credentialProps.entrySet()) {
+                credentialsMap.put((String)credentialProp.getKey(),
+                        (String)credentialProp.getValue());
+            }
         }
         if (null != rawCredentials) {
             for (int i = 0; i < rawCredentials.size(); i += 2) {
                 credentialsMap.put(rawCredentials.get(i), rawCredentials.get(i + 1));
             }
         }
-        StormSubmitter.pushCredentials(topologyName, new HashMap(), credentialsMap);
+        StormSubmitter.pushCredentials(topologyName, new HashMap<>(), credentialsMap);
         LOG.info("Uploaded new creds to topology: {}", topologyName);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-core/src/jvm/org/apache/storm/ui/FilterConfiguration.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/ui/FilterConfiguration.java b/storm-core/src/jvm/org/apache/storm/ui/FilterConfiguration.java
index db5952e..e671264 100644
--- a/storm-core/src/jvm/org/apache/storm/ui/FilterConfiguration.java
+++ b/storm-core/src/jvm/org/apache/storm/ui/FilterConfiguration.java
@@ -22,16 +22,16 @@ import java.util.Map;
 public class FilterConfiguration {
     private String filterClass;
     private String filterName;
-    private Map filterParams;
+    private Map<String, String> filterParams;
 
 
-    public FilterConfiguration(String filterClass, Map filterParams) {
+    public FilterConfiguration(String filterClass, Map<String, String> filterParams) {
         this.filterParams = filterParams;
         this.filterClass = filterClass;
         this.filterName = null;
     }
 
-    public FilterConfiguration(String filterClass, String filterName, Map filterParams) {
+    public FilterConfiguration(String filterClass, String filterName, Map<String, String> filterParams) {
         this.filterClass = filterClass;
         this.filterName = filterName;
         this.filterParams = filterParams;
@@ -53,11 +53,11 @@ public class FilterConfiguration {
         this.filterClass = filterClass;
     }
 
-    public Map getFilterParams() {
+    public Map<String, String> getFilterParams() {
         return filterParams;
     }
 
-    public void setFilterParams(Map filterParams) {
+    public void setFilterParams(Map<String, String> filterParams) {
         this.filterParams = filterParams;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-core/src/jvm/org/apache/storm/ui/UIHelpers.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/ui/UIHelpers.java b/storm-core/src/jvm/org/apache/storm/ui/UIHelpers.java
index f29c1f6..be4a4c0 100644
--- a/storm-core/src/jvm/org/apache/storm/ui/UIHelpers.java
+++ b/storm-core/src/jvm/org/apache/storm/ui/UIHelpers.java
@@ -109,7 +109,7 @@ public class UIHelpers {
         return "[" + e.get_task_start() + "-" + e.get_task_end() + "]";
     }
 
-    public static Map unauthorizedUserJson(String user) {
+    public static Map<String, Object> unauthorizedUserJson(String user) {
         return ImmutableMap.of(
                 "error", "No Authorization",
                 "errorMessage", String.format("User %s is not authorized.", user));
@@ -203,7 +203,7 @@ public class UIHelpers {
         for (FilterConfiguration filterConf : filtersConfs) {
             String filterName = filterConf.getFilterName();
             String filterClass = filterConf.getFilterClass();
-            Map filterParams = filterConf.getFilterParams();
+            Map<String, String> filterParams = filterConf.getFilterParams();
             if (filterClass != null) {
                 FilterHolder filterHolder = new FilterHolder();
                 filterHolder.setClassName(filterClass);
@@ -215,7 +215,7 @@ public class UIHelpers {
                 if (filterParams != null) {
                     filterHolder.setInitParameters(filterParams);
                 } else {
-                    filterHolder.setInitParameters(new HashMap<String, String>());
+                    filterHolder.setInitParameters(new HashMap<>());
                 }
                 context.addFilter(filterHolder, "/*", EnumSet.allOf(DispatcherType.class));
             }

http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-core/test/jvm/org/apache/storm/nimbus/InMemoryTopologyActionNotifier.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/nimbus/InMemoryTopologyActionNotifier.java b/storm-core/test/jvm/org/apache/storm/nimbus/InMemoryTopologyActionNotifier.java
index 6205468..c1f082c 100644
--- a/storm-core/test/jvm/org/apache/storm/nimbus/InMemoryTopologyActionNotifier.java
+++ b/storm-core/test/jvm/org/apache/storm/nimbus/InMemoryTopologyActionNotifier.java
@@ -30,7 +30,7 @@ public class InMemoryTopologyActionNotifier implements  ITopologyActionNotifierP
 
 
     @Override
-    public void prepare(Map StormConf) {
+    public void prepare(Map<String, Object> StormConf) {
         //no-op
     }