You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by kn...@apache.org on 2016/02/11 20:10:55 UTC
[12/15] storm git commit: Addressing comments.
Addressing comments.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f3e8348e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f3e8348e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f3e8348e
Branch: refs/heads/master
Commit: f3e8348e8ff82b06a14db3513eb9d90a24e5652d
Parents: 235d6e7
Author: Kyle Nusbaum <Ky...@gmail.com>
Authored: Wed Feb 10 13:15:24 2016 -0600
Committer: Kyle Nusbaum <Ky...@gmail.com>
Committed: Wed Feb 10 13:15:24 2016 -0600
----------------------------------------------------------------------
.../src/clj/org/apache/storm/daemon/common.clj | 2 +-
storm-core/src/clj/org/apache/storm/testing.clj | 6 +--
.../storm/logging/ThriftAccessLogger.java | 15 +++---
.../serialization/SerializationFactory.java | 17 +++++-
.../src/jvm/org/apache/storm/utils/Time.java | 2 +-
.../src/jvm/org/apache/storm/utils/Utils.java | 54 +++++++-------------
.../test/clj/org/apache/storm/nimbus_test.clj | 4 +-
.../clj/org/apache/storm/supervisor_test.clj | 2 +-
8 files changed, 49 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/f3e8348e/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj b/storm-core/src/clj/org/apache/storm/daemon/common.clj
index 3dc2ee5..eb1ec1e 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@ -86,7 +86,7 @@
(defn get-storm-id [storm-cluster-state storm-name]
(let [active-storms (.active-storms storm-cluster-state)
pred (reify IPredicate (test [this x] (= storm-name (:storm-name (.storm-base storm-cluster-state x nil)))))]
- (Utils/findFirst pred active-storms)
+ (Utils/findOne pred active-storms)
))
(defn topology-bases [storm-cluster-state]
http://git-wip-us.apache.org/repos/asf/storm/blob/f3e8348e/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj
index 3d7ce44..9a487af 100644
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@ -205,7 +205,7 @@
(defn get-supervisor [cluster-map supervisor-id]
(let [pred (reify IPredicate (test [this x] (= (.get-id x) supervisor-id)))]
- (Utils/findFirst pred @(:supervisors cluster-map))))
+ (Utils/findOne pred @(:supervisors cluster-map))))
(defn remove-first
[pred aseq]
@@ -218,8 +218,8 @@
(let [finder-fn #(= (.get-id %) supervisor-id)
pred (reify IPredicate (test [this x] (= (.get-id x) supervisor-id)))
supervisors @(:supervisors cluster-map)
- sup (Utils/findFirst pred
- supervisors)]
+ sup (Utils/findOne pred
+ supervisors)]
;; tmp-dir will be taken care of by shutdown
(reset! (:supervisors cluster-map) (remove-first finder-fn supervisors))
(.shutdown sup)))
http://git-wip-us.apache.org/repos/asf/storm/blob/f3e8348e/storm-core/src/jvm/org/apache/storm/logging/ThriftAccessLogger.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/logging/ThriftAccessLogger.java b/storm-core/src/jvm/org/apache/storm/logging/ThriftAccessLogger.java
index c55c7da..9befb52 100644
--- a/storm-core/src/jvm/org/apache/storm/logging/ThriftAccessLogger.java
+++ b/storm-core/src/jvm/org/apache/storm/logging/ThriftAccessLogger.java
@@ -22,14 +22,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ThriftAccessLogger {
- private static final Logger LOG = LoggerFactory.getLogger(ThriftAccessLogger.class);
- public void log(String logMessage) {
- LOG.info(logMessage);
- }
+ private static final Logger LOG = LoggerFactory.getLogger(ThriftAccessLogger.class);
- public static void logAccess(Integer requestId, InetAddress remoteAddress, Principal principal, String operation) {
- new ThriftAccessLogger().log(
- String.format("Request ID: {} access from: {} principal: {} operation: {}",
- requestId, remoteAddress, principal, operation));
- }
+ public static void logAccess(Integer requestId, InetAddress remoteAddress,
+ Principal principal, String operation) {
+ LOG.info("Request ID: {} access from: {} principal: {} operation: {}",
+ requestId, remoteAddress, principal, operation);
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/f3e8348e/storm-core/src/jvm/org/apache/storm/serialization/SerializationFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/serialization/SerializationFactory.java b/storm-core/src/jvm/org/apache/storm/serialization/SerializationFactory.java
index 23dd443..4007138 100644
--- a/storm-core/src/jvm/org/apache/storm/serialization/SerializationFactory.java
+++ b/storm-core/src/jvm/org/apache/storm/serialization/SerializationFactory.java
@@ -132,6 +132,21 @@ public class SerializationFactory {
Map<String, Map<String, Integer>> streamNametoId = new HashMap<>();
Map<String, Map<Integer, String>> streamIdToName = new HashMap<>();
+ /**
+ * "{:a 1 :b 2} -> {1 :a 2 :b}"
+ *
+ * Note: Only one key wins if there are duplicate values.
+ * Which key wins is indeterminate:
+ * "{:a 1 :b 1} -> {1 :a} *or* {1 :b}"
+ */
+ private static <K, V> Map<V, K> simpleReverseMap(Map<K, V> map) {
+ Map<V, K> ret = new HashMap<V, K>();
+ for (Map.Entry<K, V> entry : map.entrySet()) {
+ ret.put(entry.getValue(), entry.getKey());
+ }
+ return ret;
+ }
+
public IdDictionary(StormTopology topology) {
List<String> componentNames = new ArrayList<>(topology.get_spouts().keySet());
componentNames.addAll(topology.get_bolts().keySet());
@@ -141,7 +156,7 @@ public class SerializationFactory {
ComponentCommon common = Utils.getComponentCommon(topology, name);
List<String> streams = new ArrayList<>(common.get_streams().keySet());
streamNametoId.put(name, idify(streams));
- streamIdToName.put(name, Utils.simpleReverseMap(streamNametoId.get(name)));
+ streamIdToName.put(name, simpleReverseMap(streamNametoId.get(name)));
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/f3e8348e/storm-core/src/jvm/org/apache/storm/utils/Time.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Time.java b/storm-core/src/jvm/org/apache/storm/utils/Time.java
index 3ebceb5..fd01fb8 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Time.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Time.java
@@ -122,7 +122,7 @@ public class Time {
}
public static long deltaMs(long timeInMilliseconds) {
- return System.currentTimeMillis() - timeInMilliseconds;
+ return Time.currentTimeMillis() - timeInMilliseconds;
}
public static void advanceTime(long ms) {
http://git-wip-us.apache.org/repos/asf/storm/blob/f3e8348e/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
index 5f24f8d..5b8bc32 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
@@ -109,7 +109,6 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
-import java.util.Vector;
import java.util.concurrent.Callable;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
@@ -119,8 +118,6 @@ import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;
-import java.security.Principal;
-import org.apache.storm.logging.ThriftAccessLogger;
public class Utils {
// A singleton instance allows us to mock delegated static methods in our
@@ -941,7 +938,6 @@ public class Utils {
private static void unTarUsingJava(File inFile, File untarDir,
boolean gzipped) throws IOException {
InputStream inputStream = null;
- TarArchiveInputStream tis = null;
try {
if (gzipped) {
inputStream = new BufferedInputStream(new GZIPInputStream(
@@ -949,14 +945,16 @@ public class Utils {
} else {
inputStream = new BufferedInputStream(new FileInputStream(inFile));
}
- tis = new TarArchiveInputStream(inputStream);
- for (TarArchiveEntry entry = tis.getNextTarEntry(); entry != null; ) {
- unpackEntries(tis, entry, untarDir);
- entry = tis.getNextTarEntry();
+ try (TarArchiveInputStream tis = new TarArchiveInputStream(inputStream)) {
+ for (TarArchiveEntry entry = tis.getNextTarEntry(); entry != null; ) {
+ unpackEntries(tis, entry, untarDir);
+ entry = tis.getNextTarEntry();
+ }
}
} finally {
- tis.close();
- inputStream.close();
+ if(inputStream != null) {
+ inputStream.close();
+ }
}
}
@@ -1427,7 +1425,6 @@ public class Utils {
return topologyInfo;
}
-
/**
* A cheap way to deterministically convert a number to a positive value. When the input is
* positive, the original value is returned. When the input number is negative, the returned
@@ -1632,7 +1629,7 @@ public class Utils {
* @param coll The Collection of items to search through.
* @return The first matching value in coll, or null if nothing matches.
*/
- public static <T> T findFirst (IPredicate<T> pred, Collection<T> coll) {
+ public static <T> T findOne (IPredicate<T> pred, Collection<T> coll) {
if(coll == null) {
return null;
}
@@ -1644,11 +1641,11 @@ public class Utils {
return null;
}
- public static <T, U> T findFirst (IPredicate<T> pred, Map<U, T> map) {
+ public static <T, U> T findOne (IPredicate<T> pred, Map<U, T> map) {
if(map == null) {
return null;
}
- return findFirst(pred, (Set<T>)map.entrySet());
+ return findOne(pred, (Set<T>)map.entrySet());
}
public static String localHostname () throws UnknownHostException {
@@ -1702,29 +1699,14 @@ public class Utils {
}
/**
- * "{:a 1 :b 2} -> {1 :a 2 :b}"
- *
- * Note: Only one key wins if there are duplicate values.
- * Which key wins is indeterminate:
- * "{:a 1 :b 1} -> {1 :a} *or* {1 :b}"
- */
- public static <K, V> Map<V, K> simpleReverseMap(Map<K, V> map) {
- Map<V, K> ret = new HashMap<V, K>();
- for (Map.Entry<K, V> entry : map.entrySet()) {
- ret.put(entry.getValue(), entry.getKey());
- }
- return ret;
- }
-
- /**
* "{:a 1 :b 1 :c 2} -> {1 [:a :b] 2 :c}"
*
* Example usage in java:
* Map<Integer, String> tasks;
* Map<String, List<Integer>> componentTasks = Utils.reverse_map(tasks);
*
- * @param map
- * @return
+ * @param map to reverse
+ * @return a reversed map
*/
public static <K, V> HashMap<V, List<K>> reverseMap(Map<K, V> map) {
HashMap<V, List<K>> rtn = new HashMap<V, List<K>>();
@@ -1745,8 +1727,11 @@ public class Utils {
}
/**
- * "{:a 1 :b 1 :c 2} -> {1 [:a :b] 2 :c}"
+ * "[[:a 1] [:b 1] [:c 2]} -> {1 [:a :b] 2 :c}"
+ * Reverses an assoc-list style Map like reverseMap(Map...)
*
+ * @param listSeq to reverse
+ * @return a reversed map
*/
public static HashMap reverseMap(List listSeq) {
HashMap<Object, List<Object>> rtn = new HashMap();
@@ -1830,9 +1815,9 @@ public class Utils {
execCommand("kill", "-" + signum, pid);
}
} catch (ExecuteException e) {
- LOG.info("Error when trying to kill " + pid + ". Process is probably already dead.");
+ LOG.info("Error when trying to kill {}. Process is probably already dead.", pid);
} catch (IOException e) {
- LOG.info("IOException Error when trying to kill " + pid + ".");
+ LOG.info("IOException Error when trying to kill {}.", pid);
throw e;
}
}
@@ -1919,7 +1904,6 @@ public class Utils {
* @param targetDir the parent directory of the link's target
* @param targetFilename the file name of the links target
* @param filename the file name of the link
- * @return the path of the link if it did not exist, otherwise null
* @throws IOException
*/
public static void createSymlink(String dir, String targetDir,
http://git-wip-us.apache.org/repos/asf/storm/blob/f3e8348e/storm-core/test/clj/org/apache/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
index 12c5c94..70cb885 100644
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@ -830,8 +830,8 @@
(check-executor-distribution slot-executors2 [2 2 2 3])
(check-consistency cluster "test")
- (bind common (first (Utils/findFirst (proxy [IPredicate] []
- (test [[k v]] (= 3 (count v)))) slot-executors2)))
+ (bind common (first (Utils/findOne (proxy [IPredicate] []
+ (test [[k v]] (= 3 (count v)))) slot-executors2)))
(is (not-nil? common))
(is (= (slot-executors2 common) (slot-executors common)))
http://git-wip-us.apache.org/repos/asf/storm/blob/f3e8348e/storm-core/test/clj/org/apache/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/supervisor_test.clj b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
index 19b7441..9c31ddf 100644
--- a/storm-core/test/clj/org/apache/storm/supervisor_test.clj
+++ b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
@@ -51,7 +51,7 @@
(when executors [storm-id executors])
))
pred (reify IPredicate (test [this x] (not-nil? x)))
- ret (Utils/findFirst pred slot-assigns)]
+ ret (Utils/findOne pred slot-assigns)]
(when-not ret
(throw (RuntimeException. "Could not find assignment for worker")))
ret