You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by da...@apache.org on 2018/03/21 18:11:29 UTC
[2/5] storm git commit: avoiding rawtypes in configuration and
metadata Maps in order to allow usage without @SuppressWarnings("rawtypes")
and early detection of design flaws
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkState.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkState.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkState.java
index 9f26479..7241c60 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkState.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkState.java
@@ -36,7 +36,8 @@ public class ZkState {
private static final Logger LOG = LoggerFactory.getLogger(ZkState.class);
CuratorFramework _curator;
- private CuratorFramework newCurator(Map stateConf) throws Exception {
+ private CuratorFramework newCurator(final Map<String, Object> stateConf)
+ throws Exception {
Integer port = (Integer) stateConf.get(Config.TRANSACTIONAL_ZOOKEEPER_PORT);
String serverPorts = "";
for (String server : (List<String>) stateConf.get(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS)) {
@@ -54,8 +55,8 @@ public class ZkState {
return _curator;
}
- public ZkState(Map stateConf) {
- stateConf = new HashMap(stateConf);
+ public ZkState(Map<String, Object> stateConf) {
+ stateConf = new HashMap<>(stateConf);
try {
_curator = newCurator(stateConf);
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java
index 8a47ddc..c17c912 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java
@@ -26,7 +26,11 @@ import java.util.List;
import java.util.Map;
-public class OpaqueTridentKafkaSpout implements IOpaquePartitionedTridentSpout<List<GlobalPartitionInformation>, Partition, Map> {
+public class OpaqueTridentKafkaSpout
+ implements IOpaquePartitionedTridentSpout<
+ List<GlobalPartitionInformation>,
+ Partition,
+ Map<String, Object>> {
TridentKafkaConfig _config;
@@ -36,13 +40,18 @@ public class OpaqueTridentKafkaSpout implements IOpaquePartitionedTridentSpout<L
}
@Override
- public IOpaquePartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map> getEmitter(Map<String, Object> conf, TopologyContext context) {
+ public Emitter<List<GlobalPartitionInformation>,
+ Partition,
+ Map<String, Object>> getEmitter(Map<String, Object> conf,
+ TopologyContext context) {
return new TridentKafkaEmitter(conf, context, _config, context
.getStormId()).asOpaqueEmitter();
}
@Override
- public IOpaquePartitionedTridentSpout.Coordinator getCoordinator(Map<String, Object> conf, TopologyContext tc) {
+ public IOpaquePartitionedTridentSpout.Coordinator getCoordinator(
+ Map<String, Object> conf,
+ TopologyContext tc) {
return new org.apache.storm.kafka.trident.Coordinator(conf, _config);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
index 1339387..3333c2c 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
@@ -73,16 +73,20 @@ public class TridentKafkaEmitter {
}
- private Map failFastEmitNewPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map lastMeta) {
+ private Map<String, Object> failFastEmitNewPartitionBatch(
+ final TransactionAttempt attempt,
+ TridentCollector collector,
+ Partition partition,
+ Map<String, Object> lastMeta) {
SimpleConsumer consumer = _connections.register(partition);
- Map ret = doEmitNewPartitionBatch(consumer, partition, collector, lastMeta, attempt);
+ Map<String, Object> ret = doEmitNewPartitionBatch(consumer, partition, collector, lastMeta, attempt);
Long offset = (Long) ret.get("offset");
Long endOffset = (Long) ret.get("nextOffset");
_kafkaOffsetMetric.setOffsetData(partition, new PartitionManager.OffsetData(endOffset, offset));
return ret;
}
- private Map emitNewPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map lastMeta) {
+ private Map<String, Object> emitNewPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map<String, Object> lastMeta) {
try {
return failFastEmitNewPartitionBatch(attempt, collector, partition, lastMeta);
} catch (FailedFetchException e) {
@@ -90,7 +94,7 @@ public class TridentKafkaEmitter {
if (lastMeta == null) {
return null;
} else {
- Map ret = new HashMap();
+ Map<String, Object> ret = new HashMap<>();
ret.put("offset", lastMeta.get("nextOffset"));
ret.put("nextOffset", lastMeta.get("nextOffset"));
ret.put("partition", partition.partition);
@@ -102,12 +106,17 @@ public class TridentKafkaEmitter {
}
}
- private Map doEmitNewPartitionBatch(SimpleConsumer consumer, Partition partition, TridentCollector collector, Map lastMeta, TransactionAttempt attempt) {
+ private Map<String, Object> doEmitNewPartitionBatch(SimpleConsumer consumer,
+ Partition partition,
+ TridentCollector collector,
+ Map<String, Object> lastMeta,
+ TransactionAttempt attempt) {
LOG.debug("Emitting new partition batch - [transaction = {}], [lastMeta = {}]", attempt, lastMeta);
long offset;
if (lastMeta != null) {
String lastInstanceId = null;
- Map lastTopoMeta = (Map) lastMeta.get("topology");
+ Map<String, Object> lastTopoMeta = (Map<String, Object>)
+ lastMeta.get("topology");
if (lastTopoMeta != null) {
lastInstanceId = (String) lastTopoMeta.get("id");
}
@@ -136,7 +145,7 @@ public class TridentKafkaEmitter {
emit(collector, msg.message(), partition, msg.offset(), attempt);
endoffset = msg.nextOffset();
}
- Map newMeta = new HashMap();
+ Map<String, Object> newMeta = new HashMap<>();
newMeta.put("offset", offset);
newMeta.put("nextOffset", endoffset);
newMeta.put("instanceId", _topologyInstanceId);
@@ -161,7 +170,7 @@ public class TridentKafkaEmitter {
/**
* re-emit the batch described by the meta data provided
*/
- private void reEmitPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map meta) {
+ private void reEmitPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map<String, Object> meta) {
LOG.info("re-emitting batch, attempt " + attempt);
String instanceId = (String) meta.get("instanceId");
if (!_config.ignoreZkOffsets || instanceId.equals(_topologyInstanceId)) {
@@ -221,9 +230,9 @@ public class TridentKafkaEmitter {
}
- public IOpaquePartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map> asOpaqueEmitter() {
+ public IOpaquePartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map<String, Object>> asOpaqueEmitter() {
- return new IOpaquePartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map>() {
+ return new IOpaquePartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map<String, Object>>() {
/**
* Emit a batch of tuples for a partition/transaction.
@@ -232,7 +241,7 @@ public class TridentKafkaEmitter {
* for defining the parameters of the next batch.
*/
@Override
- public Map emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) {
+ public Map<String, Object> emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map<String, Object> map) {
return emitNewPartitionBatch(transactionAttempt, tridentCollector, partition, map);
}
@@ -254,14 +263,14 @@ public class TridentKafkaEmitter {
}
public IPartitionedTridentSpout.Emitter asTransactionalEmitter() {
- return new IPartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map>() {
+ return new IPartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map<String, Object>>() {
/**
* Emit a batch of tuples for a partition/transaction that's never been emitted before.
* Return the metadata that can be used to reconstruct this partition/batch in the future.
*/
@Override
- public Map emitPartitionBatchNew(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) {
+ public Map<String, Object> emitPartitionBatchNew(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map<String, Object> map) {
return failFastEmitNewPartitionBatch(transactionAttempt, tridentCollector, partition, map);
}
@@ -270,7 +279,7 @@ public class TridentKafkaEmitter {
* the metadata created when it was first emitted.
*/
@Override
- public void emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) {
+ public void emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map<String, Object> map) {
reEmitPartitionBatch(transactionAttempt, tridentCollector, partition, map);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
index 8d8746f..31dfffe 100644
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
@@ -63,8 +63,8 @@ public class ZkCoordinatorTest {
when(dynamicPartitionConnections.register(any(Broker.class), any(String.class) ,anyInt())).thenReturn(simpleConsumer);
}
- private Map buildZookeeperConfig(TestingServer server) {
- Map<String, Object> conf = new HashMap();
+ private Map<String, Object> buildZookeeperConfig(TestingServer server) {
+ Map<String, Object> conf = new HashMap<>();
conf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, server.getPort());
conf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Arrays.asList("localhost"));
conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 20000);
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoMapState.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoMapState.java
index 56fd33a..b06d35d 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoMapState.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoMapState.java
@@ -68,9 +68,9 @@ public class MongoMapState<T> implements IBackingMap<T> {
private Options<T> options;
private Serializer<T> serializer;
private MongoDbClient mongoClient;
- private Map map;
+ private Map<String, Object> map;
- protected MongoMapState(Map map, Options options) {
+ protected MongoMapState(Map<String, Object> map, Options options) {
this.options = options;
this.map = map;
this.serializer = options.serializer;
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
index c4e8cca..77c394c 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
@@ -45,9 +45,9 @@ public class MongoState implements State {
private Options options;
private MongoDbClient mongoClient;
- private Map map;
+ private Map<String, Object> map;
- protected MongoState(Map map, Options options) {
+ protected MongoState(Map<String, Object> map, Options options) {
this.options = options;
this.map = map;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java
index 471d6cd..f0f3fa3 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java
@@ -82,7 +82,7 @@ public abstract class AbstractRedisBolt extends BaseTickTupleAwareRichBolt {
* {@inheritDoc}
*/
@Override
- public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
+ public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector collector) {
// FIXME: stores map (topoConf), topologyContext and expose these to derived classes
this.collector = collector;
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqState.java
----------------------------------------------------------------------
diff --git a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqState.java b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqState.java
index 9a8a46e..daaa93c 100644
--- a/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqState.java
+++ b/external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/trident/state/RocketMqState.java
@@ -45,7 +45,7 @@ public class RocketMqState implements State {
private Options options;
private MQProducer producer;
- protected RocketMqState(Map map, Options options) {
+ protected RocketMqState(Map<String, Object> map, Options options) {
this.options = options;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrStateFactory.java b/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrStateFactory.java
index 7d3e69d..2e8997c 100644
--- a/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrStateFactory.java
+++ b/external/storm-solr/src/main/java/org/apache/storm/solr/trident/SolrStateFactory.java
@@ -36,7 +36,7 @@ public class SolrStateFactory implements StateFactory {
}
@Override
- public State makeState(Map map, IMetricsContext iMetricsContext, int partitionIndex, int numPartitions) {
+ public State makeState(Map<String, Object> map, IMetricsContext iMetricsContext, int partitionIndex, int numPartitions) {
SolrState state = new SolrState(solrConfig, solrMapper);
state.prepare();
return state;
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/flux/flux-core/src/main/java/org/apache/storm/flux/model/ConfigMethodDef.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/model/ConfigMethodDef.java b/flux/flux-core/src/main/java/org/apache/storm/flux/model/ConfigMethodDef.java
index d100598..8d32c06 100644
--- a/flux/flux-core/src/main/java/org/apache/storm/flux/model/ConfigMethodDef.java
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/model/ConfigMethodDef.java
@@ -49,7 +49,7 @@ public class ConfigMethodDef {
List<Object> newVal = new ArrayList<Object>();
for (Object obj : args) {
if (obj instanceof LinkedHashMap) {
- Map map = (Map)obj;
+ Map<String, Object> map = (Map<String, Object>)obj;
if (map.containsKey("ref") && map.size() == 1) {
newVal.add(new BeanReference((String)map.get("ref")));
this.hasReferences = true;
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/flux/flux-core/src/main/java/org/apache/storm/flux/model/ObjectDef.java
----------------------------------------------------------------------
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/model/ObjectDef.java b/flux/flux-core/src/main/java/org/apache/storm/flux/model/ObjectDef.java
index 170ee4f..1df9f9f 100644
--- a/flux/flux-core/src/main/java/org/apache/storm/flux/model/ObjectDef.java
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/model/ObjectDef.java
@@ -57,7 +57,7 @@ public class ObjectDef {
List<Object> newVal = new ArrayList<Object>();
for (Object obj : constructorArgs) {
if (obj instanceof LinkedHashMap) {
- Map map = (Map)obj;
+ Map<String, Object> map = (Map<String, Object>)obj;
if (map.containsKey("ref") && map.size() == 1) {
newVal.add(new BeanReference((String) map.get("ref")));
this.hasReferences = true;
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/integration-test/src/test/java/org/apache/storm/st/wrapper/StormCluster.java
----------------------------------------------------------------------
diff --git a/integration-test/src/test/java/org/apache/storm/st/wrapper/StormCluster.java b/integration-test/src/test/java/org/apache/storm/st/wrapper/StormCluster.java
index 4521b36..f8e59f5 100644
--- a/integration-test/src/test/java/org/apache/storm/st/wrapper/StormCluster.java
+++ b/integration-test/src/test/java/org/apache/storm/st/wrapper/StormCluster.java
@@ -49,7 +49,7 @@ public class StormCluster {
this.client = NimbusClient.getConfiguredClient(conf).getClient();
}
- public static Map getConfig() {
+ public static Map<String, Object> getConfig() {
return Utils.readStormConfig();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index 7a286d5..ed6068c 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -1853,11 +1853,11 @@ public class Config extends HashMap<String, Object> {
setClasspath(this, cp);
}
- public static void setEnvironment(Map<String, Object> conf, Map env) {
+ public static void setEnvironment(Map<String, Object> conf, Map<String, Object> env) {
conf.put(Config.TOPOLOGY_ENVIRONMENT, env);
}
- public void setEnvironment(Map env) {
+ public void setEnvironment(Map<String, Object> env) {
setEnvironment(this, env);
}
@@ -1955,7 +1955,7 @@ public class Config extends HashMap<String, Object> {
}
public static void registerMetricsConsumer(Map<String, Object> conf, Class klass, Object argument, long parallelismHint) {
- HashMap m = new HashMap();
+ HashMap<String, Object> m = new HashMap<>();
m.put("class", klass.getCanonicalName());
m.put("parallelism.hint", parallelismHint);
m.put("argument", argument);
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/StormSubmitter.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/StormSubmitter.java b/storm-client/src/jvm/org/apache/storm/StormSubmitter.java
index 0f6baf2..5ce0d44 100644
--- a/storm-client/src/jvm/org/apache/storm/StormSubmitter.java
+++ b/storm-client/src/jvm/org/apache/storm/StormSubmitter.java
@@ -80,9 +80,8 @@ public class StormSubmitter {
return false;
}
- @SuppressWarnings("unchecked")
- public static Map prepareZookeeperAuthentication(Map<String, Object> conf) {
- Map toRet = new HashMap();
+ public static Map<String, Object> prepareZookeeperAuthentication(Map<String, Object> conf) {
+ Map<String, Object> toRet = new HashMap<>();
String secretPayload = (String) conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD);
// Is the topology ZooKeeper authentication configuration unset?
if (! conf.containsKey(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD) ||
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/Thrift.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/Thrift.java b/storm-client/src/jvm/org/apache/storm/Thrift.java
index bffd04f..e00b0ee 100644
--- a/storm-client/src/jvm/org/apache/storm/Thrift.java
+++ b/storm-client/src/jvm/org/apache/storm/Thrift.java
@@ -94,7 +94,7 @@ public class Thrift {
return parallelism;
}
- public Map getConf() {
+ public Map<String, Object> getConf() {
return conf;
}
}
@@ -117,7 +117,7 @@ public class Thrift {
return bolt;
}
- public Map getConf() {
+ public Map<String, Object> getConf() {
return conf;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java b/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
index 362d4dd..4bd5ded 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
@@ -256,12 +256,12 @@ public class ClusterUtils {
if (stateStorage instanceof IStateStorage) {
return new StormClusterStateImpl((IStateStorage) stateStorage, context, false);
} else {
- IStateStorage Storage = _instance.mkStateStorageImpl((Map) stateStorage, (Map) stateStorage, context);
+ IStateStorage Storage = _instance.mkStateStorageImpl((Map<String, Object>) stateStorage, (Map<String, Object>) stateStorage, context);
return new StormClusterStateImpl(Storage, context, true);
}
}
- public IStateStorage mkStateStorageImpl(Map<String, Object> config, Map auth_conf, ClusterStateContext context) throws Exception {
+ public IStateStorage mkStateStorageImpl(Map<String, Object> config, Map<String, Object> auth_conf, ClusterStateContext context) throws Exception {
String className = null;
IStateStorage stateStorage = null;
if (config.get(Config.STORM_CLUSTER_STATE_STORE) != null) {
@@ -275,7 +275,7 @@ public class ClusterUtils {
return stateStorage;
}
- public static IStateStorage mkStateStorage(Map<String, Object> config, Map auth_conf, ClusterStateContext context) throws Exception {
+ public static IStateStorage mkStateStorage(Map<String, Object> config, Map<String, Object> auth_conf, ClusterStateContext context) throws Exception {
return _instance.mkStateStorageImpl(config, auth_conf, context);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/cluster/PaceMakerStateStorageFactory.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/PaceMakerStateStorageFactory.java b/storm-client/src/jvm/org/apache/storm/cluster/PaceMakerStateStorageFactory.java
index 065e0df..5e0cdd7 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/PaceMakerStateStorageFactory.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/PaceMakerStateStorageFactory.java
@@ -24,7 +24,7 @@ import org.apache.storm.utils.Utils;
public class PaceMakerStateStorageFactory implements StateStorageFactory {
@Override
- public IStateStorage mkStore(Map<String, Object> config, Map auth_conf, ClusterStateContext context) {
+ public IStateStorage mkStore(Map<String, Object> config, Map<String, Object> auth_conf, ClusterStateContext context) {
try {
ZKStateStorageFactory zkfact = new ZKStateStorageFactory();
IStateStorage zkState = zkfact.mkStore(config, auth_conf, context);
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/cluster/StateStorageFactory.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/StateStorageFactory.java b/storm-client/src/jvm/org/apache/storm/cluster/StateStorageFactory.java
index 584a2d8..7b58855 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/StateStorageFactory.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/StateStorageFactory.java
@@ -22,5 +22,5 @@ import java.util.Map;
public interface StateStorageFactory {
- IStateStorage mkStore(Map<String, Object> config, Map auth_conf, ClusterStateContext context);
+ IStateStorage mkStore(Map<String, Object> config, Map<String, Object> auth_conf, ClusterStateContext context);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorage.java b/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
index c68011e..d5c29f9 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
@@ -50,7 +50,7 @@ public class ZKStateStorage implements IStateStorage {
private AtomicBoolean active;
private boolean isNimbus;
- private Map authConf;
+ private Map<String, Object> authConf;
private Map<String, Object> conf;
private class ZkWatcherCallBack implements WatcherCallBack{
@@ -73,7 +73,7 @@ public class ZKStateStorage implements IStateStorage {
}
}
- public ZKStateStorage(Map<String, Object> conf, Map authConf, ClusterStateContext context) throws Exception {
+ public ZKStateStorage(Map<String, Object> conf, Map<String, Object> authConf, ClusterStateContext context) throws Exception {
this.conf = conf;
this.authConf = authConf;
if (context.getDaemonType().equals(DaemonType.NIMBUS))
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java b/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java
index 987c2fd..0ae745f 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java
@@ -26,7 +26,7 @@ import java.util.Map;
public class ZKStateStorageFactory implements StateStorageFactory {
@Override
- public IStateStorage mkStore(Map<String, Object> config, Map auth_conf, ClusterStateContext context) {
+ public IStateStorage mkStore(Map<String, Object> config, Map<String, Object> auth_conf, ClusterStateContext context) {
try {
return new ZKStateStorage(config, auth_conf, context);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java b/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
index f2ebd5f..9a21679 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
@@ -296,7 +296,7 @@ public class StormCommon {
for (SpoutSpec spout : topology.get_spouts().values()) {
ComponentCommon common = spout.get_common();
- Map spoutConf = componentConf(spout);
+ Map<String, Object> spoutConf = componentConf(spout);
spoutConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,
ObjectReader.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
common.set_json_conf(JSONValue.toJSONString(spoutConf));
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java b/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java
index 8237ad1..0fdcb9f 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java
@@ -25,6 +25,9 @@ import org.apache.storm.task.TopologyContext;
import java.util.HashMap;
import java.util.Map;
+import org.apache.storm.generated.NodeInfo;
+import org.apache.storm.messaging.IConnection;
+import org.apache.storm.utils.JCQueue;
public class BuiltinMetricsUtil {
public static void registerIconnectionServerMetric(Object server, Map<String, Object> topoConf, TopologyContext context) {
@@ -33,15 +36,14 @@ public class BuiltinMetricsUtil {
}
}
- public static void registerIconnectionClientMetrics(final Map nodePortToSocket, Map<String, Object> topoConf, TopologyContext context) {
+ public static void registerIconnectionClientMetrics(final Map<NodeInfo, IConnection> nodePortToSocket, Map<String, Object> topoConf, TopologyContext context) {
IMetric metric = new IMetric() {
@Override
public Object getValueAndReset() {
Map<Object, Object> ret = new HashMap<>();
- for (Object o : nodePortToSocket.entrySet()) {
- Map.Entry entry = (Map.Entry) o;
- Object nodePort = entry.getKey();
- Object connection = entry.getValue();
+ for (Map.Entry<NodeInfo, IConnection> entry : nodePortToSocket.entrySet()) {
+ NodeInfo nodePort = entry.getKey();
+ IConnection connection = entry.getValue();
if (connection instanceof IStatefulObject) {
ret.put(nodePort, ((IStatefulObject) connection).getState());
}
@@ -52,11 +54,10 @@ public class BuiltinMetricsUtil {
registerMetric("__send-iconnection", metric, topoConf, context);
}
- public static void registerQueueMetrics(Map queues, Map<String, Object> topoConf, TopologyContext context) {
- for (Object o : queues.entrySet()) {
- Map.Entry entry = (Map.Entry) o;
+ public static void registerQueueMetrics(Map<String, JCQueue> queues, Map<String, Object> topoConf, TopologyContext context) {
+ for (Map.Entry<String, JCQueue> entry : queues.entrySet()) {
String name = "__" + entry.getKey();
- IMetric metric = new StateMetric((IStatefulObject) entry.getValue());
+ IMetric metric = new StateMetric(entry.getValue());
registerMetric(name, metric, topoConf, context);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
index 9c7cf9e..d080471 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
@@ -96,7 +96,7 @@ public class WorkerState {
private final WorkerTransfer workerTransfer;
private final BackPressureTracker bpTracker;
- public Map getConf() {
+ public Map<String, Object> getConf() {
return conf;
}
@@ -140,7 +140,7 @@ public class WorkerState {
return localReceiveQueues;
}
- public Map getTopologyConf() {
+ public Map<String, Object> getTopologyConf() {
return topologyConf;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java b/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java
index 00ae469..80e4ea3 100644
--- a/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java
+++ b/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java
@@ -204,7 +204,7 @@ public class DRPCSpout extends BaseRichSpout {
try {
DRPCRequest req = client.fetchRequest(_function);
if(req.get_request_id().length() > 0) {
- Map returnInfo = new HashMap();
+ Map<String, Object> returnInfo = new HashMap<>();
returnInfo.put("id", req.get_request_id());
returnInfo.put("host", client.getHost());
returnInfo.put("port", client.getPort());
@@ -228,7 +228,7 @@ public class DRPCSpout extends BaseRichSpout {
try {
DRPCRequest req = drpc.fetchRequest(_function);
if(req.get_request_id().length() > 0) {
- Map returnInfo = new HashMap();
+ Map<String, Object> returnInfo = new HashMap<>();
returnInfo.put("id", req.get_request_id());
returnInfo.put("host", _local_drpc_id);
returnInfo.put("port", 0);
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/drpc/JoinResult.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/drpc/JoinResult.java b/storm-client/src/jvm/org/apache/storm/drpc/JoinResult.java
index f57bbb1..be668fe 100644
--- a/storm-client/src/jvm/org/apache/storm/drpc/JoinResult.java
+++ b/storm-client/src/jvm/org/apache/storm/drpc/JoinResult.java
@@ -44,7 +44,7 @@ public class JoinResult extends BaseRichBolt {
this.returnComponent = returnComponent;
}
- public void prepare(Map map, TopologyContext context, OutputCollector collector) {
+ public void prepare(Map<String, Object> map, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/drpc/PrepareRequest.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/drpc/PrepareRequest.java b/storm-client/src/jvm/org/apache/storm/drpc/PrepareRequest.java
index 06e576c..6cfb898 100644
--- a/storm-client/src/jvm/org/apache/storm/drpc/PrepareRequest.java
+++ b/storm-client/src/jvm/org/apache/storm/drpc/PrepareRequest.java
@@ -38,7 +38,7 @@ public class PrepareRequest extends BaseBasicBolt {
Random rand;
@Override
- public void prepare(Map map, TopologyContext context) {
+ public void prepare(Map<String, Object> map, TopologyContext context) {
rand = new Random();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/drpc/ReturnResults.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/drpc/ReturnResults.java b/storm-client/src/jvm/org/apache/storm/drpc/ReturnResults.java
index 04ddf25..6146890 100644
--- a/storm-client/src/jvm/org/apache/storm/drpc/ReturnResults.java
+++ b/storm-client/src/jvm/org/apache/storm/drpc/ReturnResults.java
@@ -47,7 +47,7 @@ public class ReturnResults extends BaseRichBolt {
public static final Logger LOG = LoggerFactory.getLogger(ReturnResults.class);
OutputCollector _collector;
boolean local;
- Map _conf;
+ Map<String, Object> _conf;
Map<List, DRPCInvocationsClient> _clients = new HashMap<List, DRPCInvocationsClient>();
@Override
@@ -62,9 +62,9 @@ public class ReturnResults extends BaseRichBolt {
String result = (String) input.getValue(0);
String returnInfo = (String) input.getValue(1);
if (returnInfo!=null) {
- Map retMap = null;
+ Map<String, Object> retMap;
try {
- retMap = (Map) JSONValue.parseWithException(returnInfo);
+ retMap = (Map<String, Object>) JSONValue.parseWithException(returnInfo);
} catch (ParseException e) {
LOG.error("Parseing returnInfo failed", e);
_collector.fail(input);
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/executor/Executor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/Executor.java b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
index 267fe74..f811b82 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/Executor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
@@ -225,15 +225,21 @@ public abstract class Executor implements Callable, JCQueue.Consumer {
}
}
- private static List<Object> All_CONFIGS() {
- List<Object> ret = new ArrayList<Object>();
- Config config = new Config();
- Class<?> ConfigClass = config.getClass();
- Field[] fields = ConfigClass.getFields();
+ /**
+ * Retrieves all values of all static fields of {@link Config} which
+ * represent all available configuration keys through reflection. The method
+ * assumes that they are {@code String}s through reflection.
+ * @return the list of retrieved field values
+ * @throws ClassCastException if one of the fields is not of type
+ * {@code String}
+ */
+ private static List<String> retrieveAllConfigKeys() {
+ List<String> ret = new ArrayList<>();
+ Field[] fields = Config.class.getFields();
for (int i = 0; i < fields.length; i++) {
try {
- Object obj = fields[i].get(null);
- ret.add(obj);
+ String fieldValue = (String)fields[i].get(null);
+ ret.add(fieldValue);
} catch (IllegalArgumentException e) {
LOG.error(e.getMessage(), e);
} catch (IllegalAccessException e) {
@@ -432,8 +438,8 @@ public abstract class Executor implements Callable, JCQueue.Consumer {
// ============================ getter methods =================================
// =============================================================================
- private Map normalizedComponentConf(Map<String, Object> topoConf, WorkerTopologyContext topologyContext, String componentId) {
- List<Object> keysToRemove = All_CONFIGS();
+ private Map<String, Object> normalizedComponentConf(Map<String, Object> topoConf, WorkerTopologyContext topologyContext, String componentId) {
+ List<String> keysToRemove = retrieveAllConfigKeys();
keysToRemove.remove(Config.TOPOLOGY_DEBUG);
keysToRemove.remove(Config.TOPOLOGY_MAX_SPOUT_PENDING);
keysToRemove.remove(Config.TOPOLOGY_MAX_TASK_PARALLELISM);
@@ -451,11 +457,11 @@ public abstract class Executor implements Callable, JCQueue.Consumer {
keysToRemove.remove(Config.TOPOLOGY_STATE_PROVIDER_CONFIG);
keysToRemove.remove(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM);
- Map<Object, Object> componentConf;
+ Map<String, Object> componentConf;
String specJsonConf = topologyContext.getComponentCommon(componentId).get_json_conf();
if (specJsonConf != null) {
try {
- componentConf = (Map<Object, Object>) JSONValue.parseWithException(specJsonConf);
+ componentConf = (Map<String, Object>) JSONValue.parseWithException(specJsonConf);
} catch (ParseException e) {
throw new RuntimeException(e);
}
@@ -466,7 +472,7 @@ public abstract class Executor implements Callable, JCQueue.Consumer {
componentConf = new HashMap<>();
}
- Map<Object, Object> ret = new HashMap<>();
+ Map<String, Object> ret = new HashMap<>();
ret.putAll(topoConf);
ret.putAll(componentConf);
@@ -489,7 +495,7 @@ public abstract class Executor implements Callable, JCQueue.Consumer {
return openOrPrepareWasCalled;
}
- public Map getTopoConf() {
+ public Map<String, Object> getTopoConf() {
return topoConf;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
index 61e6488..91bbcad 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
@@ -37,7 +37,9 @@ import org.apache.storm.daemon.Task;
import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
import org.apache.storm.daemon.worker.WorkerState;
import org.apache.storm.executor.Executor;
+import org.apache.storm.generated.NodeInfo;
import org.apache.storm.hooks.info.BoltExecuteInfo;
+import org.apache.storm.messaging.IConnection;
import org.apache.storm.metric.api.IMetricsRegistrant;
import org.apache.storm.security.auth.IAutoCredentials;
import org.apache.storm.policy.IWaitStrategy.WAIT_SITUATION;
@@ -95,7 +97,7 @@ public class BoltExecutor extends Executor {
private static IWaitStrategy makeSystemBoltWaitStrategy() {
WaitStrategyPark ws = new WaitStrategyPark();
- HashMap conf = new HashMap<String, Object>();
+ Map<String, Object> conf = new HashMap<>();
conf.put(Config.TOPOLOGY_BOLT_WAIT_PARK_MICROSEC, 5000);
ws.prepare(conf, WAIT_SITUATION.BOLT_WAIT);
return ws;
@@ -126,7 +128,7 @@ public class BoltExecutor extends Executor {
Map<String, JCQueue> map = ImmutableMap.of("receive", receiveQueue, "transfer", workerData.getTransferQueue());
BuiltinMetricsUtil.registerQueueMetrics(map, topoConf, userContext);
- Map cachedNodePortToSocket = workerData.getCachedNodeToPortSocket().get();
+ Map<NodeInfo, IConnection> cachedNodePortToSocket = workerData.getCachedNodeToPortSocket().get();
BuiltinMetricsUtil.registerIconnectionClientMetrics(cachedNodePortToSocket, topoConf, userContext);
BuiltinMetricsUtil.registerIconnectionServerMetric(workerData.getReceiver(), topoConf, userContext);
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java b/storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java
index b80f7c0..7dbc5d6 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java
@@ -39,7 +39,7 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class DeserializingConnectionCallback implements IConnectionCallback, IMetric {
private final WorkerState.ILocalTransferCallback cb;
- private final Map conf;
+ private final Map<String, Object> conf;
private final GeneralTopologyContext context;
private final ThreadLocal<KryoTupleDeserializer> _des =
@@ -55,7 +55,7 @@ public class DeserializingConnectionCallback implements IConnectionCallback, IMe
private final ConcurrentHashMap<String, AtomicLong> byteCounts = new ConcurrentHashMap<>();
- public DeserializingConnectionCallback(final Map conf, final GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) {
+ public DeserializingConnectionCallback(final Map<String, Object> conf, final GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) {
this.conf = conf;
this.context = context;
cb = callback;
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
index ac92c6b..e0736b5 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
@@ -549,7 +549,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
return ret;
}
- public Map getConfig() {
+ public Map<String, Object> getConfig() {
return topoConf;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/metric/api/MultiCountMetric.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/metric/api/MultiCountMetric.java b/storm-client/src/jvm/org/apache/storm/metric/api/MultiCountMetric.java
index 5c0caf5..343a3d7 100644
--- a/storm-client/src/jvm/org/apache/storm/metric/api/MultiCountMetric.java
+++ b/storm-client/src/jvm/org/apache/storm/metric/api/MultiCountMetric.java
@@ -35,8 +35,8 @@ public class MultiCountMetric implements IMetric {
return val;
}
- public Object getValueAndReset() {
- Map ret = new HashMap();
+ public Map<String, Object> getValueAndReset() {
+ Map<String, Object> ret = new HashMap<>();
for(Map.Entry<String, CountMetric> e : _value.entrySet()) {
ret.put(e.getKey(), e.getValue().getValueAndReset());
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/metric/api/MultiReducedMetric.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/metric/api/MultiReducedMetric.java b/storm-client/src/jvm/org/apache/storm/metric/api/MultiReducedMetric.java
index 9bd9113..a6cb49a 100644
--- a/storm-client/src/jvm/org/apache/storm/metric/api/MultiReducedMetric.java
+++ b/storm-client/src/jvm/org/apache/storm/metric/api/MultiReducedMetric.java
@@ -37,8 +37,8 @@ public class MultiReducedMetric implements IMetric {
return val;
}
- public Object getValueAndReset() {
- Map ret = new HashMap();
+ public Map<String, Object> getValueAndReset() {
+ Map<String, Object> ret = new HashMap<>();
for(Map.Entry<String, ReducedMetric> e : _value.entrySet()) {
Object val = e.getValue().getValueAndReset();
if(val != null) {
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/security/auth/FixedGroupsMapping.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/FixedGroupsMapping.java b/storm-client/src/jvm/org/apache/storm/security/auth/FixedGroupsMapping.java
index 62f1d9b..b55dec2 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/FixedGroupsMapping.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/FixedGroupsMapping.java
@@ -41,7 +41,7 @@ public class FixedGroupsMapping implements IGroupMappingServiceProvider {
* @param storm_conf Storm configuration
*/
@Override
- public void prepare(Map storm_conf) {
+ public void prepare(Map<String, Object> storm_conf) {
Map<?, ?> params = (Map<?, ?>) storm_conf.get(Config.STORM_GROUP_MAPPING_SERVICE_PARAMS);
Map<String, Set<String>> mapping = (Map<String, Set<String>>) params.get(STORM_FIXED_GROUP_MAPPING);
if (mapping != null) {
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/security/auth/authorizer/DRPCAuthorizerBase.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/authorizer/DRPCAuthorizerBase.java b/storm-client/src/jvm/org/apache/storm/security/auth/authorizer/DRPCAuthorizerBase.java
index fdcd31e..f29e882 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/authorizer/DRPCAuthorizerBase.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/authorizer/DRPCAuthorizerBase.java
@@ -36,9 +36,9 @@ public abstract class DRPCAuthorizerBase implements IAuthorizer {
@Override
public abstract void prepare(Map<String, Object> conf);
- abstract protected boolean permitClientRequest(ReqContext context, String operation, Map params);
+ abstract protected boolean permitClientRequest(ReqContext context, String operation, Map<String, Object> params);
- abstract protected boolean permitInvocationRequest(ReqContext context, String operation, Map params);
+ abstract protected boolean permitInvocationRequest(ReqContext context, String operation, Map<String, Object> params);
/**
* Authorizes request from to the DRPC server.
@@ -47,7 +47,7 @@ public abstract class DRPCAuthorizerBase implements IAuthorizer {
* @param params a Map with any key-value entries of use to the authorization implementation
*/
@Override
- public boolean permit(ReqContext context, String operation, Map params) {
+ public boolean permit(ReqContext context, String operation, Map<String, Object> params) {
if ("execute".equals(operation)) {
return permitClientRequest(context, operation, params);
} else if ("failRequest".equals(operation) ||
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java b/storm-client/src/jvm/org/apache/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java
index dc4d911..ca8646f 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java
@@ -125,7 +125,7 @@ public class DRPCSimpleACLAuthorizer extends DRPCAuthorizerBase {
return null;
}
- protected boolean permitClientOrInvocationRequest(ReqContext context, Map params,
+ protected boolean permitClientOrInvocationRequest(ReqContext context, Map<String, Object> params,
String fieldName) {
Map<String,AclFunctionEntry> acl = readAclFromConfig();
String function = (String) params.get(FUNCTION_KEY);
@@ -165,13 +165,13 @@ public class DRPCSimpleACLAuthorizer extends DRPCAuthorizerBase {
@Override
protected boolean permitClientRequest(ReqContext context, String operation,
- Map params) {
+ Map<String, Object> params) {
return permitClientOrInvocationRequest(context, params, "clientUsers");
}
@Override
protected boolean permitInvocationRequest(ReqContext context, String operation,
- Map params) {
+ Map<String, Object> params) {
return permitClientOrInvocationRequest(context, params, "invocationUser");
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/serialization/types/HashMapSerializer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/serialization/types/HashMapSerializer.java b/storm-client/src/jvm/org/apache/storm/serialization/types/HashMapSerializer.java
index a1e917a..73b1273 100644
--- a/storm-client/src/jvm/org/apache/storm/serialization/types/HashMapSerializer.java
+++ b/storm-client/src/jvm/org/apache/storm/serialization/types/HashMapSerializer.java
@@ -26,7 +26,7 @@ import java.util.Map;
public class HashMapSerializer extends MapSerializer {
@Override
- public Map create(Kryo kryo, Input input, Class<Map> type) {
- return new HashMap();
+ public Map<String, Object> create(Kryo kryo, Input input, Class<Map> type) {
+ return new HashMap<>();
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/stats/CommonStats.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/stats/CommonStats.java b/storm-client/src/jvm/org/apache/storm/stats/CommonStats.java
index f115d12..b6461c5 100644
--- a/storm-client/src/jvm/org/apache/storm/stats/CommonStats.java
+++ b/storm-client/src/jvm/org/apache/storm/stats/CommonStats.java
@@ -61,11 +61,11 @@ public abstract class CommonStats {
transferredStats.close();
}
- protected Map valueStat(MultiCountStatAndMetric metric) {
+ protected Map<String,Map<String,Long>> valueStat(MultiCountStatAndMetric metric) {
return metric.getTimeCounts();
}
- protected Map valueStat(MultiLatencyStatAndMetric metric) {
+ protected Map<String, Map<String, Double>> valueStat(MultiLatencyStatAndMetric metric) {
return metric.getTimeLatAvg();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/task/GeneralTopologyContext.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/task/GeneralTopologyContext.java b/storm-client/src/jvm/org/apache/storm/task/GeneralTopologyContext.java
index d59c9cf..4ea979e 100644
--- a/storm-client/src/jvm/org/apache/storm/task/GeneralTopologyContext.java
+++ b/storm-client/src/jvm/org/apache/storm/task/GeneralTopologyContext.java
@@ -160,7 +160,7 @@ public class GeneralTopologyContext implements JSONAware {
@Override
public String toJSONString() {
- Map obj = new HashMap();
+ Map<String, Object> obj = new HashMap<>();
obj.put("task->component", _taskToComponent);
// TODO: jsonify StormTopology
// at the minimum should send source info
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/testing/NonRichBoltTracker.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/testing/NonRichBoltTracker.java b/storm-client/src/jvm/org/apache/storm/testing/NonRichBoltTracker.java
index 4c04d84..8193f9a 100644
--- a/storm-client/src/jvm/org/apache/storm/testing/NonRichBoltTracker.java
+++ b/storm-client/src/jvm/org/apache/storm/testing/NonRichBoltTracker.java
@@ -41,7 +41,7 @@ public class NonRichBoltTracker implements IBolt {
public void execute(Tuple input) {
_delegate.execute(input);
- Map stats = (Map) RegisteredGlobalState.getState(_trackId);
+ Map<String, Object> stats = (Map<String, Object>) RegisteredGlobalState.getState(_trackId);
((AtomicInteger) stats.get("processed")).incrementAndGet();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/testing/SpoutTracker.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/testing/SpoutTracker.java b/storm-client/src/jvm/org/apache/storm/testing/SpoutTracker.java
index 5f75eae..d2af80f 100644
--- a/storm-client/src/jvm/org/apache/storm/testing/SpoutTracker.java
+++ b/storm-client/src/jvm/org/apache/storm/testing/SpoutTracker.java
@@ -45,7 +45,7 @@ public class SpoutTracker extends BaseRichSpout {
}
private void recordSpoutEmit() {
- Map stats = (Map) RegisteredGlobalState.getState(_trackId);
+ Map<String, Object> stats = (Map<String, Object>) RegisteredGlobalState.getState(_trackId);
((AtomicInteger) stats.get("spout-emitted")).incrementAndGet();
}
@@ -99,13 +99,13 @@ public class SpoutTracker extends BaseRichSpout {
public void ack(Object msgId) {
_delegate.ack(msgId);
- Map stats = (Map) RegisteredGlobalState.getState(_trackId);
+ Map<String, Object> stats = (Map<String, Object>) RegisteredGlobalState.getState(_trackId);
((AtomicInteger) stats.get("processed")).incrementAndGet();
}
public void fail(Object msgId) {
_delegate.fail(msgId);
- Map stats = (Map) RegisteredGlobalState.getState(_trackId);
+ Map<String, Object> stats = (Map<String, Object>) RegisteredGlobalState.getState(_trackId);
((AtomicInteger) stats.get("processed")).incrementAndGet();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java b/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java
index 60a7eb3..e10bc44 100644
--- a/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java
+++ b/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java
@@ -735,8 +735,8 @@ public class TopologyBuilder {
}
}
- private static String mergeIntoJson(Map into, Map newMap) {
- Map res = new HashMap<>(into);
+ private static String mergeIntoJson(Map<String, Object> into, Map<String, Object> newMap) {
+ Map<String, Object> res = new HashMap<>(into);
res.putAll(newMap);
return JSONValue.toJSONString(res);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/transactional/state/TestTransactionalState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/transactional/state/TestTransactionalState.java b/storm-client/src/jvm/org/apache/storm/transactional/state/TestTransactionalState.java
index bf6b181..b83c849 100644
--- a/storm-client/src/jvm/org/apache/storm/transactional/state/TestTransactionalState.java
+++ b/storm-client/src/jvm/org/apache/storm/transactional/state/TestTransactionalState.java
@@ -35,7 +35,7 @@ public class TestTransactionalState extends TransactionalState {
* Matching constructor in absence of a default constructor in the parent
* class.
*/
- protected TestTransactionalState(Map<String, Object> conf, String id, Map componentConf, String subroot) {
+ protected TestTransactionalState(Map<String, Object> conf, String id, Map<String, Object> componentConf, String subroot) {
super(conf, id, componentConf, subroot);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/transactional/state/TransactionalState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/transactional/state/TransactionalState.java b/storm-client/src/jvm/org/apache/storm/transactional/state/TransactionalState.java
index d7079f3..c940b3f 100644
--- a/storm-client/src/jvm/org/apache/storm/transactional/state/TransactionalState.java
+++ b/storm-client/src/jvm/org/apache/storm/transactional/state/TransactionalState.java
@@ -43,17 +43,17 @@ public class TransactionalState {
KryoValuesDeserializer _des;
List<ACL> _zkAcls = null;
- public static TransactionalState newUserState(Map<String, Object> conf, String id, Map componentConf) {
+ public static TransactionalState newUserState(Map<String, Object> conf, String id, Map<String, Object> componentConf) {
return new TransactionalState(conf, id, componentConf, "user");
}
- public static TransactionalState newCoordinatorState(Map<String, Object> conf, String id, Map componentConf) {
+ public static TransactionalState newCoordinatorState(Map<String, Object> conf, String id, Map<String, Object> componentConf) {
return new TransactionalState(conf, id, componentConf, "coordinator");
}
- protected TransactionalState(Map<String, Object> conf, String id, Map componentConf, String subroot) {
+ protected TransactionalState(Map<String, Object> conf, String id, Map<String, Object> componentConf, String subroot) {
try {
- conf = new HashMap(conf);
+ conf = new HashMap<>(conf);
// ensure that the serialization registrations are consistent with the declarations in this spout
if(componentConf!=null) {
conf.put(Config.TOPOLOGY_KRYO_REGISTER,
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/trident/TridentTopology.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/trident/TridentTopology.java b/storm-client/src/jvm/org/apache/storm/trident/TridentTopology.java
index d51b883..7b18187 100644
--- a/storm-client/src/jvm/org/apache/storm/trident/TridentTopology.java
+++ b/storm-client/src/jvm/org/apache/storm/trident/TridentTopology.java
@@ -529,17 +529,17 @@ public class TridentTopology {
}
}
}
- HashMap<String, Number> combinedMasterCoordResources = new HashMap<String, Number>(_resourceDefaults);
+ HashMap<String, Number> combinedMasterCoordResources = new HashMap<>(_resourceDefaults);
combinedMasterCoordResources.putAll(_masterCoordResources);
return builder.buildTopology(combinedMasterCoordResources);
}
- private static Map<String, Number> mergeDefaultResources(Map<String, Number> res, Map defaultConfig) {
- Map<String, Number> ret = new HashMap<String, Number>();
+ private static Map<String, Number> mergeDefaultResources(Map<String, Number> res, Map<String, Number> defaultConfig) {
+ Map<String, Number> ret = new HashMap<>();
- Number onHeapDefault = (Number)defaultConfig.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
- Number offHeapDefault = (Number)defaultConfig.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
- Number cpuLoadDefault = (Number)defaultConfig.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
+ Number onHeapDefault = defaultConfig.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+ Number offHeapDefault = defaultConfig.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
+ Number cpuLoadDefault = defaultConfig.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
if(res == null) {
ret.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, onHeapDefault);
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/trident/drpc/ReturnResultsReducer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/trident/drpc/ReturnResultsReducer.java b/storm-client/src/jvm/org/apache/storm/trident/drpc/ReturnResultsReducer.java
index 617a42f..6d6648a 100644
--- a/storm-client/src/jvm/org/apache/storm/trident/drpc/ReturnResultsReducer.java
+++ b/storm-client/src/jvm/org/apache/storm/trident/drpc/ReturnResultsReducer.java
@@ -80,9 +80,9 @@ public class ReturnResultsReducer implements MultiReducer<ReturnResultsState> {
// only one of the multireducers will receive the tuples
if (state.returnInfo!=null) {
String result = JSONValue.toJSONString(state.results);
- Map retMap = null;
+ Map<String, Object> retMap;
try {
- retMap = (Map) JSONValue.parseWithException(state.returnInfo);
+ retMap = (Map<String, Object>) JSONValue.parseWithException(state.returnInfo);
} catch (ParseException e) {
collector.reportError(e);
return;
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java b/storm-client/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
index bb05450..61d50f8 100644
--- a/storm-client/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
+++ b/storm-client/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
@@ -59,7 +59,7 @@ public class TransactionalState {
protected TransactionalState(Map<String, Object> conf, String id, String subroot) {
try {
- conf = new HashMap(conf);
+ conf = new HashMap<>(conf);
String transactionalRoot = (String)conf.get(Config.TRANSACTIONAL_ZOOKEEPER_ROOT);
String rootDir = transactionalRoot + "/" + id + "/" + subroot;
List<String> servers = (List<String>) getWithBackup(conf, Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Config.STORM_ZOOKEEPER_SERVERS);
@@ -193,9 +193,11 @@ public class TransactionalState {
_curator.close();
}
- private Object getWithBackup(Map amap, Object primary, Object backup) {
+ private Object getWithBackup(Map<String, Object> amap, String primary, String backup) {
Object ret = amap.get(primary);
- if(ret==null) return amap.get(backup);
+ if(ret==null) {
+ return amap.get(backup);
+ }
return ret;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java b/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java
index 0d5bab8..eb4557f 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java
@@ -271,7 +271,7 @@ public class ConfigUtils {
return ret;
}
- public static Map overrideLoginConfigWithSystemProperty(Map<String, Object> conf) { // note that we delete the return value
+ public static Map<String, Object> overrideLoginConfigWithSystemProperty(Map<String, Object> conf) { // note that we delete the return value
String loginConfFile = System.getProperty("java.security.auth.login.config");
if (loginConfFile != null) {
conf.put("java.security.auth.login.config", loginConfFile);
@@ -322,7 +322,7 @@ public class ConfigUtils {
return conf;
}
- public static Map readYamlConfig(String name) {
+ public static Map<String, Object> readYamlConfig(String name) {
return readYamlConfig(name, true);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java b/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
index 26b0ac4..7a64812 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
@@ -189,7 +190,7 @@ public class JCQueue implements IStatefulObject {
}
public Object getState() {
- HashMap state = new HashMap<String, Object>();
+ Map<String, Object> state = new HashMap<>();
final double arrivalRateInSecs = arrivalsTracker.reportRate();
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/utils/NimbusClient.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/NimbusClient.java b/storm-client/src/jvm/org/apache/storm/utils/NimbusClient.java
index e27304a..bbd47a9 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/NimbusClient.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/NimbusClient.java
@@ -111,7 +111,7 @@ public class NimbusClient extends ThriftClient {
return true;
}
- public static NimbusClient getConfiguredClientAs(Map conf, String asUser) {
+ public static NimbusClient getConfiguredClientAs(Map<String, Object> conf, String asUser) {
return getConfiguredClientAs(conf, asUser, null);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
index af03787..ba2ed74 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
@@ -934,18 +934,17 @@ public class Utils {
* @param listSeq to reverse
* @return a reversed map
*/
- public static HashMap reverseMap(List listSeq) {
- HashMap<Object, List<Object>> rtn = new HashMap();
+ public static Map<Object, List<Object>> reverseMap(List<List<Object>> listSeq) {
+ Map<Object, List<Object>> rtn = new HashMap<>();
if (listSeq == null) {
return rtn;
}
- for (Object entry : listSeq) {
- List listEntry = (List) entry;
+ for (List<Object> listEntry : listSeq) {
Object key = listEntry.get(0);
Object val = listEntry.get(1);
- List list = rtn.get(val);
+ List<Object> list = rtn.get(val);
if (list == null) {
- list = new ArrayList<Object>();
+ list = new ArrayList<>();
rtn.put(val, list);
}
list.add(key);
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/src/jvm/org/apache/storm/zookeeper/ClientZookeeper.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/zookeeper/ClientZookeeper.java b/storm-client/src/jvm/org/apache/storm/zookeeper/ClientZookeeper.java
index d2d862b..42d4417 100644
--- a/storm-client/src/jvm/org/apache/storm/zookeeper/ClientZookeeper.java
+++ b/storm-client/src/jvm/org/apache/storm/zookeeper/ClientZookeeper.java
@@ -73,7 +73,7 @@ public class ClientZookeeper {
_instance.mkdirsImpl(zk, path, acls);
}
- public static CuratorFramework mkClient(Map<String, Object> conf, List<String> servers, Object port, String root, final WatcherCallBack watcher, Map authConf) {
+ public static CuratorFramework mkClient(Map<String, Object> conf, List<String> servers, Object port, String root, final WatcherCallBack watcher, Map<String, Object> authConf) {
return _instance.mkClientImpl(conf, servers, port, root, watcher, authConf);
}
@@ -313,15 +313,15 @@ public class ClientZookeeper {
return mkClientImpl(conf, servers, port, root, new DefaultWatcherCallBack());
}
- public CuratorFramework mkClientImpl(Map<String, Object> conf, List<String> servers, Object port, Map authConf) {
+ public CuratorFramework mkClientImpl(Map<String, Object> conf, List<String> servers, Object port, Map<String, Object> authConf) {
return mkClientImpl(conf, servers, port, "", new DefaultWatcherCallBack(), authConf);
}
- public CuratorFramework mkClientImpl(Map<String, Object> conf, List<String> servers, Object port, String root, Map authConf) {
+ public CuratorFramework mkClientImpl(Map<String, Object> conf, List<String> servers, Object port, String root, Map<String, Object> authConf) {
return mkClientImpl(conf, servers, port, root, new DefaultWatcherCallBack(), authConf);
}
- public CuratorFramework mkClientImpl(Map<String, Object> conf, List<String> servers, Object port, String root, final WatcherCallBack watcher, Map authConf) {
+ public CuratorFramework mkClientImpl(Map<String, Object> conf, List<String> servers, Object port, String root, final WatcherCallBack watcher, Map<String, Object> authConf) {
CuratorFramework fk;
if (authConf != null) {
fk = CuratorUtils.newCurator(conf, servers, port, root, new ZookeeperAuthInfo(authConf));
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/test/jvm/org/apache/storm/messaging/DeserializingConnectionCallbackTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/messaging/DeserializingConnectionCallbackTest.java b/storm-client/test/jvm/org/apache/storm/messaging/DeserializingConnectionCallbackTest.java
index 9f0e1d1..985aa1c 100644
--- a/storm-client/test/jvm/org/apache/storm/messaging/DeserializingConnectionCallbackTest.java
+++ b/storm-client/test/jvm/org/apache/storm/messaging/DeserializingConnectionCallbackTest.java
@@ -48,7 +48,7 @@ public class DeserializingConnectionCallbackTest {
@Test
public void testUpdateMetricsConfigOff() {
- Map config = new HashMap();
+ Map<String, Object> config = new HashMap<>();
config.put(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS, Boolean.FALSE);
DeserializingConnectionCallback withoutMetrics =
new DeserializingConnectionCallback(config, mock(GeneralTopologyContext.class), mock(
@@ -64,7 +64,7 @@ public class DeserializingConnectionCallbackTest {
@Test
public void testUpdateMetricsConfigOn() {
- Map config = new HashMap();
+ Map<String, Object> config = new HashMap<>();
config.put(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS, Boolean.TRUE);
DeserializingConnectionCallback withMetrics =
new DeserializingConnectionCallback(config, mock(GeneralTopologyContext.class), mock(
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/test/jvm/org/apache/storm/security/auth/AutoSSLTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/security/auth/AutoSSLTest.java b/storm-client/test/jvm/org/apache/storm/security/auth/AutoSSLTest.java
index 272ca60..9283132 100644
--- a/storm-client/test/jvm/org/apache/storm/security/auth/AutoSSLTest.java
+++ b/storm-client/test/jvm/org/apache/storm/security/auth/AutoSSLTest.java
@@ -59,7 +59,7 @@ public class AutoSSLTest {
@Test
public void testgetSSLFilesFromConf() throws Exception {
AutoSSL assl = new AutoSSL();
- Map<String, Object> conf = new HashMap();
+ Map<String, Object> conf = new HashMap<>();
assertNull(assl.getSSLFilesFromConf(conf));
conf.put(AutoSSL.SSL_FILES_CONF, "sslfile1.txt");
assl.prepare(conf);
@@ -74,14 +74,14 @@ public class AutoSSLTest {
@Test
public void testgetSSLFilesFromConfMultipleComma() throws Exception {
AutoSSL assl = new AutoSSL();
- Map<String, Object> conf = new HashMap();
+ Map<String, Object> conf = new HashMap<>();
assertNull(assl.getSSLFilesFromConf(conf));
conf.put(AutoSSL.SSL_FILES_CONF, "sslfile1.txt,sslfile2.txt,sslfile3.txt");
assl.prepare(conf);
Collection<String> sslFiles = assl.getSSLFilesFromConf(conf);
assertNotNull(sslFiles);
assertEquals(3, sslFiles.size());
- ArrayList valid = new ArrayList<String>();
+ List<String> valid = new ArrayList<>();
Collections.addAll(valid, "sslfile1.txt", "sslfile2.txt", "sslfile3.txt");
for(String file: sslFiles) {
assertTrue("removing: " + file, valid.remove(file));
@@ -104,13 +104,13 @@ public class AutoSSLTest {
AutoSSL assl = new TestAutoSSL(baseDir.getPath());
LOG.debug("base dir is; " + baseDir);
- Map sslconf = new HashMap();
+ Map<String, Object> sslconf = new HashMap<>();
sslconf.put(AutoSSL.SSL_FILES_CONF, temp.getPath());
assl.prepare(sslconf);
Collection<String> sslFiles = assl.getSSLFilesFromConf(sslconf);
- Map<String, String> creds = new HashMap();
+ Map<String, String> creds = new HashMap<>();
assl.populateCredentials(creds);
assertTrue(creds.containsKey(temp.getName()));
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/test/jvm/org/apache/storm/streams/ProcessorBoltTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/streams/ProcessorBoltTest.java b/storm-client/test/jvm/org/apache/storm/streams/ProcessorBoltTest.java
index 9380714..ba0747c 100644
--- a/storm-client/test/jvm/org/apache/storm/streams/ProcessorBoltTest.java
+++ b/storm-client/test/jvm/org/apache/storm/streams/ProcessorBoltTest.java
@@ -40,6 +40,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import org.apache.storm.generated.Grouping;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@@ -140,7 +141,7 @@ public class ProcessorBoltTest {
node.setWindowed(isWindowed);
Mockito.when(mockStreamToProcessors.get(Mockito.anyString())).thenReturn(Collections.singletonList(node));
Mockito.when(mockStreamToProcessors.keySet()).thenReturn(Collections.singleton("inputstream"));
- Map mockSources = Mockito.mock(Map.class);
+ Map<GlobalStreamId, Grouping> mockSources = Mockito.mock(Map.class);
GlobalStreamId mockGlobalStreamId = Mockito.mock(GlobalStreamId.class);
Mockito.when(mockTopologyContext.getThisSources()).thenReturn(mockSources);
Mockito.when(mockSources.keySet()).thenReturn(Collections.singleton(mockGlobalStreamId));
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-client/test/jvm/org/apache/storm/utils/UtilsTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/utils/UtilsTest.java b/storm-client/test/jvm/org/apache/storm/utils/UtilsTest.java
index c8499e9..30cdae9 100644
--- a/storm-client/test/jvm/org/apache/storm/utils/UtilsTest.java
+++ b/storm-client/test/jvm/org/apache/storm/utils/UtilsTest.java
@@ -50,22 +50,22 @@ public class UtilsTest {
Utils.isZkAuthenticationConfiguredTopology(topologyMockMap("foobar")));
}
- private Map topologyMockMap(String value) {
+ private Map<String, Object> topologyMockMap(String value) {
return mockMap(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME, value);
}
- private Map mockMap(String key, String value) {
- Map<String, Object> map = new HashMap<String, Object>();
+ private Map<String, Object> mockMap(String key, String value) {
+ Map<String, Object> map = new HashMap<>();
map.put(key, value);
return map;
}
- private Map serverMockMap(String value) {
+ private Map<String, Object> serverMockMap(String value) {
return mockMap(Config.STORM_ZOOKEEPER_AUTH_SCHEME, value);
}
- private Map emptyMockMap() {
- return new HashMap<String, Object>();
+ private Map<String, Object> emptyMockMap() {
+ return new HashMap<>();
}
private void doParseJvmHeapMemByChildOptsTest(String message, String opt, double expected) {
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-core/src/jvm/org/apache/storm/command/UploadCredentials.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/UploadCredentials.java b/storm-core/src/jvm/org/apache/storm/command/UploadCredentials.java
index 6023409..a72039b 100644
--- a/storm-core/src/jvm/org/apache/storm/command/UploadCredentials.java
+++ b/storm-core/src/jvm/org/apache/storm/command/UploadCredentials.java
@@ -44,18 +44,21 @@ public class UploadCredentials {
if (null != rawCredentials && ((rawCredentials.size() % 2) != 0)) {
throw new RuntimeException("Need an even number of arguments to make a map");
}
- Map credentialsMap = new HashMap<>();
+ Map<String,String> credentialsMap = new HashMap<>();
if (null != credentialFile) {
Properties credentialProps = new Properties();
credentialProps.load(new FileReader(credentialFile));
- credentialsMap.putAll(credentialProps);
+ for(Map.Entry<Object, Object> credentialProp : credentialProps.entrySet()) {
+ credentialsMap.put((String)credentialProp.getKey(),
+ (String)credentialProp.getValue());
+ }
}
if (null != rawCredentials) {
for (int i = 0; i < rawCredentials.size(); i += 2) {
credentialsMap.put(rawCredentials.get(i), rawCredentials.get(i + 1));
}
}
- StormSubmitter.pushCredentials(topologyName, new HashMap(), credentialsMap);
+ StormSubmitter.pushCredentials(topologyName, new HashMap<>(), credentialsMap);
LOG.info("Uploaded new creds to topology: {}", topologyName);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-core/src/jvm/org/apache/storm/ui/FilterConfiguration.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/ui/FilterConfiguration.java b/storm-core/src/jvm/org/apache/storm/ui/FilterConfiguration.java
index db5952e..e671264 100644
--- a/storm-core/src/jvm/org/apache/storm/ui/FilterConfiguration.java
+++ b/storm-core/src/jvm/org/apache/storm/ui/FilterConfiguration.java
@@ -22,16 +22,16 @@ import java.util.Map;
public class FilterConfiguration {
private String filterClass;
private String filterName;
- private Map filterParams;
+ private Map<String, String> filterParams;
- public FilterConfiguration(String filterClass, Map filterParams) {
+ public FilterConfiguration(String filterClass, Map<String, String> filterParams) {
this.filterParams = filterParams;
this.filterClass = filterClass;
this.filterName = null;
}
- public FilterConfiguration(String filterClass, String filterName, Map filterParams) {
+ public FilterConfiguration(String filterClass, String filterName, Map<String, String> filterParams) {
this.filterClass = filterClass;
this.filterName = filterName;
this.filterParams = filterParams;
@@ -53,11 +53,11 @@ public class FilterConfiguration {
this.filterClass = filterClass;
}
- public Map getFilterParams() {
+ public Map<String, String> getFilterParams() {
return filterParams;
}
- public void setFilterParams(Map filterParams) {
+ public void setFilterParams(Map<String, String> filterParams) {
this.filterParams = filterParams;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-core/src/jvm/org/apache/storm/ui/UIHelpers.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/ui/UIHelpers.java b/storm-core/src/jvm/org/apache/storm/ui/UIHelpers.java
index f29c1f6..be4a4c0 100644
--- a/storm-core/src/jvm/org/apache/storm/ui/UIHelpers.java
+++ b/storm-core/src/jvm/org/apache/storm/ui/UIHelpers.java
@@ -109,7 +109,7 @@ public class UIHelpers {
return "[" + e.get_task_start() + "-" + e.get_task_end() + "]";
}
- public static Map unauthorizedUserJson(String user) {
+ public static Map<String, Object> unauthorizedUserJson(String user) {
return ImmutableMap.of(
"error", "No Authorization",
"errorMessage", String.format("User %s is not authorized.", user));
@@ -203,7 +203,7 @@ public class UIHelpers {
for (FilterConfiguration filterConf : filtersConfs) {
String filterName = filterConf.getFilterName();
String filterClass = filterConf.getFilterClass();
- Map filterParams = filterConf.getFilterParams();
+ Map<String, String> filterParams = filterConf.getFilterParams();
if (filterClass != null) {
FilterHolder filterHolder = new FilterHolder();
filterHolder.setClassName(filterClass);
@@ -215,7 +215,7 @@ public class UIHelpers {
if (filterParams != null) {
filterHolder.setInitParameters(filterParams);
} else {
- filterHolder.setInitParameters(new HashMap<String, String>());
+ filterHolder.setInitParameters(new HashMap<>());
}
context.addFilter(filterHolder, "/*", EnumSet.allOf(DispatcherType.class));
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c7394d12/storm-core/test/jvm/org/apache/storm/nimbus/InMemoryTopologyActionNotifier.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/nimbus/InMemoryTopologyActionNotifier.java b/storm-core/test/jvm/org/apache/storm/nimbus/InMemoryTopologyActionNotifier.java
index 6205468..c1f082c 100644
--- a/storm-core/test/jvm/org/apache/storm/nimbus/InMemoryTopologyActionNotifier.java
+++ b/storm-core/test/jvm/org/apache/storm/nimbus/InMemoryTopologyActionNotifier.java
@@ -30,7 +30,7 @@ public class InMemoryTopologyActionNotifier implements ITopologyActionNotifierP
@Override
- public void prepare(Map StormConf) {
+ public void prepare(Map<String, Object> StormConf) {
//no-op
}