You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by zh...@apache.org on 2016/01/28 18:49:44 UTC

[08/23] storm git commit: To continue update callers to read-storm-config

To continue update callers to read-storm-config


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0b90a63d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0b90a63d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0b90a63d

Branch: refs/heads/master
Commit: 0b90a63d5ac7075524e9fabc7a49bda47b41b946
Parents: 6ce9841
Author: zhuol <zh...@yahoo-inc.com>
Authored: Wed Jan 13 16:45:39 2016 -0600
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Wed Jan 20 09:55:24 2016 -0600

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/common.clj  |  4 +-
 .../clj/org/apache/storm/daemon/executor.clj    |  4 +-
 .../src/clj/org/apache/storm/daemon/nimbus.clj  | 10 ++--
 .../clj/org/apache/storm/daemon/supervisor.clj  |  4 +-
 .../src/clj/org/apache/storm/daemon/worker.clj  |  6 +--
 .../jvm/org/apache/storm/utils/ConfigUtils.java | 52 ++++++++++++++------
 .../test/clj/org/apache/storm/cluster_test.clj  |  4 +-
 7 files changed, 54 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0b90a63d/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj b/storm-core/src/clj/org/apache/storm/daemon/common.clj
index dd761a5..77791f4 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@ -18,7 +18,7 @@
   (:import [org.apache.storm.generated StormTopology
             InvalidTopologyException GlobalStreamId]
            [org.apache.storm.utils ThriftTopologyUtils])
-  (:import [org.apache.storm.utils Utils])
+  (:import [org.apache.storm.utils Utils ConfigUtils])
   (:import [org.apache.storm.task WorkerTopologyContext])
   (:import [org.apache.storm Constants])
   (:import [org.apache.storm.metric SystemBolt])
@@ -88,7 +88,7 @@
     ))
 
 (defn validate-distributed-mode! [conf]
-  (if (local-mode? conf)
+  (if (ConfigUtils/isLocalMode conf)
       (throw
         (IllegalArgumentException. "Cannot start server in local mode!"))))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/0b90a63d/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 8052835..590379d 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -28,7 +28,7 @@
   (:import [org.apache.storm.grouping CustomStreamGrouping])
   (:import [org.apache.storm.task WorkerTopologyContext IBolt OutputCollector IOutputCollector])
   (:import [org.apache.storm.generated GlobalStreamId])
-  (:import [org.apache.storm.utils Utils TupleUtils MutableObject RotatingMap RotatingMap$ExpiredCallback MutableLong Time DisruptorQueue WorkerBackpressureThread])
+  (:import [org.apache.storm.utils Utils ConfigUtils TupleUtils MutableObject RotatingMap RotatingMap$ExpiredCallback MutableLong Time DisruptorQueue WorkerBackpressureThread])
   (:import [com.lmax.disruptor InsufficientCapacityException])
   (:import [org.apache.storm.serialization KryoTupleSerializer])
   (:import [org.apache.storm.daemon Shutdownable])
@@ -256,7 +256,7 @@
                                                           :context (ClusterStateContext. DaemonType/WORKER))
      :type executor-type
      ;; TODO: should refactor this to be part of the executor specific map (spout or bolt with :common field)
-     :stats (mk-executor-stats <> (sampling-rate storm-conf))
+     :stats (mk-executor-stats <> (ConfigUtils/samplingRate storm-conf))
      :interval->task->metric-registry (HashMap.)
      :task->component (:task->component worker)
      :stream->component->grouper (outbound-components worker-context component-id storm-conf)

http://git-wip-us.apache.org/repos/asf/storm/blob/0b90a63d/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index e3bdd5f..163f301 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -38,7 +38,7 @@
   (:import [org.apache.storm.scheduler INimbus SupervisorDetails WorkerSlot TopologyDetails
             Cluster Topologies SchedulerAssignment SchedulerAssignmentImpl DefaultScheduler ExecutorDetails])
   (:import [org.apache.storm.nimbus NimbusInfo])
-  (:import [org.apache.storm.utils TimeCacheMap TimeCacheMap$ExpiredCallback Utils TupleUtils ThriftTopologyUtils
+  (:import [org.apache.storm.utils TimeCacheMap TimeCacheMap$ExpiredCallback Utils ConfigUtils TupleUtils ThriftTopologyUtils
             BufferFileInputStream BufferInputStream])
   (:import [org.apache.storm.generated NotAliveException AlreadyAliveException StormTopology ErrorInfo
             ExecutorInfo InvalidTopologyException Nimbus$Iface Nimbus$Processor SubmitOptions TopologyInitialStatus
@@ -235,7 +235,7 @@
 (defn- get-key-list-from-id
   [conf id]
   (log-debug "set keys id = " id "set = " #{(master-stormcode-key id) (master-stormjar-key id) (master-stormconf-key id)})
-  (if (local-mode? conf)
+  (if (ConfigUtils/isLocalMode conf)
     [(master-stormcode-key id) (master-stormconf-key id)]
     [(master-stormcode-key id) (master-stormjar-key id) (master-stormconf-key id)]))
 
@@ -488,7 +488,7 @@
 (defn- wait-for-desired-code-replication [nimbus conf storm-id]
   (let [min-replication-count (conf TOPOLOGY-MIN-REPLICATION-COUNT)
         max-replication-wait-time (conf TOPOLOGY-MAX-REPLICATION-WAIT-TIME-SEC)
-        current-replication-count-jar (if (not (local-mode? conf))
+        current-replication-count-jar (if (not (ConfigUtils/isLocalMode conf))
                                         (atom (get-blob-replication-count (master-stormjar-key storm-id) nimbus))
                                         (atom min-replication-count))
         current-replication-count-code (atom (get-blob-replication-count (master-stormcode-key storm-id) nimbus))
@@ -504,12 +504,12 @@
         (sleep-secs 1)
         (log-debug "waiting for desired replication to be achieved.
           min-replication-count = " min-replication-count  " max-replication-wait-time = " max-replication-wait-time
-          (if (not (local-mode? conf))"current-replication-count for jar key = " @current-replication-count-jar)
+          (if (not (ConfigUtils/isLocalMode conf))"current-replication-count for jar key = " @current-replication-count-jar)
           "current-replication-count for code key = " @current-replication-count-code
           "current-replication-count for conf key = " @current-replication-count-conf
           " total-wait-time " @total-wait-time)
         (swap! total-wait-time inc)
-        (if (not (local-mode? conf))
+        (if (not (ConfigUtils/isLocalMode conf))
           (reset! current-replication-count-conf  (get-blob-replication-count (master-stormconf-key storm-id) nimbus)))
         (reset! current-replication-count-code  (get-blob-replication-count (master-stormcode-key storm-id) nimbus))
         (reset! current-replication-count-jar  (get-blob-replication-count (master-stormjar-key storm-id) nimbus))))

http://git-wip-us.apache.org/repos/asf/storm/blob/0b90a63d/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index e4b44b0..c01ea62 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@ -16,7 +16,7 @@
 (ns org.apache.storm.daemon.supervisor
   (:import [java.io File IOException FileOutputStream])
   (:import [org.apache.storm.scheduler ISupervisor]
-           [org.apache.storm.utils LocalState Time Utils]
+           [org.apache.storm.utils LocalState Time Utils ConfigUtils]
            [org.apache.storm.daemon Shutdownable]
            [org.apache.storm Constants]
            [org.apache.storm.cluster ClusterStateContext DaemonType]
@@ -352,7 +352,7 @@
         stormcodepath (supervisor-stormcode-path stormroot)
         stormconfpath (supervisor-stormconf-path stormroot)]
     (and (every? exists-file? [stormroot stormconfpath stormcodepath])
-         (or (local-mode? conf)
+         (or (ConfigUtils/isLocalMode conf)
              (exists-file? stormjarpath)))))
 
 (defn get-worker-assignment-helper-msg

http://git-wip-us.apache.org/repos/asf/storm/blob/0b90a63d/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index 9607d77..aea4372 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -25,7 +25,7 @@
   (:import [java.util.concurrent Executors]
            [org.apache.storm.hooks IWorkerHook BaseWorkerHook])
   (:import [java.util ArrayList HashMap])
-  (:import [org.apache.storm.utils Utils TransferDrainer ThriftTopologyUtils WorkerBackpressureThread DisruptorQueue])
+  (:import [org.apache.storm.utils Utils ConfigUtils TransferDrainer ThriftTopologyUtils WorkerBackpressureThread DisruptorQueue])
   (:import [org.apache.storm.grouping LoadMapping])
   (:import [org.apache.storm.messaging TransportFactory])
   (:import [org.apache.storm.messaging TaskMessage IContext IConnection ConnectionWithStatus ConnectionWithStatus$Status])
@@ -575,11 +575,11 @@
 (defserverfn mk-worker [conf shared-mq-context storm-id assignment-id port worker-id]
   (log-message "Launching worker for " storm-id " on " assignment-id ":" port " with id " worker-id
                " and conf " conf)
-  (if-not (local-mode? conf)
+  (if-not (ConfigUtils/isLocalMode conf)
     (redirect-stdio-to-slf4j!))
   ;; because in local mode, its not a separate
   ;; process. supervisor will register it in this case
-  (when (= :distributed (cluster-mode conf))
+  (when (= :distributed (ConfigUtils/clusterMode conf))
     (let [pid (process-pid)]
       (touch (worker-pid-path conf worker-id pid))
       (spit (worker-artifacts-pid-path conf storm-id port) pid)))

http://git-wip-us.apache.org/repos/asf/storm/blob/0b90a63d/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
index 67440ac..4b8df65 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
@@ -19,6 +19,7 @@
 package org.apache.storm.utils;
 
 import org.apache.storm.Config;
+import org.apache.storm.validation.ConfigValidation;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.utils.LocalState;
 import org.apache.storm.utils.Utils;
@@ -26,15 +27,7 @@ import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.Reader;
-import java.io.BufferedWriter;
-import java.io.BufferedReader;
+import java.io.*;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.lang.reflect.Field;
@@ -102,7 +95,7 @@ public class ConfigUtils {
 
     }
 
-    public static boolean localMode(Map conf) {
+    public static boolean isLocalMode(Map conf) {
         String mode = (String) conf.get(Config.STORM_CLUSTER_MODE);
         if (mode != null) {
             if ("local".equals(mode)) {
@@ -123,18 +116,49 @@ public class ConfigUtils {
         throw new IllegalArgumentException("Illegal topology.stats.sample.rate in conf: " + rate);
     }
 
-    // public static mkStatsSampler // depends on Utils.evenSampler() TODO
+    // public static mkStatsSampler // depends on Utils.evenSampler() TODO, this is sth we have to do
 
-    // public static readDefaultConfig // depends on Utils.clojurifyStructure and Utils.readDefaultConfig // not necessary indeed
+    // public static readDefaultConfig // depends on Utils.clojurifyStructure and Utils.readDefaultConfig // TODO
 
     // validate-configs-with-schemas is just a wrapper of ConfigValidation.validateFields(conf) TODO
 
+    //For testing only
+    // for java
+    // try (SetMockedStormConfig mocked = new SetMockedStormConfig(conf)) {
+    //    run test ...
+    // }
+    //
+    // for clojure
+    // (let [something (SetMockedStormConfig. conf)]
+    //   (try
+    //     run test ...
+    //     (finally (.close something))))
+    public static class SetMockedStormConfig implements Closeable {
+        public SetMockedStormConfig(Map conf) {
+            mockedStormConfig = conf;
+        }
+
+        @Override
+        public void close() {
+            mockedStormConfig = null;
+        }
+    }
+    private static Map mockedStormConfig = null;
     public static Map readStormConfig() {
-        return Utils.readStormConfig();
+        if (mockedStormConfig != null) return mockedStormConfig;
+        Map conf = Utils.readStormConfig();
+        ConfigValidation.validateFields(conf);
+        return conf; // TODO, should this be clojurify-sturecture and then return? Otherwise, the clj files who call it fail
+    }
+
+    public static Map readYamlConfig(String name, boolean mustExist) {
+        Map conf = Utils.findAndReadConfigFile(name, mustExist);
+        ConfigValidation.validateFields(conf);
+        return conf;
     }
 
     public static Map readYamlConfig(String name) {
-        return Utils.findAndReadConfigFile(name, true);
+        return  readYamlConfig(name, true);
     }
 
     public static String absoluteStormLocalDir(Map conf) {

http://git-wip-us.apache.org/repos/asf/storm/blob/0b90a63d/storm-core/test/clj/org/apache/storm/cluster_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/cluster_test.clj b/storm-core/test/clj/org/apache/storm/cluster_test.clj
index e082768..7834b54 100644
--- a/storm-core/test/clj/org/apache/storm/cluster_test.clj
+++ b/storm-core/test/clj/org/apache/storm/cluster_test.clj
@@ -22,7 +22,7 @@
   (:import [org.mockito Mockito])
   (:import [org.mockito.exceptions.base MockitoAssertionError])
   (:import [org.apache.curator.framework CuratorFramework CuratorFrameworkFactory CuratorFrameworkFactory$Builder])
-  (:import [org.apache.storm.utils Utils TestUtils ZookeeperAuthInfo])
+  (:import [org.apache.storm.utils Utils TestUtils ZookeeperAuthInfo ConfigUtils])
   (:import [org.apache.storm.cluster ClusterState])
   (:require [org.apache.storm [zookeeper :as zk]])
   (:require [conjure.core])
@@ -31,7 +31,7 @@
   (:use [org.apache.storm cluster config util testing thrift log]))
 
 (defn mk-config [zk-port]
-  (merge (read-storm-config)
+  (merge (clojurify-structure (ConfigUtils/readStormConfig))
          {STORM-ZOOKEEPER-PORT zk-port
           STORM-ZOOKEEPER-SERVERS ["localhost"]}))