You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/11/10 03:36:39 UTC
[1/5] storm git commit: STORM-1164. Code cleanup for typos,
warnings and conciseness
Repository: storm
Updated Branches:
refs/heads/master 5a4e1f85b -> e25f28f71
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/utils/KeyedRoundRobinQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/KeyedRoundRobinQueue.java b/storm-core/src/jvm/backtype/storm/utils/KeyedRoundRobinQueue.java
index 3cb455d..4ecdda6 100644
--- a/storm-core/src/jvm/backtype/storm/utils/KeyedRoundRobinQueue.java
+++ b/storm-core/src/jvm/backtype/storm/utils/KeyedRoundRobinQueue.java
@@ -28,15 +28,15 @@ import java.util.concurrent.Semaphore;
public class KeyedRoundRobinQueue<V> {
private final Object _lock = new Object();
private Semaphore _size = new Semaphore(0);
- private Map<Object, Queue<V>> _queues = new HashMap<Object, Queue<V>>();
- private List<Object> _keyOrder = new ArrayList<Object>();
+ private Map<Object, Queue<V>> _queues = new HashMap<>();
+ private List<Object> _keyOrder = new ArrayList<>();
private int _currIndex = 0;
public void add(Object key, V val) {
synchronized(_lock) {
Queue<V> queue = _queues.get(key);
if(queue==null) {
- queue = new LinkedList<V>();
+ queue = new LinkedList<>();
_queues.put(key, queue);
_keyOrder.add(key);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/utils/ListDelegate.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/ListDelegate.java b/storm-core/src/jvm/backtype/storm/utils/ListDelegate.java
index 1e091f0..d198a72 100644
--- a/storm-core/src/jvm/backtype/storm/utils/ListDelegate.java
+++ b/storm-core/src/jvm/backtype/storm/utils/ListDelegate.java
@@ -27,7 +27,7 @@ public class ListDelegate implements List<Object> {
private List<Object> _delegate;
public ListDelegate() {
- _delegate = new ArrayList<Object>();
+ _delegate = new ArrayList<>();
}
public void setDelegate(List<Object> delegate) {
@@ -84,12 +84,12 @@ public class ListDelegate implements List<Object> {
}
@Override
- public boolean addAll(Collection<? extends Object> clctn) {
+ public boolean addAll(Collection<?> clctn) {
return _delegate.addAll(clctn);
}
@Override
- public boolean addAll(int i, Collection<? extends Object> clctn) {
+ public boolean addAll(int i, Collection<?> clctn) {
return _delegate.addAll(i, clctn);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/utils/Monitor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/Monitor.java b/storm-core/src/jvm/backtype/storm/utils/Monitor.java
index 36fedc4..41717a9 100644
--- a/storm-core/src/jvm/backtype/storm/utils/Monitor.java
+++ b/storm-core/src/jvm/backtype/storm/utils/Monitor.java
@@ -20,7 +20,6 @@ package backtype.storm.utils;
import backtype.storm.generated.*;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
public class Monitor {
@@ -98,7 +97,7 @@ public class Monitor {
}
private HashSet<String> getComponents(Nimbus.Client client, String topology) throws Exception{
- HashSet<String> components = new HashSet<String>();
+ HashSet<String> components = new HashSet<>();
ClusterSummary clusterSummary = client.getClusterInfo();
TopologySummary topologySummary = null;
for (TopologySummary ts: clusterSummary.get_topologies()) {
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java b/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
index a715de2..a9306f7 100644
--- a/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
+++ b/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
@@ -22,13 +22,8 @@ import backtype.storm.Config;
import backtype.storm.generated.ClusterSummary;
import backtype.storm.generated.Nimbus;
import backtype.storm.generated.NimbusSummary;
-import backtype.storm.nimbus.ILeaderElector;
-import backtype.storm.nimbus.NimbusInfo;
import backtype.storm.security.auth.ThriftClient;
import backtype.storm.security.auth.ThriftConnectionType;
-import clojure.lang.IFn;
-import clojure.lang.PersistentArrayMap;
-import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
@@ -38,7 +33,6 @@ import java.util.List;
import java.util.Map;
public class NimbusClient extends ThriftClient {
- public static final String DELIMITER = ":";
private Nimbus.Client _client;
private static final Logger LOG = LoggerFactory.getLogger(NimbusClient.class);
@@ -56,7 +50,7 @@ public class NimbusClient extends ThriftClient {
asUser = (String) conf.get(Config.STORM_DO_AS_USER);
}
- List<String> seeds = null;
+ List<String> seeds;
if(conf.containsKey(Config.NIMBUS_HOST)) {
LOG.warn("Using deprecated config {} for backward compatibility. Please update your storm.yaml so it only has config {}",
Config.NIMBUS_HOST, Config.NIMBUS_SEEDS);
@@ -67,7 +61,7 @@ public class NimbusClient extends ThriftClient {
for (String host : seeds) {
int port = Integer.parseInt(conf.get(Config.NIMBUS_THRIFT_PORT).toString());
- ClusterSummary clusterInfo = null;
+ ClusterSummary clusterInfo;
try {
NimbusClient client = new NimbusClient(conf, host, port);
clusterInfo = client.getClient().getClusterInfo();
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/utils/RegisteredGlobalState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/RegisteredGlobalState.java b/storm-core/src/jvm/backtype/storm/utils/RegisteredGlobalState.java
index 48053fc..159a7c1 100644
--- a/storm-core/src/jvm/backtype/storm/utils/RegisteredGlobalState.java
+++ b/storm-core/src/jvm/backtype/storm/utils/RegisteredGlobalState.java
@@ -27,7 +27,7 @@ import java.util.UUID;
* tuples.
*/
public class RegisteredGlobalState {
- private static HashMap<String, Object> _states = new HashMap<String, Object>();
+ private static HashMap<String, Object> _states = new HashMap<>();
private static final Object _lock = new Object();
public static Object globalLock() {
@@ -50,9 +50,7 @@ public class RegisteredGlobalState {
public static Object getState(String id) {
synchronized(_lock) {
- Object ret = _states.get(id);
- //System.out.println("State: " + ret.toString());
- return ret;
+ return _states.get(id);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/utils/RotatingMap.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/RotatingMap.java b/storm-core/src/jvm/backtype/storm/utils/RotatingMap.java
index 2b8d66b..03d1bf8 100644
--- a/storm-core/src/jvm/backtype/storm/utils/RotatingMap.java
+++ b/storm-core/src/jvm/backtype/storm/utils/RotatingMap.java
@@ -53,7 +53,7 @@ public class RotatingMap<K, V> {
if(numBuckets<2) {
throw new IllegalArgumentException("numBuckets must be >= 2");
}
- _buckets = new LinkedList<HashMap<K, V>>();
+ _buckets = new LinkedList<>();
for(int i=0; i<numBuckets; i++) {
_buckets.add(new HashMap<K, V>());
}
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/utils/ServiceRegistry.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/ServiceRegistry.java b/storm-core/src/jvm/backtype/storm/utils/ServiceRegistry.java
index 724bc3e..d5c2eca 100644
--- a/storm-core/src/jvm/backtype/storm/utils/ServiceRegistry.java
+++ b/storm-core/src/jvm/backtype/storm/utils/ServiceRegistry.java
@@ -22,7 +22,7 @@ import java.util.UUID;
// this class should be combined with RegisteredGlobalState
public class ServiceRegistry {
- private static HashMap<String, Object> _services = new HashMap<String, Object>();
+ private static HashMap<String, Object> _services = new HashMap<>();
private static final Object _lock = new Object();
public static String registerService(Object service) {
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/utils/StormBoundedExponentialBackoffRetry.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/StormBoundedExponentialBackoffRetry.java b/storm-core/src/jvm/backtype/storm/utils/StormBoundedExponentialBackoffRetry.java
index 5422c2e..be6f0e9 100644
--- a/storm-core/src/jvm/backtype/storm/utils/StormBoundedExponentialBackoffRetry.java
+++ b/storm-core/src/jvm/backtype/storm/utils/StormBoundedExponentialBackoffRetry.java
@@ -42,8 +42,9 @@ public class StormBoundedExponentialBackoffRetry extends BoundedExponentialBacko
public StormBoundedExponentialBackoffRetry(int baseSleepTimeMs, int maxSleepTimeMs, int maxRetries) {
super(baseSleepTimeMs, maxSleepTimeMs, maxRetries);
expRetriesThreshold = 1;
- while ((1 << (expRetriesThreshold + 1)) < ((maxSleepTimeMs - baseSleepTimeMs) / 2))
+ while ((1 << (expRetriesThreshold + 1)) < ((maxSleepTimeMs - baseSleepTimeMs) / 2)) {
expRetriesThreshold++;
+ }
LOG.debug("The baseSleepTimeMs [{}] the maxSleepTimeMs [{}] the maxRetries [{}]",
baseSleepTimeMs, maxSleepTimeMs, maxRetries);
if (baseSleepTimeMs > maxSleepTimeMs) {
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/utils/VersionInfo.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/VersionInfo.java b/storm-core/src/jvm/backtype/storm/utils/VersionInfo.java
index 1740e18..f3586bb 100644
--- a/storm-core/src/jvm/backtype/storm/utils/VersionInfo.java
+++ b/storm-core/src/jvm/backtype/storm/utils/VersionInfo.java
@@ -86,7 +86,7 @@ public class VersionInfo {
}
- private static VersionInfo COMMON_VERSION_INFO = new VersionInfo("storm-core");
+ private static final VersionInfo COMMON_VERSION_INFO = new VersionInfo("storm-core");
public static String getVersion() {
return COMMON_VERSION_INFO._getVersion();
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/storm/trident/TridentTopology.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/TridentTopology.java b/storm-core/src/jvm/storm/trident/TridentTopology.java
index 8164717..58c83e4 100644
--- a/storm-core/src/jvm/storm/trident/TridentTopology.java
+++ b/storm-core/src/jvm/storm/trident/TridentTopology.java
@@ -87,16 +87,17 @@ import storm.trident.util.TridentUtils;
// all operations have finishBatch and can optionally be committers
public class TridentTopology {
- //TODO: add a method for drpc stream, needs to know how to automatically do returnresults, etc
+ //TODO: add a method for drpc stream, needs to know how to automatically do return results, etc
// is it too expensive to do a batch per drpc request?
- DefaultDirectedGraph<Node, IndexedEdge> _graph;
- Map<String, List<Node>> _colocate = new HashMap();
- UniqueIdGen _gen;
+ final DefaultDirectedGraph<Node, IndexedEdge> _graph;
+ final Map<String, List<Node>> _colocate;
+ final UniqueIdGen _gen;
public TridentTopology() {
- _graph = new DefaultDirectedGraph(new ErrorEdgeFactory());
- _gen = new UniqueIdGen();
+ this(new DefaultDirectedGraph<Node, IndexedEdge>(new ErrorEdgeFactory()),
+ new HashMap<String, List<Node>>(),
+ new UniqueIdGen());
}
private TridentTopology(DefaultDirectedGraph<Node, IndexedEdge> graph, Map<String, List<Node>> colocate, UniqueIdGen gen) {
@@ -195,7 +196,7 @@ public class TridentTopology {
}
public Stream multiReduce(List<Fields> inputFields, List<Stream> streams, MultiReducer function, Fields outputFields) {
- List<String> names = new ArrayList<String>();
+ List<String> names = new ArrayList<>();
for(Stream s: streams) {
if(s._name!=null) {
names.add(s._name);
@@ -206,9 +207,9 @@ public class TridentTopology {
}
public Stream multiReduce(List<Fields> inputFields, List<GroupedStream> groupedStreams, GroupedMultiReducer function, Fields outputFields) {
- List<Fields> fullInputFields = new ArrayList<Fields>();
- List<Stream> streams = new ArrayList<Stream>();
- List<Fields> fullGroupFields = new ArrayList<Fields>();
+ List<Fields> fullInputFields = new ArrayList<>();
+ List<Stream> streams = new ArrayList<>();
+ List<Fields> fullGroupFields = new ArrayList<>();
for(int i=0; i<groupedStreams.size(); i++) {
GroupedStream gs = groupedStreams.get(i);
Fields groupFields = gs.getGroupFields();
@@ -270,10 +271,10 @@ public class TridentTopology {
completeDRPC(graph, _colocate, _gen);
- List<SpoutNode> spoutNodes = new ArrayList<SpoutNode>();
+ List<SpoutNode> spoutNodes = new ArrayList<>();
// can be regular nodes (static state) or processor nodes
- Set<Node> boltNodes = new HashSet<Node>();
+ Set<Node> boltNodes = new HashSet<>();
for(Node n: graph.vertexSet()) {
if(n instanceof SpoutNode) {
spoutNodes.add((SpoutNode) n);
@@ -283,7 +284,7 @@ public class TridentTopology {
}
- Set<Group> initialGroups = new HashSet<Group>();
+ Set<Group> initialGroups = new HashSet<>();
for(List<Node> colocate: _colocate.values()) {
Group g = new Group(graph, colocate);
boltNodes.removeAll(colocate);
@@ -301,7 +302,7 @@ public class TridentTopology {
// add identity partitions between groups
- for(IndexedEdge<Node> e: new HashSet<IndexedEdge>(graph.edgeSet())) {
+ for(IndexedEdge<Node> e: new HashSet<>(graph.edgeSet())) {
if(!(e.source instanceof PartitionNode) && !(e.target instanceof PartitionNode)) {
Group g1 = grouper.nodeGroup(e.source);
Group g2 = grouper.nodeGroup(e.target);
@@ -325,7 +326,7 @@ public class TridentTopology {
// this is because can't currently merge splitting logic into a spout
// not the most kosher algorithm here, since the grouper indexes are being trounced via the adding of nodes to random groups, but it
// works out
- List<Node> forNewGroups = new ArrayList<Node>();
+ List<Node> forNewGroups = new ArrayList<>();
for(Group g: mergedGroups) {
for(PartitionNode n: extraPartitionInputs(g)) {
Node idNode = makeIdentityNode(n.allOutputFields);
@@ -350,7 +351,7 @@ public class TridentTopology {
}
}
// TODO: in the future, want a way to include this logic in the spout itself,
- // or make it unecessary by having storm include metadata about which grouping a tuple
+ // or make it unnecessary by having storm include metadata about which grouping a tuple
// came from
for(Node n: forNewGroups) {
@@ -366,8 +367,8 @@ public class TridentTopology {
mergedGroups = grouper.getAllGroups();
- Map<Node, String> batchGroupMap = new HashMap();
- List<Set<Node>> connectedComponents = new ConnectivityInspector<Node, IndexedEdge>(graph).connectedSets();
+ Map<Node, String> batchGroupMap = new HashMap<>();
+ List<Set<Node>> connectedComponents = new ConnectivityInspector<>(graph).connectedSets();
for(int i=0; i<connectedComponents.size(); i++) {
String groupId = "bg" + i;
for(Node n: connectedComponents.get(i)) {
@@ -413,14 +414,10 @@ public class TridentTopology {
Collection<PartitionNode> inputs = uniquedSubscriptions(externalGroupInputs(g));
for(PartitionNode n: inputs) {
Node parent = TridentUtils.getParent(graph, n);
- String componentId;
- if(parent instanceof SpoutNode) {
- componentId = spoutIds.get(parent);
- } else {
- componentId = boltIds.get(grouper.nodeGroup(parent));
- }
+ String componentId = parent instanceof SpoutNode ?
+ spoutIds.get(parent) : boltIds.get(grouper.nodeGroup(parent));
d.grouping(new GlobalStreamId(componentId, n.streamId), n.thriftGrouping);
- }
+ }
}
}
@@ -428,7 +425,7 @@ public class TridentTopology {
}
private static void completeDRPC(DefaultDirectedGraph<Node, IndexedEdge> graph, Map<String, List<Node>> colocate, UniqueIdGen gen) {
- List<Set<Node>> connectedComponents = new ConnectivityInspector<Node, IndexedEdge>(graph).connectedSets();
+ List<Set<Node>> connectedComponents = new ConnectivityInspector<>(graph).connectedSets();
for(Set<Node> g: connectedComponents) {
checkValidJoins(g);
@@ -496,7 +493,7 @@ public class TridentTopology {
}
private static Collection<PartitionNode> uniquedSubscriptions(Set<PartitionNode> subscriptions) {
- Map<String, PartitionNode> ret = new HashMap();
+ Map<String, PartitionNode> ret = new HashMap<>();
for(PartitionNode n: subscriptions) {
PartitionNode curr = ret.get(n.streamId);
if(curr!=null && !curr.thriftGrouping.equals(n.thriftGrouping)) {
@@ -508,7 +505,7 @@ public class TridentTopology {
}
private static Map<Node, String> genSpoutIds(Collection<SpoutNode> spoutNodes) {
- Map<Node, String> ret = new HashMap();
+ Map<Node, String> ret = new HashMap<>();
int ctr = 0;
for(SpoutNode n: spoutNodes) {
if (n.type == SpoutNode.SpoutType.BATCH) { // if Batch spout then id contains txId
@@ -525,11 +522,11 @@ public class TridentTopology {
}
private static Map<Group, String> genBoltIds(Collection<Group> groups) {
- Map<Group, String> ret = new HashMap();
+ Map<Group, String> ret = new HashMap<>();
int ctr = 0;
for(Group g: groups) {
if(!isSpoutGroup(g)) {
- List<String> name = new ArrayList();
+ List<String> name = new ArrayList<>();
name.add("b");
name.add("" + ctr);
String groupName = getGroupName(g);
@@ -544,13 +541,13 @@ public class TridentTopology {
}
private static String getGroupName(Group g) {
- TreeMap<Integer, String> sortedNames = new TreeMap();
+ TreeMap<Integer, String> sortedNames = new TreeMap<>();
for(Node n: g.nodes) {
if(n.name!=null) {
sortedNames.put(n.creationIndex, n.name);
}
}
- List<String> names = new ArrayList<String>();
+ List<String> names = new ArrayList<>();
String prevName = null;
for(String n: sortedNames.values()) {
if(prevName==null || !n.equals(prevName)) {
@@ -562,7 +559,7 @@ public class TridentTopology {
}
private static Map<String, String> getOutputStreamBatchGroups(Group g, Map<Node, String> batchGroupMap) {
- Map<String, String> ret = new HashMap();
+ Map<String, String> ret = new HashMap<>();
Set<PartitionNode> externalGroupOutputs = externalGroupOutputs(g);
for(PartitionNode n: externalGroupOutputs) {
ret.put(n.streamId, batchGroupMap.get(n));
@@ -571,7 +568,7 @@ public class TridentTopology {
}
private static Set<String> committerBatches(Group g, Map<Node, String> batchGroupMap) {
- Set<String> ret = new HashSet();
+ Set<String> ret = new HashSet<>();
for(Node n: g.nodes) {
if(n instanceof ProcessorNode) {
if(((ProcessorNode) n).committer) {
@@ -583,7 +580,7 @@ public class TridentTopology {
}
private static Map<Group, Integer> getGroupParallelisms(DirectedGraph<Node, IndexedEdge> graph, GraphGrouper grouper, Collection<Group> groups) {
- UndirectedGraph<Group, Object> equivs = new Pseudograph<Group, Object>(Object.class);
+ UndirectedGraph<Group, Object> equivs = new Pseudograph<>(Object.class);
for(Group g: groups) {
equivs.addVertex(g);
}
@@ -599,8 +596,8 @@ public class TridentTopology {
}
}
- Map<Group, Integer> ret = new HashMap();
- List<Set<Group>> equivGroups = new ConnectivityInspector<Group, Object>(equivs).connectedSets();
+ Map<Group, Integer> ret = new HashMap<>();
+ List<Set<Group>> equivGroups = new ConnectivityInspector<>(equivs).connectedSets();
for(Set<Group> equivGroup: equivGroups) {
Integer fixedP = getFixedParallelism(equivGroup);
Integer maxP = getMaxParallelism(equivGroup);
@@ -689,9 +686,9 @@ public class TridentTopology {
}
private static List<PartitionNode> extraPartitionInputs(Group g) {
- List<PartitionNode> ret = new ArrayList();
+ List<PartitionNode> ret = new ArrayList<>();
Set<PartitionNode> inputs = externalGroupInputs(g);
- Map<String, List<PartitionNode>> grouped = new HashMap();
+ Map<String, List<PartitionNode>> grouped = new HashMap<>();
for(PartitionNode n: inputs) {
if(!grouped.containsKey(n.streamId)) {
grouped.put(n.streamId, new ArrayList());
@@ -711,7 +708,7 @@ public class TridentTopology {
}
private static Set<PartitionNode> externalGroupInputs(Group g) {
- Set<PartitionNode> ret = new HashSet();
+ Set<PartitionNode> ret = new HashSet<>();
for(Node n: g.incomingNodes()) {
if(n instanceof PartitionNode) {
ret.add((PartitionNode) n);
@@ -721,7 +718,7 @@ public class TridentTopology {
}
private static Set<PartitionNode> externalGroupOutputs(Group g) {
- Set<PartitionNode> ret = new HashSet();
+ Set<PartitionNode> ret = new HashSet<>();
for(Node n: g.outgoingNodes()) {
if(n instanceof PartitionNode) {
ret.add((PartitionNode) n);
@@ -788,7 +785,7 @@ public class TridentTopology {
}
private static List<Fields> getAllOutputFields(List streams) {
- List<Fields> ret = new ArrayList<Fields>();
+ List<Fields> ret = new ArrayList<>();
for(Object o: streams) {
ret.add(((IAggregatableStream) o).getOutputFields());
}
@@ -797,7 +794,7 @@ public class TridentTopology {
private static List<GroupedStream> groupedStreams(List<Stream> streams, List<Fields> joinFields) {
- List<GroupedStream> ret = new ArrayList<GroupedStream>();
+ List<GroupedStream> ret = new ArrayList<>();
for(int i=0; i<streams.size(); i++) {
ret.add(streams.get(i).groupBy(joinFields.get(i)));
}
@@ -805,7 +802,7 @@ public class TridentTopology {
}
private static List<Fields> strippedInputFields(List<Stream> streams, List<Fields> joinFields) {
- List<Fields> ret = new ArrayList<Fields>();
+ List<Fields> ret = new ArrayList<>();
for(int i=0; i<streams.size(); i++) {
ret.add(TridentUtils.fieldsSubtract(streams.get(i).getOutputFields(), joinFields.get(i)));
}
@@ -813,7 +810,7 @@ public class TridentTopology {
}
private static List<JoinType> repeat(int n, JoinType type) {
- List<JoinType> ret = new ArrayList<JoinType>();
+ List<JoinType> ret = new ArrayList<>();
for(int i=0; i<n; i++) {
ret.add(type);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/storm/trident/drpc/ReturnResultsReducer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/drpc/ReturnResultsReducer.java b/storm-core/src/jvm/storm/trident/drpc/ReturnResultsReducer.java
index 849fb10..515d3a2 100644
--- a/storm-core/src/jvm/storm/trident/drpc/ReturnResultsReducer.java
+++ b/storm-core/src/jvm/storm/trident/drpc/ReturnResultsReducer.java
@@ -40,7 +40,7 @@ import storm.trident.tuple.TridentTuple;
public class ReturnResultsReducer implements MultiReducer<ReturnResultsState> {
public static class ReturnResultsState {
- List<TridentTuple> results = new ArrayList<TridentTuple>();
+ List<TridentTuple> results = new ArrayList<>();
String returnInfo;
@Override
@@ -50,7 +50,7 @@ public class ReturnResultsReducer implements MultiReducer<ReturnResultsState> {
}
boolean local;
Map conf;
- Map<List, DRPCInvocationsClient> _clients = new HashMap<List, DRPCInvocationsClient>();
+ Map<List, DRPCInvocationsClient> _clients = new HashMap<>();
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/storm/trident/fluent/ChainedAggregatorDeclarer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/fluent/ChainedAggregatorDeclarer.java b/storm-core/src/jvm/storm/trident/fluent/ChainedAggregatorDeclarer.java
index 8040e8b..976f983 100644
--- a/storm-core/src/jvm/storm/trident/fluent/ChainedAggregatorDeclarer.java
+++ b/storm-core/src/jvm/storm/trident/fluent/ChainedAggregatorDeclarer.java
@@ -59,7 +59,7 @@ public class ChainedAggregatorDeclarer implements ChainedFullAggregatorDeclarer,
}
}
- List<AggSpec> _aggs = new ArrayList<AggSpec>();
+ List<AggSpec> _aggs = new ArrayList<>();
IAggregatableStream _stream;
AggType _type = null;
GlobalAggregationScheme _globalScheme;
@@ -73,8 +73,8 @@ public class ChainedAggregatorDeclarer implements ChainedFullAggregatorDeclarer,
Fields[] inputFields = new Fields[_aggs.size()];
Aggregator[] aggs = new Aggregator[_aggs.size()];
int[] outSizes = new int[_aggs.size()];
- List<String> allOutFields = new ArrayList<String>();
- Set<String> allInFields = new HashSet<String>();
+ List<String> allOutFields = new ArrayList<>();
+ Set<String> allInFields = new HashSet<>();
for(int i=0; i<_aggs.size(); i++) {
AggSpec spec = _aggs.get(i);
Fields infields = spec.inFields;
@@ -92,7 +92,7 @@ public class ChainedAggregatorDeclarer implements ChainedFullAggregatorDeclarer,
throw new IllegalArgumentException("Output fields for chained aggregators must be distinct: " + allOutFields.toString());
}
- Fields inFields = new Fields(new ArrayList<String>(allInFields));
+ Fields inFields = new Fields(new ArrayList<>(allInFields));
Fields outFields = new Fields(allOutFields);
Aggregator combined = new ChainedAggregatorImpl(aggs, inputFields, new ComboList.Factory(outSizes));
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/storm/trident/graph/GraphGrouper.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/graph/GraphGrouper.java b/storm-core/src/jvm/storm/trident/graph/GraphGrouper.java
index 94db077..6f34e8c 100644
--- a/storm-core/src/jvm/storm/trident/graph/GraphGrouper.java
+++ b/storm-core/src/jvm/storm/trident/graph/GraphGrouper.java
@@ -28,14 +28,13 @@ import storm.trident.util.IndexedEdge;
public class GraphGrouper {
-
- DirectedGraph<Node, IndexedEdge> graph;
- Set<Group> currGroups;
- Map<Node, Group> groupIndex = new HashMap();
+ final DirectedGraph<Node, IndexedEdge> graph;
+ final Set<Group> currGroups;
+ final Map<Node, Group> groupIndex = new HashMap<>();
public GraphGrouper(DirectedGraph<Node, IndexedEdge> graph, Collection<Group> initialGroups) {
this.graph = graph;
- this.currGroups = new HashSet(initialGroups);
+ this.currGroups = new HashSet<>(initialGroups);
reindex();
}
@@ -95,7 +94,7 @@ public class GraphGrouper {
}
public Collection<Group> outgoingGroups(Group g) {
- Set<Group> ret = new HashSet();
+ Set<Group> ret = new HashSet<>();
for(Node n: g.outgoingNodes()) {
Group other = nodeGroup(n);
if(other==null || !other.equals(g)) {
@@ -106,7 +105,7 @@ public class GraphGrouper {
}
public Collection<Group> incomingGroups(Group g) {
- Set<Group> ret = new HashSet();
+ Set<Group> ret = new HashSet<>();
for(Node n: g.incomingNodes()) {
Group other = nodeGroup(n);
if(other==null || !other.equals(g)) {
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/storm/trident/graph/Group.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/graph/Group.java b/storm-core/src/jvm/storm/trident/graph/Group.java
index 8ed0023..64bdec6 100644
--- a/storm-core/src/jvm/storm/trident/graph/Group.java
+++ b/storm-core/src/jvm/storm/trident/graph/Group.java
@@ -29,14 +29,13 @@ import storm.trident.util.TridentUtils;
public class Group {
- public Set<Node> nodes = new HashSet<Node>();
- private DirectedGraph<Node, IndexedEdge> graph;
- private String id;
+ public final Set<Node> nodes = new HashSet<>();
+ private final DirectedGraph<Node, IndexedEdge> graph;
+ private final String id = UUID.randomUUID().toString();
public Group(DirectedGraph graph, List<Node> nodes) {
- init(graph);
- this.nodes.addAll(nodes);
this.graph = graph;
+ this.nodes.addAll(nodes);
}
public Group(DirectedGraph graph, Node n) {
@@ -44,18 +43,13 @@ public class Group {
}
public Group(Group g1, Group g2) {
- init(g1.graph);
+ this.graph = g1.graph;
nodes.addAll(g1.nodes);
nodes.addAll(g2.nodes);
}
- private void init(DirectedGraph graph) {
- this.graph = graph;
- this.id = UUID.randomUUID().toString();
- }
-
public Set<Node> outgoingNodes() {
- Set<Node> ret = new HashSet<Node>();
+ Set<Node> ret = new HashSet<>();
for(Node n: nodes) {
ret.addAll(TridentUtils.getChildren(graph, n));
}
@@ -63,7 +57,7 @@ public class Group {
}
public Set<Node> incomingNodes() {
- Set<Node> ret = new HashSet<Node>();
+ Set<Node> ret = new HashSet<>();
for(Node n: nodes) {
ret.addAll(TridentUtils.getParents(graph, n));
}
@@ -77,6 +71,9 @@ public class Group {
@Override
public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
return id.equals(((Group) o).id);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/storm/trident/operation/builtin/SnapshotGet.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/operation/builtin/SnapshotGet.java b/storm-core/src/jvm/storm/trident/operation/builtin/SnapshotGet.java
index 6d24ae6..2a44eab 100644
--- a/storm-core/src/jvm/storm/trident/operation/builtin/SnapshotGet.java
+++ b/storm-core/src/jvm/storm/trident/operation/builtin/SnapshotGet.java
@@ -29,9 +29,9 @@ public class SnapshotGet extends BaseQueryFunction<ReadOnlySnapshottable, Object
@Override
public List<Object> batchRetrieve(ReadOnlySnapshottable state, List<TridentTuple> args) {
- List<Object> ret = new ArrayList<Object>(args.size());
+ List<Object> ret = new ArrayList<>(args.size());
Object snapshot = state.get();
- for(int i=0; i<args.size(); i++) {
+ for (TridentTuple arg : args) {
ret.add(snapshot);
}
return ret;
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/storm/trident/operation/builtin/TupleCollectionGet.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/operation/builtin/TupleCollectionGet.java b/storm-core/src/jvm/storm/trident/operation/builtin/TupleCollectionGet.java
index 52dd633..426b56f 100644
--- a/storm-core/src/jvm/storm/trident/operation/builtin/TupleCollectionGet.java
+++ b/storm-core/src/jvm/storm/trident/operation/builtin/TupleCollectionGet.java
@@ -30,9 +30,9 @@ public class TupleCollectionGet extends BaseQueryFunction<State, Iterator<List<O
@Override
public List<Iterator<List<Object>>> batchRetrieve(State state, List<TridentTuple> args) {
- List<Iterator<List<Object>>> ret = new ArrayList(args.size());
- for(int i=0; i<args.size(); i++) {
- ret.add(((ITupleCollection)state).getTuples());
+ List<Iterator<List<Object>>> ret = new ArrayList<>(args.size());
+ for (TridentTuple arg : args) {
+ ret.add(((ITupleCollection) state).getTuples());
}
return ret;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/storm/trident/partition/GlobalGrouping.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/partition/GlobalGrouping.java b/storm-core/src/jvm/storm/trident/partition/GlobalGrouping.java
index 54fa844..a535eb2 100644
--- a/storm-core/src/jvm/storm/trident/partition/GlobalGrouping.java
+++ b/storm-core/src/jvm/storm/trident/partition/GlobalGrouping.java
@@ -26,13 +26,11 @@ import java.util.Collections;
import java.util.List;
public class GlobalGrouping implements CustomStreamGrouping {
-
List<Integer> target;
-
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targets) {
- List<Integer> sorted = new ArrayList<Integer>(targets);
+ List<Integer> sorted = new ArrayList<>(targets);
Collections.sort(sorted);
target = Arrays.asList(sorted.get(0));
}
@@ -41,5 +39,4 @@ public class GlobalGrouping implements CustomStreamGrouping {
public List<Integer> chooseTasks(int i, List<Object> list) {
return target;
}
-
}
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/storm/trident/partition/IdentityGrouping.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/partition/IdentityGrouping.java b/storm-core/src/jvm/storm/trident/partition/IdentityGrouping.java
index 30f48ad..88c2922 100644
--- a/storm-core/src/jvm/storm/trident/partition/IdentityGrouping.java
+++ b/storm-core/src/jvm/storm/trident/partition/IdentityGrouping.java
@@ -29,18 +29,16 @@ import java.util.Map;
public class IdentityGrouping implements CustomStreamGrouping {
-
- List<Integer> ret = new ArrayList<Integer>();
- Map<Integer, List<Integer>> _precomputed = new HashMap();
+ final Map<Integer, List<Integer>> _precomputed = new HashMap<>();
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> tasks) {
- List<Integer> sourceTasks = new ArrayList<Integer>(context.getComponentTasks(stream.get_componentId()));
+ List<Integer> sourceTasks = new ArrayList<>(context.getComponentTasks(stream.get_componentId()));
Collections.sort(sourceTasks);
if(sourceTasks.size()!=tasks.size()) {
throw new RuntimeException("Can only do an identity grouping when source and target have same number of tasks");
}
- tasks = new ArrayList<Integer>(tasks);
+ tasks = new ArrayList<>(tasks);
Collections.sort(tasks);
for(int i=0; i<sourceTasks.size(); i++) {
int s = sourceTasks.get(i);
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/storm/trident/planner/Node.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/planner/Node.java b/storm-core/src/jvm/storm/trident/planner/Node.java
index 6284cb9..d35e7da 100644
--- a/storm-core/src/jvm/storm/trident/planner/Node.java
+++ b/storm-core/src/jvm/storm/trident/planner/Node.java
@@ -26,7 +26,7 @@ import org.apache.commons.lang.builder.ToStringStyle;
public class Node implements Serializable {
- private static AtomicInteger INDEX = new AtomicInteger(0);
+ private static final AtomicInteger INDEX = new AtomicInteger(0);
private String nodeId;
@@ -47,6 +47,9 @@ public class Node implements Serializable {
@Override
public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
return nodeId.equals(((Node) o).nodeId);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/storm/trident/planner/PartitionNode.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/planner/PartitionNode.java b/storm-core/src/jvm/storm/trident/planner/PartitionNode.java
index 4f10c25..4485765 100644
--- a/storm-core/src/jvm/storm/trident/planner/PartitionNode.java
+++ b/storm-core/src/jvm/storm/trident/planner/PartitionNode.java
@@ -22,8 +22,6 @@ import backtype.storm.tuple.Fields;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
-import java.util.ArrayList;
-import java.util.List;
import storm.trident.util.TridentUtils;
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/storm/trident/planner/SubtopologyBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/planner/SubtopologyBolt.java b/storm-core/src/jvm/storm/trident/planner/SubtopologyBolt.java
index cdceaf9..6320a21 100644
--- a/storm-core/src/jvm/storm/trident/planner/SubtopologyBolt.java
+++ b/storm-core/src/jvm/storm/trident/planner/SubtopologyBolt.java
@@ -23,7 +23,6 @@ import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
-import backtype.storm.utils.Utils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -49,9 +48,9 @@ import storm.trident.util.TridentUtils;
public class SubtopologyBolt implements ITridentBatchBolt {
DirectedGraph _graph;
Set<Node> _nodes;
- Map<String, InitialReceiver> _roots = new HashMap();
- Map<Node, Factory> _outputFactories = new HashMap();
- Map<String, List<TridentProcessor>> _myTopologicallyOrdered = new HashMap();
+ Map<String, InitialReceiver> _roots = new HashMap<>();
+ Map<Node, Factory> _outputFactories = new HashMap<>();
+ Map<String, List<TridentProcessor>> _myTopologicallyOrdered = new HashMap<>();
Map<Node, String> _batchGroups;
//given processornodes and static state nodes
@@ -71,7 +70,7 @@ public class SubtopologyBolt implements ITridentBatchBolt {
}
}
DirectedSubgraph<Node, Object> subgraph = new DirectedSubgraph(_graph, _nodes, null);
- TopologicalOrderIterator it = new TopologicalOrderIterator<Node, Object>(subgraph);
+ TopologicalOrderIterator it = new TopologicalOrderIterator<>(subgraph);
int stateIndex = 0;
while(it.hasNext()) {
Node n = (Node) it.next();
@@ -82,8 +81,8 @@ public class SubtopologyBolt implements ITridentBatchBolt {
_myTopologicallyOrdered.put(batchGroup, new ArrayList());
}
_myTopologicallyOrdered.get(batchGroup).add(pn.processor);
- List<String> parentStreams = new ArrayList();
- List<Factory> parentFactories = new ArrayList();
+ List<String> parentStreams = new ArrayList<>();
+ List<Factory> parentFactories = new ArrayList<>();
for(Node p: TridentUtils.getParents(_graph, n)) {
parentStreams.add(p.streamId);
if(_nodes.contains(p)) {
@@ -96,7 +95,7 @@ public class SubtopologyBolt implements ITridentBatchBolt {
parentFactories.add(_roots.get(p.streamId).getOutputFactory());
}
}
- List<TupleReceiver> targets = new ArrayList();
+ List<TupleReceiver> targets = new ArrayList<>();
boolean outgoingNode = false;
for(Node cn: TridentUtils.getChildren(_graph, n)) {
if(_nodes.contains(cn)) {
@@ -185,7 +184,7 @@ public class SubtopologyBolt implements ITridentBatchBolt {
protected static class InitialReceiver {
- List<TridentProcessor> _receivers = new ArrayList();
+ List<TridentProcessor> _receivers = new ArrayList<>();
RootFactory _factory;
ProjectionFactory _project;
String _stream;
@@ -195,7 +194,7 @@ public class SubtopologyBolt implements ITridentBatchBolt {
// how to distinguish "batch" streams from non-batch streams?
_stream = stream;
_factory = new RootFactory(allFields);
- List<String> projected = new ArrayList(allFields.toList());
+ List<String> projected = new ArrayList<>(allFields.toList());
projected.remove(0);
_project = new ProjectionFactory(_factory, new Fields(projected));
}
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/storm/trident/planner/processor/MultiReducerProcessor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/planner/processor/MultiReducerProcessor.java b/storm-core/src/jvm/storm/trident/planner/processor/MultiReducerProcessor.java
index 6777d2f..4086996 100644
--- a/storm-core/src/jvm/storm/trident/planner/processor/MultiReducerProcessor.java
+++ b/storm-core/src/jvm/storm/trident/planner/processor/MultiReducerProcessor.java
@@ -49,7 +49,7 @@ public class MultiReducerProcessor implements TridentProcessor {
public void prepare(Map conf, TopologyContext context, TridentContext tridentContext) {
List<Factory> parents = tridentContext.getParentTupleFactories();
_context = tridentContext;
- _streamToIndex = new HashMap<String, Integer>();
+ _streamToIndex = new HashMap<>();
List<String> parentStreams = tridentContext.getParentStreams();
for(int i=0; i<parentStreams.size(); i++) {
_streamToIndex.put(parentStreams.get(i), i);
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/storm/trident/spout/TridentSpoutExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/spout/TridentSpoutExecutor.java b/storm-core/src/jvm/storm/trident/spout/TridentSpoutExecutor.java
index 63debdb..d43d4e4 100644
--- a/storm-core/src/jvm/storm/trident/spout/TridentSpoutExecutor.java
+++ b/storm-core/src/jvm/storm/trident/spout/TridentSpoutExecutor.java
@@ -39,7 +39,7 @@ import storm.trident.tuple.ConsList;
public class TridentSpoutExecutor implements ITridentBatchBolt {
public static final String ID_FIELD = "$tx";
- public static Logger LOG = LoggerFactory.getLogger(TridentSpoutExecutor.class);
+ public static final Logger LOG = LoggerFactory.getLogger(TridentSpoutExecutor.class);
AddIdCollector _collector;
ITridentSpout<Object> _spout;
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/storm/trident/tuple/TridentTupleView.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/tuple/TridentTupleView.java b/storm-core/src/jvm/storm/trident/tuple/TridentTupleView.java
index 04b554e..761b221 100644
--- a/storm-core/src/jvm/storm/trident/tuple/TridentTupleView.java
+++ b/storm-core/src/jvm/storm/trident/tuple/TridentTupleView.java
@@ -31,7 +31,9 @@ import java.util.Map;
import java.util.Set;
import java.util.Arrays;
-//extends abstractlist so that it can be emitted directly as Storm tuples
+/**
+ * Extends AbstractList so that it can be emitted directly as Storm tuples
+ */
public class TridentTupleView extends AbstractList<Object> implements TridentTuple {
ValuePointer[] _index;
Map<String, ValuePointer> _fieldIndex;
@@ -114,7 +116,7 @@ public class TridentTupleView extends AbstractList<Object> implements TridentTup
public OperationOutputFactory(Factory parent, Fields selfFields) {
_parent = parent;
- _fieldIndex = new HashMap(parent.getFieldIndex());
+ _fieldIndex = new HashMap<>(parent.getFieldIndex());
int myIndex = parent.numDelegates();
for(int i=0; i<selfFields.size(); i++) {
String field = selfFields.get(i);
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java b/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java
index 2ab0109..dfb78fc 100644
--- a/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java
+++ b/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java
@@ -22,7 +22,6 @@ import java.util.concurrent.atomic.AtomicReference;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.dsl.ProducerType;
-import com.lmax.disruptor.InsufficientCapacityException;
import org.junit.Assert;
import org.junit.Test;
import junit.framework.TestCase;
@@ -41,7 +40,7 @@ public class DisruptorQueueTest extends TestCase {
Runnable producer = new IncProducer(queue, i+100);
- final AtomicReference<Object> result = new AtomicReference<Object>();
+ final AtomicReference<Object> result = new AtomicReference<>();
Runnable consumer = new Consumer(queue, new EventHandler<Object>() {
private boolean head = true;
@@ -130,7 +129,7 @@ public class DisruptorQueueTest extends TestCase {
queue.publish(i);
}
}
- };
+ }
private static class Consumer implements Runnable {
private EventHandler handler;
@@ -151,7 +150,7 @@ public class DisruptorQueueTest extends TestCase {
//break
}
}
- };
+ }
private static DisruptorQueue createQueue(String name, int queueSize) {
return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 1, 1L);
[5/5] storm git commit: Added STORM-1164 to CHANGELOG.
Posted by sr...@apache.org.
Added STORM-1164 to CHANGELOG.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e25f28f7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e25f28f7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e25f28f7
Branch: refs/heads/master
Commit: e25f28f7175757e98c14b5d1a502c611a8b9282b
Parents: 1acadff
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Mon Nov 9 18:24:54 2015 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Mon Nov 9 18:24:54 2015 -0800
----------------------------------------------------------------------
CHANGELOG.md | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/e25f28f7/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index b914b96..5e23d59 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,6 @@
## 0.11.0
- * STORM-902: Simple Log Search
+ * STORM-1164: Code cleanup for typos, warnings and conciseness.
+ * STORM-902: Simple Log Search.
* STORM-1052: TridentKafkaState uses new Kafka Producer API.
* STORM-1182: Removing and wrapping some exceptions in ConfigValidation to make code cleaner
* STORM-1134. Windows: Fix log4j config.
[4/5] storm git commit: Merge branch 'STORM-1164' of
https://github.com/sureshms/incubator-storm into STORM-1164
Posted by sr...@apache.org.
Merge branch 'STORM-1164' of https://github.com/sureshms/incubator-storm into STORM-1164
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1acadff4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1acadff4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1acadff4
Branch: refs/heads/master
Commit: 1acadff444a6660f2fedd3466863736e30eb2f01
Parents: 5a4e1f8 f0e4c6f
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Mon Nov 9 16:57:05 2015 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Mon Nov 9 16:57:05 2015 -0800
----------------------------------------------------------------------
storm-core/src/jvm/backtype/storm/Config.java | 36 ++++----
.../src/jvm/backtype/storm/LogWriter.java | 2 +-
.../src/jvm/backtype/storm/StormSubmitter.java | 4 -
.../storm/drpc/DRPCInvocationsClient.java | 5 +-
.../storm/generated/AlreadyAliveException.java | 7 +-
.../storm/grouping/PartialKeyGrouping.java | 5 +-
.../storm/messaging/ConnectionWithStatus.java | 4 +-
.../jvm/backtype/storm/messaging/IContext.java | 2 +-
.../storm/messaging/TransportFactory.java | 2 +-
.../backtype/storm/messaging/netty/Client.java | 2 +-
.../backtype/storm/messaging/netty/Context.java | 8 +-
.../storm/messaging/netty/ControlMessage.java | 5 +-
.../storm/messaging/netty/MessageBatch.java | 14 ++--
.../storm/messaging/netty/MessageDecoder.java | 7 +-
.../storm/messaging/netty/SaslMessageToken.java | 10 +--
.../storm/messaging/netty/SaslNettyClient.java | 6 +-
.../storm/messaging/netty/SaslNettyServer.java | 4 -
.../messaging/netty/SaslStormClientHandler.java | 4 +-
.../messaging/netty/SaslStormServerHandler.java | 11 +--
.../storm/messaging/netty/SaslUtils.java | 11 +--
.../backtype/storm/messaging/netty/Server.java | 26 +++---
.../backtype/storm/metric/EventLoggerBolt.java | 8 --
.../storm/metric/FileBasedEventLogger.java | 2 +-
.../metric/HttpForwardingMetricsConsumer.java | 1 -
.../metric/HttpForwardingMetricsServer.java | 1 -
.../jvm/backtype/storm/metric/IEventLogger.java | 8 +-
.../storm/metric/LoggingMetricsConsumer.java | 1 -
.../storm/metric/MetricsConsumerBolt.java | 1 -
.../jvm/backtype/storm/metric/SystemBolt.java | 5 --
.../backtype/storm/metric/api/CountMetric.java | 2 -
.../backtype/storm/metric/api/MeanReducer.java | 4 +-
.../storm/metric/api/MultiCountMetric.java | 2 +-
.../storm/metric/api/MultiReducedMetric.java | 2 +-
.../storm/metric/api/rpc/CountShellMetric.java | 3 +-
.../AbstractDNSToSwitchMapping.java | 2 +-
.../DefaultRackDNSToSwitchMapping.java | 4 +-
.../backtype/storm/nimbus/ILeaderElector.java | 6 +-
.../jvm/backtype/storm/nimbus/NimbusInfo.java | 4 +-
.../jvm/backtype/storm/scheduler/Cluster.java | 5 +-
.../scheduler/SchedulerAssignmentImpl.java | 15 ++--
.../storm/scheduler/SupervisorDetails.java | 6 +-
.../backtype/storm/scheduler/Topologies.java | 6 +-
.../storm/scheduler/TopologyDetails.java | 30 +++----
.../scheduler/multitenant/DefaultPool.java | 22 ++---
.../storm/scheduler/multitenant/FreePool.java | 6 +-
.../scheduler/multitenant/IsolatedPool.java | 32 ++++---
.../multitenant/MultitenantScheduler.java | 6 +-
.../storm/scheduler/multitenant/Node.java | 17 ++--
.../storm/scheduler/multitenant/NodePool.java | 16 ++--
.../strategies/ResourceAwareStrategy.java | 39 +++++----
.../backtype/storm/security/auth/AuthUtils.java | 27 +++---
.../auth/DefaultHttpCredentialsPlugin.java | 6 +-
.../security/auth/DefaultPrincipalToLocal.java | 1 -
.../storm/security/auth/IAuthorizer.java | 4 +-
.../security/auth/ICredentialsRenewer.java | 3 +-
.../security/auth/IHttpCredentialsPlugin.java | 2 -
.../storm/security/auth/IPrincipalToLocal.java | 2 +-
.../storm/security/auth/ITransportPlugin.java | 4 -
.../security/auth/KerberosPrincipalToLocal.java | 2 +-
.../storm/security/auth/ReqContext.java | 11 +--
.../security/auth/SaslTransportPlugin.java | 12 +--
.../security/auth/SimpleTransportPlugin.java | 6 +-
.../security/auth/SingleUserPrincipal.java | 5 +-
.../storm/security/auth/TBackoffConnect.java | 1 -
.../storm/security/auth/ThriftClient.java | 10 +--
.../storm/security/auth/ThriftServer.java | 6 +-
.../authorizer/DRPCSimpleACLAuthorizer.java | 11 ++-
.../auth/authorizer/DenyAuthorizer.java | 11 +--
.../authorizer/ImpersonationAuthorizer.java | 10 +--
.../auth/authorizer/NoopAuthorizer.java | 7 +-
.../auth/authorizer/SimpleACLAuthorizer.java | 18 ++--
.../authorizer/SimpleWhitelistAuthorizer.java | 4 -
.../auth/digest/ClientCallbackHandler.java | 2 -
.../auth/digest/DigestSaslTransportPlugin.java | 2 -
.../auth/digest/ServerCallbackHandler.java | 5 +-
.../GzipThriftSerializationDelegate.java | 1 -
.../storm/serialization/ITupleDeserializer.java | 1 -
.../serialization/KryoTupleDeserializer.java | 3 -
.../serialization/KryoValuesDeserializer.java | 3 +-
.../serialization/SerializationFactory.java | 17 ++--
.../storm/task/GeneralTopologyContext.java | 10 +--
.../backtype/storm/task/TopologyContext.java | 9 +-
.../storm/topology/BasicBoltExecutor.java | 2 +-
.../storm/topology/OutputFieldsGetter.java | 2 +-
.../storm/topology/TopologyBuilder.java | 16 ++--
.../storm/topology/base/BaseBatchBolt.java | 1 -
.../topology/base/BaseTransactionalSpout.java | 1 -
.../TransactionalSpoutCoordinator.java | 2 +-
...uePartitionedTransactionalSpoutExecutor.java | 8 +-
.../PartitionedTransactionalSpoutExecutor.java | 2 +-
.../src/jvm/backtype/storm/tuple/Fields.java | 10 +--
.../src/jvm/backtype/storm/tuple/MessageId.java | 10 +--
.../src/jvm/backtype/storm/tuple/Tuple.java | 1 -
.../src/jvm/backtype/storm/tuple/TupleImpl.java | 4 +-
.../jvm/backtype/storm/utils/DRPCClient.java | 1 -
.../backtype/storm/utils/DisruptorQueue.java | 5 +-
.../backtype/storm/utils/InprocMessaging.java | 4 +-
.../storm/utils/KeyedRoundRobinQueue.java | 6 +-
.../jvm/backtype/storm/utils/ListDelegate.java | 6 +-
.../src/jvm/backtype/storm/utils/Monitor.java | 3 +-
.../jvm/backtype/storm/utils/NimbusClient.java | 10 +--
.../storm/utils/RegisteredGlobalState.java | 6 +-
.../jvm/backtype/storm/utils/RotatingMap.java | 2 +-
.../backtype/storm/utils/ServiceRegistry.java | 2 +-
.../StormBoundedExponentialBackoffRetry.java | 3 +-
.../jvm/backtype/storm/utils/VersionInfo.java | 2 +-
.../src/jvm/storm/trident/TridentTopology.java | 87 ++++++++++----------
.../trident/drpc/ReturnResultsReducer.java | 4 +-
.../fluent/ChainedAggregatorDeclarer.java | 8 +-
.../jvm/storm/trident/graph/GraphGrouper.java | 13 ++-
.../src/jvm/storm/trident/graph/Group.java | 23 +++---
.../trident/operation/builtin/SnapshotGet.java | 4 +-
.../operation/builtin/TupleCollectionGet.java | 6 +-
.../storm/trident/partition/GlobalGrouping.java | 5 +-
.../trident/partition/IdentityGrouping.java | 8 +-
.../src/jvm/storm/trident/planner/Node.java | 5 +-
.../storm/trident/planner/PartitionNode.java | 2 -
.../storm/trident/planner/SubtopologyBolt.java | 19 ++---
.../processor/MultiReducerProcessor.java | 2 +-
.../trident/spout/TridentSpoutExecutor.java | 2 +-
.../storm/trident/tuple/TridentTupleView.java | 6 +-
.../storm/utils/DisruptorQueueTest.java | 7 +-
122 files changed, 378 insertions(+), 579 deletions(-)
----------------------------------------------------------------------
[3/5] storm git commit: STORM-1164. Code cleanup for typos,
warnings and conciseness
Posted by sr...@apache.org.
STORM-1164. Code cleanup for typos, warnings and conciseness
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f0e4c6f2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f0e4c6f2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f0e4c6f2
Branch: refs/heads/master
Commit: f0e4c6f2099196760f0037e7812dda543d332c54
Parents: c12e28c
Author: Suresh Srinivas <su...@yahoo-inc.com>
Authored: Tue Nov 3 16:03:16 2015 -0800
Committer: Suresh Srinivas <su...@yahoo-inc.com>
Committed: Mon Nov 9 11:56:11 2015 -0800
----------------------------------------------------------------------
storm-core/src/jvm/backtype/storm/Config.java | 36 ++++----
.../src/jvm/backtype/storm/LogWriter.java | 2 +-
.../src/jvm/backtype/storm/StormSubmitter.java | 4 -
.../storm/drpc/DRPCInvocationsClient.java | 5 +-
.../storm/generated/AlreadyAliveException.java | 7 +-
.../storm/grouping/PartialKeyGrouping.java | 5 +-
.../storm/messaging/ConnectionWithStatus.java | 4 +-
.../jvm/backtype/storm/messaging/IContext.java | 2 +-
.../storm/messaging/TransportFactory.java | 2 +-
.../backtype/storm/messaging/netty/Client.java | 2 +-
.../backtype/storm/messaging/netty/Context.java | 8 +-
.../storm/messaging/netty/ControlMessage.java | 5 +-
.../storm/messaging/netty/MessageBatch.java | 14 ++--
.../storm/messaging/netty/MessageDecoder.java | 7 +-
.../storm/messaging/netty/SaslMessageToken.java | 10 +--
.../storm/messaging/netty/SaslNettyClient.java | 6 +-
.../storm/messaging/netty/SaslNettyServer.java | 4 -
.../messaging/netty/SaslStormClientHandler.java | 4 +-
.../messaging/netty/SaslStormServerHandler.java | 11 +--
.../storm/messaging/netty/SaslUtils.java | 11 +--
.../backtype/storm/messaging/netty/Server.java | 26 +++---
.../backtype/storm/metric/EventLoggerBolt.java | 8 --
.../storm/metric/FileBasedEventLogger.java | 2 +-
.../metric/HttpForwardingMetricsConsumer.java | 1 -
.../metric/HttpForwardingMetricsServer.java | 1 -
.../jvm/backtype/storm/metric/IEventLogger.java | 8 +-
.../storm/metric/LoggingMetricsConsumer.java | 1 -
.../storm/metric/MetricsConsumerBolt.java | 1 -
.../jvm/backtype/storm/metric/SystemBolt.java | 5 --
.../backtype/storm/metric/api/CountMetric.java | 2 -
.../backtype/storm/metric/api/MeanReducer.java | 4 +-
.../storm/metric/api/MultiCountMetric.java | 2 +-
.../storm/metric/api/MultiReducedMetric.java | 2 +-
.../storm/metric/api/rpc/CountShellMetric.java | 3 +-
.../AbstractDNSToSwitchMapping.java | 2 +-
.../DefaultRackDNSToSwitchMapping.java | 4 +-
.../backtype/storm/nimbus/ILeaderElector.java | 6 +-
.../jvm/backtype/storm/nimbus/NimbusInfo.java | 4 +-
.../jvm/backtype/storm/scheduler/Cluster.java | 5 +-
.../scheduler/SchedulerAssignmentImpl.java | 15 ++--
.../storm/scheduler/SupervisorDetails.java | 6 +-
.../backtype/storm/scheduler/Topologies.java | 6 +-
.../storm/scheduler/TopologyDetails.java | 30 +++----
.../scheduler/multitenant/DefaultPool.java | 22 ++---
.../storm/scheduler/multitenant/FreePool.java | 6 +-
.../scheduler/multitenant/IsolatedPool.java | 32 ++++---
.../multitenant/MultitenantScheduler.java | 6 +-
.../storm/scheduler/multitenant/Node.java | 17 ++--
.../storm/scheduler/multitenant/NodePool.java | 16 ++--
.../strategies/ResourceAwareStrategy.java | 39 +++++----
.../backtype/storm/security/auth/AuthUtils.java | 27 +++---
.../auth/DefaultHttpCredentialsPlugin.java | 6 +-
.../security/auth/DefaultPrincipalToLocal.java | 1 -
.../storm/security/auth/IAuthorizer.java | 4 +-
.../security/auth/ICredentialsRenewer.java | 3 +-
.../security/auth/IHttpCredentialsPlugin.java | 2 -
.../storm/security/auth/IPrincipalToLocal.java | 2 +-
.../storm/security/auth/ITransportPlugin.java | 4 -
.../security/auth/KerberosPrincipalToLocal.java | 2 +-
.../storm/security/auth/ReqContext.java | 11 +--
.../security/auth/SaslTransportPlugin.java | 12 +--
.../security/auth/SimpleTransportPlugin.java | 6 +-
.../security/auth/SingleUserPrincipal.java | 5 +-
.../storm/security/auth/TBackoffConnect.java | 1 -
.../storm/security/auth/ThriftClient.java | 10 +--
.../storm/security/auth/ThriftServer.java | 6 +-
.../authorizer/DRPCSimpleACLAuthorizer.java | 11 ++-
.../auth/authorizer/DenyAuthorizer.java | 11 +--
.../authorizer/ImpersonationAuthorizer.java | 10 +--
.../auth/authorizer/NoopAuthorizer.java | 7 +-
.../auth/authorizer/SimpleACLAuthorizer.java | 18 ++--
.../authorizer/SimpleWhitelistAuthorizer.java | 4 -
.../auth/digest/ClientCallbackHandler.java | 2 -
.../auth/digest/DigestSaslTransportPlugin.java | 2 -
.../auth/digest/ServerCallbackHandler.java | 5 +-
.../GzipThriftSerializationDelegate.java | 1 -
.../storm/serialization/ITupleDeserializer.java | 1 -
.../serialization/KryoTupleDeserializer.java | 3 -
.../serialization/KryoValuesDeserializer.java | 3 +-
.../serialization/SerializationFactory.java | 17 ++--
.../storm/task/GeneralTopologyContext.java | 10 +--
.../backtype/storm/task/TopologyContext.java | 9 +-
.../storm/topology/BasicBoltExecutor.java | 2 +-
.../storm/topology/OutputFieldsGetter.java | 2 +-
.../storm/topology/TopologyBuilder.java | 16 ++--
.../storm/topology/base/BaseBatchBolt.java | 1 -
.../topology/base/BaseTransactionalSpout.java | 1 -
.../TransactionalSpoutCoordinator.java | 2 +-
...uePartitionedTransactionalSpoutExecutor.java | 8 +-
.../PartitionedTransactionalSpoutExecutor.java | 2 +-
.../src/jvm/backtype/storm/tuple/Fields.java | 10 +--
.../src/jvm/backtype/storm/tuple/MessageId.java | 10 +--
.../src/jvm/backtype/storm/tuple/Tuple.java | 1 -
.../src/jvm/backtype/storm/tuple/TupleImpl.java | 4 +-
.../jvm/backtype/storm/utils/DRPCClient.java | 1 -
.../backtype/storm/utils/DisruptorQueue.java | 5 +-
.../backtype/storm/utils/InprocMessaging.java | 4 +-
.../storm/utils/KeyedRoundRobinQueue.java | 6 +-
.../jvm/backtype/storm/utils/ListDelegate.java | 6 +-
.../src/jvm/backtype/storm/utils/Monitor.java | 3 +-
.../jvm/backtype/storm/utils/NimbusClient.java | 10 +--
.../storm/utils/RegisteredGlobalState.java | 6 +-
.../jvm/backtype/storm/utils/RotatingMap.java | 2 +-
.../backtype/storm/utils/ServiceRegistry.java | 2 +-
.../StormBoundedExponentialBackoffRetry.java | 3 +-
.../jvm/backtype/storm/utils/VersionInfo.java | 2 +-
.../src/jvm/storm/trident/TridentTopology.java | 87 ++++++++++----------
.../trident/drpc/ReturnResultsReducer.java | 4 +-
.../fluent/ChainedAggregatorDeclarer.java | 8 +-
.../jvm/storm/trident/graph/GraphGrouper.java | 13 ++-
.../src/jvm/storm/trident/graph/Group.java | 23 +++---
.../trident/operation/builtin/SnapshotGet.java | 4 +-
.../operation/builtin/TupleCollectionGet.java | 6 +-
.../storm/trident/partition/GlobalGrouping.java | 5 +-
.../trident/partition/IdentityGrouping.java | 8 +-
.../src/jvm/storm/trident/planner/Node.java | 5 +-
.../storm/trident/planner/PartitionNode.java | 2 -
.../storm/trident/planner/SubtopologyBolt.java | 19 ++---
.../processor/MultiReducerProcessor.java | 2 +-
.../trident/spout/TridentSpoutExecutor.java | 2 +-
.../storm/trident/tuple/TridentTupleView.java | 6 +-
.../storm/utils/DisruptorQueueTest.java | 7 +-
122 files changed, 378 insertions(+), 579 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index d80876e..1485361 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -166,7 +166,7 @@ public class Config extends HashMap<String, Object> {
public static final String STORM_LOG4J2_CONF_DIR = "storm.log4j2.conf.dir";
/**
- * A global task scheduler used to assign topologies's tasks to supervisors' wokers.
+ * A global task scheduler used to assign topologies's tasks to supervisors' workers.
*
* If this is not set, a default system scheduler will be used.
*/
@@ -192,7 +192,7 @@ public class Config extends HashMap<String, Object> {
* The hostname the supervisors/workers should report to nimbus. If unset, Storm will
* get the hostname to report by calling <code>InetAddress.getLocalHost().getCanonicalHostName()</code>.
*
- * You should set this config when you dont have a DNS which supervisors/workers
+ * You should set this config when you don't have a DNS which supervisors/workers
* can utilize to find each other based on hostname got from calls to
* <code>InetAddress.getLocalHost().getCanonicalHostName()</code>.
*/
@@ -445,7 +445,7 @@ public class Config extends HashMap<String, Object> {
* How often nimbus should wake up to check heartbeats and do reassignments. Note
* that if a machine ever goes down Nimbus will immediately wake up and take action.
* This parameter is for checking for failures when there's no explicit event like that
- * occuring.
+ * occurring.
*/
@isInteger
@isPositiveNumber
@@ -453,7 +453,7 @@ public class Config extends HashMap<String, Object> {
/**
* How often nimbus should wake the cleanup thread to clean the inbox.
- * @see NIMBUS_INBOX_JAR_EXPIRATION_SECS
+ * @see #NIMBUS_INBOX_JAR_EXPIRATION_SECS
*/
@isInteger
@isPositiveNumber
@@ -466,7 +466,7 @@ public class Config extends HashMap<String, Object> {
* Note that the time it takes to delete an inbox jar file is going to be somewhat more than
* NIMBUS_CLEANUP_INBOX_JAR_EXPIRATION_SECS (depending on how often NIMBUS_CLEANUP_FREQ_SECS
* is set to).
- * @see NIMBUS_CLEANUP_FREQ_SECS
+ * @see #NIMBUS_CLEANUP_INBOX_FREQ_SECS
*/
@isInteger
public static final String NIMBUS_INBOX_JAR_EXPIRATION_SECS = "nimbus.inbox.jar.expiration.secs";
@@ -622,7 +622,7 @@ public class Config extends HashMap<String, Object> {
public static final String LOGVIEWER_HTTPS_KEYSTORE_TYPE = "logviewer.https.keystore.type";
/**
- * Password to the private key in the keystore for settting up HTTPS (SSL).
+ * Password to the private key in the keystore for setting up HTTPS (SSL).
*/
@isString
public static final String LOGVIEWER_HTTPS_KEY_PASSWORD = "logviewer.https.key.password";
@@ -647,7 +647,7 @@ public class Config extends HashMap<String, Object> {
public static final String LOGVIEWER_HTTPS_TRUSTSTORE_TYPE = "logviewer.https.truststore.type";
/**
- * Password to the truststore used by Storm Logviewer settting up HTTPS (SSL).
+ * Password to the truststore used by Storm Logviewer setting up HTTPS (SSL).
*/
@isBoolean
public static final String LOGVIEWER_HTTPS_WANT_CLIENT_AUTH = "logviewer.https.want.client.auth";
@@ -725,19 +725,19 @@ public class Config extends HashMap<String, Object> {
public static final String UI_HTTPS_KEYSTORE_TYPE = "ui.https.keystore.type";
/**
- * Password to the private key in the keystore for settting up HTTPS (SSL).
+ * Password to the private key in the keystore for setting up HTTPS (SSL).
*/
@isString
public static final String UI_HTTPS_KEY_PASSWORD = "ui.https.key.password";
/**
- * Path to the truststore used by Storm UI settting up HTTPS (SSL).
+ * Path to the truststore used by Storm UI setting up HTTPS (SSL).
*/
@isString
public static final String UI_HTTPS_TRUSTSTORE_PATH = "ui.https.truststore.path";
/**
- * Password to the truststore used by Storm UI settting up HTTPS (SSL).
+ * Password to the truststore used by Storm UI setting up HTTPS (SSL).
*/
@isString
public static final String UI_HTTPS_TRUSTSTORE_PASSWORD = "ui.https.truststore.password";
@@ -750,7 +750,7 @@ public class Config extends HashMap<String, Object> {
public static final String UI_HTTPS_TRUSTSTORE_TYPE = "ui.https.truststore.type";
/**
- * Password to the truststore used by Storm DRPC settting up HTTPS (SSL).
+ * Password to the truststore used by Storm DRPC setting up HTTPS (SSL).
*/
@isBoolean
public static final String UI_HTTPS_WANT_CLIENT_AUTH = "ui.https.want.client.auth";
@@ -796,19 +796,19 @@ public class Config extends HashMap<String, Object> {
public static final String DRPC_HTTPS_KEYSTORE_TYPE = "drpc.https.keystore.type";
/**
- * Password to the private key in the keystore for settting up HTTPS (SSL).
+ * Password to the private key in the keystore for setting up HTTPS (SSL).
*/
@isString
public static final String DRPC_HTTPS_KEY_PASSWORD = "drpc.https.key.password";
/**
- * Path to the truststore used by Storm DRPC settting up HTTPS (SSL).
+ * Path to the truststore used by Storm DRPC setting up HTTPS (SSL).
*/
@isString
public static final String DRPC_HTTPS_TRUSTSTORE_PATH = "drpc.https.truststore.path";
/**
- * Password to the truststore used by Storm DRPC settting up HTTPS (SSL).
+ * Password to the truststore used by Storm DRPC setting up HTTPS (SSL).
*/
@isString
public static final String DRPC_HTTPS_TRUSTSTORE_PASSWORD = "drpc.https.truststore.password";
@@ -821,7 +821,7 @@ public class Config extends HashMap<String, Object> {
public static final String DRPC_HTTPS_TRUSTSTORE_TYPE = "drpc.https.truststore.type";
/**
- * Password to the truststore used by Storm DRPC settting up HTTPS (SSL).
+ * Password to the truststore used by Storm DRPC setting up HTTPS (SSL).
*/
@isBoolean
public static final String DRPC_HTTPS_WANT_CLIENT_AUTH = "drpc.https.want.client.auth";
@@ -850,14 +850,14 @@ public class Config extends HashMap<String, Object> {
/**
* The Access Control List for the DRPC Authorizer.
- * @see DRPCSimpleAclAuthorizer
+ * @see backtype.storm.security.auth.authorizer.DRPCSimpleACLAuthorizer
*/
@isType(type=Map.class)
public static final String DRPC_AUTHORIZER_ACL = "drpc.authorizer.acl";
/**
* File name of the DRPC Authorizer ACL.
- * @see DRPCSimpleAclAuthorizer
+ * @see backtype.storm.security.auth.authorizer.DRPCSimpleACLAuthorizer
*/
@isString
public static final String DRPC_AUTHORIZER_ACL_FILENAME = "drpc.authorizer.acl.filename";
@@ -869,7 +869,7 @@ public class Config extends HashMap<String, Object> {
* permitted, which is appropriate for a development environment. When set
* to true, explicit ACL entries are required for every DRPC function, and
* any request for functions will be denied.
- * @see DRPCSimpleAclAuthorizer
+ * @see backtype.storm.security.auth.authorizer.DRPCSimpleACLAuthorizer
*/
@isBoolean
public static final String DRPC_AUTHORIZER_ACL_STRICT = "drpc.authorizer.acl.strict";
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/LogWriter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/LogWriter.java b/storm-core/src/jvm/backtype/storm/LogWriter.java
index b0857e8..849f5ca 100644
--- a/storm-core/src/jvm/backtype/storm/LogWriter.java
+++ b/storm-core/src/jvm/backtype/storm/LogWriter.java
@@ -41,7 +41,7 @@ public class LogWriter extends Thread {
public void run() {
Logger logger = this.logger;
BufferedReader in = this.in;
- String line = null;
+ String line;
try {
while ((line = in.readLine()) != null) {
logger.info(line);
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/StormSubmitter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/StormSubmitter.java b/storm-core/src/jvm/backtype/storm/StormSubmitter.java
index 27844f2..1d7a1f3 100644
--- a/storm-core/src/jvm/backtype/storm/StormSubmitter.java
+++ b/storm-core/src/jvm/backtype/storm/StormSubmitter.java
@@ -51,10 +51,6 @@ public class StormSubmitter {
private static ILocalCluster localNimbus = null;
- public static void setLocalNimbus(ILocalCluster localNimbusHandler) {
- StormSubmitter.localNimbus = localNimbusHandler;
- }
-
private static String generateZookeeperDigestSecretPayload() {
return Utils.secureRandomLong() + ":" + Utils.secureRandomLong();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/drpc/DRPCInvocationsClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/drpc/DRPCInvocationsClient.java b/storm-core/src/jvm/backtype/storm/drpc/DRPCInvocationsClient.java
index 624db3e..78e8d9b 100644
--- a/storm-core/src/jvm/backtype/storm/drpc/DRPCInvocationsClient.java
+++ b/storm-core/src/jvm/backtype/storm/drpc/DRPCInvocationsClient.java
@@ -31,9 +31,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DRPCInvocationsClient extends ThriftClient implements DistributedRPCInvocations.Iface {
- public static Logger LOG = LoggerFactory.getLogger(DRPCInvocationsClient.class);
- private final AtomicReference<DistributedRPCInvocations.Client> client =
- new AtomicReference<DistributedRPCInvocations.Client>();
+ public static final Logger LOG = LoggerFactory.getLogger(DRPCInvocationsClient.class);
+ private final AtomicReference<DistributedRPCInvocations.Client> client = new AtomicReference<>();
private String host;
private int port;
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/generated/AlreadyAliveException.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/AlreadyAliveException.java b/storm-core/src/jvm/backtype/storm/generated/AlreadyAliveException.java
index fb2eee3..adc71a2 100644
--- a/storm-core/src/jvm/backtype/storm/generated/AlreadyAliveException.java
+++ b/storm-core/src/jvm/backtype/storm/generated/AlreadyAliveException.java
@@ -221,11 +221,10 @@ public class AlreadyAliveException extends TException implements org.apache.thri
@Override
public boolean equals(Object that) {
- if (that == null)
+ if (that == null) {
return false;
- if (that instanceof AlreadyAliveException)
- return this.equals((AlreadyAliveException)that);
- return false;
+ }
+ return (that instanceof AlreadyAliveException) ? this.equals((AlreadyAliveException)that) : false;
}
public boolean equals(AlreadyAliveException that) {
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/grouping/PartialKeyGrouping.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/grouping/PartialKeyGrouping.java b/storm-core/src/jvm/backtype/storm/grouping/PartialKeyGrouping.java
index 43ad5a0..e92156e 100644
--- a/storm-core/src/jvm/backtype/storm/grouping/PartialKeyGrouping.java
+++ b/storm-core/src/jvm/backtype/storm/grouping/PartialKeyGrouping.java
@@ -24,7 +24,6 @@ import java.util.Arrays;
import java.util.List;
import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.grouping.CustomStreamGrouping;
import backtype.storm.task.WorkerTopologyContext;
import backtype.storm.tuple.Fields;
@@ -59,9 +58,9 @@ public class PartialKeyGrouping implements CustomStreamGrouping, Serializable {
@Override
public List<Integer> chooseTasks(int taskId, List<Object> values) {
- List<Integer> boltIds = new ArrayList<Integer>(1);
+ List<Integer> boltIds = new ArrayList<>(1);
if (values.size() > 0) {
- byte[] raw = null;
+ byte[] raw;
if (fields != null) {
List<Object> selectedFields = outFields.select(fields, values);
ByteBuffer out = ByteBuffer.allocate(selectedFields.size() * 4);
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/messaging/ConnectionWithStatus.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/ConnectionWithStatus.java b/storm-core/src/jvm/backtype/storm/messaging/ConnectionWithStatus.java
index 37981ca..b92b0d6 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/ConnectionWithStatus.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/ConnectionWithStatus.java
@@ -39,9 +39,9 @@ public abstract class ConnectionWithStatus implements IConnection {
* data sending or receiving. All data sending request will be dropped.
*/
Closed
- };
+ }
- /**
+ /**
* whether this connection is available to transfer data
*/
public abstract Status status();
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/messaging/IContext.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/IContext.java b/storm-core/src/jvm/backtype/storm/messaging/IContext.java
index 8645a6f..42f59bd 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/IContext.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/IContext.java
@@ -56,4 +56,4 @@ public interface IContext {
* @return client side connection
*/
public IConnection connect(String storm_id, String host, int port);
-};
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/messaging/TransportFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/TransportFactory.java b/storm-core/src/jvm/backtype/storm/messaging/TransportFactory.java
index 656b323..c551e0d 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/TransportFactory.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/TransportFactory.java
@@ -32,7 +32,7 @@ public class TransportFactory {
String transport_plugin_klassName = (String)storm_conf.get(Config.STORM_MESSAGING_TRANSPORT);
LOG.info("Storm peer transport plugin:"+transport_plugin_klassName);
- IContext transport = null;
+ IContext transport;
try {
//create a factory class
Class klass = Class.forName(transport_plugin_klassName);
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index b9151f3..ccdef41 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -84,7 +84,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
/**
* The channel used for all write operations from this client to the remote destination.
*/
- private final AtomicReference<Channel> channelRef = new AtomicReference<Channel>();
+ private final AtomicReference<Channel> channelRef = new AtomicReference<>();
/**
* Total number of connection attempts.
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
index 5d27a16..bc978b6 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
@@ -19,12 +19,8 @@ package backtype.storm.messaging.netty;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.util.HashedWheelTimer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
import java.util.HashMap;
import java.util.Map;
@@ -34,8 +30,6 @@ import backtype.storm.messaging.IContext;
import backtype.storm.utils.Utils;
public class Context implements IContext {
- private static final Logger LOG = LoggerFactory.getLogger(Context.class);
-
@SuppressWarnings("rawtypes")
private Map storm_conf;
private Map<String, IConnection> connections;
@@ -49,7 +43,7 @@ public class Context implements IContext {
@SuppressWarnings("rawtypes")
public void prepare(Map storm_conf) {
this.storm_conf = storm_conf;
- connections = new HashMap<String, IConnection>();
+ connections = new HashMap<>();
//each context will have a single client channel factory
int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java b/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
index fb3efe6..e4507f5 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
@@ -39,9 +39,8 @@ enum ControlMessage {
}
/**
- * Return a control message per an encoded status code
- * @param encoded
- * @return
+ * @param encoded status code
+ * @return a control message per an encoded status code
*/
static ControlMessage mkMessage(short encoded) {
for(ControlMessage cm: ControlMessage.values()) {
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
index ec0dc0f..fea33fd 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
@@ -31,7 +31,7 @@ class MessageBatch {
MessageBatch(int buffer_size) {
this.buffer_size = buffer_size;
- msgs = new ArrayList<TaskMessage>();
+ msgs = new ArrayList<>();
encoded_length = ControlMessage.EOB_MESSAGE.encodeLength();
}
@@ -54,24 +54,21 @@ class MessageBatch {
}
/**
- * Has this batch used up allowed buffer size
- * @return
+ * @return true if this batch used up allowed buffer size
*/
boolean isFull() {
return encoded_length >= buffer_size;
}
/**
- * true if this batch doesn't have any messages
- * @return
+ * @return true if this batch doesn't have any messages
*/
boolean isEmpty() {
return msgs.isEmpty();
}
/**
- * # of msgs in this batch
- * @return
+ * @return number of msgs in this batch
*/
int size() {
return msgs.size();
@@ -83,8 +80,9 @@ class MessageBatch {
ChannelBuffer buffer() throws Exception {
ChannelBufferOutputStream bout = new ChannelBufferOutputStream(ChannelBuffers.directBuffer(encoded_length));
- for (TaskMessage msg : msgs)
+ for (TaskMessage msg : msgs) {
writeTaskMessage(bout, msg);
+ }
//add a END_OF_BATCH indicator
ControlMessage.EOB_MESSAGE.write(bout);
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
index 7d8bf54..06d2cde 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
@@ -43,7 +43,7 @@ public class MessageDecoder extends FrameDecoder {
return null;
}
- List<Object> ret = new ArrayList<Object>();
+ List<Object> ret = new ArrayList<>();
// Use while loop, try to decode as more messages as possible in single call
while (available >= 2) {
@@ -100,7 +100,6 @@ public class MessageDecoder extends FrameDecoder {
}
// case 3: task Message
- short task = code;
// Make sure that we have received at least an integer (length)
if (available < 4) {
@@ -115,7 +114,7 @@ public class MessageDecoder extends FrameDecoder {
available -= 4;
if (length <= 0) {
- ret.add(new TaskMessage(task, null));
+ ret.add(new TaskMessage(code, null));
break;
}
@@ -133,7 +132,7 @@ public class MessageDecoder extends FrameDecoder {
// Successfully decoded a frame.
// Return a TaskMessage object
- ret.add(new TaskMessage(task, payload.array()));
+ ret.add(new TaskMessage(code, payload.array()));
}
if (ret.size() == 0) {
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java
index d0d3ca1..d7a86d1 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java
@@ -20,17 +20,11 @@ package backtype.storm.messaging.netty;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferOutputStream;
import org.jboss.netty.buffer.ChannelBuffers;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Send and receive SASL tokens.
*/
public class SaslMessageToken {
- /** Class logger */
- private static final Logger LOG = LoggerFactory
- .getLogger(SaslMessageToken.class);
-
/** Used for client or server's token to send or receive from each other. */
private byte[] token;
@@ -88,8 +82,8 @@ public class SaslMessageToken {
if (token != null)
payload_len = token.length;
- bout.writeShort((short) identifier);
- bout.writeInt((int) payload_len);
+ bout.writeShort(identifier);
+ bout.writeInt(payload_len);
if (payload_len > 0) {
bout.write(token);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClient.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClient.java
index 023e950..f6ea78f 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClient.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyClient.java
@@ -81,9 +81,7 @@ public class SaslNettyClient {
*/
public byte[] saslResponse(SaslMessageToken saslTokenMessage) {
try {
- byte[] retval = saslClient.evaluateChallenge(saslTokenMessage
- .getSaslToken());
- return retval;
+ return saslClient.evaluateChallenge(saslTokenMessage.getSaslToken());
} catch (SaslException e) {
LOG.error(
"saslResponse: Failed to respond to SASL server's token:",
@@ -104,8 +102,6 @@ public class SaslNettyClient {
/**
* Set private members using topology token.
- *
- * @param topologyToken
*/
public SaslClientCallbackHandler(String topologyToken, byte[] token) {
this.userName = SaslUtils
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyServer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyServer.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyServer.java
index 2cb47d9..f98e5bd 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyServer.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslNettyServer.java
@@ -18,9 +18,6 @@
package backtype.storm.messaging.netty;
import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.HashMap;
-import java.util.Map;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
@@ -33,7 +30,6 @@ import javax.security.sasl.Sasl;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
-import org.apache.commons.codec.binary.Base64;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java
index 2a5ae99..980c0f6 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormClientHandler.java
@@ -69,8 +69,6 @@ public class SaslStormClientHandler extends SimpleChannelUpstreamHandler {
LOG.error("Failed to authenticate with server " + "due to error: ",
e);
}
- return;
-
}
@Override
@@ -131,7 +129,7 @@ public class SaslStormClientHandler extends SimpleChannelUpstreamHandler {
if (!saslNettyClient.isComplete()) {
LOG.warn("Generated a null response, "
+ "but authentication is not complete.");
- throw new Exception("Server reponse is null, but as far as "
+ throw new Exception("Server response is null, but as far as "
+ "we can tell, we are not authenticated yet.");
}
this.client.channelReady();
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java
index 02448e2..5ce90a3 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslStormServerHandler.java
@@ -55,7 +55,7 @@ public class SaslStormServerHandler extends SimpleChannelUpstreamHandler {
Channel channel = ctx.getChannel();
if (msg instanceof ControlMessage
- && ((ControlMessage) e.getMessage()) == ControlMessage.SASL_TOKEN_MESSAGE_REQUEST) {
+ && e.getMessage() == ControlMessage.SASL_TOKEN_MESSAGE_REQUEST) {
// initialize server-side SASL functionality, if we haven't yet
// (in which case we are looking at the first SASL message from the
// client).
@@ -85,7 +85,7 @@ public class SaslStormServerHandler extends SimpleChannelUpstreamHandler {
LOG.debug("processToken: With nettyServer: " + saslNettyServer
+ " and token length: " + token.length);
- SaslMessageToken saslTokenMessageRequest = null;
+ SaslMessageToken saslTokenMessageRequest;
saslTokenMessageRequest = new SaslMessageToken(
saslNettyServer.response(new byte[0]));
// Send response to client.
@@ -102,10 +102,8 @@ public class SaslStormServerHandler extends SimpleChannelUpstreamHandler {
SaslNettyServer saslNettyServer = SaslNettyServerState.getSaslNettyServer
.get(channel);
if (saslNettyServer == null) {
- if (saslNettyServer == null) {
- throw new Exception("saslNettyServer was unexpectedly "
- + "null for channel: " + channel);
- }
+ throw new Exception("saslNettyServer was unexpectedly "
+ + "null for channel: " + channel);
}
SaslMessageToken saslTokenMessageRequest = new SaslMessageToken(
saslNettyServer.response(((SaslMessageToken) msg)
@@ -124,7 +122,6 @@ public class SaslStormServerHandler extends SimpleChannelUpstreamHandler {
+ "authentication is complete.");
ctx.getPipeline().remove(this);
}
- return;
} else {
// Client should not be sending other-than-SASL messages before
// SaslServerHandler has removed itself from the pipeline. Such
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java
index a2d0b26..cde4d9f 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslUtils.java
@@ -17,7 +17,6 @@
*/
package backtype.storm.messaging.netty;
-import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
@@ -33,7 +32,7 @@ class SaslUtils {
public static final String DEFAULT_REALM = "default";
static Map<String, String> getSaslProps() {
- Map<String, String> props = new HashMap<String, String>();
+ Map<String, String> props = new HashMap<>();
props.put(Sasl.POLICY_NOPLAINTEXT, "true");
return props;
}
@@ -62,13 +61,7 @@ class SaslUtils {
}
static String getSecretKey(Map conf) {
- if (conf == null || conf.isEmpty())
- return null;
-
- String secretPayLoad = (String) conf
- .get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD);
-
- return secretPayLoad;
+ return conf == null || conf.isEmpty() ? null : (String) conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
index 7011b42..32c2bd7 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
@@ -56,7 +56,7 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
@SuppressWarnings("rawtypes")
Map storm_conf;
int port;
- private final ConcurrentHashMap<String, AtomicInteger> messagesEnqueued = new ConcurrentHashMap<String, AtomicInteger>();
+ private final ConcurrentHashMap<String, AtomicInteger> messagesEnqueued = new ConcurrentHashMap<>();
private final AtomicInteger messagesDequeued = new AtomicInteger(0);
private final AtomicInteger[] pendingMessages;
@@ -84,12 +84,12 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
queueCount = Utils.getInt(storm_conf.get(Config.WORKER_RECEIVER_THREAD_COUNT), 1);
roundRobinQueueId = 0;
- taskToQueueId = new HashMap<Integer, Integer>();
+ taskToQueueId = new HashMap<>();
message_queue = new LinkedBlockingQueue[queueCount];
pendingMessages = new AtomicInteger[queueCount];
for (int i = 0; i < queueCount; i++) {
- message_queue[i] = new LinkedBlockingQueue<ArrayList<TaskMessage>>();
+ message_queue[i] = new LinkedBlockingQueue<>();
pendingMessages[i] = new AtomicInteger(0);
}
@@ -128,8 +128,7 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
private ArrayList<TaskMessage>[] groupMessages(List<TaskMessage> msgs) {
ArrayList<TaskMessage> messageGroups[] = new ArrayList[queueCount];
- for (int i = 0; i < msgs.size(); i++) {
- TaskMessage message = msgs.get(i);
+ for (TaskMessage message : msgs) {
int task = message.task();
if (task == -1) {
@@ -140,7 +139,7 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
Integer queueId = getMessageQueueId(task);
if (null == messageGroups[queueId]) {
- messageGroups[queueId] = new ArrayList<TaskMessage>();
+ messageGroups[queueId] = new ArrayList<>();
}
messageGroups[queueId].add(message);
}
@@ -158,7 +157,7 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
if (roundRobinQueueId == queueCount) {
roundRobinQueueId = 0;
}
- HashMap<Integer, Integer> newRef = new HashMap<Integer, Integer>(taskToQueueId);
+ HashMap<Integer, Integer> newRef = new HashMap<>(taskToQueueId);
newRef.put(task, queueId);
taskToQueueId = newRef;
}
@@ -214,7 +213,7 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
return closeMessage.iterator();
}
- ArrayList<TaskMessage> ret = null;
+ ArrayList<TaskMessage> ret;
int queueId = receiverId % queueCount;
if ((flags & 0x01) == 0x01) {
//non-blocking
@@ -240,15 +239,14 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
/**
* register a newly created channel
- * @param channel
+ * @param channel newly created channel
*/
protected void addChannel(Channel channel) {
allChannels.add(channel);
}
/**
- * close a channel
- * @param channel
+ * @param channel channel to close
*/
public void closeChannel(Channel channel) {
channel.close().awaitUninterruptibly();
@@ -326,14 +324,14 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
public Object getState() {
LOG.debug("Getting metrics for server on port {}", port);
- HashMap<String, Object> ret = new HashMap<String, Object>();
+ HashMap<String, Object> ret = new HashMap<>();
ret.put("dequeuedMessages", messagesDequeued.getAndSet(0));
- ArrayList<Integer> pending = new ArrayList<Integer>(pendingMessages.length);
+ ArrayList<Integer> pending = new ArrayList<>(pendingMessages.length);
for (AtomicInteger p: pendingMessages) {
pending.add(p.get());
}
ret.put("pending", pending);
- HashMap<String, Integer> enqueued = new HashMap<String, Integer>();
+ HashMap<String, Integer> enqueued = new HashMap<>();
Iterator<Map.Entry<String, AtomicInteger>> it = messagesEnqueued.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, AtomicInteger> ent = it.next();
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/metric/EventLoggerBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/EventLoggerBolt.java b/storm-core/src/jvm/backtype/storm/metric/EventLoggerBolt.java
index 6324237..6d43e65 100644
--- a/storm-core/src/jvm/backtype/storm/metric/EventLoggerBolt.java
+++ b/storm-core/src/jvm/backtype/storm/metric/EventLoggerBolt.java
@@ -20,18 +20,10 @@ package backtype.storm.metric;
import backtype.storm.task.IBolt;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.StandardOpenOption;
import java.util.Map;
import static backtype.storm.metric.IEventLogger.EventInfo;
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/metric/FileBasedEventLogger.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/FileBasedEventLogger.java b/storm-core/src/jvm/backtype/storm/metric/FileBasedEventLogger.java
index 27aa9d9..3003427 100644
--- a/storm-core/src/jvm/backtype/storm/metric/FileBasedEventLogger.java
+++ b/storm-core/src/jvm/backtype/storm/metric/FileBasedEventLogger.java
@@ -35,7 +35,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class FileBasedEventLogger implements IEventLogger {
- private static Logger LOG = LoggerFactory.getLogger(FileBasedEventLogger.class);
+ private static final Logger LOG = LoggerFactory.getLogger(FileBasedEventLogger.class);
private static final int FLUSH_INTERVAL_MILLIS = 1000;
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/metric/HttpForwardingMetricsConsumer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/HttpForwardingMetricsConsumer.java b/storm-core/src/jvm/backtype/storm/metric/HttpForwardingMetricsConsumer.java
index 77fe49c..4f4242a 100644
--- a/storm-core/src/jvm/backtype/storm/metric/HttpForwardingMetricsConsumer.java
+++ b/storm-core/src/jvm/backtype/storm/metric/HttpForwardingMetricsConsumer.java
@@ -21,7 +21,6 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.net.URL;
-import java.net.URLConnection;
import java.net.HttpURLConnection;
import com.esotericsoftware.kryo.io.Output;
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/metric/HttpForwardingMetricsServer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/HttpForwardingMetricsServer.java b/storm-core/src/jvm/backtype/storm/metric/HttpForwardingMetricsServer.java
index 6441a39..7a8f676 100644
--- a/storm-core/src/jvm/backtype/storm/metric/HttpForwardingMetricsServer.java
+++ b/storm-core/src/jvm/backtype/storm/metric/HttpForwardingMetricsServer.java
@@ -18,7 +18,6 @@
package backtype.storm.metric;
import java.io.IOException;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.List;
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/metric/IEventLogger.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/IEventLogger.java b/storm-core/src/jvm/backtype/storm/metric/IEventLogger.java
index 4f0b34e..5fd5a32 100644
--- a/storm-core/src/jvm/backtype/storm/metric/IEventLogger.java
+++ b/storm-core/src/jvm/backtype/storm/metric/IEventLogger.java
@@ -17,9 +17,7 @@
*/
package backtype.storm.metric;
-import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Tuple;
import java.util.Date;
import java.util.Map;
@@ -54,11 +52,7 @@ public interface IEventLogger {
*/
@Override
public String toString() {
- return new StringBuilder(new Date(Long.parseLong(ts)).toString()).append(",")
- .append(component).append(",")
- .append(task).append(",")
- .append(messageId).append(",")
- .append(values).toString();
+ return new Date(Long.parseLong(ts)).toString() + "," + component + "," + task + "," + messageId + "," + values;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/metric/LoggingMetricsConsumer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/LoggingMetricsConsumer.java b/storm-core/src/jvm/backtype/storm/metric/LoggingMetricsConsumer.java
index c1c7c0a..98fb527 100644
--- a/storm-core/src/jvm/backtype/storm/metric/LoggingMetricsConsumer.java
+++ b/storm-core/src/jvm/backtype/storm/metric/LoggingMetricsConsumer.java
@@ -26,7 +26,6 @@ import java.util.Map;
import backtype.storm.metric.api.IMetricsConsumer;
import backtype.storm.task.IErrorReporter;
import backtype.storm.task.TopologyContext;
-import backtype.storm.utils.Utils;
/*
* Listens for all metrics, dumps them to log
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/metric/MetricsConsumerBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/MetricsConsumerBolt.java b/storm-core/src/jvm/backtype/storm/metric/MetricsConsumerBolt.java
index 5a40c27..e3ec069 100644
--- a/storm-core/src/jvm/backtype/storm/metric/MetricsConsumerBolt.java
+++ b/storm-core/src/jvm/backtype/storm/metric/MetricsConsumerBolt.java
@@ -20,7 +20,6 @@ package backtype.storm.metric;
import backtype.storm.Config;
import backtype.storm.metric.api.IMetricsConsumer;
import backtype.storm.task.IBolt;
-import backtype.storm.task.IErrorReporter;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Tuple;
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/metric/SystemBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/SystemBolt.java b/storm-core/src/jvm/backtype/storm/metric/SystemBolt.java
index d0bbe44..5cba7d3 100644
--- a/storm-core/src/jvm/backtype/storm/metric/SystemBolt.java
+++ b/storm-core/src/jvm/backtype/storm/metric/SystemBolt.java
@@ -18,7 +18,6 @@
package backtype.storm.metric;
import backtype.storm.Config;
-import backtype.storm.metric.api.AssignableMetric;
import backtype.storm.metric.api.IMetric;
import backtype.storm.task.IBolt;
import backtype.storm.task.OutputCollector;
@@ -28,12 +27,9 @@ import backtype.storm.utils.Utils;
import clojure.lang.AFn;
import clojure.lang.IFn;
import clojure.lang.RT;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.lang.management.*;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
@@ -41,7 +37,6 @@ import java.util.Map;
// TaskID is always -1, therefore you can only send-unanchored tuples to co-located SystemBolt.
// This bolt was conceived to export worker stats via metrics api.
public class SystemBolt implements IBolt {
- private static Logger LOG = LoggerFactory.getLogger(SystemBolt.class);
private static boolean _prepareWasCalled = false;
private static class MemoryUsageMetric implements IMetric {
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/metric/api/CountMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/api/CountMetric.java b/storm-core/src/jvm/backtype/storm/metric/api/CountMetric.java
index dd048b8..6b317da 100644
--- a/storm-core/src/jvm/backtype/storm/metric/api/CountMetric.java
+++ b/storm-core/src/jvm/backtype/storm/metric/api/CountMetric.java
@@ -17,8 +17,6 @@
*/
package backtype.storm.metric.api;
-import backtype.storm.metric.api.IMetric;
-
public class CountMetric implements IMetric {
long _value = 0;
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/metric/api/MeanReducer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/api/MeanReducer.java b/storm-core/src/jvm/backtype/storm/metric/api/MeanReducer.java
index e25e26d..fde7053 100644
--- a/storm-core/src/jvm/backtype/storm/metric/api/MeanReducer.java
+++ b/storm-core/src/jvm/backtype/storm/metric/api/MeanReducer.java
@@ -17,8 +17,6 @@
*/
package backtype.storm.metric.api;
-import backtype.storm.metric.api.IReducer;
-
class MeanReducerState {
public int count = 0;
public double sum = 0.0;
@@ -47,7 +45,7 @@ public class MeanReducer implements IReducer<MeanReducerState> {
public Object extractResult(MeanReducerState acc) {
if(acc.count > 0) {
- return new Double(acc.sum / (double)acc.count);
+ return acc.sum / (double) acc.count;
} else {
return null;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/metric/api/MultiCountMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/api/MultiCountMetric.java b/storm-core/src/jvm/backtype/storm/metric/api/MultiCountMetric.java
index c420a16..a6077e6 100644
--- a/storm-core/src/jvm/backtype/storm/metric/api/MultiCountMetric.java
+++ b/storm-core/src/jvm/backtype/storm/metric/api/MultiCountMetric.java
@@ -22,7 +22,7 @@ import java.util.HashMap;
import java.util.Map;
public class MultiCountMetric implements IMetric {
- Map<String, CountMetric> _value = new HashMap();
+ Map<String, CountMetric> _value = new HashMap<>();
public MultiCountMetric() {
}
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/metric/api/MultiReducedMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/api/MultiReducedMetric.java b/storm-core/src/jvm/backtype/storm/metric/api/MultiReducedMetric.java
index 530b168..d9d3a02 100644
--- a/storm-core/src/jvm/backtype/storm/metric/api/MultiReducedMetric.java
+++ b/storm-core/src/jvm/backtype/storm/metric/api/MultiReducedMetric.java
@@ -22,7 +22,7 @@ import java.util.HashMap;
import java.util.Map;
public class MultiReducedMetric implements IMetric {
- Map<String, ReducedMetric> _value = new HashMap();
+ Map<String, ReducedMetric> _value = new HashMap<>();
IReducer _reducer;
public MultiReducedMetric(IReducer reducer) {
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/metric/api/rpc/CountShellMetric.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/metric/api/rpc/CountShellMetric.java b/storm-core/src/jvm/backtype/storm/metric/api/rpc/CountShellMetric.java
index def74c2..eae982b 100644
--- a/storm-core/src/jvm/backtype/storm/metric/api/rpc/CountShellMetric.java
+++ b/storm-core/src/jvm/backtype/storm/metric/api/rpc/CountShellMetric.java
@@ -21,8 +21,7 @@ import backtype.storm.metric.api.CountMetric;
public class CountShellMetric extends CountMetric implements IShellMetric {
/***
- * @param
- * params should be null or long
+ * @param value should be null or long
* if value is null, it will call incr()
* if value is long, it will call incrBy((long)params)
* */
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/networktopography/AbstractDNSToSwitchMapping.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/networktopography/AbstractDNSToSwitchMapping.java b/storm-core/src/jvm/backtype/storm/networktopography/AbstractDNSToSwitchMapping.java
index e91e54a..b17ca40 100644
--- a/storm-core/src/jvm/backtype/storm/networktopography/AbstractDNSToSwitchMapping.java
+++ b/storm-core/src/jvm/backtype/storm/networktopography/AbstractDNSToSwitchMapping.java
@@ -75,7 +75,7 @@ public abstract class AbstractDNSToSwitchMapping
builder.append("Mapping: ").append(toString()).append("\n");
if (rack != null) {
builder.append("Map:\n");
- Set<String> switches = new HashSet<String>();
+ Set<String> switches = new HashSet<>();
for (Map.Entry<String, String> entry : rack.entrySet()) {
builder.append(" ")
.append(entry.getKey())
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/networktopography/DefaultRackDNSToSwitchMapping.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/networktopography/DefaultRackDNSToSwitchMapping.java b/storm-core/src/jvm/backtype/storm/networktopography/DefaultRackDNSToSwitchMapping.java
index 8e1e521..18eac60 100644
--- a/storm-core/src/jvm/backtype/storm/networktopography/DefaultRackDNSToSwitchMapping.java
+++ b/storm-core/src/jvm/backtype/storm/networktopography/DefaultRackDNSToSwitchMapping.java
@@ -28,12 +28,12 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public final class DefaultRackDNSToSwitchMapping extends AbstractDNSToSwitchMapping {
- private Map<String, String> mappingCache = new ConcurrentHashMap<String, String>();
+ private Map<String, String> mappingCache = new ConcurrentHashMap<>();
@Override
public Map<String,String> resolve(List<String> names) {
- Map<String, String> m = new HashMap<String, String>();
+ Map<String, String> m = new HashMap<>();
if (names.isEmpty()) {
//name list is empty, return an empty map
return m;
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/nimbus/ILeaderElector.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/nimbus/ILeaderElector.java b/storm-core/src/jvm/backtype/storm/nimbus/ILeaderElector.java
index 1148be2..5b54d46 100644
--- a/storm-core/src/jvm/backtype/storm/nimbus/ILeaderElector.java
+++ b/storm-core/src/jvm/backtype/storm/nimbus/ILeaderElector.java
@@ -17,11 +17,7 @@
*/
package backtype.storm.nimbus;
-import org.apache.curator.framework.CuratorFramework;
-
import java.io.Closeable;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
import java.util.List;
import java.util.Map;
@@ -32,7 +28,7 @@ public interface ILeaderElector extends Closeable {
/**
* Method guaranteed to be called as part of initialization of leader elector instance.
- * @param conf
+ * @param conf configuration
*/
void prepare(Map conf);
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/nimbus/NimbusInfo.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/nimbus/NimbusInfo.java b/storm-core/src/jvm/backtype/storm/nimbus/NimbusInfo.java
index 77c0d7a..539df62 100644
--- a/storm-core/src/jvm/backtype/storm/nimbus/NimbusInfo.java
+++ b/storm-core/src/jvm/backtype/storm/nimbus/NimbusInfo.java
@@ -94,9 +94,7 @@ public class NimbusInfo implements Serializable {
if (isLeader != that.isLeader) return false;
if (port != that.port) return false;
- if (!host.equals(that.host)) return false;
-
- return true;
+ return host.equals(that.host);
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java b/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
index b07576f..b3028e9 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
@@ -265,14 +265,13 @@ public class Cluster {
* @return the number of workers assigned to this topology.
*/
public int getAssignedNumWorkers(TopologyDetails topology) {
- SchedulerAssignment assignment = this.getAssignmentById(topology.getId());
- if (topology == null || assignment == null) {
+ SchedulerAssignment assignment = topology != null ? this.getAssignmentById(topology.getId()) : null;
+ if (assignment == null) {
return 0;
}
Set<WorkerSlot> slots = new HashSet<WorkerSlot>();
slots.addAll(assignment.getExecutorToSlot().values());
-
return slots.size();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/scheduler/SchedulerAssignmentImpl.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/SchedulerAssignmentImpl.java b/storm-core/src/jvm/backtype/storm/scheduler/SchedulerAssignmentImpl.java
index 08af4b7..e06abf8 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/SchedulerAssignmentImpl.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/SchedulerAssignmentImpl.java
@@ -38,7 +38,7 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment {
public SchedulerAssignmentImpl(String topologyId, Map<ExecutorDetails, WorkerSlot> executorToSlots) {
this.topologyId = topologyId;
- this.executorToSlot = new HashMap<ExecutorDetails, WorkerSlot>(0);
+ this.executorToSlot = new HashMap<>(0);
if (executorToSlots != null) {
this.executorToSlot.putAll(executorToSlots);
}
@@ -46,13 +46,11 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment {
@Override
public Set<WorkerSlot> getSlots() {
- return new HashSet(executorToSlot.values());
+ return new HashSet<>(executorToSlot.values());
}
/**
* Assign the slot to executors.
- * @param slot
- * @param executors
*/
public void assign(WorkerSlot slot, Collection<ExecutorDetails> executors) {
for (ExecutorDetails executor : executors) {
@@ -62,10 +60,9 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment {
/**
* Release the slot occupied by this assignment.
- * @param slot
*/
public void unassignBySlot(WorkerSlot slot) {
- List<ExecutorDetails> executors = new ArrayList<ExecutorDetails>();
+ List<ExecutorDetails> executors = new ArrayList<>();
for (ExecutorDetails executor : this.executorToSlot.keySet()) {
WorkerSlot ws = this.executorToSlot.get(executor);
if (ws.equals(slot)) {
@@ -80,9 +77,8 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment {
}
/**
- * Does this slot occupied by this assignment?
* @param slot
- * @return
+ * @return true if slot is occupied by this assignment
*/
public boolean isSlotOccupied(WorkerSlot slot) {
return this.executorToSlot.containsValue(slot);
@@ -101,8 +97,7 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment {
}
/**
- * Return the executors covered by this assignments
- * @return
+ * @return the executors covered by this assignments
*/
public Set<ExecutorDetails> getExecutors() {
return this.executorToSlot.keySet();
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/scheduler/SupervisorDetails.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/SupervisorDetails.java b/storm-core/src/jvm/backtype/storm/scheduler/SupervisorDetails.java
index 65e6e9b..a748e11 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/SupervisorDetails.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/SupervisorDetails.java
@@ -44,7 +44,7 @@ public class SupervisorDetails {
*/
Set<Integer> allPorts;
/**
- * Map containing a manifest of resources for the node the superivsor resides
+ * Map containing a manifest of resources for the node the supervisor resides
*/
private Map<String, Double> _total_resources;
@@ -58,7 +58,7 @@ public class SupervisorDetails {
if(allPorts!=null) {
setAllPorts(allPorts);
} else {
- this.allPorts = new HashSet();
+ this.allPorts = new HashSet<>();
}
this._total_resources = total_resources;
LOG.debug("Creating a new supervisor ({}-{}) with resources: {}", this.host, this.id, total_resources);
@@ -86,7 +86,7 @@ public class SupervisorDetails {
}
private void setAllPorts(Collection<Number> allPorts) {
- this.allPorts = new HashSet<Integer>();
+ this.allPorts = new HashSet<>();
if(allPorts!=null) {
for(Number n: allPorts) {
this.allPorts.add(n.intValue());
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java b/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java
index 443bf3f..0828a73 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java
@@ -29,10 +29,10 @@ public class Topologies {
Map<String, Map<String, Component>> _allComponents;
public Topologies(Map<String, TopologyDetails> topologies) {
- if(topologies==null) topologies = new HashMap();
- this.topologies = new HashMap<String, TopologyDetails>(topologies.size());
+ if(topologies==null) topologies = new HashMap<>();
+ this.topologies = new HashMap<>(topologies.size());
this.topologies.putAll(topologies);
- this.nameToId = new HashMap<String, String>(topologies.size());
+ this.nameToId = new HashMap<>(topologies.size());
for (Map.Entry<String, TopologyDetails> entry : topologies.entrySet()) {
TopologyDetails topology = entry.getValue();
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java b/storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java
index 01fa0df..95aa5c8 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java
@@ -59,7 +59,7 @@ public class TopologyDetails {
public TopologyDetails(String topologyId, Map topologyConf, StormTopology topology, int numWorkers, Map<ExecutorDetails, String> executorToComponents) {
this(topologyId, topologyConf, topology, numWorkers);
- this.executorToComponent = new HashMap<ExecutorDetails, String>(0);
+ this.executorToComponent = new HashMap<>(0);
if (executorToComponents != null) {
this.executorToComponent.putAll(executorToComponents);
}
@@ -88,7 +88,7 @@ public class TopologyDetails {
}
public Map<ExecutorDetails, String> selectExecutorToComponent(Collection<ExecutorDetails> executors) {
- Map<ExecutorDetails, String> ret = new HashMap<ExecutorDetails, String>(executors.size());
+ Map<ExecutorDetails, String> ret = new HashMap<>(executors.size());
for (ExecutorDetails executor : executors) {
String compId = this.executorToComponent.get(executor);
if (compId != null) {
@@ -104,7 +104,7 @@ public class TopologyDetails {
}
private void initResourceList() {
- _resourceList = new HashMap<ExecutorDetails, Map<String, Double>>();
+ _resourceList = new HashMap<>();
// Extract bolt memory info
if (this.topology.get_bolts() != null) {
for (Map.Entry<String, Bolt> bolt : this.topology.get_bolts().entrySet()) {
@@ -136,7 +136,7 @@ public class TopologyDetails {
}
//schedule tasks that are not part of components returned from topology.get_spout or topology.getbolt (AKA sys tasks most specifically __acker tasks)
for(ExecutorDetails exec : this.getExecutors()) {
- if (_resourceList.containsKey(exec) == false) {
+ if (!_resourceList.containsKey(exec)) {
LOG.debug(
"Scheduling {} {} with memory requirement as 'on heap' - {} and 'off heap' - {} and CPU requirement as {}",
this.getExecutorToComponent().get(exec),
@@ -166,7 +166,7 @@ public class TopologyDetails {
* @return a map of components
*/
public Map<String, Component> getComponents() {
- Map<String, Component> all_comp = new HashMap<String, Component>();
+ Map<String, Component> all_comp = new HashMap<>();
StormTopology storm_topo = this.topology;
// spouts
@@ -174,7 +174,7 @@ public class TopologyDetails {
for (Map.Entry<String, SpoutSpec> spoutEntry : storm_topo
.get_spouts().entrySet()) {
if (!Utils.isSystemId(spoutEntry.getKey())) {
- Component newComp = null;
+ Component newComp;
if (all_comp.containsKey(spoutEntry.getKey())) {
newComp = all_comp.get(spoutEntry.getKey());
newComp.execs = componentToExecs(newComp.id);
@@ -209,7 +209,7 @@ public class TopologyDetails {
for (Map.Entry<String, Bolt> boltEntry : storm_topo.get_bolts()
.entrySet()) {
if (!Utils.isSystemId(boltEntry.getKey())) {
- Component newComp = null;
+ Component newComp;
if (all_comp.containsKey(boltEntry.getKey())) {
newComp = all_comp.get(boltEntry.getKey());
newComp.execs = componentToExecs(newComp.id);
@@ -297,7 +297,7 @@ public class TopologyDetails {
* for all tasks in topology topoId.
*/
public Map<ExecutorDetails, Double> getTotalMemoryResourceList() {
- Map<ExecutorDetails, Double> ret = new HashMap<ExecutorDetails, Double>();
+ Map<ExecutorDetails, Double> ret = new HashMap<>();
for (ExecutorDetails exec : _resourceList.keySet()) {
ret.put(exec, getTotalMemReqTask(exec));
}
@@ -306,7 +306,6 @@ public class TopologyDetails {
/**
* Get the total CPU requirement for executor
- * @param exec
* @return Double the total about of cpu requirement for executor
*/
public Double getTotalCpuReqTask(ExecutorDetails exec) {
@@ -386,17 +385,11 @@ public class TopologyDetails {
* @return Boolean whether or not a certain ExecutorDetail is included in the _resourceList.
*/
public boolean hasExecInTopo(ExecutorDetails exec) {
- if (_resourceList != null) { // null is possible if the first constructor of TopologyDetails is used
- return _resourceList.containsKey(exec);
- } else {
- return false;
- }
+ return _resourceList != null && _resourceList.containsKey(exec);
}
/**
* add resource requirements for a executor
- * @param exec
- * @param resourceList
*/
public void addResourcesForExec(ExecutorDetails exec, Map<String, Double> resourceList) {
if (hasExecInTopo(exec)) {
@@ -408,10 +401,9 @@ public class TopologyDetails {
/**
* Add default resource requirements for a executor
- * @param exec
*/
public void addDefaultResforExec(ExecutorDetails exec) {
- Map<String, Double> defaultResourceList = new HashMap<String, Double>();
+ Map<String, Double> defaultResourceList = new HashMap<>();
defaultResourceList.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT,
Utils.getDouble(topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT), null));
defaultResourceList.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB,
@@ -427,7 +419,7 @@ public class TopologyDetails {
}
/**
- * initalizes the scheduler member variable by extracting what scheduler
+ * initializes the scheduler member variable by extracting what scheduler
* this topology is going to use from topologyConf
*/
private void initConfigs() {
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/scheduler/multitenant/DefaultPool.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/multitenant/DefaultPool.java b/storm-core/src/jvm/backtype/storm/scheduler/multitenant/DefaultPool.java
index 3053b5b..2e418e9 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/multitenant/DefaultPool.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/multitenant/DefaultPool.java
@@ -37,8 +37,8 @@ import backtype.storm.scheduler.WorkerSlot;
*/
public class DefaultPool extends NodePool {
private static final Logger LOG = LoggerFactory.getLogger(DefaultPool.class);
- private Set<Node> _nodes = new HashSet<Node>();
- private HashMap<String, TopologyDetails> _tds = new HashMap<String, TopologyDetails>();
+ private Set<Node> _nodes = new HashSet<>();
+ private HashMap<String, TopologyDetails> _tds = new HashMap<>();
@Override
public void addTopology(TopologyDetails td) {
@@ -61,8 +61,8 @@ public class DefaultPool extends NodePool {
@Override
public Collection<Node> takeNodes(int nodesNeeded) {
- HashSet<Node> ret = new HashSet<Node>();
- LinkedList<Node> sortedNodes = new LinkedList<Node>(_nodes);
+ HashSet<Node> ret = new HashSet<>();
+ LinkedList<Node> sortedNodes = new LinkedList<>(_nodes);
Collections.sort(sortedNodes, Node.FREE_NODE_COMPARATOR_DEC);
for (Node n: sortedNodes) {
if (nodesNeeded <= ret.size()) {
@@ -95,7 +95,7 @@ public class DefaultPool extends NodePool {
public NodeAndSlotCounts getNodeAndSlotCountIfSlotsWereTaken(int slotsNeeded) {
int nodesFound = 0;
int slotsFound = 0;
- LinkedList<Node> sortedNodes = new LinkedList<Node>(_nodes);
+ LinkedList<Node> sortedNodes = new LinkedList<>(_nodes);
Collections.sort(sortedNodes, Node.FREE_NODE_COMPARATOR_DEC);
for (Node n: sortedNodes) {
if (slotsNeeded <= 0) {
@@ -113,8 +113,8 @@ public class DefaultPool extends NodePool {
@Override
public Collection<Node> takeNodesBySlots(int slotsNeeded) {
- HashSet<Node> ret = new HashSet<Node>();
- LinkedList<Node> sortedNodes = new LinkedList<Node>(_nodes);
+ HashSet<Node> ret = new HashSet<>();
+ LinkedList<Node> sortedNodes = new LinkedList<>(_nodes);
Collections.sort(sortedNodes, Node.FREE_NODE_COMPARATOR_DEC);
for (Node n: sortedNodes) {
if (slotsNeeded <= 0) {
@@ -148,8 +148,8 @@ public class DefaultPool extends NodePool {
}
int slotsToUse = Math.min(slotsRequested - slotsUsed, slotsFree + slotsAvailable);
int executorsNotRunning = _cluster.getUnassignedExecutors(td).size();
- LOG.debug("Slots... requested {} used {} free {} available {} to be used {}, executors not running {}",
- new Object[] {slotsRequested, slotsUsed, slotsFree, slotsAvailable, slotsToUse, executorsNotRunning});
+ LOG.debug("Slots... requested {} used {} free {} available {} to be used {}, executors not running {}",
+ slotsRequested, slotsUsed, slotsFree, slotsAvailable, slotsToUse, executorsNotRunning);
if (slotsToUse <= 0) {
if (executorsNotRunning > 0) {
_cluster.setStatus(topId,"Not fully scheduled (No free slots in default pool) "+executorsNotRunning+" executors not scheduled");
@@ -180,9 +180,9 @@ public class DefaultPool extends NodePool {
RoundRobinSlotScheduler slotSched =
new RoundRobinSlotScheduler(td, slotsToUse, _cluster);
- LinkedList<Node> nodes = new LinkedList<Node>(_nodes);
+ LinkedList<Node> nodes = new LinkedList<>(_nodes);
while (true) {
- Node n = null;
+ Node n;
do {
if (nodes.isEmpty()) {
throw new IllegalStateException("This should not happen, we" +
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/scheduler/multitenant/FreePool.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/multitenant/FreePool.java b/storm-core/src/jvm/backtype/storm/scheduler/multitenant/FreePool.java
index c625895..456900b 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/multitenant/FreePool.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/multitenant/FreePool.java
@@ -35,7 +35,7 @@ import backtype.storm.scheduler.TopologyDetails;
*/
public class FreePool extends NodePool {
private static final Logger LOG = LoggerFactory.getLogger(FreePool.class);
- private Set<Node> _nodes = new HashSet<Node>();
+ private Set<Node> _nodes = new HashSet<>();
private int _totalSlots = 0;
@Override
@@ -63,7 +63,7 @@ public class FreePool extends NodePool {
@Override
public Collection<Node> takeNodes(int nodesNeeded) {
- HashSet<Node> ret = new HashSet<Node>();
+ HashSet<Node> ret = new HashSet<>();
Iterator<Node> it = _nodes.iterator();
while (it.hasNext() && nodesNeeded > ret.size()) {
Node n = it.next();
@@ -86,7 +86,7 @@ public class FreePool extends NodePool {
@Override
public Collection<Node> takeNodesBySlots(int slotsNeeded) {
- HashSet<Node> ret = new HashSet<Node>();
+ HashSet<Node> ret = new HashSet<>();
Iterator<Node> it = _nodes.iterator();
while (it.hasNext() && slotsNeeded > 0) {
Node n = it.next();
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/scheduler/multitenant/IsolatedPool.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/multitenant/IsolatedPool.java b/storm-core/src/jvm/backtype/storm/scheduler/multitenant/IsolatedPool.java
index dc7eded..25a6f25 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/multitenant/IsolatedPool.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/multitenant/IsolatedPool.java
@@ -41,9 +41,9 @@ import backtype.storm.scheduler.WorkerSlot;
*/
public class IsolatedPool extends NodePool {
private static final Logger LOG = LoggerFactory.getLogger(IsolatedPool.class);
- private Map<String, Set<Node>> _topologyIdToNodes = new HashMap<String, Set<Node>>();
- private HashMap<String, TopologyDetails> _tds = new HashMap<String, TopologyDetails>();
- private HashSet<String> _isolated = new HashSet<String>();
+ private Map<String, Set<Node>> _topologyIdToNodes = new HashMap<>();
+ private HashMap<String, TopologyDetails> _tds = new HashMap<>();
+ private HashSet<String> _isolated = new HashSet<>();
private int _maxNodes;
private int _usedNodes;
@@ -57,7 +57,7 @@ public class IsolatedPool extends NodePool {
String topId = td.getId();
LOG.debug("Adding in Topology {}", topId);
SchedulerAssignment assignment = _cluster.getAssignmentById(topId);
- Set<Node> assignedNodes = new HashSet<Node>();
+ Set<Node> assignedNodes = new HashSet<>();
if (assignment != null) {
for (WorkerSlot ws: assignment.getSlots()) {
Node n = _nodeIdToNode.get(ws.getNodeId());
@@ -96,7 +96,7 @@ public class IsolatedPool extends NodePool {
LOG.debug("Scheduling topology {}",topId);
Set<Node> allNodes = _topologyIdToNodes.get(topId);
Number nodesRequested = (Number) td.getConf().get(Config.TOPOLOGY_ISOLATED_MACHINES);
- int slotsToUse = 0;
+ int slotsToUse;
if (nodesRequested == null) {
slotsToUse = getNodesForNotIsolatedTop(td, allNodes, lesserPools);
} else {
@@ -111,7 +111,7 @@ public class IsolatedPool extends NodePool {
RoundRobinSlotScheduler slotSched =
new RoundRobinSlotScheduler(td, slotsToUse, _cluster);
- LinkedList<Node> sortedNodes = new LinkedList<Node>(allNodes);
+ LinkedList<Node> sortedNodes = new LinkedList<>(allNodes);
Collections.sort(sortedNodes, Node.FREE_NODE_COMPARATOR_DEC);
LOG.debug("Nodes sorted by free space {}", sortedNodes);
@@ -157,9 +157,9 @@ public class IsolatedPool extends NodePool {
int nodesUsed = _topologyIdToNodes.get(topId).size();
int nodesNeeded = nodesRequested - nodesUsed;
LOG.debug("Nodes... requested {} used {} available from us {} " +
- "avail from other {} needed {}", new Object[] {nodesRequested,
- nodesUsed, nodesFromUsAvailable, nodesFromOthersAvailable,
- nodesNeeded});
+ "avail from other {} needed {}", nodesRequested,
+ nodesUsed, nodesFromUsAvailable, nodesFromOthersAvailable,
+ nodesNeeded);
if ((nodesNeeded - nodesFromUsAvailable) > (_maxNodes - _usedNodes)) {
_cluster.setStatus(topId,"Max Nodes("+_maxNodes+") for this user would be exceeded. "
+ ((nodesNeeded - nodesFromUsAvailable) - (_maxNodes - _usedNodes))
@@ -224,8 +224,8 @@ public class IsolatedPool extends NodePool {
slotsAvailable = NodePool.slotsAvailable(lesserPools);
}
int slotsToUse = Math.min(slotsRequested - slotsUsed, slotsFree + slotsAvailable);
- LOG.debug("Slots... requested {} used {} free {} available {} to be used {}",
- new Object[] {slotsRequested, slotsUsed, slotsFree, slotsAvailable, slotsToUse});
+ LOG.debug("Slots... requested {} used {} free {} available {} to be used {}",
+ slotsRequested, slotsUsed, slotsFree, slotsAvailable, slotsToUse);
if (slotsToUse <= 0) {
_cluster.setStatus(topId, "Not Enough Slots Available to Schedule Topology");
return 0;
@@ -233,7 +233,7 @@ public class IsolatedPool extends NodePool {
int slotsNeeded = slotsToUse - slotsFree;
int numNewNodes = NodePool.getNodeCountIfSlotsWereTaken(slotsNeeded, lesserPools);
LOG.debug("Nodes... new {} used {} max {}",
- new Object[]{numNewNodes, _usedNodes, _maxNodes});
+ numNewNodes, _usedNodes, _maxNodes);
if ((numNewNodes + _usedNodes) > _maxNodes) {
_cluster.setStatus(topId,"Max Nodes("+_maxNodes+") for this user would be exceeded. " +
(numNewNodes - (_maxNodes - _usedNodes)) + " more nodes needed to run topology.");
@@ -249,7 +249,7 @@ public class IsolatedPool extends NodePool {
@Override
public Collection<Node> takeNodes(int nodesNeeded) {
LOG.debug("Taking {} from {}", nodesNeeded, this);
- HashSet<Node> ret = new HashSet<Node>();
+ HashSet<Node> ret = new HashSet<>();
for (Entry<String, Set<Node>> entry: _topologyIdToNodes.entrySet()) {
if (!_isolated.contains(entry.getKey())) {
Iterator<Node> it = entry.getValue().iterator();
@@ -293,7 +293,7 @@ public class IsolatedPool extends NodePool {
@Override
public Collection<Node> takeNodesBySlots(int slotsNeeded) {
- HashSet<Node> ret = new HashSet<Node>();
+ HashSet<Node> ret = new HashSet<>();
for (Entry<String, Set<Node>> entry: _topologyIdToNodes.entrySet()) {
if (!_isolated.contains(entry.getKey())) {
Iterator<Node> it = entry.getValue().iterator();
@@ -321,9 +321,7 @@ public class IsolatedPool extends NodePool {
int slotsFound = 0;
for (Entry<String, Set<Node>> entry: _topologyIdToNodes.entrySet()) {
if (!_isolated.contains(entry.getKey())) {
- Iterator<Node> it = entry.getValue().iterator();
- while (it.hasNext()) {
- Node n = it.next();
+ for (Node n : entry.getValue()) {
if (n.isAlive()) {
nodesFound++;
int totalSlotsFree = n.totalSlots();
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/scheduler/multitenant/MultitenantScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/multitenant/MultitenantScheduler.java b/storm-core/src/jvm/backtype/storm/scheduler/multitenant/MultitenantScheduler.java
index 320b388..6b77c63 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/multitenant/MultitenantScheduler.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/multitenant/MultitenantScheduler.java
@@ -44,9 +44,9 @@ public class MultitenantScheduler implements IScheduler {
private Map<String, Number> getUserConf() {
Map<String, Number> ret = (Map<String, Number>)_conf.get(Config.MULTITENANT_SCHEDULER_USER_POOLS);
if (ret == null) {
- ret = new HashMap<String, Number>();
+ ret = new HashMap<>();
} else {
- ret = new HashMap<String, Number>(ret);
+ ret = new HashMap<>(ret);
}
Map fromFile = Utils.findAndReadConfigFile("multitenant-scheduler.yaml", false);
@@ -65,7 +65,7 @@ public class MultitenantScheduler implements IScheduler {
Map<String, Number> userConf = getUserConf();
- Map<String, IsolatedPool> userPools = new HashMap<String, IsolatedPool>();
+ Map<String, IsolatedPool> userPools = new HashMap<>();
for (Map.Entry<String, Number> entry : userConf.entrySet()) {
userPools.put(entry.getKey(), new IsolatedPool(entry.getValue().intValue()));
}
[2/5] storm git commit: STORM-1164. Code cleanup for typos,
warnings and conciseness
Posted by sr...@apache.org.
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/scheduler/multitenant/Node.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/multitenant/Node.java b/storm-core/src/jvm/backtype/storm/scheduler/multitenant/Node.java
index 883c65f..6c2f06b 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/multitenant/Node.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/multitenant/Node.java
@@ -40,8 +40,8 @@ import backtype.storm.scheduler.WorkerSlot;
*/
public class Node {
private static final Logger LOG = LoggerFactory.getLogger(Node.class);
- private Map<String, Set<WorkerSlot>> _topIdToUsedSlots = new HashMap<String,Set<WorkerSlot>>();
- private Set<WorkerSlot> _freeSlots = new HashSet<WorkerSlot>();
+ private Map<String, Set<WorkerSlot>> _topIdToUsedSlots = new HashMap<>();
+ private Set<WorkerSlot> _freeSlots = new HashSet<>();
private final String _nodeId;
private boolean _isAlive;
@@ -143,7 +143,7 @@ public class Node {
}
Set<WorkerSlot> usedSlots = _topIdToUsedSlots.get(topId);
if (usedSlots == null) {
- usedSlots = new HashSet<WorkerSlot>();
+ usedSlots = new HashSet<>();
_topIdToUsedSlots.put(topId, usedSlots);
}
usedSlots.add(ws);
@@ -164,7 +164,7 @@ public class Node {
_freeSlots.addAll(entry.getValue());
}
}
- _topIdToUsedSlots = new HashMap<String,Set<WorkerSlot>>();
+ _topIdToUsedSlots = new HashMap<>();
}
/**
@@ -242,10 +242,7 @@ public class Node {
@Override
public boolean equals(Object other) {
- if (other instanceof Node) {
- return _nodeId.equals(((Node)other)._nodeId);
- }
- return false;
+ return other instanceof Node && _nodeId.equals(((Node) other)._nodeId);
}
@Override
@@ -295,13 +292,13 @@ public class Node {
}
public static Map<String, Node> getAllNodesFrom(Cluster cluster) {
- Map<String, Node> nodeIdToNode = new HashMap<String, Node>();
+ Map<String, Node> nodeIdToNode = new HashMap<>();
for (SupervisorDetails sup : cluster.getSupervisors().values()) {
//Node ID and supervisor ID are the same.
String id = sup.getId();
boolean isAlive = !cluster.isBlackListed(id);
LOG.debug("Found a {} Node {} {}",
- new Object[] {isAlive? "living":"dead", id, sup.getAllPorts()});
+ isAlive? "living":"dead", id, sup.getAllPorts());
nodeIdToNode.put(id, new Node(id, sup.getAllPorts(), isAlive));
}
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/scheduler/multitenant/NodePool.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/multitenant/NodePool.java b/storm-core/src/jvm/backtype/storm/scheduler/multitenant/NodePool.java
index 21d1577..5a46df5 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/multitenant/NodePool.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/multitenant/NodePool.java
@@ -81,7 +81,7 @@ public abstract class NodePool {
Map<ExecutorDetails, String> execToComp = td.getExecutorToComponent();
SchedulerAssignment assignment = _cluster.getAssignmentById(_topId);
- _nodeToComps = new HashMap<String, Set<String>>();
+ _nodeToComps = new HashMap<>();
if (assignment != null) {
Map<ExecutorDetails, WorkerSlot> execToSlot = assignment.getExecutorToSlot();
@@ -90,14 +90,14 @@ public abstract class NodePool {
String nodeId = entry.getValue().getNodeId();
Set<String> comps = _nodeToComps.get(nodeId);
if (comps == null) {
- comps = new HashSet<String>();
+ comps = new HashSet<>();
_nodeToComps.put(nodeId, comps);
}
comps.add(execToComp.get(entry.getKey()));
}
}
- _spreadToSchedule = new HashMap<String, List<ExecutorDetails>>();
+ _spreadToSchedule = new HashMap<>();
List<String> spreadComps = (List<String>)td.getConf().get(Config.TOPOLOGY_SPREAD_COMPONENTS);
if (spreadComps != null) {
for (String comp: spreadComps) {
@@ -105,7 +105,7 @@ public abstract class NodePool {
}
}
- _slots = new LinkedList<Set<ExecutorDetails>>();
+ _slots = new LinkedList<>();
for (int i = 0; i < slotsToUse; i++) {
_slots.add(new HashSet<ExecutorDetails>());
}
@@ -118,7 +118,7 @@ public abstract class NodePool {
_spreadToSchedule.get(entry.getKey()).addAll(entry.getValue());
} else {
for (ExecutorDetails ed: entry.getValue()) {
- LOG.debug("Assigning {} {} to slot {}", new Object[]{entry.getKey(), ed, at});
+ LOG.debug("Assigning {} {} to slot {}", entry.getKey(), ed, at);
_slots.get(at).add(ed);
at++;
if (at >= _slots.size()) {
@@ -151,7 +151,7 @@ public abstract class NodePool {
String nodeId = n.getId();
Set<String> nodeComps = _nodeToComps.get(nodeId);
if (nodeComps == null) {
- nodeComps = new HashSet<String>();
+ nodeComps = new HashSet<>();
_nodeToComps.put(nodeId, nodeComps);
}
for (Entry<String, List<ExecutorDetails>> entry: _spreadToSchedule.entrySet()) {
@@ -251,7 +251,7 @@ public abstract class NodePool {
public static Collection<Node> takeNodesBySlot(int slotsNeeded,NodePool[] pools) {
LOG.debug("Trying to grab {} free slots from {}",slotsNeeded, pools);
- HashSet<Node> ret = new HashSet<Node>();
+ HashSet<Node> ret = new HashSet<>();
for (NodePool pool: pools) {
Collection<Node> got = pool.takeNodesBySlots(slotsNeeded);
ret.addAll(got);
@@ -266,7 +266,7 @@ public abstract class NodePool {
public static Collection<Node> takeNodes(int nodesNeeded,NodePool[] pools) {
LOG.debug("Trying to grab {} free nodes from {}",nodesNeeded, pools);
- HashSet<Node> ret = new HashSet<Node>();
+ HashSet<Node> ret = new HashSet<>();
for (NodePool pool: pools) {
Collection<Node> got = pool.takeNodes(nodesNeeded);
ret.addAll(got);
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/ResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/ResourceAwareStrategy.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/ResourceAwareStrategy.java
index f98d9ed..b8a96a2 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/ResourceAwareStrategy.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/ResourceAwareStrategy.java
@@ -42,7 +42,7 @@ import backtype.storm.scheduler.resource.Component;
import backtype.storm.scheduler.resource.RAS_Node;
public class ResourceAwareStrategy implements IStrategy {
- private Logger LOG = null;
+ private static final Logger LOG = LoggerFactory.getLogger(ResourceAwareStrategy.class);
private Topologies _topologies;
private Cluster _cluster;
//Map key is the supervisor id and the value is the corresponding RAS_Node Object
@@ -63,7 +63,6 @@ public class ResourceAwareStrategy implements IStrategy {
_cluster = cluster;
_nodes = RAS_Node.getAllNodesFrom(cluster, _topologies);
_availNodes = this.getAvailNodes();
- this.LOG = LoggerFactory.getLogger(this.getClass());
_clusterInfo = cluster.getNetworkTopography();
LOG.debug(this.getClusterInfo());
}
@@ -71,7 +70,7 @@ public class ResourceAwareStrategy implements IStrategy {
//the returned TreeMap keeps the Components sorted
private TreeMap<Integer, List<ExecutorDetails>> getPriorityToExecutorDetailsListMap(
Queue<Component> ordered__Component_list, Collection<ExecutorDetails> unassignedExecutors) {
- TreeMap<Integer, List<ExecutorDetails>> retMap = new TreeMap<Integer, List<ExecutorDetails>>();
+ TreeMap<Integer, List<ExecutorDetails>> retMap = new TreeMap<>();
Integer rank = 0;
for (Component ras_comp : ordered__Component_list) {
retMap.put(rank, new ArrayList<ExecutorDetails>());
@@ -91,9 +90,9 @@ public class ResourceAwareStrategy implements IStrategy {
return null;
}
Collection<ExecutorDetails> unassignedExecutors = _cluster.getUnassignedExecutors(td);
- Map<WorkerSlot, Collection<ExecutorDetails>> schedulerAssignmentMap = new HashMap<WorkerSlot, Collection<ExecutorDetails>>();
+ Map<WorkerSlot, Collection<ExecutorDetails>> schedulerAssignmentMap = new HashMap<>();
LOG.debug("ExecutorsNeedScheduling: {}", unassignedExecutors);
- Collection<ExecutorDetails> scheduledTasks = new ArrayList<ExecutorDetails>();
+ Collection<ExecutorDetails> scheduledTasks = new ArrayList<>();
List<Component> spouts = this.getSpouts(_topologies, td);
if (spouts.size() == 0) {
@@ -104,7 +103,7 @@ public class ResourceAwareStrategy implements IStrategy {
Queue<Component> ordered__Component_list = bfs(_topologies, td, spouts);
Map<Integer, List<ExecutorDetails>> priorityToExecutorMap = getPriorityToExecutorDetailsListMap(ordered__Component_list, unassignedExecutors);
- Collection<ExecutorDetails> executorsNotScheduled = new HashSet<ExecutorDetails>(unassignedExecutors);
+ Collection<ExecutorDetails> executorsNotScheduled = new HashSet<>(unassignedExecutors);
Integer longestPriorityListSize = this.getLongestPriorityListSize(priorityToExecutorMap);
//Pick the first executor with priority one, then the 1st exec with priority 2, so on an so forth.
//Once we reach the last priority, we go back to priority 1 and schedule the second task with priority 1.
@@ -145,7 +144,7 @@ public class ResourceAwareStrategy implements IStrategy {
WorkerSlot targetSlot = this.findWorkerForExec(exec, td, schedulerAssignmentMap);
if (targetSlot != null) {
RAS_Node targetNode = this.idToNode(targetSlot.getNodeId());
- if(schedulerAssignmentMap.containsKey(targetSlot) == false) {
+ if(!schedulerAssignmentMap.containsKey(targetSlot)) {
schedulerAssignmentMap.put(targetSlot, new LinkedList<ExecutorDetails>());
}
@@ -175,7 +174,7 @@ public class ResourceAwareStrategy implements IStrategy {
}
private WorkerSlot findWorkerForExec(ExecutorDetails exec, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
- WorkerSlot ws = null;
+ WorkerSlot ws;
// first scheduling
if (this.refNode == null) {
String clus = this.getBestClustering();
@@ -205,7 +204,7 @@ public class ResourceAwareStrategy implements IStrategy {
nodes = this.getAvailableNodes();
}
//First sort nodes by distance
- TreeMap<Double, RAS_Node> nodeRankMap = new TreeMap<Double, RAS_Node>();
+ TreeMap<Double, RAS_Node> nodeRankMap = new TreeMap<>();
for (RAS_Node n : nodes) {
if(n.getFreeSlots().size()>0) {
if (n.getAvailableMemoryResources() >= taskMem
@@ -262,9 +261,9 @@ public class ResourceAwareStrategy implements IStrategy {
}
private Double distToNode(RAS_Node src, RAS_Node dest) {
- if (src.getId().equals(dest.getId()) == true) {
+ if (src.getId().equals(dest.getId())) {
return 0.0;
- }else if (this.NodeToCluster(src) == this.NodeToCluster(dest)) {
+ } else if (this.NodeToCluster(src) == this.NodeToCluster(dest)) {
return 0.5;
} else {
return 1.0;
@@ -283,7 +282,7 @@ public class ResourceAwareStrategy implements IStrategy {
}
private List<RAS_Node> getAvailableNodes() {
- LinkedList<RAS_Node> nodes = new LinkedList<RAS_Node>();
+ LinkedList<RAS_Node> nodes = new LinkedList<>();
for (String clusterId : _clusterInfo.keySet()) {
nodes.addAll(this.getAvailableNodesFromCluster(clusterId));
}
@@ -291,7 +290,7 @@ public class ResourceAwareStrategy implements IStrategy {
}
private List<RAS_Node> getAvailableNodesFromCluster(String clus) {
- List<RAS_Node> retList = new ArrayList<RAS_Node>();
+ List<RAS_Node> retList = new ArrayList<>();
for (String node_id : _clusterInfo.get(clus)) {
retList.add(_availNodes.get(this
.NodeHostnameToId(node_id)));
@@ -301,7 +300,7 @@ public class ResourceAwareStrategy implements IStrategy {
private List<WorkerSlot> getAvailableWorkersFromCluster(String clusterId) {
List<RAS_Node> nodes = this.getAvailableNodesFromCluster(clusterId);
- List<WorkerSlot> workers = new LinkedList<WorkerSlot>();
+ List<WorkerSlot> workers = new LinkedList<>();
for(RAS_Node node : nodes) {
workers.addAll(node.getFreeSlots());
}
@@ -309,7 +308,7 @@ public class ResourceAwareStrategy implements IStrategy {
}
private List<WorkerSlot> getAvailableWorker() {
- List<WorkerSlot> workers = new LinkedList<WorkerSlot>();
+ List<WorkerSlot> workers = new LinkedList<>();
for (String clusterId : _clusterInfo.keySet()) {
workers.addAll(this.getAvailableWorkersFromCluster(clusterId));
}
@@ -333,18 +332,18 @@ public class ResourceAwareStrategy implements IStrategy {
private Queue<Component> bfs(Topologies topologies, TopologyDetails td, List<Component> spouts) {
// Since queue is a interface
Queue<Component> ordered__Component_list = new LinkedList<Component>();
- HashMap<String, Component> visited = new HashMap<String, Component>();
+ HashMap<String, Component> visited = new HashMap<>();
/* start from each spout that is not visited, each does a breadth-first traverse */
for (Component spout : spouts) {
if (!visited.containsKey(spout.id)) {
- Queue<Component> queue = new LinkedList<Component>();
+ Queue<Component> queue = new LinkedList<>();
queue.offer(spout);
while (!queue.isEmpty()) {
Component comp = queue.poll();
visited.put(comp.id, comp);
ordered__Component_list.add(comp);
- List<String> neighbors = new ArrayList<String>();
+ List<String> neighbors = new ArrayList<>();
neighbors.addAll(comp.children);
neighbors.addAll(comp.parents);
for (String nbID : neighbors) {
@@ -360,7 +359,7 @@ public class ResourceAwareStrategy implements IStrategy {
}
private List<Component> getSpouts(Topologies topologies, TopologyDetails td) {
- List<Component> spouts = new ArrayList<Component>();
+ List<Component> spouts = new ArrayList<>();
for (Component c : topologies.getAllComponents().get(td.getId())
.values()) {
if (c.type == Component.ComponentType.SPOUT) {
@@ -413,7 +412,7 @@ public class ResourceAwareStrategy implements IStrategy {
/**
* Checks whether we can schedule an Executor exec on the worker slot ws
- * Only considers memory currenlty. May include CPU in the future
+ * Only considers memory currently. May include CPU in the future
* @param exec
* @param ws
* @param td
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/AuthUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/AuthUtils.java b/storm-core/src/jvm/backtype/storm/security/auth/AuthUtils.java
index ac3fb53..8062b4e 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/AuthUtils.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/AuthUtils.java
@@ -21,21 +21,18 @@ import backtype.storm.Config;
import javax.security.auth.login.Configuration;
import javax.security.auth.login.AppConfigurationEntry;
import javax.security.auth.Subject;
-import java.security.NoSuchAlgorithmException;
import java.security.URIParameter;
import backtype.storm.security.INimbusCredentialPlugin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.Set;
import java.util.HashSet;
import java.util.Map;
-import java.util.concurrent.ExecutorService;
public class AuthUtils {
private static final Logger LOG = LoggerFactory.getLogger(AuthUtils.class);
@@ -72,11 +69,11 @@ public class AuthUtils {
/**
* Construct a principal to local plugin
- * @param conf storm configuration
+ * @param storm_conf storm configuration
* @return the plugin
*/
public static IPrincipalToLocal GetPrincipalToLocalPlugin(Map storm_conf) {
- IPrincipalToLocal ptol = null;
+ IPrincipalToLocal ptol;
try {
String ptol_klassName = (String) storm_conf.get(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN);
Class klass = Class.forName(ptol_klassName);
@@ -90,11 +87,11 @@ public class AuthUtils {
/**
* Construct a group mapping service provider plugin
- * @param conf storm configuration
+ * @param storm_conf storm configuration
* @return the plugin
*/
public static IGroupMappingServiceProvider GetGroupMappingServiceProviderPlugin(Map storm_conf) {
- IGroupMappingServiceProvider gmsp = null;
+ IGroupMappingServiceProvider gmsp;
try {
String gmsp_klassName = (String) storm_conf.get(Config.STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN);
Class klass = Class.forName(gmsp_klassName);
@@ -107,13 +104,13 @@ public class AuthUtils {
}
/**
- * Get all of the configured Credential Renwer Plugins.
- * @param storm_conf the storm configuration to use.
+ * Get all of the configured Credential Renewer Plugins.
+ * @param conf the storm configuration to use.
* @return the configured credential renewers.
*/
public static Collection<ICredentialsRenewer> GetCredentialRenewers(Map conf) {
try {
- Set<ICredentialsRenewer> ret = new HashSet<ICredentialsRenewer>();
+ Set<ICredentialsRenewer> ret = new HashSet<>();
Collection<String> clazzes = (Collection<String>)conf.get(Config.NIMBUS_CREDENTIAL_RENEWERS);
if (clazzes != null) {
for (String clazz : clazzes) {
@@ -135,7 +132,7 @@ public class AuthUtils {
*/
public static Collection<INimbusCredentialPlugin> getNimbusAutoCredPlugins(Map conf) {
try {
- Set<INimbusCredentialPlugin> ret = new HashSet<INimbusCredentialPlugin>();
+ Set<INimbusCredentialPlugin> ret = new HashSet<>();
Collection<String> clazzes = (Collection<String>)conf.get(Config.NIMBUS_AUTO_CRED_PLUGINS);
if (clazzes != null) {
for (String clazz : clazzes) {
@@ -157,7 +154,7 @@ public class AuthUtils {
*/
public static Collection<IAutoCredentials> GetAutoCredentials(Map storm_conf) {
try {
- Set<IAutoCredentials> autos = new HashSet<IAutoCredentials>();
+ Set<IAutoCredentials> autos = new HashSet<>();
Collection<String> clazzes = (Collection<String>)storm_conf.get(Config.TOPOLOGY_AUTO_CREDENTIALS);
if (clazzes != null) {
for (String clazz : clazzes) {
@@ -216,11 +213,9 @@ public class AuthUtils {
/**
* Construct a transport plugin per storm configuration
- * @param conf storm configuration
- * @return
*/
public static ITransportPlugin GetTransportPlugin(ThriftConnectionType type, Map storm_conf, Configuration login_conf) {
- ITransportPlugin transportPlugin = null;
+ ITransportPlugin transportPlugin;
try {
String transport_plugin_klassName = type.getTransportPlugin(storm_conf);
Class klass = Class.forName(transport_plugin_klassName);
@@ -234,7 +229,7 @@ public class AuthUtils {
private static IHttpCredentialsPlugin GetHttpCredentialsPlugin(Map conf,
String klassName) {
- IHttpCredentialsPlugin plugin = null;
+ IHttpCredentialsPlugin plugin;
try {
Class klass = Class.forName(klassName);
plugin = (IHttpCredentialsPlugin)klass.newInstance();
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/DefaultHttpCredentialsPlugin.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/DefaultHttpCredentialsPlugin.java b/storm-core/src/jvm/backtype/storm/security/auth/DefaultHttpCredentialsPlugin.java
index e2469e5..9c81cdf 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/DefaultHttpCredentialsPlugin.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/DefaultHttpCredentialsPlugin.java
@@ -28,8 +28,6 @@ import javax.servlet.http.HttpServletRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import backtype.storm.security.auth.ReqContext;
-
public class DefaultHttpCredentialsPlugin implements IHttpCredentialsPlugin {
private static final Logger LOG =
LoggerFactory.getLogger(DefaultHttpCredentialsPlugin.class);
@@ -50,7 +48,7 @@ public class DefaultHttpCredentialsPlugin implements IHttpCredentialsPlugin {
*/
@Override
public String getUserName(HttpServletRequest req) {
- Principal princ = null;
+ Principal princ;
if (req != null && (princ = req.getUserPrincipal()) != null) {
String userName = princ.getName();
if (userName != null && !userName.isEmpty()) {
@@ -83,7 +81,7 @@ public class DefaultHttpCredentialsPlugin implements IHttpCredentialsPlugin {
userName = doAsUser;
}
- Set<Principal> principals = new HashSet<Principal>();
+ Set<Principal> principals = new HashSet<>();
if(userName != null) {
Principal p = new SingleUserPrincipal(userName);
principals.add(p);
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/DefaultPrincipalToLocal.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/DefaultPrincipalToLocal.java b/storm-core/src/jvm/backtype/storm/security/auth/DefaultPrincipalToLocal.java
index 729d744..9f95101 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/DefaultPrincipalToLocal.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/DefaultPrincipalToLocal.java
@@ -28,7 +28,6 @@ import java.security.Principal;
public class DefaultPrincipalToLocal implements IPrincipalToLocal {
/**
* Invoked once immediately after construction
- * @param conf Storm configuration
*/
public void prepare(Map storm_conf) {}
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/IAuthorizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/IAuthorizer.java b/storm-core/src/jvm/backtype/storm/security/auth/IAuthorizer.java
index d592bb7..ff1e2ba 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/IAuthorizer.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/IAuthorizer.java
@@ -32,7 +32,7 @@ import java.util.Map;
public interface IAuthorizer {
/**
* Invoked once immediately after construction
- * @param conf Storm configuration
+ * @param storm_conf Storm configuration
*/
void prepare(Map storm_conf);
@@ -40,7 +40,7 @@ public interface IAuthorizer {
* permit() method is invoked for each incoming Thrift request.
* @param context request context includes info about
* @param operation operation name
- * @param topology_storm configuration of targeted topology
+ * @param topology_conf configuration of targeted topology
* @return true if the request is authorized, false if reject
*/
public boolean permit(ReqContext context, String operation, Map topology_conf);
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/ICredentialsRenewer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/ICredentialsRenewer.java b/storm-core/src/jvm/backtype/storm/security/auth/ICredentialsRenewer.java
index 3eaf6c4..9a6f02e 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/ICredentialsRenewer.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/ICredentialsRenewer.java
@@ -18,11 +18,10 @@
package backtype.storm.security.auth;
-import java.util.Collection;
import java.util.Map;
/**
- * Provides a way to renew credentials on behelf of a user.
+ * Provides a way to renew credentials on behalf of a user.
*/
public interface ICredentialsRenewer {
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/IHttpCredentialsPlugin.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/IHttpCredentialsPlugin.java b/storm-core/src/jvm/backtype/storm/security/auth/IHttpCredentialsPlugin.java
index a012ce4..0b57eca 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/IHttpCredentialsPlugin.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/IHttpCredentialsPlugin.java
@@ -21,8 +21,6 @@ package backtype.storm.security.auth;
import java.util.Map;
import javax.servlet.http.HttpServletRequest;
-import backtype.storm.security.auth.ReqContext;
-
/**
* Interface for handling credentials in an HttpServletRequest
*/
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/IPrincipalToLocal.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/IPrincipalToLocal.java b/storm-core/src/jvm/backtype/storm/security/auth/IPrincipalToLocal.java
index fca3d37..e938d39 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/IPrincipalToLocal.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/IPrincipalToLocal.java
@@ -28,7 +28,7 @@ import java.security.Principal;
public interface IPrincipalToLocal {
/**
* Invoked once immediately after construction
- * @param conf Storm configuration
+ * @param storm_conf Storm configuration
*/
void prepare(Map storm_conf);
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/ITransportPlugin.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/ITransportPlugin.java b/storm-core/src/jvm/backtype/storm/security/auth/ITransportPlugin.java
index 5ba2557..ba09fad 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/ITransportPlugin.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/ITransportPlugin.java
@@ -18,9 +18,7 @@
package backtype.storm.security.auth;
import java.io.IOException;
-import java.security.Principal;
import java.util.Map;
-import java.util.concurrent.ExecutorService;
import javax.security.auth.login.Configuration;
@@ -29,8 +27,6 @@ import org.apache.thrift.server.TServer;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
-import backtype.storm.security.auth.ThriftConnectionType;
-
/**
* Interface for Thrift Transport plugin
*/
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/KerberosPrincipalToLocal.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/KerberosPrincipalToLocal.java b/storm-core/src/jvm/backtype/storm/security/auth/KerberosPrincipalToLocal.java
index 35c7788..1f67c14 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/KerberosPrincipalToLocal.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/KerberosPrincipalToLocal.java
@@ -28,7 +28,7 @@ public class KerberosPrincipalToLocal implements IPrincipalToLocal {
/**
* Invoked once immediately after construction
- * @param conf Storm configuration
+ * @param storm_conf Storm configuration
*/
public void prepare(Map storm_conf) {}
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/ReqContext.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/ReqContext.java b/storm-core/src/jvm/backtype/storm/security/auth/ReqContext.java
index 2f18982..31aeef9 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/ReqContext.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/ReqContext.java
@@ -22,8 +22,6 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.net.InetAddress;
import com.google.common.annotations.VisibleForTesting;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.security.AccessControlContext;
import java.security.AccessController;
@@ -44,12 +42,8 @@ public class ReqContext {
private Map _storm_conf;
private Principal realPrincipal;
- private static final Logger LOG = LoggerFactory.getLogger(ReqContext.class);
-
-
/**
- * Get a request context associated with current thread
- * @return
+ * @return a request context associated with current thread
*/
public static ReqContext context() {
return ctxt.get();
@@ -132,8 +126,7 @@ public class ReqContext {
}
/**
- * Returns true if this request is an impersonation request.
- * @return
+ * @return true if this request is an impersonation request.
*/
public boolean isImpersonating() {
return this.realPrincipal != null;
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/SaslTransportPlugin.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/SaslTransportPlugin.java b/storm-core/src/jvm/backtype/storm/security/auth/SaslTransportPlugin.java
index 7013cd4..92004fa 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/SaslTransportPlugin.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/SaslTransportPlugin.java
@@ -45,10 +45,6 @@ import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.TTransportFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.security.auth.ThriftConnectionType;
/**
* Base class for SASL authentication plugin.
@@ -57,7 +53,6 @@ public abstract class SaslTransportPlugin implements ITransportPlugin {
protected ThriftConnectionType type;
protected Map storm_conf;
protected Configuration login_conf;
- private static final Logger LOG = LoggerFactory.getLogger(SaslTransportPlugin.class);
@Override
public void prepare(ThriftConnectionType type, Map storm_conf, Configuration login_conf) {
@@ -95,7 +90,7 @@ public abstract class SaslTransportPlugin implements ITransportPlugin {
/**
* All subclass must implement this method
- * @return
+ * @return server transport factory
* @throws IOException
*/
protected abstract TTransportFactory getServerTransportFactory() throws IOException;
@@ -162,11 +157,8 @@ public abstract class SaslTransportPlugin implements ITransportPlugin {
public boolean equals(Object o) {
if (this == o) {
return true;
- } else if (o == null || getClass() != o.getClass()) {
- return false;
- } else {
- return (name.equals(((User) o).name));
}
+ return !(o == null || getClass() != o.getClass()) && (name.equals(((User) o).name));
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/SimpleTransportPlugin.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/SimpleTransportPlugin.java b/storm-core/src/jvm/backtype/storm/security/auth/SimpleTransportPlugin.java
index 2abcdae..7a0a6f2 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/SimpleTransportPlugin.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/SimpleTransportPlugin.java
@@ -45,8 +45,6 @@ import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import backtype.storm.security.auth.ThriftConnectionType;
-
/**
* Simple transport for Thrift plugin.
*
@@ -146,12 +144,12 @@ public class SimpleTransportPlugin implements ITransportPlugin {
if (s == null) {
final String user = (String)storm_conf.get("debug.simple.transport.user");
if (user != null) {
- HashSet<Principal> principals = new HashSet<Principal>();
+ HashSet<Principal> principals = new HashSet<>();
principals.add(new Principal() {
public String getName() { return user; }
public String toString() { return user; }
});
- s = new Subject(true, principals, new HashSet<Object>(), new HashSet<Object>());
+ s = new Subject(true, principals, new HashSet<>(), new HashSet<>());
}
}
req_context.setSubject(s);
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/SingleUserPrincipal.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/SingleUserPrincipal.java b/storm-core/src/jvm/backtype/storm/security/auth/SingleUserPrincipal.java
index 6af17fa..0cadba6 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/SingleUserPrincipal.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/SingleUserPrincipal.java
@@ -33,10 +33,7 @@ public class SingleUserPrincipal implements Principal {
@Override
public boolean equals(Object another) {
- if (another instanceof SingleUserPrincipal) {
- return _userName.equals(((SingleUserPrincipal)another)._userName);
- }
- return false;
+ return another instanceof SingleUserPrincipal && _userName.equals(((SingleUserPrincipal) another)._userName);
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/TBackoffConnect.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/TBackoffConnect.java b/storm-core/src/jvm/backtype/storm/security/auth/TBackoffConnect.java
index f547868..9729671 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/TBackoffConnect.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/TBackoffConnect.java
@@ -19,7 +19,6 @@
package backtype.storm.security.auth;
import java.io.IOException;
-import java.util.Random;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/ThriftClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/ThriftClient.java b/storm-core/src/jvm/backtype/storm/security/auth/ThriftClient.java
index 9f77ab9..8b3d4c5 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/ThriftClient.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/ThriftClient.java
@@ -24,14 +24,10 @@ import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import backtype.storm.utils.Utils;
import backtype.storm.Config;
-import backtype.storm.security.auth.TBackoffConnect;
-public class ThriftClient {
- private static final Logger LOG = LoggerFactory.getLogger(ThriftClient.class);
+public class ThriftClient {
private TTransport _transport;
protected TProtocol _protocol;
private String _host;
@@ -90,8 +86,6 @@ public class ThriftClient {
//construct a transport plugin
ITransportPlugin transportPlugin = AuthUtils.GetTransportPlugin(_type, _conf, login_conf);
- final TTransport underlyingTransport = socket;
-
//TODO get this from type instead of hardcoding to Nimbus.
//establish client-server transport via plugin
//do retries if the connect fails
@@ -100,7 +94,7 @@ public class ThriftClient {
Utils.getInt(_conf.get(Config.STORM_NIMBUS_RETRY_TIMES)),
Utils.getInt(_conf.get(Config.STORM_NIMBUS_RETRY_INTERVAL)),
Utils.getInt(_conf.get(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING)));
- _transport = connectionRetry.doConnectWithRetry(transportPlugin, underlyingTransport, _host, _asUser);
+ _transport = connectionRetry.doConnectWithRetry(transportPlugin, socket, _host, _asUser);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/ThriftServer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/ThriftServer.java b/storm-core/src/jvm/backtype/storm/security/auth/ThriftServer.java
index 64243ce..fdbdc7c 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/ThriftServer.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/ThriftServer.java
@@ -53,12 +53,10 @@ public class ThriftServer {
}
/**
- * Is ThriftServer listening to requests?
- * @return
+ * @return true if ThriftServer is listening to requests?
*/
public boolean isServing() {
- if (_server == null) return false;
- return _server.isServing();
+ return _server != null && _server.isServing();
}
public void serve() {
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java b/storm-core/src/jvm/backtype/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java
index 5e83b9f..4a9b379 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java
@@ -28,7 +28,6 @@ import java.util.Set;
import backtype.storm.Config;
import backtype.storm.security.auth.ReqContext;
-import backtype.storm.security.auth.authorizer.DRPCAuthorizerBase;
import backtype.storm.security.auth.AuthUtils;
import backtype.storm.security.auth.IPrincipalToLocal;
import backtype.storm.utils.Utils;
@@ -54,7 +53,7 @@ public class DRPCSimpleACLAuthorizer extends DRPCAuthorizerBase {
public AclFunctionEntry(Collection<String> clientUsers,
String invocationUser) {
this.clientUsers = (clientUsers != null) ?
- new HashSet<String>(clientUsers) : new HashSet<String>();
+ new HashSet<>(clientUsers) : new HashSet<String>();
this.invocationUser = invocationUser;
}
}
@@ -68,7 +67,7 @@ public class DRPCSimpleACLAuthorizer extends DRPCAuthorizerBase {
//change is atomic
long now = System.currentTimeMillis();
if ((now - 5000) > _lastUpdate || _acl == null) {
- Map<String,AclFunctionEntry> acl = new HashMap<String,AclFunctionEntry>();
+ Map<String,AclFunctionEntry> acl = new HashMap<>();
Map conf = Utils.findAndReadConfigFile(_aclFileName);
if (conf.containsKey(Config.DRPC_AUTHORIZER_ACL)) {
Map<String,Map<String,?>> confAcl =
@@ -88,7 +87,7 @@ public class DRPCSimpleACLAuthorizer extends DRPCAuthorizerBase {
}
} else if (!_permitWhenMissingFunctionEntry) {
LOG.warn("Requiring explicit ACL entries, but none given. " +
- "Therefore, all operiations will be denied.");
+ "Therefore, all operations will be denied.");
}
_acl = acl;
_lastUpdate = System.currentTimeMillis();
@@ -100,8 +99,8 @@ public class DRPCSimpleACLAuthorizer extends DRPCAuthorizerBase {
public void prepare(Map conf) {
Boolean isStrict =
(Boolean) conf.get(Config.DRPC_AUTHORIZER_ACL_STRICT);
- _permitWhenMissingFunctionEntry =
- (isStrict != null && !isStrict) ? true : false;
+ _permitWhenMissingFunctionEntry =
+ (isStrict != null && !isStrict);
_aclFileName = (String) conf.get(Config.DRPC_AUTHORIZER_ACL_FILENAME);
_ptol = AuthUtils.GetPrincipalToLocalPlugin(conf);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/authorizer/DenyAuthorizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/authorizer/DenyAuthorizer.java b/storm-core/src/jvm/backtype/storm/security/auth/authorizer/DenyAuthorizer.java
index 8d61492..d1d6f87 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/authorizer/DenyAuthorizer.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/authorizer/DenyAuthorizer.java
@@ -19,19 +19,14 @@ package backtype.storm.security.auth.authorizer;
import java.util.Map;
-import backtype.storm.Config;
import backtype.storm.security.auth.IAuthorizer;
import backtype.storm.security.auth.ReqContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
/**
* An authorization implementation that denies everything, for testing purposes
*/
public class DenyAuthorizer implements IAuthorizer {
- private static final Logger LOG = LoggerFactory.getLogger(DenyAuthorizer.class);
-
+
/**
* Invoked once immediately after construction
* @param conf Storm configuration
@@ -41,9 +36,9 @@ public class DenyAuthorizer implements IAuthorizer {
/**
* permit() method is invoked for each incoming Thrift request
- * @param contrext request context
+ * @param context request context
* @param operation operation name
- * @param topology_storm configuration of targeted topology
+ * @param topology_conf configuration of targeted topology
* @return true if the request is authorized, false if reject
*/
public boolean permit(ReqContext context, String operation, Map topology_conf) {
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/authorizer/ImpersonationAuthorizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/authorizer/ImpersonationAuthorizer.java b/storm-core/src/jvm/backtype/storm/security/auth/authorizer/ImpersonationAuthorizer.java
index 0cfd488..07e6447 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/authorizer/ImpersonationAuthorizer.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/authorizer/ImpersonationAuthorizer.java
@@ -38,7 +38,7 @@ public class ImpersonationAuthorizer implements IAuthorizer {
@Override
public void prepare(Map conf) {
- userImpersonationACL = new HashMap<String, ImpersonationACL>();
+ userImpersonationACL = new HashMap<>();
Map<String, Map<String, List<String>>> userToHostAndGroup = (Map<String, Map<String, List<String>>>) conf.get(Config.NIMBUS_IMPERSONATION_ACL);
@@ -67,7 +67,7 @@ public class ImpersonationAuthorizer implements IAuthorizer {
String userBeingImpersonated = _ptol.toLocal(context.principal());
InetAddress remoteAddress = context.remoteAddress();
- LOG.info("user = {}, principal = {} is attmepting to impersonate user = {} for operation = {} from host = {}",
+ LOG.info("user = {}, principal = {} is attempting to impersonate user = {} for operation = {} from host = {}",
impersonatingUser, impersonatingPrincipal, userBeingImpersonated, operation, remoteAddress);
/**
@@ -83,8 +83,8 @@ public class ImpersonationAuthorizer implements IAuthorizer {
ImpersonationACL principalACL = userImpersonationACL.get(impersonatingPrincipal);
ImpersonationACL userACL = userImpersonationACL.get(impersonatingUser);
- Set<String> authorizedHosts = new HashSet<String>();
- Set<String> authorizedGroups = new HashSet<String>();
+ Set<String> authorizedHosts = new HashSet<>();
+ Set<String> authorizedGroups = new HashSet<>();
if (principalACL != null) {
authorizedHosts.addAll(principalACL.authorizedHosts);
@@ -127,7 +127,7 @@ public class ImpersonationAuthorizer implements IAuthorizer {
return true;
}
- Set<String> groups = null;
+ Set<String> groups;
try {
groups = _groupMappingProvider.getGroups(userBeingImpersonated);
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/authorizer/NoopAuthorizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/authorizer/NoopAuthorizer.java b/storm-core/src/jvm/backtype/storm/security/auth/authorizer/NoopAuthorizer.java
index c8008f1..ab5bd4b 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/authorizer/NoopAuthorizer.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/authorizer/NoopAuthorizer.java
@@ -19,18 +19,13 @@ package backtype.storm.security.auth.authorizer;
import java.util.Map;
-import backtype.storm.Config;
import backtype.storm.security.auth.IAuthorizer;
import backtype.storm.security.auth.ReqContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
/**
* A no-op authorization implementation that illustrate info available for authorization decisions.
*/
public class NoopAuthorizer implements IAuthorizer {
- private static final Logger LOG = LoggerFactory.getLogger(NoopAuthorizer.class);
/**
* Invoked once immediately after construction
@@ -43,7 +38,7 @@ public class NoopAuthorizer implements IAuthorizer {
* permit() method is invoked for each incoming Thrift request
* @param context request context includes info about
* @param operation operation name
- * @param topology_storm configuration of targeted topology
+ * @param topology_conf configuration of targeted topology
* @return true if the request is authorized, false if reject
*/
public boolean permit(ReqContext context, String operation, Map topology_conf) {
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/authorizer/SimpleACLAuthorizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/authorizer/SimpleACLAuthorizer.java b/storm-core/src/jvm/backtype/storm/security/auth/authorizer/SimpleACLAuthorizer.java
index a2549a5..0063f92 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/authorizer/SimpleACLAuthorizer.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/authorizer/SimpleACLAuthorizer.java
@@ -42,9 +42,9 @@ import org.slf4j.LoggerFactory;
public class SimpleACLAuthorizer implements IAuthorizer {
private static final Logger LOG = LoggerFactory.getLogger(SimpleACLAuthorizer.class);
- protected Set<String> _userCommands = new HashSet<String>(Arrays.asList("submitTopology", "fileUpload", "getNimbusConf", "getClusterInfo"));
- protected Set<String> _supervisorCommands = new HashSet<String>(Arrays.asList("fileDownload"));
- protected Set<String> _topoCommands = new HashSet<String>(Arrays.asList(
+ protected Set<String> _userCommands = new HashSet<>(Arrays.asList("submitTopology", "fileUpload", "getNimbusConf", "getClusterInfo"));
+ protected Set<String> _supervisorCommands = new HashSet<>(Arrays.asList("fileDownload"));
+ protected Set<String> _topoCommands = new HashSet<>(Arrays.asList(
"killTopology",
"rebalance",
"activate",
@@ -79,10 +79,10 @@ public class SimpleACLAuthorizer implements IAuthorizer {
*/
@Override
public void prepare(Map conf) {
- _admins = new HashSet<String>();
- _supervisors = new HashSet<String>();
- _nimbusUsers = new HashSet<String>();
- _nimbusGroups = new HashSet<String>();
+ _admins = new HashSet<>();
+ _supervisors = new HashSet<>();
+ _nimbusUsers = new HashSet<>();
+ _nimbusGroups = new HashSet<>();
if (conf.containsKey(Config.NIMBUS_ADMINS)) {
_admins.addAll((Collection<String>)conf.get(Config.NIMBUS_ADMINS));
@@ -113,7 +113,7 @@ public class SimpleACLAuthorizer implements IAuthorizer {
public boolean permit(ReqContext context, String operation, Map topology_conf) {
String principal = context.principal().getName();
String user = _ptol.toLocal(context.principal());
- Set<String> userGroups = new HashSet<String>();
+ Set<String> userGroups = new HashSet<>();
if (_groupMappingProvider != null) {
try {
@@ -145,7 +145,7 @@ public class SimpleACLAuthorizer implements IAuthorizer {
return true;
}
- Set<String> topoGroups = new HashSet<String>();
+ Set<String> topoGroups = new HashSet<>();
if (topology_conf.containsKey(Config.TOPOLOGY_GROUPS) && topology_conf.get(Config.TOPOLOGY_GROUPS) != null) {
topoGroups.addAll((Collection<String>)topology_conf.get(Config.TOPOLOGY_GROUPS));
}
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/authorizer/SimpleWhitelistAuthorizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/authorizer/SimpleWhitelistAuthorizer.java b/storm-core/src/jvm/backtype/storm/security/auth/authorizer/SimpleWhitelistAuthorizer.java
index 21ee9a6..5731f06 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/authorizer/SimpleWhitelistAuthorizer.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/authorizer/SimpleWhitelistAuthorizer.java
@@ -26,15 +26,11 @@ import java.util.Collection;
import backtype.storm.security.auth.IAuthorizer;
import backtype.storm.security.auth.ReqContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
/**
* An authorization implementation that simply checks a whitelist of users that
* are allowed to use the cluster.
*/
public class SimpleWhitelistAuthorizer implements IAuthorizer {
- private static final Logger LOG = LoggerFactory.getLogger(SimpleWhitelistAuthorizer.class);
public static final String WHITELIST_USERS_CONF = "storm.auth.simple-white-list.users";
protected Set<String> users;
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/digest/ClientCallbackHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/digest/ClientCallbackHandler.java b/storm-core/src/jvm/backtype/storm/security/auth/digest/ClientCallbackHandler.java
index 3caacaa..420326c 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/digest/ClientCallbackHandler.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/digest/ClientCallbackHandler.java
@@ -46,8 +46,6 @@ public class ClientCallbackHandler implements CallbackHandler {
* Constructor based on a JAAS configuration
*
* For digest, you should have a pair of user name and password defined.
- *
- * @param configuration
* @throws IOException
*/
public ClientCallbackHandler(Configuration configuration) throws IOException {
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/digest/DigestSaslTransportPlugin.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/digest/DigestSaslTransportPlugin.java b/storm-core/src/jvm/backtype/storm/security/auth/digest/DigestSaslTransportPlugin.java
index ad642d8..09d6f78 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/digest/DigestSaslTransportPlugin.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/digest/DigestSaslTransportPlugin.java
@@ -18,10 +18,8 @@
package backtype.storm.security.auth.digest;
import java.io.IOException;
-import java.util.Map;
import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.login.Configuration;
import org.apache.thrift.transport.TSaslClientTransport;
import org.apache.thrift.transport.TSaslServerTransport;
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/security/auth/digest/ServerCallbackHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/digest/ServerCallbackHandler.java b/storm-core/src/jvm/backtype/storm/security/auth/digest/ServerCallbackHandler.java
index 1788dab..e80072c 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/digest/ServerCallbackHandler.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/digest/ServerCallbackHandler.java
@@ -26,7 +26,6 @@ import backtype.storm.security.auth.SaslTransportPlugin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.security.auth.Subject;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.NameCallback;
@@ -40,7 +39,7 @@ import javax.security.sasl.RealmCallback;
import backtype.storm.security.auth.AuthUtils;
/**
- * SASL server side collback handler
+ * SASL server side callback handler
*/
public class ServerCallbackHandler implements CallbackHandler {
private static final String USER_PREFIX = "user_";
@@ -48,7 +47,7 @@ public class ServerCallbackHandler implements CallbackHandler {
private static final String SYSPROP_SUPER_PASSWORD = "storm.SASLAuthenticationProvider.superPassword";
private String userName;
- private final Map<String,String> credentials = new HashMap<String,String>();
+ private final Map<String,String> credentials = new HashMap<>();
public ServerCallbackHandler(Configuration configuration) throws IOException {
if (configuration==null) return;
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/serialization/GzipThriftSerializationDelegate.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/serialization/GzipThriftSerializationDelegate.java b/storm-core/src/jvm/backtype/storm/serialization/GzipThriftSerializationDelegate.java
index 933a125..54193da 100644
--- a/storm-core/src/jvm/backtype/storm/serialization/GzipThriftSerializationDelegate.java
+++ b/storm-core/src/jvm/backtype/storm/serialization/GzipThriftSerializationDelegate.java
@@ -17,7 +17,6 @@
*/
package backtype.storm.serialization;
-import java.io.IOException;
import java.util.Map;
import backtype.storm.utils.Utils;
import org.apache.thrift.TBase;
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/serialization/ITupleDeserializer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/serialization/ITupleDeserializer.java b/storm-core/src/jvm/backtype/storm/serialization/ITupleDeserializer.java
index 4e68658..09cc422 100644
--- a/storm-core/src/jvm/backtype/storm/serialization/ITupleDeserializer.java
+++ b/storm-core/src/jvm/backtype/storm/serialization/ITupleDeserializer.java
@@ -18,7 +18,6 @@
package backtype.storm.serialization;
import backtype.storm.tuple.Tuple;
-import java.io.IOException;
public interface ITupleDeserializer {
Tuple deserialize(byte[] ser);
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/serialization/KryoTupleDeserializer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/serialization/KryoTupleDeserializer.java b/storm-core/src/jvm/backtype/storm/serialization/KryoTupleDeserializer.java
index 5a5e3a4..07b9cd5 100644
--- a/storm-core/src/jvm/backtype/storm/serialization/KryoTupleDeserializer.java
+++ b/storm-core/src/jvm/backtype/storm/serialization/KryoTupleDeserializer.java
@@ -21,10 +21,7 @@ import backtype.storm.task.GeneralTopologyContext;
import backtype.storm.tuple.MessageId;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.TupleImpl;
-import backtype.storm.utils.WritableUtils;
import com.esotericsoftware.kryo.io.Input;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
import java.io.IOException;
import java.util.List;
import java.util.Map;
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/serialization/KryoValuesDeserializer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/serialization/KryoValuesDeserializer.java b/storm-core/src/jvm/backtype/storm/serialization/KryoValuesDeserializer.java
index 209ae53..418068e 100644
--- a/storm-core/src/jvm/backtype/storm/serialization/KryoValuesDeserializer.java
+++ b/storm-core/src/jvm/backtype/storm/serialization/KryoValuesDeserializer.java
@@ -21,7 +21,6 @@ import backtype.storm.utils.ListDelegate;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -35,7 +34,7 @@ public class KryoValuesDeserializer {
}
public List<Object> deserializeFrom(Input input) {
- ListDelegate delegate = (ListDelegate) _kryo.readObject(input, ListDelegate.class);
+ ListDelegate delegate = _kryo.readObject(input, ListDelegate.class);
return delegate.getDelegate();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/serialization/SerializationFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/serialization/SerializationFactory.java b/storm-core/src/jvm/backtype/storm/serialization/SerializationFactory.java
index 2a829d1..417f102 100644
--- a/storm-core/src/jvm/backtype/storm/serialization/SerializationFactory.java
+++ b/storm-core/src/jvm/backtype/storm/serialization/SerializationFactory.java
@@ -21,7 +21,6 @@ import backtype.storm.Config;
import backtype.storm.generated.ComponentCommon;
import backtype.storm.generated.StormTopology;
import backtype.storm.serialization.types.ArrayListSerializer;
-import backtype.storm.serialization.types.ListDelegateSerializer;
import backtype.storm.serialization.types.HashMapSerializer;
import backtype.storm.serialization.types.HashSetSerializer;
import backtype.storm.transactional.TransactionAttempt;
@@ -130,17 +129,17 @@ public class SerializationFactory {
}
public static class IdDictionary {
- Map<String, Map<String, Integer>> streamNametoId = new HashMap<String, Map<String, Integer>>();
- Map<String, Map<Integer, String>> streamIdToName = new HashMap<String, Map<Integer, String>>();
+ Map<String, Map<String, Integer>> streamNametoId = new HashMap<>();
+ Map<String, Map<Integer, String>> streamIdToName = new HashMap<>();
public IdDictionary(StormTopology topology) {
- List<String> componentNames = new ArrayList<String>(topology.get_spouts().keySet());
+ List<String> componentNames = new ArrayList<>(topology.get_spouts().keySet());
componentNames.addAll(topology.get_bolts().keySet());
componentNames.addAll(topology.get_state_spouts().keySet());
for(String name: componentNames) {
ComponentCommon common = Utils.getComponentCommon(topology, name);
- List<String> streams = new ArrayList<String>(common.get_streams().keySet());
+ List<String> streams = new ArrayList<>(common.get_streams().keySet());
streamNametoId.put(name, idify(streams));
streamIdToName.put(name, Utils.reverseMap(streamNametoId.get(name)));
}
@@ -156,7 +155,7 @@ public class SerializationFactory {
private static Map<String, Integer> idify(List<String> names) {
Collections.sort(names);
- Map<String, Integer> ret = new HashMap<String, Integer>();
+ Map<String, Integer> ret = new HashMap<>();
int i = 1;
for(String name: names) {
ret.put(name, i);
@@ -204,8 +203,8 @@ public class SerializationFactory {
private static Map<String, String> normalizeKryoRegister(Map conf) {
// TODO: de-duplicate this logic with the code in nimbus
Object res = conf.get(Config.TOPOLOGY_KRYO_REGISTER);
- if(res==null) return new TreeMap<String, String>();
- Map<String, String> ret = new HashMap<String, String>();
+ if(res==null) return new TreeMap<>();
+ Map<String, String> ret = new HashMap<>();
if(res instanceof Map) {
ret = (Map<String, String>) res;
} else {
@@ -219,6 +218,6 @@ public class SerializationFactory {
}
//ensure always same order for registrations with TreeMap
- return new TreeMap<String, String>(ret);
+ return new TreeMap<>(ret);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/task/GeneralTopologyContext.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/GeneralTopologyContext.java b/storm-core/src/jvm/backtype/storm/task/GeneralTopologyContext.java
index 54de390..11bdc19 100644
--- a/storm-core/src/jvm/backtype/storm/task/GeneralTopologyContext.java
+++ b/storm-core/src/jvm/backtype/storm/task/GeneralTopologyContext.java
@@ -100,8 +100,8 @@ public class GeneralTopologyContext implements JSONAware {
*/
public List<Integer> getComponentTasks(String componentId) {
List<Integer> ret = _componentToTasks.get(componentId);
- if(ret==null) return new ArrayList<Integer>();
- else return new ArrayList<Integer>(ret);
+ if(ret==null) return new ArrayList<>();
+ else return new ArrayList<>(ret);
}
/**
@@ -138,14 +138,14 @@ public class GeneralTopologyContext implements JSONAware {
* @return Map from stream id to component id to the Grouping used.
*/
public Map<String, Map<String, Grouping>> getTargets(String componentId) {
- Map<String, Map<String, Grouping>> ret = new HashMap<String, Map<String, Grouping>>();
+ Map<String, Map<String, Grouping>> ret = new HashMap<>();
for(String otherComponentId: getComponentIds()) {
Map<GlobalStreamId, Grouping> inputs = getComponentCommon(otherComponentId).get_inputs();
for(Map.Entry<GlobalStreamId, Grouping> entry: inputs.entrySet()) {
GlobalStreamId id = entry.getKey();
if(id.get_componentId().equals(componentId)) {
Map<String, Grouping> curr = ret.get(id.get_streamId());
- if(curr==null) curr = new HashMap<String, Grouping>();
+ if(curr==null) curr = new HashMap<>();
curr.put(otherComponentId, entry.getValue());
ret.put(id.get_streamId(), curr);
}
@@ -196,4 +196,4 @@ public class GeneralTopologyContext implements JSONAware {
}
return max;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/TopologyContext.java b/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
index cefa207..e9cdb5b 100644
--- a/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
+++ b/storm-core/src/jvm/backtype/storm/task/TopologyContext.java
@@ -51,8 +51,8 @@ import org.json.simple.JSONValue;
*/
public class TopologyContext extends WorkerTopologyContext implements IMetricsContext {
private Integer _taskId;
- private Map<String, Object> _taskData = new HashMap<String, Object>();
- private List<ITaskHook> _hooks = new ArrayList<ITaskHook>();
+ private Map<String, Object> _taskData = new HashMap<>();
+ private List<ITaskHook> _hooks = new ArrayList<>();
private Map<String, Object> _executorData;
private Map<Integer,Map<Integer, Map<String, IMetric>>> _registeredMetrics;
private clojure.lang.Atom _openOrPrepareWasCalled;
@@ -139,9 +139,8 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
}
/**
- * Gets the component id for this task. The component id maps
+ * @return the component id for this task. The component id maps
* to a component id specified for a Spout or Bolt in the topology definition.
- * @return
*/
public String getThisComponentId() {
return getComponentId(_taskId);
@@ -308,7 +307,7 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
* @return The IMetric argument unchanged.
*/
public <T extends IMetric> T registerMetric(String name, T metric, int timeBucketSizeInSecs) {
- if((Boolean)_openOrPrepareWasCalled.deref() == true) {
+ if((Boolean) _openOrPrepareWasCalled.deref()) {
throw new RuntimeException("TopologyContext.registerMetric can only be called from within overridden " +
"IBolt::prepare() or ISpout::open() method.");
}
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/topology/BasicBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/topology/BasicBoltExecutor.java b/storm-core/src/jvm/backtype/storm/topology/BasicBoltExecutor.java
index 6c9cdc1..cf828fe 100644
--- a/storm-core/src/jvm/backtype/storm/topology/BasicBoltExecutor.java
+++ b/storm-core/src/jvm/backtype/storm/topology/BasicBoltExecutor.java
@@ -25,7 +25,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BasicBoltExecutor implements IRichBolt {
- public static Logger LOG = LoggerFactory.getLogger(BasicBoltExecutor.class);
+ public static final Logger LOG = LoggerFactory.getLogger(BasicBoltExecutor.class);
private IBasicBolt _bolt;
private transient BasicOutputCollector _collector;
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/topology/OutputFieldsGetter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/topology/OutputFieldsGetter.java b/storm-core/src/jvm/backtype/storm/topology/OutputFieldsGetter.java
index 0e7fd59..26a791e 100644
--- a/storm-core/src/jvm/backtype/storm/topology/OutputFieldsGetter.java
+++ b/storm-core/src/jvm/backtype/storm/topology/OutputFieldsGetter.java
@@ -24,7 +24,7 @@ import java.util.HashMap;
import java.util.Map;
public class OutputFieldsGetter implements OutputFieldsDeclarer {
- private Map<String, StreamInfo> _fields = new HashMap<String, StreamInfo>();
+ private Map<String, StreamInfo> _fields = new HashMap<>();
public void declare(Fields fields) {
declare(false, fields);
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java b/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java
index 38b30d7..bccb1bf 100644
--- a/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java
+++ b/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java
@@ -89,18 +89,18 @@ import org.json.simple.JSONValue;
* the inputs for that component.</p>
*/
public class TopologyBuilder {
- private Map<String, IRichBolt> _bolts = new HashMap<String, IRichBolt>();
- private Map<String, IRichSpout> _spouts = new HashMap<String, IRichSpout>();
- private Map<String, ComponentCommon> _commons = new HashMap<String, ComponentCommon>();
+ private Map<String, IRichBolt> _bolts = new HashMap<>();
+ private Map<String, IRichSpout> _spouts = new HashMap<>();
+ private Map<String, ComponentCommon> _commons = new HashMap<>();
// private Map<String, Map<GlobalStreamId, Grouping>> _inputs = new HashMap<String, Map<GlobalStreamId, Grouping>>();
- private Map<String, StateSpoutSpec> _stateSpouts = new HashMap<String, StateSpoutSpec>();
+ private Map<String, StateSpoutSpec> _stateSpouts = new HashMap<>();
public StormTopology createTopology() {
- Map<String, Bolt> boltSpecs = new HashMap<String, Bolt>();
- Map<String, SpoutSpec> spoutSpecs = new HashMap<String, SpoutSpec>();
+ Map<String, Bolt> boltSpecs = new HashMap<>();
+ Map<String, SpoutSpec> spoutSpecs = new HashMap<>();
for(String boltId: _bolts.keySet()) {
IRichBolt bolt = _bolts.get(boltId);
ComponentCommon common = getComponentCommon(boltId, bolt);
@@ -168,7 +168,7 @@ public class TopologyBuilder {
*
* @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
* @param bolt the basic bolt
- * @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somwehere around the cluster.
+ * @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somewhere around the cluster.
* @return use the returned object to declare the inputs to this component
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
@@ -193,7 +193,7 @@ public class TopologyBuilder {
* will be allocated to this component.
*
* @param id the id of this component. This id is referenced by other components that want to consume this spout's outputs.
- * @param parallelism_hint the number of tasks that should be assigned to execute this spout. Each task will run on a thread in a process somwehere around the cluster.
+ * @param parallelism_hint the number of tasks that should be assigned to execute this spout. Each task will run on a thread in a process somewhere around the cluster.
* @param spout the spout
* @throws IllegalArgumentException if {@code parallelism_hint} is not positive
*/
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/topology/base/BaseBatchBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/topology/base/BaseBatchBolt.java b/storm-core/src/jvm/backtype/storm/topology/base/BaseBatchBolt.java
index 3206941..3f35149 100644
--- a/storm-core/src/jvm/backtype/storm/topology/base/BaseBatchBolt.java
+++ b/storm-core/src/jvm/backtype/storm/topology/base/BaseBatchBolt.java
@@ -18,7 +18,6 @@
package backtype.storm.topology.base;
import backtype.storm.coordination.IBatchBolt;
-import java.util.Map;
public abstract class BaseBatchBolt<T> extends BaseComponent implements IBatchBolt<T> {
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/topology/base/BaseTransactionalSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/topology/base/BaseTransactionalSpout.java b/storm-core/src/jvm/backtype/storm/topology/base/BaseTransactionalSpout.java
index 704a95b..e2b3a93 100644
--- a/storm-core/src/jvm/backtype/storm/topology/base/BaseTransactionalSpout.java
+++ b/storm-core/src/jvm/backtype/storm/topology/base/BaseTransactionalSpout.java
@@ -18,7 +18,6 @@
package backtype.storm.topology.base;
import backtype.storm.transactional.ITransactionalSpout;
-import java.util.Map;
public abstract class BaseTransactionalSpout<T> extends BaseComponent implements ITransactionalSpout<T> {
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/transactional/TransactionalSpoutCoordinator.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/transactional/TransactionalSpoutCoordinator.java b/storm-core/src/jvm/backtype/storm/transactional/TransactionalSpoutCoordinator.java
index f7ce534..f8f73f6 100644
--- a/storm-core/src/jvm/backtype/storm/transactional/TransactionalSpoutCoordinator.java
+++ b/storm-core/src/jvm/backtype/storm/transactional/TransactionalSpoutCoordinator.java
@@ -52,7 +52,7 @@ public class TransactionalSpoutCoordinator extends BaseRichSpout {
private TransactionalState _state;
private RotatingTransactionalState _coordinatorState;
- TreeMap<BigInteger, TransactionStatus> _activeTx = new TreeMap<BigInteger, TransactionStatus>();
+ TreeMap<BigInteger, TransactionStatus> _activeTx = new TreeMap<>();
private SpoutOutputCollector _collector;
private Random _rand;
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/transactional/partitioned/OpaquePartitionedTransactionalSpoutExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/transactional/partitioned/OpaquePartitionedTransactionalSpoutExecutor.java b/storm-core/src/jvm/backtype/storm/transactional/partitioned/OpaquePartitionedTransactionalSpoutExecutor.java
index 3be5d37..02185f4 100644
--- a/storm-core/src/jvm/backtype/storm/transactional/partitioned/OpaquePartitionedTransactionalSpoutExecutor.java
+++ b/storm-core/src/jvm/backtype/storm/transactional/partitioned/OpaquePartitionedTransactionalSpoutExecutor.java
@@ -63,8 +63,8 @@ public class OpaquePartitionedTransactionalSpoutExecutor implements ICommitterTr
public class Emitter implements ICommitterTransactionalSpout.Emitter {
IOpaquePartitionedTransactionalSpout.Emitter _emitter;
TransactionalState _state;
- TreeMap<BigInteger, Map<Integer, Object>> _cachedMetas = new TreeMap<BigInteger, Map<Integer, Object>>();
- Map<Integer, RotatingTransactionalState> _partitionStates = new HashMap<Integer, RotatingTransactionalState>();
+ TreeMap<BigInteger, Map<Integer, Object>> _cachedMetas = new TreeMap<>();
+ Map<Integer, RotatingTransactionalState> _partitionStates = new HashMap<>();
int _index;
int _numTasks;
@@ -84,7 +84,7 @@ public class OpaquePartitionedTransactionalSpoutExecutor implements ICommitterTr
@Override
public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, BatchOutputCollector collector) {
- Map<Integer, Object> metas = new HashMap<Integer, Object>();
+ Map<Integer, Object> metas = new HashMap<>();
_cachedMetas.put(tx.getTransactionId(), metas);
int partitions = _emitter.numPartitions();
Entry<BigInteger, Map<Integer, Object>> entry = _cachedMetas.lowerEntry(tx.getTransactionId());
@@ -92,7 +92,7 @@ public class OpaquePartitionedTransactionalSpoutExecutor implements ICommitterTr
if(entry!=null) {
prevCached = entry.getValue();
} else {
- prevCached = new HashMap<Integer, Object>();
+ prevCached = new HashMap<>();
}
for(int i=_index; i < partitions; i+=_numTasks) {
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/transactional/partitioned/PartitionedTransactionalSpoutExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/transactional/partitioned/PartitionedTransactionalSpoutExecutor.java b/storm-core/src/jvm/backtype/storm/transactional/partitioned/PartitionedTransactionalSpoutExecutor.java
index 479dda4..76859cf 100644
--- a/storm-core/src/jvm/backtype/storm/transactional/partitioned/PartitionedTransactionalSpoutExecutor.java
+++ b/storm-core/src/jvm/backtype/storm/transactional/partitioned/PartitionedTransactionalSpoutExecutor.java
@@ -67,7 +67,7 @@ public class PartitionedTransactionalSpoutExecutor implements ITransactionalSpou
class Emitter implements ITransactionalSpout.Emitter<Integer> {
private IPartitionedTransactionalSpout.Emitter _emitter;
private TransactionalState _state;
- private Map<Integer, RotatingTransactionalState> _partitionStates = new HashMap<Integer, RotatingTransactionalState>();
+ private Map<Integer, RotatingTransactionalState> _partitionStates = new HashMap<>();
private int _index;
private int _numTasks;
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/tuple/Fields.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/tuple/Fields.java b/storm-core/src/jvm/backtype/storm/tuple/Fields.java
index 3eed409..b52b798 100644
--- a/storm-core/src/jvm/backtype/storm/tuple/Fields.java
+++ b/storm-core/src/jvm/backtype/storm/tuple/Fields.java
@@ -30,14 +30,14 @@ import java.io.Serializable;
*/
public class Fields implements Iterable<String>, Serializable {
private List<String> _fields;
- private Map<String, Integer> _index = new HashMap<String, Integer>();
+ private Map<String, Integer> _index = new HashMap<>();
public Fields(String... fields) {
this(Arrays.asList(fields));
}
public Fields(List<String> fields) {
- _fields = new ArrayList<String>(fields.size());
+ _fields = new ArrayList<>(fields.size());
for (String field : fields) {
if (_fields.contains(field))
throw new IllegalArgumentException(
@@ -49,7 +49,7 @@ public class Fields implements Iterable<String>, Serializable {
}
public List<Object> select(Fields selector, List<Object> tuple) {
- List<Object> ret = new ArrayList<Object>(selector.size());
+ List<Object> ret = new ArrayList<>(selector.size());
for(String s: selector) {
ret.add(tuple.get(_index.get(s)));
}
@@ -57,7 +57,7 @@ public class Fields implements Iterable<String>, Serializable {
}
public List<String> toList() {
- return new ArrayList<String>(_fields);
+ return new ArrayList<>(_fields);
}
/**
@@ -98,7 +98,7 @@ public class Fields implements Iterable<String>, Serializable {
}
/**
- * @returns true if this contains the specified name of the field.
+ * @return true if this contains the specified name of the field.
*/
public boolean contains(String field) {
return _index.containsKey(field);
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/tuple/MessageId.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/tuple/MessageId.java b/storm-core/src/jvm/backtype/storm/tuple/MessageId.java
index 680af38..554bab6 100644
--- a/storm-core/src/jvm/backtype/storm/tuple/MessageId.java
+++ b/storm-core/src/jvm/backtype/storm/tuple/MessageId.java
@@ -43,7 +43,7 @@ public class MessageId {
}
public static MessageId makeRootId(long id, long val) {
- Map<Long, Long> anchorsToIds = new HashMap<Long, Long>();
+ Map<Long, Long> anchorsToIds = new HashMap<>();
anchorsToIds.put(id, val);
return new MessageId(anchorsToIds);
}
@@ -67,11 +67,7 @@ public class MessageId {
@Override
public boolean equals(Object other) {
- if(other instanceof MessageId) {
- return _anchorsToIds.equals(((MessageId) other)._anchorsToIds);
- } else {
- return false;
- }
+ return other instanceof MessageId && _anchorsToIds.equals(((MessageId) other)._anchorsToIds);
}
@Override
@@ -89,7 +85,7 @@ public class MessageId {
public static MessageId deserialize(Input in) throws IOException {
int numAnchors = in.readInt(true);
- Map<Long, Long> anchorsToIds = new HashMap<Long, Long>();
+ Map<Long, Long> anchorsToIds = new HashMap<>();
for(int i=0; i<numAnchors; i++) {
anchorsToIds.put(in.readLong(), in.readLong());
}
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/tuple/Tuple.java b/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
index c4ea7c8..19d0cb4 100644
--- a/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
+++ b/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
@@ -18,7 +18,6 @@
package backtype.storm.tuple;
import backtype.storm.generated.GlobalStreamId;
-import java.util.List;
/**
* The tuple is the main data structure in Storm. A tuple is a named list of values,
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
index 53b63ba..dd31c96 100644
--- a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
+++ b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
@@ -237,7 +237,7 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
return System.identityHashCode(this);
}
- private final Keyword makeKeyword(String name) {
+ private Keyword makeKeyword(String name) {
return Keyword.intern(Symbol.create(name));
}
@@ -250,7 +250,7 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
} else if(o instanceof String) {
return getValueByField((String) o);
}
- } catch(IllegalArgumentException e) {
+ } catch(IllegalArgumentException ignored) {
}
return null;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/utils/DRPCClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/DRPCClient.java b/storm-core/src/jvm/backtype/storm/utils/DRPCClient.java
index b2a2a7d..23deb28 100644
--- a/storm-core/src/jvm/backtype/storm/utils/DRPCClient.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DRPCClient.java
@@ -17,7 +17,6 @@
*/
package backtype.storm.utils;
-import backtype.storm.Config;
import backtype.storm.generated.DRPCExecutionException;
import backtype.storm.generated.DistributedRPC;
import backtype.storm.generated.AuthorizationException;
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
index 246f9a7..7ad6ae0 100644
--- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
@@ -34,14 +34,11 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
-import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
@@ -255,7 +252,7 @@ public class DisruptorQueue implements IStatefulObject {
public DisruptorQueue(String queueName, ProducerType type, int size, long readTimeout, int inputBatchSize, long flushInterval) {
this._queueName = PREFIX + queueName;
- WaitStrategy wait = null;
+ WaitStrategy wait;
if (readTimeout <= 0) {
wait = new LiteBlockingWaitStrategy();
} else {
http://git-wip-us.apache.org/repos/asf/storm/blob/f0e4c6f2/storm-core/src/jvm/backtype/storm/utils/InprocMessaging.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/InprocMessaging.java b/storm-core/src/jvm/backtype/storm/utils/InprocMessaging.java
index b20c775..4abbc3e 100644
--- a/storm-core/src/jvm/backtype/storm/utils/InprocMessaging.java
+++ b/storm-core/src/jvm/backtype/storm/utils/InprocMessaging.java
@@ -22,7 +22,7 @@ import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
public class InprocMessaging {
- private static Map<Integer, LinkedBlockingQueue<Object>> _queues = new HashMap<Integer, LinkedBlockingQueue<Object>>();
+ private static Map<Integer, LinkedBlockingQueue<Object>> _queues = new HashMap<>();
private static final Object _lock = new Object();
private static int port = 1;
@@ -50,7 +50,7 @@ public class InprocMessaging {
private static LinkedBlockingQueue<Object> getQueue(int port) {
synchronized(_lock) {
if(!_queues.containsKey(port)) {
- _queues.put(port, new LinkedBlockingQueue<Object>());
+ _queues.put(port, new LinkedBlockingQueue<>());
}
return _queues.get(port);
}