You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/09/28 07:21:38 UTC
[4/9] storm git commit: STORM-2084: Refactor localization to combine
files together
STORM-2084: Refactor localization to combine files together
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/78cb243c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/78cb243c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/78cb243c
Branch: refs/heads/master
Commit: 78cb243c4bc9aaeaffd6f1c76915ac20016b32e7
Parents: 66ff5fd
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Fri Sep 15 13:29:40 2017 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Sep 21 15:59:24 2017 -0500
----------------------------------------------------------------------
conf/defaults.yaml | 2 +-
.../src/jvm/org/apache/storm/StormTimer.java | 2 +-
.../org/apache/storm/testing4j_test.clj | 44 +-
.../daemon/supervisor/ReadClusterState.java | 4 +-
.../apache/storm/daemon/supervisor/Slot.java | 24 +-
.../storm/daemon/supervisor/Supervisor.java | 53 +-
.../daemon/supervisor/SupervisorUtils.java | 33 -
.../daemon/supervisor/timer/UpdateBlobs.java | 111 --
.../org/apache/storm/event/EventManagerImp.java | 2 +-
.../apache/storm/localizer/AsyncLocalizer.java | 1030 +++++++++++++++---
.../org/apache/storm/localizer/ILocalizer.java | 70 --
.../localizer/LocalDownloadedResource.java | 42 +-
.../org/apache/storm/localizer/Localizer.java | 695 ------------
.../org/apache/storm/utils/ServerUtils.java | 18 +-
.../storm/daemon/supervisor/SlotTest.java | 24 +-
.../storm/localizer/AsyncLocalizerTest.java | 699 +++++++++++-
.../apache/storm/localizer/LocalizerTest.java | 682 ------------
17 files changed, 1627 insertions(+), 1908 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index c6ef390..679a74b 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -123,7 +123,7 @@ supervisor.blobstore.class: "org.apache.storm.blobstore.NimbusBlobStore"
supervisor.blobstore.download.thread.count: 5
supervisor.blobstore.download.max_retries: 3
supervisor.localizer.cache.target.size.mb: 10240
-supervisor.localizer.cleanup.interval.ms: 600000
+supervisor.localizer.cleanup.interval.ms: 30000
nimbus.blobstore.class: "org.apache.storm.blobstore.LocalFsBlobStore"
nimbus.blobstore.expiration.secs: 600
http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-client/src/jvm/org/apache/storm/StormTimer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/StormTimer.java b/storm-client/src/jvm/org/apache/storm/StormTimer.java
index 4f6a7d5..b2e2b4a 100644
--- a/storm-client/src/jvm/org/apache/storm/StormTimer.java
+++ b/storm-client/src/jvm/org/apache/storm/StormTimer.java
@@ -225,7 +225,7 @@ public class StormTimer implements AutoCloseable {
*/
@Override
- public void close() throws Exception {
+ public void close() throws InterruptedException {
if (this.task.isActive()) {
this.task.setActive(false);
this.task.interrupt();
http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj b/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
index 87e1fc0..1b12928 100644
--- a/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
+++ b/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
@@ -65,13 +65,13 @@
(reify TestJob
(^void run [this ^ILocalCluster cluster]
(let [topology (Thrift/buildTopology
- {"1" (Thrift/prepareSpoutDetails (TestWordSpout. true) (Integer. 3))}
+ {"spout" (Thrift/prepareSpoutDetails (TestWordSpout. true) (Integer. 3))}
{"2" (Thrift/prepareBoltDetails
- {(GlobalStreamId. "1" Utils/DEFAULT_STREAM_ID)
+ {(GlobalStreamId. "spout" Utils/DEFAULT_STREAM_ID)
(Thrift/prepareFieldsGrouping ["word"])}
(TestWordCounter.) (Integer. 4))
"3" (Thrift/prepareBoltDetails
- {(GlobalStreamId. "1" Utils/DEFAULT_STREAM_ID)
+ {(GlobalStreamId. "spout" Utils/DEFAULT_STREAM_ID)
(Thrift/prepareGlobalGrouping)}
(TestGlobalCount.))
"4" (Thrift/prepareBoltDetails
@@ -79,7 +79,7 @@
(Thrift/prepareGlobalGrouping)}
(TestAggregatesCounter.))})
mocked-sources (doto (MockedSources.)
- (.addMockData "1" (into-array Values [(Values. (into-array ["nathan"]))
+ (.addMockData "spout" (into-array Values [(Values. (into-array ["nathan"]))
(Values. (into-array ["bob"]))
(Values. (into-array ["joey"]))
(Values. (into-array ["nathan"]))])
@@ -93,7 +93,7 @@
topology
complete-topology-param)]
(is (Testing/multiseteq [["nathan"] ["bob"] ["joey"] ["nathan"]]
- (Testing/readTuples results "1")))
+ (Testing/readTuples results "spout")))
(is (Testing/multiseteq [["nathan" (int 1)] ["nathan" (int 2)] ["bob" (int 1)] ["joey" (int 1)]]
(Testing/readTuples results "2")))
(is (= [[1] [2] [3] [4]]
@@ -102,18 +102,36 @@
(Testing/readTuples results "4")))
))))
-(deftest test-complete-topology
- (doseq [zmq-on? [true false]
- :let [daemon-conf (doto (Config.)
- (.put STORM-LOCAL-MODE-ZMQ zmq-on?))
- mk-cluster-param (doto (MkClusterParam.)
- (.setSupervisors (int 4))
- (.setDaemonConf daemon-conf))]]
+(deftest test-complete-topology-netty-simulated
+ (let [daemon-conf (doto (Config.)
+ (.put STORM-LOCAL-MODE-ZMQ true))
+ mk-cluster-param (doto (MkClusterParam.)
+ (.setSupervisors (int 4))
+ (.setDaemonConf daemon-conf))]
(Testing/withSimulatedTimeLocalCluster
- mk-cluster-param complete-topology-testjob )
+ mk-cluster-param complete-topology-testjob)))
+
+(deftest test-complete-topology-netty
+ (let [daemon-conf (doto (Config.)
+ (.put STORM-LOCAL-MODE-ZMQ true))
+ mk-cluster-param (doto (MkClusterParam.)
+ (.setSupervisors (int 4))
+ (.setDaemonConf daemon-conf))]
(Testing/withLocalCluster
mk-cluster-param complete-topology-testjob)))
+(deftest test-complete-topology-local
+ (let [mk-cluster-param (doto (MkClusterParam.)
+ (.setSupervisors (int 4)))]
+ (Testing/withLocalCluster
+ mk-cluster-param complete-topology-testjob)))
+
+(deftest test-complete-topology-local-simulated
+ (let [mk-cluster-param (doto (MkClusterParam.)
+ (.setSupervisors (int 4)))]
+ (Testing/withSimulatedTimeLocalCluster
+ mk-cluster-param complete-topology-testjob)))
+
(deftest test-with-tracked-cluster
(Testing/withTrackedCluster
(reify TestJob
http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
index e346a09..d68e512 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
@@ -41,7 +41,7 @@ import org.apache.storm.generated.LocalAssignment;
import org.apache.storm.generated.NodeInfo;
import org.apache.storm.generated.ProfileRequest;
import org.apache.storm.generated.WorkerResources;
-import org.apache.storm.localizer.ILocalizer;
+import org.apache.storm.localizer.AsyncLocalizer;
import org.apache.storm.scheduler.ISupervisor;
import org.apache.storm.utils.LocalState;
import org.apache.storm.utils.Time;
@@ -60,7 +60,7 @@ public class ReadClusterState implements Runnable, AutoCloseable {
private final AtomicInteger readRetry = new AtomicInteger(0);
private final String assignmentId;
private final ISupervisor iSuper;
- private final ILocalizer localizer;
+ private final AsyncLocalizer localizer;
private final ContainerLauncher launcher;
private final String host;
private final LocalState localState;
http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
index d221b71..6533d15 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
@@ -43,7 +43,7 @@ import org.apache.storm.generated.LSWorkerHeartbeat;
import org.apache.storm.generated.LocalAssignment;
import org.apache.storm.generated.ProfileAction;
import org.apache.storm.generated.ProfileRequest;
-import org.apache.storm.localizer.ILocalizer;
+import org.apache.storm.localizer.AsyncLocalizer;
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.scheduler.ISupervisor;
import org.apache.storm.utils.LocalState;
@@ -79,7 +79,7 @@ public class Slot extends Thread implements AutoCloseable {
}
static class StaticState {
- public final ILocalizer localizer;
+ public final AsyncLocalizer localizer;
public final long hbTimeoutMs;
public final long firstHbTimeoutMs;
public final long killSleepMs;
@@ -90,10 +90,10 @@ public class Slot extends Thread implements AutoCloseable {
public final ISupervisor iSupervisor;
public final LocalState localState;
- StaticState(ILocalizer localizer, long hbTimeoutMs, long firstHbTimeoutMs,
- long killSleepMs, long monitorFreqMs,
- ContainerLauncher containerLauncher, String host, int port,
- ISupervisor iSupervisor, LocalState localState) {
+ StaticState(AsyncLocalizer localizer, long hbTimeoutMs, long firstHbTimeoutMs,
+ long killSleepMs, long monitorFreqMs,
+ ContainerLauncher containerLauncher, String host, int port,
+ ISupervisor iSupervisor, LocalState localState) {
this.localizer = localizer;
this.hbTimeoutMs = hbTimeoutMs;
this.firstHbTimeoutMs = firstHbTimeoutMs;
@@ -684,12 +684,12 @@ public class Slot extends Thread implements AutoCloseable {
private volatile DynamicState dynamicState;
private final AtomicReference<Map<Long, LocalAssignment>> cachedCurrentAssignments;
- public Slot(ILocalizer localizer, Map<String, Object> conf,
- ContainerLauncher containerLauncher, String host,
- int port, LocalState localState,
- IStormClusterState clusterState,
- ISupervisor iSupervisor,
- AtomicReference<Map<Long, LocalAssignment>> cachedCurrentAssignments) throws Exception {
+ public Slot(AsyncLocalizer localizer, Map<String, Object> conf,
+ ContainerLauncher containerLauncher, String host,
+ int port, LocalState localState,
+ IStormClusterState clusterState,
+ ISupervisor iSupervisor,
+ AtomicReference<Map<Long, LocalAssignment>> cachedCurrentAssignments) throws Exception {
super("SLOT_"+port);
this.cachedCurrentAssignments = cachedCurrentAssignments;
http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
index 1f8d4c3..08d32f1 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.daemon.supervisor;
import java.io.File;
@@ -25,7 +26,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReference;
@@ -39,19 +39,15 @@ import org.apache.storm.cluster.IStormClusterState;
import org.apache.storm.daemon.DaemonCommon;
import org.apache.storm.daemon.supervisor.timer.SupervisorHealthCheck;
import org.apache.storm.daemon.supervisor.timer.SupervisorHeartbeat;
-import org.apache.storm.daemon.supervisor.timer.UpdateBlobs;
import org.apache.storm.event.EventManager;
import org.apache.storm.event.EventManagerImp;
import org.apache.storm.generated.LocalAssignment;
import org.apache.storm.localizer.AsyncLocalizer;
-import org.apache.storm.localizer.ILocalizer;
-import org.apache.storm.localizer.Localizer;
import org.apache.storm.messaging.IContext;
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.scheduler.ISupervisor;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.ServerConfigUtils;
-import org.apache.storm.utils.ServerUtils;
import org.apache.storm.utils.Utils;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.LocalState;
@@ -78,8 +74,6 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
private final AtomicReference<Map<Long, LocalAssignment>> currAssignment;
private final StormTimer heartbeatTimer;
private final StormTimer eventTimer;
- private final StormTimer blobUpdateTimer;
- private final Localizer localizer;
private final AsyncLocalizer asyncLocalizer;
private EventManager eventManager;
private ReadClusterState readState;
@@ -110,10 +104,11 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
throw Utils.wrapInRuntime(e);
}
+ this.currAssignment = new AtomicReference<>(new HashMap<>());
+
try {
this.localState = ServerConfigUtils.supervisorState(conf);
- this.localizer = ServerUtils.createLocalizer(conf, ConfigUtils.supervisorLocalDir(conf));
- this.asyncLocalizer = new AsyncLocalizer(conf, this.localizer);
+ this.asyncLocalizer = new AsyncLocalizer(conf, currAssignment, localState.getLocalAssignmentsMap());
} catch (IOException e) {
throw Utils.wrapInRuntime(e);
}
@@ -126,13 +121,9 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
throw Utils.wrapInRuntime(e);
}
- this.currAssignment = new AtomicReference<Map<Long, LocalAssignment>>(new HashMap<Long,LocalAssignment>());
-
this.heartbeatTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler());
this.eventTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler());
-
- this.blobUpdateTimer = new StormTimer("blob-update-timer", new DefaultUncaughtExceptionHandler());
}
public String getId() {
@@ -178,12 +169,8 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
public AtomicReference<Map<Long, LocalAssignment>> getCurrAssignment() {
return currAssignment;
}
-
- public Localizer getLocalizer() {
- return localizer;
- }
- ILocalizer getAsyncLocalizer() {
+ AsyncLocalizer getAsyncLocalizer() {
return asyncLocalizer;
}
@@ -199,8 +186,6 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
String path = ServerConfigUtils.supervisorTmpDir(conf);
FileUtils.cleanDirectory(new File(path));
- Localizer localizer = getLocalizer();
-
SupervisorHeartbeat hb = new SupervisorHeartbeat(conf, this);
hb.run();
// should synchronize supervisor so it doesn't launch anything after being down (optimization)
@@ -209,36 +194,14 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
this.eventManager = new EventManagerImp(false);
this.readState = new ReadClusterState(this);
-
- Set<String> downloadedTopoIds = SupervisorUtils.readDownloadedTopologyIds(conf);
- Map<Integer, LocalAssignment> portToAssignments = localState.getLocalAssignmentsMap();
- if (portToAssignments != null) {
- Map<String, LocalAssignment> assignments = new HashMap<>();
- for (LocalAssignment la : localState.getLocalAssignmentsMap().values()) {
- assignments.put(la.get_topology_id(), la);
- }
- for (String topoId : downloadedTopoIds) {
- LocalAssignment la = assignments.get(topoId);
- if (la != null) {
- SupervisorUtils.addBlobReferences(localizer, topoId, conf, la.get_owner());
- } else {
- LOG.warn("Could not find an owner for topo {}", topoId);
- }
- }
- }
- // do this after adding the references so we don't try to clean things being used
- localizer.startCleaner();
- UpdateBlobs updateBlobsThread = new UpdateBlobs(this);
+ asyncLocalizer.start();
if ((Boolean) conf.get(DaemonConfig.SUPERVISOR_ENABLE)) {
// This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up
// to date even if callbacks don't all work exactly right
eventTimer.scheduleRecurring(0, 10, new EventManagerPushCallback(readState, eventManager));
- // Blob update thread. Starts with 30 seconds delay, every 30 seconds
- blobUpdateTimer.scheduleRecurring(30, 30, new EventManagerPushCallback(updateBlobsThread, eventManager));
-
// supervisor health check
eventTimer.scheduleRecurring(300, 300, new SupervisorHealthCheck(this));
}
@@ -282,15 +245,13 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
this.active = false;
heartbeatTimer.close();
eventTimer.close();
- blobUpdateTimer.close();
if (eventManager != null) {
eventManager.close();
}
if (readState != null) {
readState.close();
}
- asyncLocalizer.shutdown();
- localizer.shutdown();
+ asyncLocalizer.close();
getStormClusterState().disconnect();
} catch (Exception e) {
LOG.error("Error Shutting down", e);
http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
index 09c2b5d..33574c3 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
@@ -19,9 +19,7 @@ package org.apache.storm.daemon.supervisor;
import org.apache.storm.Config;
import org.apache.storm.generated.LSWorkerHeartbeat;
-import org.apache.storm.generated.LocalAssignment;
import org.apache.storm.localizer.LocalResource;
-import org.apache.storm.localizer.Localizer;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.ServerUtils;
import org.apache.storm.utils.Utils;
@@ -33,14 +31,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
public class SupervisorUtils {
@@ -95,34 +90,6 @@ public class SupervisorUtils {
return localResourceList;
}
- /**
- * For each of the downloaded topologies, adds references to the blobs that the topologies are using. This is used to reconstruct the
- * cache on restart.
- *
- * @param localizer
- * @param stormId
- * @param conf
- */
- static void addBlobReferences(Localizer localizer, String stormId, Map<String, Object> conf, String user) throws IOException {
- Map<String, Object> topoConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
- Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
- String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
- List<LocalResource> localresources = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap);
- if (blobstoreMap != null) {
- localizer.addReferences(localresources, user, topoName);
- }
- }
-
- public static Set<String> readDownloadedTopologyIds(Map<String, Object> conf) throws IOException {
- Set<String> stormIds = new HashSet<>();
- String path = ConfigUtils.supervisorStormDistRoot(conf);
- Collection<String> rets = ConfigUtils.readDirContents(path);
- for (String ret : rets) {
- stormIds.add(URLDecoder.decode(ret));
- }
- return stormIds;
- }
-
public static Collection<String> supervisorWorkerIds(Map<String, Object> conf) {
String workerRoot = ConfigUtils.workerRoot(conf);
return ConfigUtils.readDirContents(workerRoot);
http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
deleted file mode 100644
index b5dbf57..0000000
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon.supervisor.timer;
-
-import java.util.HashMap;
-import org.apache.storm.Config;
-import org.apache.storm.daemon.supervisor.Supervisor;
-import org.apache.storm.daemon.supervisor.SupervisorUtils;
-import org.apache.storm.generated.AuthorizationException;
-import org.apache.storm.generated.KeyNotFoundException;
-import org.apache.storm.generated.LocalAssignment;
-import org.apache.storm.localizer.LocalResource;
-import org.apache.storm.localizer.Localizer;
-import org.apache.storm.utils.ConfigUtils;
-import org.apache.storm.utils.Utils;
-import org.apache.storm.utils.NimbusLeaderNotFoundException;
-import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * downloads all blobs listed in the topology configuration for all topologies assigned to this supervisor, and creates version files with a suffix. The
- * Runnable is intended to be run periodically by a timer, created elsewhere.
- */
-public class UpdateBlobs implements Runnable {
-
- private static final Logger LOG = LoggerFactory.getLogger(UpdateBlobs.class);
-
- private Supervisor supervisor;
-
- public UpdateBlobs(Supervisor supervisor) {
- this.supervisor = supervisor;
- }
-
- @Override
- public void run() {
- try {
- Map<String, Object> conf = supervisor.getConf();
- Set<String> downloadedStormIds = SupervisorUtils.readDownloadedTopologyIds(conf);
- AtomicReference<Map<Long, LocalAssignment>> newAssignment = supervisor.getCurrAssignment();
- Map<String, LocalAssignment> assignedStormIds = new HashMap<>();
- for (LocalAssignment localAssignment : newAssignment.get().values()) {
- assignedStormIds.put(localAssignment.get_topology_id(), localAssignment);
- }
- for (String stormId : downloadedStormIds) {
- LocalAssignment la = assignedStormIds.get(stormId);
- if (la != null) {
- if (la.get_owner() == null) {
- //We got a case where the local assignment is not up to date, no point in going on...
- LOG.warn("The blobs will not be updated for {} until the local assignment is updated...", stormId);
- } else {
- String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
- LOG.debug("Checking Blob updates for storm topology id {} With target_dir: {}", stormId, stormRoot);
- updateBlobsForTopology(conf, stormId, supervisor.getLocalizer(), la.get_owner());
- }
- }
- }
- } catch (Exception e) {
- if (Utils.exceptionCauseIsInstanceOf(TTransportException.class, e)) {
- LOG.error("Network error while updating blobs, will retry again later", e);
- } else if (Utils.exceptionCauseIsInstanceOf(NimbusLeaderNotFoundException.class, e)) {
- LOG.error("Nimbus unavailable to update blobs, will retry again later", e);
- } else {
- throw Utils.wrapInRuntime(e);
- }
- }
- }
-
- /**
- * Update each blob listed in the topology configuration if the latest version of the blob has not been downloaded.
- *
- * @param conf
- * @param stormId
- * @param localizer
- * @throws IOException
- */
- private void updateBlobsForTopology(Map<String, Object> conf, String stormId, Localizer localizer, String user) throws IOException {
- Map<String, Object> topoConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
- Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
- List<LocalResource> localresources = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap);
- try {
- localizer.updateBlobs(localresources, user);
- } catch (AuthorizationException authExp) {
- LOG.error("AuthorizationException error", authExp);
- } catch (KeyNotFoundException knf) {
- LOG.error("KeyNotFoundException error", knf);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/main/java/org/apache/storm/event/EventManagerImp.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/event/EventManagerImp.java b/storm-server/src/main/java/org/apache/storm/event/EventManagerImp.java
index 0a64370..6b9d4f1 100644
--- a/storm-server/src/main/java/org/apache/storm/event/EventManagerImp.java
+++ b/storm-server/src/main/java/org/apache/storm/event/EventManagerImp.java
@@ -73,7 +73,7 @@ public class EventManagerImp implements EventManager {
runner.start();
}
- public void proccessInc() {
+ private void proccessInc() {
processed.incrementAndGet();
}