You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by kn...@apache.org on 2016/02/11 20:10:55 UTC

[12/15] storm git commit: Addressing comments.

Addressing comments.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f3e8348e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f3e8348e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f3e8348e

Branch: refs/heads/master
Commit: f3e8348e8ff82b06a14db3513eb9d90a24e5652d
Parents: 235d6e7
Author: Kyle Nusbaum <Ky...@gmail.com>
Authored: Wed Feb 10 13:15:24 2016 -0600
Committer: Kyle Nusbaum <Ky...@gmail.com>
Committed: Wed Feb 10 13:15:24 2016 -0600

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/common.clj  |  2 +-
 storm-core/src/clj/org/apache/storm/testing.clj |  6 +--
 .../storm/logging/ThriftAccessLogger.java       | 15 +++---
 .../serialization/SerializationFactory.java     | 17 +++++-
 .../src/jvm/org/apache/storm/utils/Time.java    |  2 +-
 .../src/jvm/org/apache/storm/utils/Utils.java   | 54 +++++++-------------
 .../test/clj/org/apache/storm/nimbus_test.clj   |  4 +-
 .../clj/org/apache/storm/supervisor_test.clj    |  2 +-
 8 files changed, 49 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/f3e8348e/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj b/storm-core/src/clj/org/apache/storm/daemon/common.clj
index 3dc2ee5..eb1ec1e 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@ -86,7 +86,7 @@
 (defn get-storm-id [storm-cluster-state storm-name]
   (let [active-storms (.active-storms storm-cluster-state)
         pred  (reify IPredicate (test [this x] (= storm-name (:storm-name (.storm-base storm-cluster-state x nil)))))]
-    (Utils/findFirst pred active-storms)
+    (Utils/findOne pred active-storms)
     ))
 
 (defn topology-bases [storm-cluster-state]

http://git-wip-us.apache.org/repos/asf/storm/blob/f3e8348e/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj
index 3d7ce44..9a487af 100644
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@ -205,7 +205,7 @@
 
 (defn get-supervisor [cluster-map supervisor-id]
   (let [pred  (reify IPredicate (test [this x] (= (.get-id x) supervisor-id)))]
-    (Utils/findFirst pred @(:supervisors cluster-map))))
+    (Utils/findOne pred @(:supervisors cluster-map))))
 
 (defn remove-first
   [pred aseq]
@@ -218,8 +218,8 @@
   (let [finder-fn #(= (.get-id %) supervisor-id)
         pred  (reify IPredicate (test [this x] (= (.get-id x) supervisor-id)))
         supervisors @(:supervisors cluster-map)
-        sup (Utils/findFirst pred
-                        supervisors)]
+        sup (Utils/findOne pred
+                           supervisors)]
     ;; tmp-dir will be taken care of by shutdown
     (reset! (:supervisors cluster-map) (remove-first finder-fn supervisors))
     (.shutdown sup)))

http://git-wip-us.apache.org/repos/asf/storm/blob/f3e8348e/storm-core/src/jvm/org/apache/storm/logging/ThriftAccessLogger.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/logging/ThriftAccessLogger.java b/storm-core/src/jvm/org/apache/storm/logging/ThriftAccessLogger.java
index c55c7da..9befb52 100644
--- a/storm-core/src/jvm/org/apache/storm/logging/ThriftAccessLogger.java
+++ b/storm-core/src/jvm/org/apache/storm/logging/ThriftAccessLogger.java
@@ -22,14 +22,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class ThriftAccessLogger {
-  private static final Logger LOG = LoggerFactory.getLogger(ThriftAccessLogger.class);
-  public void log(String logMessage) {
-    LOG.info(logMessage);
-  }
+    private static final Logger LOG = LoggerFactory.getLogger(ThriftAccessLogger.class);
 
-  public static void logAccess(Integer requestId, InetAddress remoteAddress, Principal principal, String operation) {
-    new ThriftAccessLogger().log(
-      String.format("Request ID: {} access from: {} principal: {} operation: {}",
-                    requestId, remoteAddress, principal, operation));
-  } 
+    public static void logAccess(Integer requestId, InetAddress remoteAddress,
+                                 Principal principal, String operation) {
+        LOG.info("Request ID: {} access from: {} principal: {} operation: {}",
+                 requestId, remoteAddress, principal, operation);
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f3e8348e/storm-core/src/jvm/org/apache/storm/serialization/SerializationFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/serialization/SerializationFactory.java b/storm-core/src/jvm/org/apache/storm/serialization/SerializationFactory.java
index 23dd443..4007138 100644
--- a/storm-core/src/jvm/org/apache/storm/serialization/SerializationFactory.java
+++ b/storm-core/src/jvm/org/apache/storm/serialization/SerializationFactory.java
@@ -132,6 +132,21 @@ public class SerializationFactory {
         Map<String, Map<String, Integer>> streamNametoId = new HashMap<>();
         Map<String, Map<Integer, String>> streamIdToName = new HashMap<>();
 
+        /**
+         * "{:a 1  :b 2} -> {1 :a  2 :b}"
+         *
+         * Note: Only one key wins if there are duplicate values.
+         *       Which key wins is indeterminate:
+         * "{:a 1  :b 1} -> {1 :a} *or* {1 :b}"
+         */
+        private static <K, V> Map<V, K> simpleReverseMap(Map<K, V> map) {
+            Map<V, K> ret = new HashMap<V, K>();
+            for (Map.Entry<K, V> entry : map.entrySet()) {
+                ret.put(entry.getValue(), entry.getKey());
+            }
+            return ret;
+        }
+
         public IdDictionary(StormTopology topology) {
             List<String> componentNames = new ArrayList<>(topology.get_spouts().keySet());
             componentNames.addAll(topology.get_bolts().keySet());
@@ -141,7 +156,7 @@ public class SerializationFactory {
                 ComponentCommon common = Utils.getComponentCommon(topology, name);
                 List<String> streams = new ArrayList<>(common.get_streams().keySet());
                 streamNametoId.put(name, idify(streams));
-                streamIdToName.put(name, Utils.simpleReverseMap(streamNametoId.get(name)));
+                streamIdToName.put(name, simpleReverseMap(streamNametoId.get(name)));
             }
         }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/f3e8348e/storm-core/src/jvm/org/apache/storm/utils/Time.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Time.java b/storm-core/src/jvm/org/apache/storm/utils/Time.java
index 3ebceb5..fd01fb8 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Time.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Time.java
@@ -122,7 +122,7 @@ public class Time {
     }
 
     public static long deltaMs(long timeInMilliseconds) {
-        return System.currentTimeMillis() - timeInMilliseconds;
+        return Time.currentTimeMillis() - timeInMilliseconds;
     }
     
     public static void advanceTime(long ms) {

http://git-wip-us.apache.org/repos/asf/storm/blob/f3e8348e/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
index 5f24f8d..5b8bc32 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
@@ -109,7 +109,6 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
-import java.util.Vector;
 import java.util.concurrent.Callable;
 import java.util.jar.JarEntry;
 import java.util.jar.JarFile;
@@ -119,8 +118,6 @@ import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipFile;
-import java.security.Principal;
-import org.apache.storm.logging.ThriftAccessLogger;
 
 public class Utils {
     // A singleton instance allows us to mock delegated static methods in our
@@ -941,7 +938,6 @@ public class Utils {
     private static void unTarUsingJava(File inFile, File untarDir,
                                        boolean gzipped) throws IOException {
         InputStream inputStream = null;
-        TarArchiveInputStream tis = null;
         try {
             if (gzipped) {
                 inputStream = new BufferedInputStream(new GZIPInputStream(
@@ -949,14 +945,16 @@ public class Utils {
             } else {
                 inputStream = new BufferedInputStream(new FileInputStream(inFile));
             }
-            tis = new TarArchiveInputStream(inputStream);
-            for (TarArchiveEntry entry = tis.getNextTarEntry(); entry != null; ) {
-                unpackEntries(tis, entry, untarDir);
-                entry = tis.getNextTarEntry();
+            try (TarArchiveInputStream tis = new TarArchiveInputStream(inputStream)) {
+                for (TarArchiveEntry entry = tis.getNextTarEntry(); entry != null; ) {
+                    unpackEntries(tis, entry, untarDir);
+                    entry = tis.getNextTarEntry();
+                }
             }
         } finally {
-            tis.close();
-            inputStream.close();
+            if(inputStream != null) {
+                inputStream.close();
+            }
         }
     }
 
@@ -1427,7 +1425,6 @@ public class Utils {
         return topologyInfo;
     }
 
-
     /**
      * A cheap way to deterministically convert a number to a positive value. When the input is
      * positive, the original value is returned. When the input number is negative, the returned
@@ -1632,7 +1629,7 @@ public class Utils {
      * @param coll The Collection of items to search through.
      * @return The first matching value in coll, or null if nothing matches.
      */
-    public static <T> T findFirst (IPredicate<T> pred, Collection<T> coll) {
+    public static <T> T findOne (IPredicate<T> pred, Collection<T> coll) {
         if(coll == null) {
             return null;
         }
@@ -1644,11 +1641,11 @@ public class Utils {
         return null;
     }
 
-    public static <T, U> T findFirst (IPredicate<T> pred, Map<U, T> map) {
+    public static <T, U> T findOne (IPredicate<T> pred, Map<U, T> map) {
         if(map == null) {
             return null;
         }
-        return findFirst(pred, (Set<T>)map.entrySet());
+        return findOne(pred, (Set<T>)map.entrySet());
     }
 
     public static String localHostname () throws UnknownHostException {
@@ -1702,29 +1699,14 @@ public class Utils {
     }
 
     /**
-     * "{:a 1  :b 2} -> {1 :a  2 :b}"
-     *
-     * Note: Only one key wins if there are duplicate values.
-     *       Which key wins is indeterminate:
-     * "{:a 1  :b 1} -> {1 :a} *or* {1 :b}"
-     */
-    public static <K, V> Map<V, K> simpleReverseMap(Map<K, V> map) {
-        Map<V, K> ret = new HashMap<V, K>();
-        for (Map.Entry<K, V> entry : map.entrySet()) {
-            ret.put(entry.getValue(), entry.getKey());
-        }
-        return ret;
-    }
-
-    /**
      * "{:a 1 :b 1 :c 2} -> {1 [:a :b] 2 :c}"
      *
      * Example usage in java:
      *  Map<Integer, String> tasks;
      *  Map<String, List<Integer>> componentTasks = Utils.reverse_map(tasks);
      *
-     * @param map
-     * @return
+     * @param map to reverse
+     * @return a reversed map
      */
     public static <K, V> HashMap<V, List<K>> reverseMap(Map<K, V> map) {
         HashMap<V, List<K>> rtn = new HashMap<V, List<K>>();
@@ -1745,8 +1727,11 @@ public class Utils {
     }
 
     /**
-     * "{:a 1 :b 1 :c 2} -> {1 [:a :b] 2 :c}"
+     * "[[:a 1] [:b 1] [:c 2]} -> {1 [:a :b] 2 :c}"
+     * Reverses an assoc-list style Map like reverseMap(Map...)
      *
+     * @param listSeq to reverse
+     * @return a reversed map
      */
     public static HashMap reverseMap(List listSeq) {
         HashMap<Object, List<Object>> rtn = new HashMap();
@@ -1830,9 +1815,9 @@ public class Utils {
                 execCommand("kill", "-" + signum, pid);
             }
         } catch (ExecuteException e) {
-            LOG.info("Error when trying to kill " + pid + ". Process is probably already dead.");
+            LOG.info("Error when trying to kill {}. Process is probably already dead.", pid);
         } catch (IOException e) {
-            LOG.info("IOException Error when trying to kill " + pid + ".");
+            LOG.info("IOException Error when trying to kill {}.", pid);
             throw e;
         }
     }
@@ -1919,7 +1904,6 @@ public class Utils {
      * @param targetDir the parent directory of the link's target
      * @param targetFilename the file name of the links target
      * @param filename the file name of the link
-     * @return the path of the link if it did not exist, otherwise null
      * @throws IOException
      */
     public static void createSymlink(String dir, String targetDir,

http://git-wip-us.apache.org/repos/asf/storm/blob/f3e8348e/storm-core/test/clj/org/apache/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
index 12c5c94..70cb885 100644
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@ -830,8 +830,8 @@
       (check-executor-distribution slot-executors2 [2 2 2 3])
       (check-consistency cluster "test")
 
-      (bind common (first (Utils/findFirst (proxy [IPredicate] []
-                                               (test [[k v]] (= 3 (count v)))) slot-executors2)))
+      (bind common (first (Utils/findOne (proxy [IPredicate] []
+                                           (test [[k v]] (= 3 (count v)))) slot-executors2)))
       (is (not-nil? common))
       (is (= (slot-executors2 common) (slot-executors common)))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/f3e8348e/storm-core/test/clj/org/apache/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/supervisor_test.clj b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
index 19b7441..9c31ddf 100644
--- a/storm-core/test/clj/org/apache/storm/supervisor_test.clj
+++ b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
@@ -51,7 +51,7 @@
                           (when executors [storm-id executors])
                           ))
         pred (reify IPredicate (test [this x] (not-nil? x)))
-        ret (Utils/findFirst pred slot-assigns)]
+        ret (Utils/findOne pred slot-assigns)]
     (when-not ret
       (throw (RuntimeException. "Could not find assignment for worker")))
     ret