You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by zh...@apache.org on 2016/01/28 18:49:44 UTC
[08/23] storm git commit: To continue update callers to
read-storm-config
To continue update callers to read-storm-config
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0b90a63d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0b90a63d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0b90a63d
Branch: refs/heads/master
Commit: 0b90a63d5ac7075524e9fabc7a49bda47b41b946
Parents: 6ce9841
Author: zhuol <zh...@yahoo-inc.com>
Authored: Wed Jan 13 16:45:39 2016 -0600
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Wed Jan 20 09:55:24 2016 -0600
----------------------------------------------------------------------
.../src/clj/org/apache/storm/daemon/common.clj | 4 +-
.../clj/org/apache/storm/daemon/executor.clj | 4 +-
.../src/clj/org/apache/storm/daemon/nimbus.clj | 10 ++--
.../clj/org/apache/storm/daemon/supervisor.clj | 4 +-
.../src/clj/org/apache/storm/daemon/worker.clj | 6 +--
.../jvm/org/apache/storm/utils/ConfigUtils.java | 52 ++++++++++++++------
.../test/clj/org/apache/storm/cluster_test.clj | 4 +-
7 files changed, 54 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/0b90a63d/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj b/storm-core/src/clj/org/apache/storm/daemon/common.clj
index dd761a5..77791f4 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@ -18,7 +18,7 @@
(:import [org.apache.storm.generated StormTopology
InvalidTopologyException GlobalStreamId]
[org.apache.storm.utils ThriftTopologyUtils])
- (:import [org.apache.storm.utils Utils])
+ (:import [org.apache.storm.utils Utils ConfigUtils])
(:import [org.apache.storm.task WorkerTopologyContext])
(:import [org.apache.storm Constants])
(:import [org.apache.storm.metric SystemBolt])
@@ -88,7 +88,7 @@
))
(defn validate-distributed-mode! [conf]
- (if (local-mode? conf)
+ (if (ConfigUtils/isLocalMode conf)
(throw
(IllegalArgumentException. "Cannot start server in local mode!"))))
http://git-wip-us.apache.org/repos/asf/storm/blob/0b90a63d/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 8052835..590379d 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -28,7 +28,7 @@
(:import [org.apache.storm.grouping CustomStreamGrouping])
(:import [org.apache.storm.task WorkerTopologyContext IBolt OutputCollector IOutputCollector])
(:import [org.apache.storm.generated GlobalStreamId])
- (:import [org.apache.storm.utils Utils TupleUtils MutableObject RotatingMap RotatingMap$ExpiredCallback MutableLong Time DisruptorQueue WorkerBackpressureThread])
+ (:import [org.apache.storm.utils Utils ConfigUtils TupleUtils MutableObject RotatingMap RotatingMap$ExpiredCallback MutableLong Time DisruptorQueue WorkerBackpressureThread])
(:import [com.lmax.disruptor InsufficientCapacityException])
(:import [org.apache.storm.serialization KryoTupleSerializer])
(:import [org.apache.storm.daemon Shutdownable])
@@ -256,7 +256,7 @@
:context (ClusterStateContext. DaemonType/WORKER))
:type executor-type
;; TODO: should refactor this to be part of the executor specific map (spout or bolt with :common field)
- :stats (mk-executor-stats <> (sampling-rate storm-conf))
+ :stats (mk-executor-stats <> (ConfigUtils/samplingRate storm-conf))
:interval->task->metric-registry (HashMap.)
:task->component (:task->component worker)
:stream->component->grouper (outbound-components worker-context component-id storm-conf)
http://git-wip-us.apache.org/repos/asf/storm/blob/0b90a63d/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index e3bdd5f..163f301 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -38,7 +38,7 @@
(:import [org.apache.storm.scheduler INimbus SupervisorDetails WorkerSlot TopologyDetails
Cluster Topologies SchedulerAssignment SchedulerAssignmentImpl DefaultScheduler ExecutorDetails])
(:import [org.apache.storm.nimbus NimbusInfo])
- (:import [org.apache.storm.utils TimeCacheMap TimeCacheMap$ExpiredCallback Utils TupleUtils ThriftTopologyUtils
+ (:import [org.apache.storm.utils TimeCacheMap TimeCacheMap$ExpiredCallback Utils ConfigUtils TupleUtils ThriftTopologyUtils
BufferFileInputStream BufferInputStream])
(:import [org.apache.storm.generated NotAliveException AlreadyAliveException StormTopology ErrorInfo
ExecutorInfo InvalidTopologyException Nimbus$Iface Nimbus$Processor SubmitOptions TopologyInitialStatus
@@ -235,7 +235,7 @@
(defn- get-key-list-from-id
[conf id]
(log-debug "set keys id = " id "set = " #{(master-stormcode-key id) (master-stormjar-key id) (master-stormconf-key id)})
- (if (local-mode? conf)
+ (if (ConfigUtils/isLocalMode conf)
[(master-stormcode-key id) (master-stormconf-key id)]
[(master-stormcode-key id) (master-stormjar-key id) (master-stormconf-key id)]))
@@ -488,7 +488,7 @@
(defn- wait-for-desired-code-replication [nimbus conf storm-id]
(let [min-replication-count (conf TOPOLOGY-MIN-REPLICATION-COUNT)
max-replication-wait-time (conf TOPOLOGY-MAX-REPLICATION-WAIT-TIME-SEC)
- current-replication-count-jar (if (not (local-mode? conf))
+ current-replication-count-jar (if (not (ConfigUtils/isLocalMode conf))
(atom (get-blob-replication-count (master-stormjar-key storm-id) nimbus))
(atom min-replication-count))
current-replication-count-code (atom (get-blob-replication-count (master-stormcode-key storm-id) nimbus))
@@ -504,12 +504,12 @@
(sleep-secs 1)
(log-debug "waiting for desired replication to be achieved.
min-replication-count = " min-replication-count " max-replication-wait-time = " max-replication-wait-time
- (if (not (local-mode? conf))"current-replication-count for jar key = " @current-replication-count-jar)
+ (if (not (ConfigUtils/isLocalMode conf))"current-replication-count for jar key = " @current-replication-count-jar)
"current-replication-count for code key = " @current-replication-count-code
"current-replication-count for conf key = " @current-replication-count-conf
" total-wait-time " @total-wait-time)
(swap! total-wait-time inc)
- (if (not (local-mode? conf))
+ (if (not (ConfigUtils/isLocalMode conf))
(reset! current-replication-count-conf (get-blob-replication-count (master-stormconf-key storm-id) nimbus)))
(reset! current-replication-count-code (get-blob-replication-count (master-stormcode-key storm-id) nimbus))
(reset! current-replication-count-jar (get-blob-replication-count (master-stormjar-key storm-id) nimbus))))
http://git-wip-us.apache.org/repos/asf/storm/blob/0b90a63d/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index e4b44b0..c01ea62 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@ -16,7 +16,7 @@
(ns org.apache.storm.daemon.supervisor
(:import [java.io File IOException FileOutputStream])
(:import [org.apache.storm.scheduler ISupervisor]
- [org.apache.storm.utils LocalState Time Utils]
+ [org.apache.storm.utils LocalState Time Utils ConfigUtils]
[org.apache.storm.daemon Shutdownable]
[org.apache.storm Constants]
[org.apache.storm.cluster ClusterStateContext DaemonType]
@@ -352,7 +352,7 @@
stormcodepath (supervisor-stormcode-path stormroot)
stormconfpath (supervisor-stormconf-path stormroot)]
(and (every? exists-file? [stormroot stormconfpath stormcodepath])
- (or (local-mode? conf)
+ (or (ConfigUtils/isLocalMode conf)
(exists-file? stormjarpath)))))
(defn get-worker-assignment-helper-msg
http://git-wip-us.apache.org/repos/asf/storm/blob/0b90a63d/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index 9607d77..aea4372 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -25,7 +25,7 @@
(:import [java.util.concurrent Executors]
[org.apache.storm.hooks IWorkerHook BaseWorkerHook])
(:import [java.util ArrayList HashMap])
- (:import [org.apache.storm.utils Utils TransferDrainer ThriftTopologyUtils WorkerBackpressureThread DisruptorQueue])
+ (:import [org.apache.storm.utils Utils ConfigUtils TransferDrainer ThriftTopologyUtils WorkerBackpressureThread DisruptorQueue])
(:import [org.apache.storm.grouping LoadMapping])
(:import [org.apache.storm.messaging TransportFactory])
(:import [org.apache.storm.messaging TaskMessage IContext IConnection ConnectionWithStatus ConnectionWithStatus$Status])
@@ -575,11 +575,11 @@
(defserverfn mk-worker [conf shared-mq-context storm-id assignment-id port worker-id]
(log-message "Launching worker for " storm-id " on " assignment-id ":" port " with id " worker-id
" and conf " conf)
- (if-not (local-mode? conf)
+ (if-not (ConfigUtils/isLocalMode conf)
(redirect-stdio-to-slf4j!))
;; because in local mode, its not a separate
;; process. supervisor will register it in this case
- (when (= :distributed (cluster-mode conf))
+ (when (= :distributed (ConfigUtils/clusterMode conf))
(let [pid (process-pid)]
(touch (worker-pid-path conf worker-id pid))
(spit (worker-artifacts-pid-path conf storm-id port) pid)))
http://git-wip-us.apache.org/repos/asf/storm/blob/0b90a63d/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
index 67440ac..4b8df65 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
@@ -19,6 +19,7 @@
package org.apache.storm.utils;
import org.apache.storm.Config;
+import org.apache.storm.validation.ConfigValidation;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.utils.LocalState;
import org.apache.storm.utils.Utils;
@@ -26,15 +27,7 @@ import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.Reader;
-import java.io.BufferedWriter;
-import java.io.BufferedReader;
+import java.io.*;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.lang.reflect.Field;
@@ -102,7 +95,7 @@ public class ConfigUtils {
}
- public static boolean localMode(Map conf) {
+ public static boolean isLocalMode(Map conf) {
String mode = (String) conf.get(Config.STORM_CLUSTER_MODE);
if (mode != null) {
if ("local".equals(mode)) {
@@ -123,18 +116,49 @@ public class ConfigUtils {
throw new IllegalArgumentException("Illegal topology.stats.sample.rate in conf: " + rate);
}
- // public static mkStatsSampler // depends on Utils.evenSampler() TODO
+ // public static mkStatsSampler // depends on Utils.evenSampler() TODO, this is sth we have to do
- // public static readDefaultConfig // depends on Utils.clojurifyStructure and Utils.readDefaultConfig // not necessary indeed
+ // public static readDefaultConfig // depends on Utils.clojurifyStructure and Utils.readDefaultConfig // TODO
// validate-configs-with-schemas is just a wrapper of ConfigValidation.validateFields(conf) TODO
+ //For testing only
+ // for java
+ // try (SetMockedStormConfig mocked = new SetMockedStormConfig(conf)) {
+ // run test ...
+ // }
+ //
+ // for clojure
+ // (let [something (SetMockedStormConfig. conf)]
+ // (try
+ // run test ...
+ // (finally (.close something))))
+ public static class SetMockedStormConfig implements Closeable {
+ public SetMockedStormConfig(Map conf) {
+ mockedStormConfig = conf;
+ }
+
+ @Override
+ public void close() {
+ mockedStormConfig = null;
+ }
+ }
+ private static Map mockedStormConfig = null;
public static Map readStormConfig() {
- return Utils.readStormConfig();
+ if (mockedStormConfig != null) return mockedStormConfig;
+ Map conf = Utils.readStormConfig();
+ ConfigValidation.validateFields(conf);
+ return conf; // TODO, should this be clojurify-sturecture and then return? Otherwise, the clj files who call it fail
+ }
+
+ public static Map readYamlConfig(String name, boolean mustExist) {
+ Map conf = Utils.findAndReadConfigFile(name, mustExist);
+ ConfigValidation.validateFields(conf);
+ return conf;
}
public static Map readYamlConfig(String name) {
- return Utils.findAndReadConfigFile(name, true);
+ return readYamlConfig(name, true);
}
public static String absoluteStormLocalDir(Map conf) {
http://git-wip-us.apache.org/repos/asf/storm/blob/0b90a63d/storm-core/test/clj/org/apache/storm/cluster_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/cluster_test.clj b/storm-core/test/clj/org/apache/storm/cluster_test.clj
index e082768..7834b54 100644
--- a/storm-core/test/clj/org/apache/storm/cluster_test.clj
+++ b/storm-core/test/clj/org/apache/storm/cluster_test.clj
@@ -22,7 +22,7 @@
(:import [org.mockito Mockito])
(:import [org.mockito.exceptions.base MockitoAssertionError])
(:import [org.apache.curator.framework CuratorFramework CuratorFrameworkFactory CuratorFrameworkFactory$Builder])
- (:import [org.apache.storm.utils Utils TestUtils ZookeeperAuthInfo])
+ (:import [org.apache.storm.utils Utils TestUtils ZookeeperAuthInfo ConfigUtils])
(:import [org.apache.storm.cluster ClusterState])
(:require [org.apache.storm [zookeeper :as zk]])
(:require [conjure.core])
@@ -31,7 +31,7 @@
(:use [org.apache.storm cluster config util testing thrift log]))
(defn mk-config [zk-port]
- (merge (read-storm-config)
+ (merge (clojurify-structure (ConfigUtils/readStormConfig))
{STORM-ZOOKEEPER-PORT zk-port
STORM-ZOOKEEPER-SERVERS ["localhost"]}))