You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/09/28 07:21:38 UTC

[4/9] storm git commit: STORM-2084: Refactor localization to combine files together

STORM-2084: Refactor localization to combine files together


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/78cb243c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/78cb243c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/78cb243c

Branch: refs/heads/master
Commit: 78cb243c4bc9aaeaffd6f1c76915ac20016b32e7
Parents: 66ff5fd
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Fri Sep 15 13:29:40 2017 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Sep 21 15:59:24 2017 -0500

----------------------------------------------------------------------
 conf/defaults.yaml                              |    2 +-
 .../src/jvm/org/apache/storm/StormTimer.java    |    2 +-
 .../org/apache/storm/testing4j_test.clj         |   44 +-
 .../daemon/supervisor/ReadClusterState.java     |    4 +-
 .../apache/storm/daemon/supervisor/Slot.java    |   24 +-
 .../storm/daemon/supervisor/Supervisor.java     |   53 +-
 .../daemon/supervisor/SupervisorUtils.java      |   33 -
 .../daemon/supervisor/timer/UpdateBlobs.java    |  111 --
 .../org/apache/storm/event/EventManagerImp.java |    2 +-
 .../apache/storm/localizer/AsyncLocalizer.java  | 1030 +++++++++++++++---
 .../org/apache/storm/localizer/ILocalizer.java  |   70 --
 .../localizer/LocalDownloadedResource.java      |   42 +-
 .../org/apache/storm/localizer/Localizer.java   |  695 ------------
 .../org/apache/storm/utils/ServerUtils.java     |   18 +-
 .../storm/daemon/supervisor/SlotTest.java       |   24 +-
 .../storm/localizer/AsyncLocalizerTest.java     |  699 +++++++++++-
 .../apache/storm/localizer/LocalizerTest.java   |  682 ------------
 17 files changed, 1627 insertions(+), 1908 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index c6ef390..679a74b 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -123,7 +123,7 @@ supervisor.blobstore.class: "org.apache.storm.blobstore.NimbusBlobStore"
 supervisor.blobstore.download.thread.count: 5
 supervisor.blobstore.download.max_retries: 3
 supervisor.localizer.cache.target.size.mb: 10240
-supervisor.localizer.cleanup.interval.ms: 600000
+supervisor.localizer.cleanup.interval.ms: 30000
 
 nimbus.blobstore.class: "org.apache.storm.blobstore.LocalFsBlobStore"
 nimbus.blobstore.expiration.secs: 600

http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-client/src/jvm/org/apache/storm/StormTimer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/StormTimer.java b/storm-client/src/jvm/org/apache/storm/StormTimer.java
index 4f6a7d5..b2e2b4a 100644
--- a/storm-client/src/jvm/org/apache/storm/StormTimer.java
+++ b/storm-client/src/jvm/org/apache/storm/StormTimer.java
@@ -225,7 +225,7 @@ public class StormTimer implements AutoCloseable {
      */
 
     @Override
-    public void close() throws Exception {
+    public void close() throws InterruptedException {
         if (this.task.isActive()) {
             this.task.setActive(false);
             this.task.interrupt();

http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj b/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
index 87e1fc0..1b12928 100644
--- a/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
+++ b/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
@@ -65,13 +65,13 @@
   (reify TestJob
        (^void run [this ^ILocalCluster cluster]
          (let [topology (Thrift/buildTopology
-                         {"1" (Thrift/prepareSpoutDetails (TestWordSpout. true) (Integer. 3))}
+                         {"spout" (Thrift/prepareSpoutDetails (TestWordSpout. true) (Integer. 3))}
                          {"2" (Thrift/prepareBoltDetails
-                                {(GlobalStreamId. "1" Utils/DEFAULT_STREAM_ID)
+                                {(GlobalStreamId. "spout" Utils/DEFAULT_STREAM_ID)
                                  (Thrift/prepareFieldsGrouping ["word"])}
                                 (TestWordCounter.) (Integer. 4))
                           "3" (Thrift/prepareBoltDetails
-                                {(GlobalStreamId. "1" Utils/DEFAULT_STREAM_ID)
+                                {(GlobalStreamId. "spout" Utils/DEFAULT_STREAM_ID)
                                  (Thrift/prepareGlobalGrouping)}
                                 (TestGlobalCount.))
                           "4" (Thrift/prepareBoltDetails
@@ -79,7 +79,7 @@
                                  (Thrift/prepareGlobalGrouping)}
                                 (TestAggregatesCounter.))})
                mocked-sources (doto (MockedSources.)
-                                (.addMockData "1" (into-array Values [(Values. (into-array ["nathan"]))
+                                (.addMockData "spout" (into-array Values [(Values. (into-array ["nathan"]))
                                                                       (Values. (into-array ["bob"]))
                                                                       (Values. (into-array ["joey"]))
                                                                       (Values. (into-array ["nathan"]))])
@@ -93,7 +93,7 @@
                                                  topology
                                                  complete-topology-param)]
            (is (Testing/multiseteq [["nathan"] ["bob"] ["joey"] ["nathan"]]
-                           (Testing/readTuples results "1")))
+                           (Testing/readTuples results "spout")))
            (is (Testing/multiseteq [["nathan" (int 1)] ["nathan" (int 2)] ["bob" (int 1)] ["joey" (int 1)]]
                            (Testing/readTuples results "2")))
            (is (= [[1] [2] [3] [4]]
@@ -102,18 +102,36 @@
                   (Testing/readTuples results "4")))
            ))))
 
-(deftest test-complete-topology
-  (doseq [zmq-on? [true false]
-          :let [daemon-conf (doto (Config.)
-                              (.put STORM-LOCAL-MODE-ZMQ zmq-on?))
-                mk-cluster-param (doto (MkClusterParam.)
-                                   (.setSupervisors (int 4))
-                                   (.setDaemonConf daemon-conf))]]
+(deftest test-complete-topology-netty-simulated
+  (let [daemon-conf (doto (Config.)
+                            (.put STORM-LOCAL-MODE-ZMQ true))
+        mk-cluster-param (doto (MkClusterParam.)
+                                 (.setSupervisors (int 4))
+                                 (.setDaemonConf daemon-conf))]
     (Testing/withSimulatedTimeLocalCluster
-      mk-cluster-param complete-topology-testjob )
+      mk-cluster-param complete-topology-testjob)))
+
+(deftest test-complete-topology-netty
+  (let [daemon-conf (doto (Config.)
+                            (.put STORM-LOCAL-MODE-ZMQ true))
+        mk-cluster-param (doto (MkClusterParam.)
+                                 (.setSupervisors (int 4))
+                                 (.setDaemonConf daemon-conf))]
     (Testing/withLocalCluster
       mk-cluster-param complete-topology-testjob)))
 
+(deftest test-complete-topology-local
+  (let [mk-cluster-param (doto (MkClusterParam.)
+                                 (.setSupervisors (int 4)))]
+    (Testing/withLocalCluster
+      mk-cluster-param complete-topology-testjob)))
+
+(deftest test-complete-topology-local-simulated
+  (let [mk-cluster-param (doto (MkClusterParam.)
+                                 (.setSupervisors (int 4)))]
+    (Testing/withSimulatedTimeLocalCluster
+      mk-cluster-param complete-topology-testjob)))
+
 (deftest test-with-tracked-cluster
   (Testing/withTrackedCluster
    (reify TestJob

http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
index e346a09..d68e512 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
@@ -41,7 +41,7 @@ import org.apache.storm.generated.LocalAssignment;
 import org.apache.storm.generated.NodeInfo;
 import org.apache.storm.generated.ProfileRequest;
 import org.apache.storm.generated.WorkerResources;
-import org.apache.storm.localizer.ILocalizer;
+import org.apache.storm.localizer.AsyncLocalizer;
 import org.apache.storm.scheduler.ISupervisor;
 import org.apache.storm.utils.LocalState;
 import org.apache.storm.utils.Time;
@@ -60,7 +60,7 @@ public class ReadClusterState implements Runnable, AutoCloseable {
     private final AtomicInteger readRetry = new AtomicInteger(0);
     private final String assignmentId;
     private final ISupervisor iSuper;
-    private final ILocalizer localizer;
+    private final AsyncLocalizer localizer;
     private final ContainerLauncher launcher;
     private final String host;
     private final LocalState localState;

http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
index d221b71..6533d15 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
@@ -43,7 +43,7 @@ import org.apache.storm.generated.LSWorkerHeartbeat;
 import org.apache.storm.generated.LocalAssignment;
 import org.apache.storm.generated.ProfileAction;
 import org.apache.storm.generated.ProfileRequest;
-import org.apache.storm.localizer.ILocalizer;
+import org.apache.storm.localizer.AsyncLocalizer;
 import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.scheduler.ISupervisor;
 import org.apache.storm.utils.LocalState;
@@ -79,7 +79,7 @@ public class Slot extends Thread implements AutoCloseable {
     }
     
     static class StaticState {
-        public final ILocalizer localizer;
+        public final AsyncLocalizer localizer;
         public final long hbTimeoutMs;
         public final long firstHbTimeoutMs;
         public final long killSleepMs;
@@ -90,10 +90,10 @@ public class Slot extends Thread implements AutoCloseable {
         public final ISupervisor iSupervisor;
         public final LocalState localState;
         
-        StaticState(ILocalizer localizer, long hbTimeoutMs, long firstHbTimeoutMs,
-                long killSleepMs, long monitorFreqMs,
-                ContainerLauncher containerLauncher, String host, int port,
-                ISupervisor iSupervisor, LocalState localState) {
+        StaticState(AsyncLocalizer localizer, long hbTimeoutMs, long firstHbTimeoutMs,
+                    long killSleepMs, long monitorFreqMs,
+                    ContainerLauncher containerLauncher, String host, int port,
+                    ISupervisor iSupervisor, LocalState localState) {
             this.localizer = localizer;
             this.hbTimeoutMs = hbTimeoutMs;
             this.firstHbTimeoutMs = firstHbTimeoutMs;
@@ -684,12 +684,12 @@ public class Slot extends Thread implements AutoCloseable {
     private volatile DynamicState dynamicState;
     private final AtomicReference<Map<Long, LocalAssignment>> cachedCurrentAssignments;
     
-    public Slot(ILocalizer localizer, Map<String, Object> conf, 
-            ContainerLauncher containerLauncher, String host,
-            int port, LocalState localState,
-            IStormClusterState clusterState,
-            ISupervisor iSupervisor,
-            AtomicReference<Map<Long, LocalAssignment>> cachedCurrentAssignments) throws Exception {
+    public Slot(AsyncLocalizer localizer, Map<String, Object> conf,
+                ContainerLauncher containerLauncher, String host,
+                int port, LocalState localState,
+                IStormClusterState clusterState,
+                ISupervisor iSupervisor,
+                AtomicReference<Map<Long, LocalAssignment>> cachedCurrentAssignments) throws Exception {
         super("SLOT_"+port);
 
         this.cachedCurrentAssignments = cachedCurrentAssignments;

http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
index 1f8d4c3..08d32f1 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.daemon.supervisor;
 
 import java.io.File;
@@ -25,7 +26,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -39,19 +39,15 @@ import org.apache.storm.cluster.IStormClusterState;
 import org.apache.storm.daemon.DaemonCommon;
 import org.apache.storm.daemon.supervisor.timer.SupervisorHealthCheck;
 import org.apache.storm.daemon.supervisor.timer.SupervisorHeartbeat;
-import org.apache.storm.daemon.supervisor.timer.UpdateBlobs;
 import org.apache.storm.event.EventManager;
 import org.apache.storm.event.EventManagerImp;
 import org.apache.storm.generated.LocalAssignment;
 import org.apache.storm.localizer.AsyncLocalizer;
-import org.apache.storm.localizer.ILocalizer;
-import org.apache.storm.localizer.Localizer;
 import org.apache.storm.messaging.IContext;
 import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.scheduler.ISupervisor;
 import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.ServerConfigUtils;
-import org.apache.storm.utils.ServerUtils;
 import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.LocalState;
@@ -78,8 +74,6 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
     private final AtomicReference<Map<Long, LocalAssignment>> currAssignment;
     private final StormTimer heartbeatTimer;
     private final StormTimer eventTimer;
-    private final StormTimer blobUpdateTimer;
-    private final Localizer localizer;
     private final AsyncLocalizer asyncLocalizer;
     private EventManager eventManager;
     private ReadClusterState readState;
@@ -110,10 +104,11 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
             throw Utils.wrapInRuntime(e);
         }
 
+        this.currAssignment = new AtomicReference<>(new HashMap<>());
+
         try {
             this.localState = ServerConfigUtils.supervisorState(conf);
-            this.localizer = ServerUtils.createLocalizer(conf, ConfigUtils.supervisorLocalDir(conf));
-            this.asyncLocalizer = new AsyncLocalizer(conf, this.localizer);
+            this.asyncLocalizer = new AsyncLocalizer(conf, currAssignment, localState.getLocalAssignmentsMap());
         } catch (IOException e) {
             throw Utils.wrapInRuntime(e);
         }
@@ -126,13 +121,9 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
             throw Utils.wrapInRuntime(e);
         }
 
-        this.currAssignment = new AtomicReference<Map<Long, LocalAssignment>>(new HashMap<Long,LocalAssignment>());
-
         this.heartbeatTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler());
 
         this.eventTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler());
-
-        this.blobUpdateTimer = new StormTimer("blob-update-timer", new DefaultUncaughtExceptionHandler());
     }
     
     public String getId() {
@@ -178,12 +169,8 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
     public AtomicReference<Map<Long, LocalAssignment>> getCurrAssignment() {
         return currAssignment;
     }
-
-    public Localizer getLocalizer() {
-        return localizer;
-    }
     
-    ILocalizer getAsyncLocalizer() {
+    AsyncLocalizer getAsyncLocalizer() {
         return asyncLocalizer;
     }
     
@@ -199,8 +186,6 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
         String path = ServerConfigUtils.supervisorTmpDir(conf);
         FileUtils.cleanDirectory(new File(path));
 
-        Localizer localizer = getLocalizer();
-
         SupervisorHeartbeat hb = new SupervisorHeartbeat(conf, this);
         hb.run();
         // should synchronize supervisor so it doesn't launch anything after being down (optimization)
@@ -209,36 +194,14 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
 
         this.eventManager = new EventManagerImp(false);
         this.readState = new ReadClusterState(this);
-        
-        Set<String> downloadedTopoIds = SupervisorUtils.readDownloadedTopologyIds(conf);
-        Map<Integer, LocalAssignment> portToAssignments = localState.getLocalAssignmentsMap();
-        if (portToAssignments != null) {
-            Map<String, LocalAssignment> assignments = new HashMap<>();
-            for (LocalAssignment la : localState.getLocalAssignmentsMap().values()) {
-                assignments.put(la.get_topology_id(), la);
-            }
-            for (String topoId : downloadedTopoIds) {
-                LocalAssignment la = assignments.get(topoId);
-                if (la != null) {
-                    SupervisorUtils.addBlobReferences(localizer, topoId, conf, la.get_owner());
-                } else {
-                    LOG.warn("Could not find an owner for topo {}", topoId);
-                }
-            }
-        }
-        // do this after adding the references so we don't try to clean things being used
-        localizer.startCleaner();
 
-        UpdateBlobs updateBlobsThread = new UpdateBlobs(this);
+        asyncLocalizer.start();
 
         if ((Boolean) conf.get(DaemonConfig.SUPERVISOR_ENABLE)) {
             // This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up
             // to date even if callbacks don't all work exactly right
             eventTimer.scheduleRecurring(0, 10, new EventManagerPushCallback(readState, eventManager));
 
-            // Blob update thread. Starts with 30 seconds delay, every 30 seconds
-            blobUpdateTimer.scheduleRecurring(30, 30, new EventManagerPushCallback(updateBlobsThread, eventManager));
-
             // supervisor health check
             eventTimer.scheduleRecurring(300, 300, new SupervisorHealthCheck(this));
         }
@@ -282,15 +245,13 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
             this.active = false;
             heartbeatTimer.close();
             eventTimer.close();
-            blobUpdateTimer.close();
             if (eventManager != null) {
                 eventManager.close();
             }
             if (readState != null) {
                 readState.close();
             }
-            asyncLocalizer.shutdown();
-            localizer.shutdown();
+            asyncLocalizer.close();
             getStormClusterState().disconnect();
         } catch (Exception e) {
             LOG.error("Error Shutting down", e);

http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
index 09c2b5d..33574c3 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
@@ -19,9 +19,7 @@ package org.apache.storm.daemon.supervisor;
 
 import org.apache.storm.Config;
 import org.apache.storm.generated.LSWorkerHeartbeat;
-import org.apache.storm.generated.LocalAssignment;
 import org.apache.storm.localizer.LocalResource;
-import org.apache.storm.localizer.Localizer;
 import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.ServerUtils;
 import org.apache.storm.utils.Utils;
@@ -33,14 +31,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.net.URLDecoder;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 public class SupervisorUtils {
 
@@ -95,34 +90,6 @@ public class SupervisorUtils {
         return localResourceList;
     }
 
-    /**
-     * For each of the downloaded topologies, adds references to the blobs that the topologies are using. This is used to reconstruct the
-     * cache on restart.
-     * 
-     * @param localizer
-     * @param stormId
-     * @param conf
-     */
-    static void addBlobReferences(Localizer localizer, String stormId, Map<String, Object> conf, String user) throws IOException {
-        Map<String, Object> topoConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
-        Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
-        String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
-        List<LocalResource> localresources = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap);
-        if (blobstoreMap != null) {
-            localizer.addReferences(localresources, user, topoName);
-        }
-    }
-
-    public static Set<String> readDownloadedTopologyIds(Map<String, Object> conf) throws IOException {
-        Set<String> stormIds = new HashSet<>();
-        String path = ConfigUtils.supervisorStormDistRoot(conf);
-        Collection<String> rets = ConfigUtils.readDirContents(path);
-        for (String ret : rets) {
-            stormIds.add(URLDecoder.decode(ret));
-        }
-        return stormIds;
-    }
-
     public static Collection<String> supervisorWorkerIds(Map<String, Object> conf) {
         String workerRoot = ConfigUtils.workerRoot(conf);
         return ConfigUtils.readDirContents(workerRoot);

http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
deleted file mode 100644
index b5dbf57..0000000
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon.supervisor.timer;
-
-import java.util.HashMap;
-import org.apache.storm.Config;
-import org.apache.storm.daemon.supervisor.Supervisor;
-import org.apache.storm.daemon.supervisor.SupervisorUtils;
-import org.apache.storm.generated.AuthorizationException;
-import org.apache.storm.generated.KeyNotFoundException;
-import org.apache.storm.generated.LocalAssignment;
-import org.apache.storm.localizer.LocalResource;
-import org.apache.storm.localizer.Localizer;
-import org.apache.storm.utils.ConfigUtils;
-import org.apache.storm.utils.Utils;
-import org.apache.storm.utils.NimbusLeaderNotFoundException;
-import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * downloads all blobs listed in the topology configuration for all topologies assigned to this supervisor, and creates version files with a suffix. The
- * Runnable is intended to be run periodically by a timer, created elsewhere.
- */
-public class UpdateBlobs implements Runnable {
-
-    private static final Logger LOG = LoggerFactory.getLogger(UpdateBlobs.class);
-
-    private Supervisor supervisor;
-
-    public UpdateBlobs(Supervisor supervisor) {
-        this.supervisor = supervisor;
-    }
-
-    @Override
-    public void run() {
-        try {
-            Map<String, Object> conf = supervisor.getConf();
-            Set<String> downloadedStormIds = SupervisorUtils.readDownloadedTopologyIds(conf);
-            AtomicReference<Map<Long, LocalAssignment>> newAssignment = supervisor.getCurrAssignment();
-            Map<String, LocalAssignment> assignedStormIds = new HashMap<>();
-            for (LocalAssignment localAssignment : newAssignment.get().values()) {
-                assignedStormIds.put(localAssignment.get_topology_id(), localAssignment);
-            }
-            for (String stormId : downloadedStormIds) {
-                LocalAssignment la = assignedStormIds.get(stormId);
-                if (la != null) {
-                    if (la.get_owner() == null) {
-                        //We got a case where the local assignment is not up to date, no point in going on...
-                        LOG.warn("The blobs will not be updated for {} until the local assignment is updated...", stormId);
-                    } else {
-                        String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
-                        LOG.debug("Checking Blob updates for storm topology id {} With target_dir: {}", stormId, stormRoot);
-                        updateBlobsForTopology(conf, stormId, supervisor.getLocalizer(), la.get_owner());
-                    }
-                }
-            }
-        } catch (Exception e) {
-            if (Utils.exceptionCauseIsInstanceOf(TTransportException.class, e)) {
-                LOG.error("Network error while updating blobs, will retry again later", e);
-            } else if (Utils.exceptionCauseIsInstanceOf(NimbusLeaderNotFoundException.class, e)) {
-                LOG.error("Nimbus unavailable to update blobs, will retry again later", e);
-            } else {
-                throw Utils.wrapInRuntime(e);
-            }
-        }
-    }
-
-    /**
-     * Update each blob listed in the topology configuration if the latest version of the blob has not been downloaded.
-     * 
-     * @param conf
-     * @param stormId
-     * @param localizer
-     * @throws IOException
-     */
-    private void updateBlobsForTopology(Map<String, Object> conf, String stormId, Localizer localizer, String user) throws IOException {
-        Map<String, Object> topoConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
-        Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
-        List<LocalResource> localresources = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap);
-        try {
-            localizer.updateBlobs(localresources, user);
-        } catch (AuthorizationException authExp) {
-            LOG.error("AuthorizationException error", authExp);
-        } catch (KeyNotFoundException knf) {
-            LOG.error("KeyNotFoundException error", knf);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/78cb243c/storm-server/src/main/java/org/apache/storm/event/EventManagerImp.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/event/EventManagerImp.java b/storm-server/src/main/java/org/apache/storm/event/EventManagerImp.java
index 0a64370..6b9d4f1 100644
--- a/storm-server/src/main/java/org/apache/storm/event/EventManagerImp.java
+++ b/storm-server/src/main/java/org/apache/storm/event/EventManagerImp.java
@@ -73,7 +73,7 @@ public class EventManagerImp implements EventManager {
         runner.start();
     }
 
-    public void proccessInc() {
+    private void proccessInc() {
         processed.incrementAndGet();
     }