You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/07/09 21:12:36 UTC

[1/4] storm git commit: STORM-3125: Refactored and commented code for Slot and ReadClusterState

Repository: storm
Updated Branches:
  refs/heads/master afe35f318 -> daec24841


STORM-3125: Refactored and commented code for Slot and ReadClusterState

(cherry picked from commit 5338491)

STORM-3125: Refactored methods for blob localization

(cherry picked from commit d40397c)

STORM-3125: Refactored methods of state machine for killing workers and
improved tests and metrics.

(cherry picked from commit 5338491)

STORM-3125: Refactored code for DRPC

(cherry picked from commit 3152e6f)


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/81307a50
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/81307a50
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/81307a50

Branch: refs/heads/master
Commit: 81307a50528f71c39786cab577edafe7151d18a0
Parents: d880767
Author: Zhengdai Hu <hu...@gmail.com>
Authored: Mon Jun 18 10:44:19 2018 -0500
Committer: Zhengdai Hu <hu...@gmail.com>
Committed: Fri Jul 6 16:52:17 2018 -0500

----------------------------------------------------------------------
 .../daemon/drpc/BlockingOutstandingRequest.java |   3 +-
 .../daemon/supervisor/ReadClusterState.java     |   9 +-
 .../apache/storm/daemon/supervisor/Slot.java    | 306 +++++++++++--------
 .../apache/storm/localizer/AsyncLocalizer.java  |   2 +-
 .../org/apache/storm/localizer/IOFunction.java  |  22 ++
 .../storm/localizer/LocalizedResource.java      |  77 ++---
 .../storm/localizer/LocallyCachedBlob.java      | 115 ++++---
 .../localizer/LocallyCachedTopologyBlob.java    |  16 +-
 .../java/org/apache/storm/utils/EnumUtil.java   |  40 +++
 .../storm/daemon/supervisor/SlotTest.java       |   8 +-
 .../storm/localizer/AsyncLocalizerTest.java     |  40 ++-
 11 files changed, 392 insertions(+), 246 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/81307a50/storm-server/src/main/java/org/apache/storm/daemon/drpc/BlockingOutstandingRequest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/drpc/BlockingOutstandingRequest.java b/storm-server/src/main/java/org/apache/storm/daemon/drpc/BlockingOutstandingRequest.java
index 1426802..e11e363 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/drpc/BlockingOutstandingRequest.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/drpc/BlockingOutstandingRequest.java
@@ -25,8 +25,7 @@ import org.apache.storm.generated.DRPCRequest;
 import org.apache.storm.utils.WrappedDRPCExecutionException;
 
 public class BlockingOutstandingRequest extends OutstandingRequest {
-    public static final RequestFactory<BlockingOutstandingRequest> FACTORY =
-        (function, request) -> new BlockingOutstandingRequest(function, request);
+    public static final RequestFactory<BlockingOutstandingRequest> FACTORY = BlockingOutstandingRequest::new;
     private Semaphore _sem;
     private volatile String _result = null;
     private volatile DRPCExecutionException _e = null;

http://git-wip-us.apache.org/repos/asf/storm/blob/81307a50/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
index b617345..c948e31 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
@@ -99,15 +99,16 @@ public class ReadClusterState implements Runnable, AutoCloseable {
         }
 
         try {
-            Collection<String> workers = SupervisorUtils.supervisorWorkerIds(superConf);
+            Collection<String> detachedRunningWorkers = SupervisorUtils.supervisorWorkerIds(superConf);
             for (Slot slot : slots.values()) {
                 String workerId = slot.getWorkerId();
+                // We ignore workers that are still bound to a slot, which is monitored by a supervisor
                 if (workerId != null) {
-                    workers.remove(workerId);
+                    detachedRunningWorkers.remove(workerId);
                 }
             }
-            if (!workers.isEmpty()) {
-                supervisor.killWorkers(workers, launcher);
+            if (!detachedRunningWorkers.isEmpty()) {
+                supervisor.killWorkers(detachedRunningWorkers, launcher);
             }
         } catch (Exception e) {
             LOG.warn("Error trying to clean up old workers", e);

http://git-wip-us.apache.org/repos/asf/storm/blob/81307a50/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
index 59fea1e..0f52f11 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
@@ -45,6 +45,7 @@ import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.metricstore.WorkerMetricsProcessor;
 import org.apache.storm.scheduler.ISupervisor;
 import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
+import org.apache.storm.utils.EnumUtil;
 import org.apache.storm.utils.LocalState;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.Time;
@@ -56,21 +57,25 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
     private static final Logger LOG = LoggerFactory.getLogger(Slot.class);
     private static final Meter numWorkersLaunched =
         StormMetricsRegistry.registerMeter("supervisor:num-workers-launched");
-    private static final Meter numWorkersKilledProcessExit =
-        StormMetricsRegistry.registerMeter("supervisor:num-workers-killed-process-exit");
-    private static final Meter numWorkersKilledMemoryViolation =
-        StormMetricsRegistry.registerMeter("supervisor:num-workers-killed-memory-violation");
-    private static final Meter numWorkersKilledHBTimeout =
-        StormMetricsRegistry.registerMeter("supervisor:num-workers-killed-hb-timeout");
-    private static final Meter numWorkersKilledHBNull =
-        StormMetricsRegistry.registerMeter("supervisor:num-workers-killed-hb-null");
+
+    private enum KillReason {
+        ASSIGNMENT_CHANGED, BLOB_CHANGED, PROCESS_EXIT, MEMORY_VIOLATION, HB_TIMEOUT, HB_NULL;
+
+        @Override
+        public String toString() {
+            return EnumUtil.toMetricName(this);
+        }
+
+    }
+
+    private static final Map<KillReason, Meter> numWorkersKilledFor = EnumUtil.toEnumMap(KillReason.class,
+        killReason -> StormMetricsRegistry.registerMeter("supervisor:num-workers-killed-" + killReason.toString()));
     private static final Meter numForceKill =
         StormMetricsRegistry.registerMeter("supervisor:num-workers-force-kill");
     private static final long ONE_SEC_IN_NANO = TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS);
     private final AtomicReference<LocalAssignment> newAssignment = new AtomicReference<>();
     private final AtomicReference<Set<TopoProfileAction>> profiling = new AtomicReference<>(new HashSet<>());
 
-    ;
     private final BlockingQueue<BlobChanging> changingBlobs = new LinkedBlockingQueue<>();
     private final StaticState staticState;
     private final IStormClusterState clusterState;
@@ -89,54 +94,56 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
                 WorkerMetricsProcessor metricsProcessor) throws Exception {
         super("SLOT_" + port);
         this.metricsExec = metricsExec;
-
         this.cachedCurrentAssignments = cachedCurrentAssignments;
         this.clusterState = clusterState;
-        Map<Integer, LocalAssignment> assignments = localState.getLocalAssignmentsMap();
+        this.staticState = new StaticState(localizer,
+                ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS)) * 1000,
+                ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_WORKER_START_TIMEOUT_SECS)) * 1000,
+                ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS)) * 1000,
+                ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_MONITOR_FREQUENCY_SECS)) * 1000,
+                containerLauncher,
+                host,
+                port,
+                iSupervisor,
+                localState,
+                this,
+                metricsExec, metricsProcessor);
+
         LocalAssignment currentAssignment = null;
+        Container container = null;
+        LocalAssignment newAssignment = null;
+
+        Map<Integer, LocalAssignment> assignments = localState.getLocalAssignmentsMap();
         if (assignments != null) {
             currentAssignment = assignments.get(port);
-        }
-        Container container = null;
-        if (currentAssignment != null) {
-            try {
-                // For now we do not make a transaction when removing a topology assignment from local, an overdue
-                // assignment may be left on local disk.
-                // So we should check if the local disk assignment is valid when initializing:
-                // if topology files does not exist, the worker[possibly alive] will be reassigned if it is timed-out;
-                // if topology files exist but the topology id is invalid, just let Supervisor make a sync;
-                // if topology files exist and topology files is valid, recover the container.
-                if (ClientSupervisorUtils.doRequiredTopoFilesExist(conf, currentAssignment.get_topology_id())) {
-                    container = containerLauncher.recoverContainer(port, currentAssignment, localState);
-                } else {
-                    // Make the assignment null to let slot clean up the disk assignment.
+            if (currentAssignment != null) {
+                try {
+                    // For now we do not make a transaction when removing a topology assignment from local, an overdue
+                    // assignment may be left on local disk.
+                    // So we should check if the local disk assignment is valid when initializing:
+                    // if topology files does not exist, the worker[possibly alive] will be reassigned if it is timed-out;
+                    // if topology files exist but the topology id is invalid, just let Supervisor make a sync;
+                    // if topology files exist and topology files is valid, recover the container.
+                    if (ClientSupervisorUtils.doRequiredTopoFilesExist(conf, currentAssignment.get_topology_id())) {
+                        container = containerLauncher.recoverContainer(port, currentAssignment, localState);
+                    } else {
+                        // Make the assignment null to let slot clean up the disk assignment.
+                        currentAssignment = null;
+                    }
+                } catch (ContainerRecoveryException e) {
+                    //We could not recover container will be null.
+                }
+
+                newAssignment = currentAssignment;
+                if (container == null) {
                     currentAssignment = null;
+                    //Assigned something but it is not running
                 }
-            } catch (ContainerRecoveryException e) {
-                //We could not recover container will be null.
             }
         }
 
-        LocalAssignment newAssignment = currentAssignment;
-        if (currentAssignment != null && container == null) {
-            currentAssignment = null;
-            //Assigned something but it is not running
-        }
-
-        dynamicState = new DynamicState(currentAssignment, container, newAssignment);
-        staticState = new StaticState(localizer,
-                                      ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS)) * 1000,
-                                      ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_WORKER_START_TIMEOUT_SECS)) * 1000,
-                                      ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS)) * 1000,
-                                      ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_MONITOR_FREQUENCY_SECS)) * 1000,
-                                      containerLauncher,
-                                      host,
-                                      port,
-                                      iSupervisor,
-                                      localState,
-                                      this,
-                                      metricsExec, metricsProcessor);
-        this.newAssignment.set(dynamicState.newAssignment);
+        setNewAssignment(newAssignment);
+        this.dynamicState = new DynamicState(currentAssignment, container, newAssignment);
         if (MachineState.RUNNING == dynamicState.state) {
             //We are running so we should recover the blobs.
             staticState.localizer.recoverRunningTopology(currentAssignment, port, this);
@@ -171,6 +178,13 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
         return false;
     }
 
+    /**
+     * Decide the equivalence of two local assignments, ignoring the order of executors
+     * This is different from #equal method.
+     * @param a Local assignment A
+     * @param b Local assignment B
+     * @return True if A and B are equivalent, ignoring the order of the executors
+     */
     @VisibleForTesting
     static boolean equivalent(LocalAssignment a, LocalAssignment b) {
         if (a == null && b == null) {
@@ -229,7 +243,7 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
      * @return the next state
      * @throws IOException on any error
      */
-    static DynamicState prepareForNewAssignmentNoWorkersRunning(DynamicState dynamicState, StaticState staticState) throws IOException {
+    private static DynamicState prepareForNewAssignmentNoWorkersRunning(DynamicState dynamicState, StaticState staticState) throws IOException {
         assert (dynamicState.container == null);
 
         if (dynamicState.newAssignment == null) {
@@ -241,64 +255,55 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
                            .withState(MachineState.WAITING_FOR_BLOB_LOCALIZATION);
     }
 
-    /**
-     * Kill the current container and start downloading what the new assignment needs, if there is a new assignment.
-     * PRECONDITION: container != null
-     * @param dynamicState current state
-     * @param staticState static data
-     * @return the next state
-     * @throws Exception
-     */
-    static DynamicState killContainerForChangedAssignment(DynamicState dynamicState, StaticState staticState) throws Exception {
-        assert (dynamicState.container != null);
+    private static DynamicState killContainerFor(KillReason reason, DynamicState dynamicState, StaticState staticState)
+            throws Exception {
+        assert dynamicState.container != null;
 
-        staticState.iSupervisor.killedWorker(staticState.port);
-        dynamicState.container.kill();
-        Future<Void> pendingDownload = null;
-        if (dynamicState.newAssignment != null) {
-            pendingDownload = staticState.localizer.requestDownloadTopologyBlobs(dynamicState.newAssignment, staticState.port,
-                                                                                 staticState.changingCallback);
+        //Skip special case if `storm kill_workers` is already invoked
+        Boolean isDead = dynamicState.container.areAllProcessesDead();
+        if (!isDead) {
+            if (reason == KillReason.ASSIGNMENT_CHANGED || reason == KillReason.BLOB_CHANGED) {
+                staticState.iSupervisor.killedWorker(staticState.port);
+            }
+            dynamicState.container.kill();
         }
-        dynamicState = drainAllChangingBlobs(dynamicState);
-        Time.sleep(staticState.killSleepMs);
-        return dynamicState.withPendingLocalization(dynamicState.newAssignment, pendingDownload).withState(MachineState.KILL);
-    }
-
-    /**
-     * Kill the current container, and wait go to the state to inform the localizer that we are ready to go.
-     * PRECONDITION: container != null
-     * @param dynamicState current state
-     * @param staticState static data
-     * @return the next state
-     */
-    private static DynamicState killContainerForChangedBlobs(DynamicState dynamicState, StaticState staticState) throws Exception {
-        assert (dynamicState.container != null);
-
-        staticState.iSupervisor.killedWorker(staticState.port);
-        dynamicState.container.kill();
+        numWorkersKilledFor.get(reason).mark();
 
-        Time.sleep(staticState.killSleepMs);
-        return dynamicState.withState(MachineState.KILL_BLOB_UPDATE);
-    }
-
-    /**
-     * Kill the current container and relaunch it.  (Something odd happened)
-     * PRECONDITION: container != null
-     * @param dynamicState current state
-     * @param staticState static data
-     * @return the next state
-     * @throws Exception
-     */
-    static DynamicState killAndRelaunchContainer(DynamicState dynamicState, StaticState staticState) throws Exception {
-        assert (dynamicState.container != null);
+        DynamicState next;
+        switch (reason) {
+            case ASSIGNMENT_CHANGED:
+                Future<Void> pendingDownload = null;
+                if (dynamicState.newAssignment != null) {
+                    pendingDownload = staticState.localizer.requestDownloadTopologyBlobs(
+                            dynamicState.newAssignment, staticState.port, staticState.changingCallback);
+                }
+                dynamicState = drainAllChangingBlobs(dynamicState);
+                next = dynamicState.withState(MachineState.KILL)
+                        .withPendingLocalization(dynamicState.newAssignment, pendingDownload);
+                break;
+
+            case BLOB_CHANGED:
+                next = dynamicState.withState(MachineState.KILL_BLOB_UPDATE);
+                break;
+
+            case PROCESS_EXIT:
+            case MEMORY_VIOLATION:
+            case HB_TIMEOUT:
+            case HB_NULL:
+                //any stop profile actions that hadn't timed out yet, we should restart after the worker is running again.
+                HashSet<TopoProfileAction> mod = new HashSet<>(dynamicState.profileActions);
+                mod.addAll(dynamicState.pendingStopProfileActions);
+                next = dynamicState.withState(MachineState.KILL_AND_RELAUNCH).withProfileActions(mod, Collections.emptySet());
+                break;
 
-        dynamicState.container.kill();
-        Time.sleep(staticState.killSleepMs);
+            default:
+                throw new IllegalArgumentException("Unknown reason for killing a container");
+        }
 
-        //any stop profile actions that hadn't timed out yet, we should restart after the worker is running again.
-        HashSet<TopoProfileAction> mod = new HashSet<>(dynamicState.profileActions);
-        mod.addAll(dynamicState.pendingStopProfileActions);
-        return dynamicState.withState(MachineState.KILL_AND_RELAUNCH).withProfileActions(mod, Collections.emptySet());
+        if (!isDead) {
+            Time.sleep(staticState.killSleepMs);
+        }
+        return next;
     }
 
     /**
@@ -309,7 +314,7 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
      * @param nextState the next MachineState to go to.
      * @return the next state.
      */
-    static DynamicState cleanupCurrentContainer(DynamicState dynamicState, StaticState staticState, MachineState nextState) throws
+    private static DynamicState cleanupCurrentContainer(DynamicState dynamicState, StaticState staticState, MachineState nextState) throws
         Exception {
         assert (dynamicState.container != null);
         assert (dynamicState.currentAssignment != null);
@@ -345,7 +350,8 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
     }
 
     /**
-     * Informs the async localizer for all of blobs that the worker is dead.
+     * Informs the async localizer for all of blobs that the worker acknowledged the change of blobs.
+     * Worker has stop as of now.
      *
      * PRECONDITION: container is null
      * PRECONDITION: changingBlobs should only be for the given assignment.
@@ -357,12 +363,13 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
         assert dynamicState.changingBlobs.stream().allMatch((cr) -> forSameTopology(cr.assignment, assignment));
 
         Set<Future<Void>> futures = new HashSet<>(dynamicState.changingBlobs.size());
+
+        // We need to add the new futures to the existing ones
         if (forSameTopology(dynamicState.pendingChangingBlobsAssignment, assignment)) {
-            //We need to add the new futures to the existing ones
             futures.addAll(dynamicState.pendingChangingBlobs);
         }
-        //Otherwise they will just be replaced
 
+        // Acknowledge all changing blobs as futures
         for (BlobChanging rc : dynamicState.changingBlobs) {
             futures.add(rc.latch.countDown());
         }
@@ -397,7 +404,8 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
     }
 
     /**
-     * State Transitions for WAITING_FOR_BLOB_LOCALIZATION state.
+     * State Transitions for WAITING_FOR_BLOB_LOCALIZATION state, when the slot is waiting for
+     * blobs of the pending assignment to be completely downloaded, before the container is launched/relaunched.
      * PRECONDITION: neither pendingLocalization nor pendingDownload is null.
      * PRECONDITION: The slot should be empty
      * @param dynamicState current state
@@ -405,14 +413,16 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
      * @return the next state
      * @throws Exception on any error
      */
-    static DynamicState handleWaitingForBlobLocalization(DynamicState dynamicState, StaticState staticState) throws Exception {
+    private static DynamicState handleWaitingForBlobLocalization(DynamicState dynamicState, StaticState staticState) throws Exception {
         assert (dynamicState.pendingLocalization != null);
         assert (dynamicState.pendingDownload != null);
         assert (dynamicState.container == null);
 
         //Ignore changes to scheduling while downloading the topology blobs
         // We don't support canceling the download through the future yet,
-        // so to keep everything in sync, just wait
+        // because pending blobs may be shared by multiple workers and cancel it
+        // may lead to race condition
+        // To keep everything in sync, just wait for all workers
         try {
             //Release things that don't need to wait for us to finish downloading.
             dynamicState = filterChangingBlobsFor(dynamicState, dynamicState.pendingLocalization);
@@ -421,11 +431,14 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
                 dynamicState = informChangedBlobs(dynamicState, dynamicState.pendingLocalization);
             }
 
+            // Wait until time out
             dynamicState.pendingDownload.get(1000, TimeUnit.MILLISECONDS);
-            //Downloading of all blobs finished.
+            //Downloading of all blobs finished. This is the precondition for all codes below.
             if (!equivalent(dynamicState.newAssignment, dynamicState.pendingLocalization)) {
                 //Scheduling changed
                 staticState.localizer.releaseSlotFor(dynamicState.pendingLocalization, staticState.port);
+                // Switch to the new assignment even if localization hasn't completed, or go to empty state
+                // if no new assignment.
                 return prepareForNewAssignmentNoWorkersRunning(dynamicState.withPendingChangingBlobs(Collections.emptySet(), null),
                                                                staticState);
             }
@@ -531,7 +544,7 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
      * @return the next state
      * @throws Exception on any error
      */
-    static DynamicState handleKill(DynamicState dynamicState, StaticState staticState) throws Exception {
+    private static DynamicState handleKill(DynamicState dynamicState, StaticState staticState) throws Exception {
         assert (dynamicState.container != null);
         assert (dynamicState.currentAssignment != null);
 
@@ -558,7 +571,7 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
      * @return the next state
      * @throws Exception on any error
      */
-    static DynamicState handleKillAndRelaunch(DynamicState dynamicState, StaticState staticState) throws Exception {
+    private static DynamicState handleKillAndRelaunch(DynamicState dynamicState, StaticState staticState) throws Exception {
         assert (dynamicState.container != null);
         assert (dynamicState.currentAssignment != null);
 
@@ -625,7 +638,7 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
      * @return the next state
      * @throws Exception on any error
      */
-    static DynamicState handleWaitingForWorkerStart(DynamicState dynamicState, StaticState staticState) throws Exception {
+    private static DynamicState handleWaitingForWorkerStart(DynamicState dynamicState, StaticState staticState) throws Exception {
         assert (dynamicState.container != null);
         assert (dynamicState.currentAssignment != null);
 
@@ -641,7 +654,7 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
             //We were rescheduled while waiting for the worker to come up
             LOG.info("SLOT {}: Assignment Changed from {} to {}", staticState.port, dynamicState.currentAssignment,
                      dynamicState.newAssignment);
-            return killContainerForChangedAssignment(dynamicState, staticState);
+            return killContainerFor(KillReason.ASSIGNMENT_CHANGED, dynamicState, staticState);
         }
         dynamicState = updateAssignmentIfNeeded(dynamicState);
 
@@ -649,13 +662,13 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
         if (timeDiffms > staticState.firstHbTimeoutMs) {
             LOG.warn("SLOT {}: Container {} failed to launch in {} ms.", staticState.port, dynamicState.container,
                      staticState.firstHbTimeoutMs);
-            return killAndRelaunchContainer(dynamicState, staticState);
+            return killContainerFor(KillReason.HB_TIMEOUT, dynamicState, staticState);
         }
 
         dynamicState = filterChangingBlobsFor(dynamicState, dynamicState.currentAssignment);
         if (!dynamicState.changingBlobs.isEmpty()) {
             //Kill the container and restart it
-            return killContainerForChangedBlobs(dynamicState, staticState);
+            return killContainerFor(KillReason.BLOB_CHANGED, dynamicState, staticState);
         }
         Time.sleep(1000);
         return dynamicState;
@@ -669,7 +682,7 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
      * @return the next state
      * @throws Exception on any error
      */
-    static DynamicState handleRunning(DynamicState dynamicState, StaticState staticState) throws Exception {
+    private static DynamicState handleRunning(DynamicState dynamicState, StaticState staticState) throws Exception {
         assert (dynamicState.container != null);
         assert (dynamicState.currentAssignment != null);
 
@@ -677,42 +690,38 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
             LOG.info("SLOT {}: Assignment Changed from {} to {}", staticState.port, dynamicState.currentAssignment,
                      dynamicState.newAssignment);
             //Scheduling changed while running...
-            return killContainerForChangedAssignment(dynamicState, staticState);
+            return killContainerFor(KillReason.ASSIGNMENT_CHANGED, dynamicState, staticState);
         }
         dynamicState = updateAssignmentIfNeeded(dynamicState);
 
         dynamicState = filterChangingBlobsFor(dynamicState, dynamicState.currentAssignment);
         if (!dynamicState.changingBlobs.isEmpty()) {
             //Kill the container and restart it
-            return killContainerForChangedBlobs(dynamicState, staticState);
+            return killContainerFor(KillReason.BLOB_CHANGED, dynamicState, staticState);
         }
 
         if (dynamicState.container.didMainProcessExit()) {
-            numWorkersKilledProcessExit.mark();
             LOG.warn("SLOT {}: main process has exited", staticState.port);
-            return killAndRelaunchContainer(dynamicState, staticState);
+            return killContainerFor(KillReason.PROCESS_EXIT, dynamicState, staticState);
         }
 
         if (dynamicState.container.isMemoryLimitViolated(dynamicState.currentAssignment)) {
-            numWorkersKilledMemoryViolation.mark();
             LOG.warn("SLOT {}: violated memory limits", staticState.port);
-            return killAndRelaunchContainer(dynamicState, staticState);
+            return killContainerFor(KillReason.MEMORY_VIOLATION, dynamicState, staticState);
         }
 
         LSWorkerHeartbeat hb = dynamicState.container.readHeartbeat();
         if (hb == null) {
-            numWorkersKilledHBNull.mark();
             LOG.warn("SLOT {}: HB returned as null", staticState.port);
             //This can happen if the supervisor crashed after launching a
             // worker that never came up.
-            return killAndRelaunchContainer(dynamicState, staticState);
+            return killContainerFor(KillReason.HB_NULL, dynamicState, staticState);
         }
 
         long timeDiffMs = (Time.currentTimeSecs() - hb.get_time_secs()) * 1000;
         if (timeDiffMs > staticState.hbTimeoutMs) {
-            numWorkersKilledHBTimeout.mark();
             LOG.warn("SLOT {}: HB is too old {} > {}", staticState.port, timeDiffMs, staticState.hbTimeoutMs);
-            return killAndRelaunchContainer(dynamicState, staticState);
+            return killContainerFor(KillReason.HB_TIMEOUT, dynamicState, staticState);
         }
 
         //The worker is up and running check for profiling requests
@@ -826,6 +835,10 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
         }
     }
 
+    /**
+     * get the workerID (nullable) from CURRENT container, if existed, or return null.
+     * @return workerID
+     */
     public String getWorkerId() {
         String workerId = null;
         Container c = dynamicState.container;
@@ -945,7 +958,7 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
         this.join();
     }
 
-    static enum MachineState {
+    enum MachineState {
         EMPTY,
         RUNNING,
         WAITING_FOR_WORKER_START,
@@ -996,19 +1009,46 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
 
     static class DynamicState {
         public final MachineState state;
+
+        /**
+         * Latest assignment assigned to this worker, updated asynchronously.
+         */
         public final LocalAssignment newAssignment;
+
+        /**
+         * Assignment the worker is running on, when the state machine enters this state.
+         */
         public final LocalAssignment currentAssignment;
-        public final Container container;
+
+        /**
+         * Assignment assigned to the worker and waiting for blob download.
+         * This is the same as newAssignment if no newer assignment is arrived before
+         * localization completes.
+         */
         public final LocalAssignment pendingLocalization;
+
+        public final Container container;
+
+        /**
+         * Signals that blobs have been downloaded for the first time.
+         */
         public final Future<Void> pendingDownload;
         public final Set<TopoProfileAction> profileActions;
         public final Set<TopoProfileAction> pendingStopProfileActions;
+
+        /**
+         * Blobs that are changed and need to be synced.
+         */
         public final Set<BlobChanging> changingBlobs;
         public final LocalAssignment pendingChangingBlobsAssignment;
+
+        /**
+         * Signals that acknowledged changing blobs have been updated.
+         */
         public final Set<Future<Void>> pendingChangingBlobs;
 
         /**
-         * The last time that WAITING_FOR_WORKER_START, KILL, or KILL_AND_RELAUNCH were entered into.
+         * The entering time when the machine transitions to current state.
          */
         public final long startTime;
 
@@ -1044,7 +1084,7 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
                             final Set<BlobChanging> changingBlobs,
                             final Set<Future<Void>> pendingChangingBlobs, final LocalAssignment pendingChaningBlobsAssignment) {
             assert pendingChangingBlobs != null;
-            assert !(pendingChangingBlobs.isEmpty() ^ (pendingChaningBlobsAssignment == null));
+            assert pendingChangingBlobs.isEmpty() == (pendingChaningBlobsAssignment == null);
             this.state = state;
             this.newAssignment = newAssignment;
             this.currentAssignment = currentAssignment;
@@ -1099,6 +1139,12 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
             return withPendingLocalization(this.pendingLocalization, pendingDownload);
         }
 
+        /**
+         * Transition to the given state. Notice that it's possible to transition to
+         * the same state.
+         * @param state The state to transition into
+         * @return New dynamicState
+         */
         public DynamicState withState(final MachineState state) {
             long newStartTime = Time.currentTimeMillis();
             return new DynamicState(state, this.newAssignment,

http://git-wip-us.apache.org/repos/asf/storm/blob/81307a50/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
index a2b9079..9d91aa0 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
@@ -252,7 +252,7 @@ public class AsyncLocalizer implements AutoCloseable {
                                 long remoteVersion = blob.getRemoteVersion(blobStore);
                                 if (localVersion != remoteVersion || !blob.isFullyDownloaded()) {
                                     try {
-                                        long newVersion = blob.downloadToTempLocation(blobStore);
+                                        long newVersion = blob.fetchUnzipToTemp(blobStore);
                                         blob.informAllOfChangeAndWaitForConsensus();
                                         blob.commitNewVersion(newVersion);
                                         blob.informAllChangeComplete();

http://git-wip-us.apache.org/repos/asf/storm/blob/81307a50/storm-server/src/main/java/org/apache/storm/localizer/IOFunction.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/IOFunction.java b/storm-server/src/main/java/org/apache/storm/localizer/IOFunction.java
new file mode 100644
index 0000000..e8d8171
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/localizer/IOFunction.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
+ * See the NOTICE file distributed with this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.storm.localizer;
+
+import java.io.IOException;
+
+@FunctionalInterface
+public interface IOFunction<T, R> {
+    R apply(T t) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/81307a50/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java b/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java
index caeca77..6f1c8d7 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java
@@ -18,6 +18,8 @@
 
 package org.apache.storm.localizer;
 
+import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
 import java.io.FileOutputStream;
@@ -43,12 +45,12 @@ import java.util.stream.Collectors;
 import org.apache.commons.io.FileUtils;
 import org.apache.storm.Config;
 import org.apache.storm.blobstore.ClientBlobStore;
-import org.apache.storm.blobstore.InputStreamWithMeta;
 import org.apache.storm.daemon.supervisor.IAdvancedFSOps;
 import org.apache.storm.generated.AuthorizationException;
 import org.apache.storm.generated.KeyNotFoundException;
 import org.apache.storm.generated.ReadableBlobMeta;
 import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
+import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.ServerUtils;
 import org.apache.storm.utils.ShellUtils;
@@ -56,10 +58,6 @@ import org.apache.storm.utils.WrappedAuthorizationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
-
-import org.apache.storm.utils.ConfigUtils;
-
 /**
  * Represents a resource that is localized on the supervisor. A localized resource has a .current symlink to the current version file which
  * is named filename.{current version}. There is also a filename.version which contains the latest version.
@@ -85,7 +83,7 @@ public class LocalizedResource extends LocallyCachedBlob {
     private final Path baseDir;
     private final Path versionFilePath;
     private final Path symlinkPath;
-    private final boolean uncompressed;
+    private final boolean shouldUncompress;
     private final IAdvancedFSOps fsOps;
     private final String user;
     private final Map<String, Object> conf;
@@ -93,18 +91,18 @@ public class LocalizedResource extends LocallyCachedBlob {
     // size of the resource
     private long size = -1;
 
-    LocalizedResource(String key, Path localBaseDir, boolean uncompressed, IAdvancedFSOps fsOps, Map<String, Object> conf,
+    LocalizedResource(String key, Path localBaseDir, boolean shouldUncompress, IAdvancedFSOps fsOps, Map<String, Object> conf,
                       String user) {
-        super(key + (uncompressed ? " archive" : " file"), key);
+        super(key + (shouldUncompress ? " archive" : " file"), key);
         Path base = getLocalUserFileCacheDir(localBaseDir, user);
-        this.baseDir = uncompressed ? getCacheDirForArchives(base) : getCacheDirForFiles(base);
+        this.baseDir = shouldUncompress ? getCacheDirForArchives(base) : getCacheDirForFiles(base);
         this.conf = conf;
         this.symLinksDisabled = (boolean)conf.getOrDefault(Config.DISABLE_SYMLINKS, false);
         this.user = user;
         this.fsOps = fsOps;
         versionFilePath = constructVersionFileName(baseDir, key);
         symlinkPath = constructBlobCurrentSymlinkName(baseDir, key);
-        this.uncompressed = uncompressed;
+        this.shouldUncompress = shouldUncompress;
         //Set the size in case we are recovering an already downloaded object
         setSize();
     }
@@ -241,48 +239,39 @@ public class LocalizedResource extends LocallyCachedBlob {
     }
 
     @Override
-    public long downloadToTempLocation(ClientBlobStore store) throws IOException, KeyNotFoundException, AuthorizationException {
+    public long fetchUnzipToTemp(ClientBlobStore store) throws IOException, KeyNotFoundException, AuthorizationException {
         String key = getKey();
         ReadableBlobMeta meta = store.getBlobMeta(key);
         if (!ServerUtils.canUserReadBlob(meta, user, conf)) {
             throw new WrappedAuthorizationException(user + " does not have READ access to " + key);
         }
-        long version;
-        Path downloadFile;
-        Path finalLocation;
-        try (InputStreamWithMeta in = store.getBlob(key)) {
-            version = in.getVersion();
-            finalLocation = constructBlobWithVersionFileName(baseDir, getKey(), version);
-            if (uncompressed) {
+
+        DownloadMeta downloadMeta = fetch(store, key, v -> {
+                Path path = shouldUncompress ? tmpOutputLocation() : constructBlobWithVersionFileName(baseDir, getKey(), v);
                 // we need to download to temp file and then unpack into the one requested
-                downloadFile = tmpOutputLocation();
-            } else {
-                downloadFile = finalLocation;
-            }
-            byte[] buffer = new byte[1024];
-            int len;
-            LOG.debug("Downloading {} to {}", key, downloadFile);
-            Path parent = downloadFile.getParent();
-            if (!Files.exists(parent)) {
-                //There is a race here that we can still lose
-                try {
-                    Files.createDirectory(parent);
-                } catch (FileAlreadyExistsException e) {
-                    //Ignored
-                }
-            }
-            try (FileOutputStream out = new FileOutputStream(downloadFile.toFile())) {
-                while ((len = in.read(buffer)) >= 0) {
-                    out.write(buffer, 0, len);
+                Path parent = path.getParent();
+                if (!Files.exists(parent)) {
+                    //There is a race here that we can still lose
+                    try {
+                        Files.createDirectory(parent);
+                    } catch (FileAlreadyExistsException e) {
+                        //Ignored
+                    }
                 }
-            }
-        }
-        if (uncompressed) {
+                return path;
+            },
+            FileOutputStream::new
+        );
+
+        Path finalLocation = downloadMeta.getDownloadPath();
+        if (shouldUncompress) {
+            Path downloadFile = finalLocation;
+            finalLocation = constructBlobWithVersionFileName(baseDir, getKey(), downloadMeta.getVersion());
             ServerUtils.unpack(downloadFile.toFile(), finalLocation.toFile(), symLinksDisabled);
             LOG.debug("Uncompressed {} to: {}", downloadFile, finalLocation);
         }
         setBlobPermissions(conf, user, finalLocation);
-        return version;
+        return downloadMeta.getVersion();
     }
 
     @Override
@@ -416,7 +405,7 @@ public class LocalizedResource extends LocallyCachedBlob {
         Path fileWithVersion = getFilePathWithVersion();
         Path currentSymLink = getCurrentSymlinkPath();
 
-        if (uncompressed) {
+        if (shouldUncompress) {
             if (Files.exists(fileWithVersion)) {
                 // this doesn't follow symlinks, which is what we want
                 FileUtils.deleteDirectory(fileWithVersion.toFile());
@@ -444,14 +433,14 @@ public class LocalizedResource extends LocallyCachedBlob {
     public boolean equals(Object other) {
         if (other instanceof LocalizedResource) {
             LocalizedResource l = (LocalizedResource) other;
-            return getKey().equals(l.getKey()) && uncompressed == l.uncompressed && baseDir.equals(l.baseDir);
+            return getKey().equals(l.getKey()) && shouldUncompress == l.shouldUncompress && baseDir.equals(l.baseDir);
         }
         return false;
     }
 
     @Override
     public int hashCode() {
-        return getKey().hashCode() + Boolean.hashCode(uncompressed) + baseDir.hashCode();
+        return getKey().hashCode() + Boolean.hashCode(shouldUncompress) + baseDir.hashCode();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/81307a50/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
index d112ee5..abffd07 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
@@ -12,6 +12,7 @@
 
 package org.apache.storm.localizer;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.file.Files;
@@ -23,10 +24,9 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
+
 import org.apache.storm.blobstore.ClientBlobStore;
 import org.apache.storm.blobstore.InputStreamWithMeta;
-import org.apache.storm.daemon.supervisor.IAdvancedFSOps;
 import org.apache.storm.generated.AuthorizationException;
 import org.apache.storm.generated.KeyNotFoundException;
 import org.apache.storm.utils.Time;
@@ -59,23 +59,40 @@ public abstract class LocallyCachedBlob {
         this.blobKey = blobKey;
     }
 
-    protected static long downloadToTempLocation(ClientBlobStore store, String key, long currentVersion, IAdvancedFSOps fsOps,
-                                                 Function<Long, Path> getTempPath)
-        throws KeyNotFoundException, AuthorizationException, IOException {
+    /**
+     * Helper function to download blob from blob store.
+     * @param store Blob store to fetch blobs from
+     * @param key Key to retrieve blobs
+     * @param pathSupplier A function that supplies the download destination of a blob. It guarantees the validity
+     *                     of path or throws {@link IOException}
+     * @param outStreamSupplier A function that supplies the {@link OutputStream} object
+     * @return The metadata of the download session, including blob's version and download destination
+     * @throws KeyNotFoundException Thrown if key to retrieve blob is invalid
+     * @throws AuthorizationException Thrown if the retrieval is not under security authorization
+     * @throws IOException Thrown if any IO error occurs
+     */
+    protected DownloadMeta fetch(ClientBlobStore store, String key,
+                                 IOFunction<Long, Path> pathSupplier,
+                                 IOFunction<File, OutputStream> outStreamSupplier)
+            throws KeyNotFoundException, AuthorizationException, IOException {
+
         try (InputStreamWithMeta in = store.getBlob(key)) {
             long newVersion = in.getVersion();
+            long currentVersion = getLocalVersion();
             if (newVersion == currentVersion) {
                 LOG.warn("The version did not change, but going to download again {} {}", currentVersion, key);
             }
-            Path tmpLocation = getTempPath.apply(newVersion);
-            long totalRead = 0;
+
             //Make sure the parent directory is there and ready to go
-            fsOps.forceMkdir(tmpLocation.getParent());
-            try (OutputStream outStream = fsOps.getOutputStream(tmpLocation.toFile())) {
+            Path downloadPath = pathSupplier.apply(newVersion);
+            LOG.debug("Downloading {} to {}", key, downloadPath);
+
+            long totalRead = 0;
+            try (OutputStream out = outStreamSupplier.apply(downloadPath.toFile())) {
                 byte[] buffer = new byte[4096];
-                int read = 0;
-                while ((read = in.read(buffer)) > 0) {
-                    outStream.write(buffer, 0, read);
+                int read;
+                while ((read = in.read(buffer)) >= 0) {
+                    out.write(buffer, 0, read);
                     totalRead += read;
                 }
             }
@@ -83,33 +100,7 @@ public abstract class LocallyCachedBlob {
             if (totalRead != expectedSize) {
                 throw new IOException("We expected to download " + expectedSize + " bytes but found we got " + totalRead);
             }
-
-            return newVersion;
-        }
-    }
-
-    /**
-     * Get the size of p in bytes.
-     * @param p the path to read.
-     * @return the size of p in bytes.
-     */
-    protected static long getSizeOnDisk(Path p) throws IOException {
-        if (!Files.exists(p)) {
-            return 0;
-        } else if (Files.isRegularFile(p)) {
-            return Files.size(p);
-        } else {
-            //We will not follow sym links
-            return Files.walk(p)
-                        .filter((subp) -> Files.isRegularFile(subp, LinkOption.NOFOLLOW_LINKS))
-                        .mapToLong((subp) -> {
-                            try {
-                                return Files.size(subp);
-                            } catch (IOException e) {
-                                LOG.warn("Could not get the size of ");
-                            }
-                            return 0;
-                        }).sum();
+            return new DownloadMeta(downloadPath, newVersion);
         }
     }
 
@@ -131,7 +122,7 @@ public abstract class LocallyCachedBlob {
      * @param store the store to us to download the data.
      * @return the version that was downloaded.
      */
-    public abstract long downloadToTempLocation(ClientBlobStore store) throws IOException, KeyNotFoundException, AuthorizationException;
+    public abstract long fetchUnzipToTemp(ClientBlobStore store) throws IOException, KeyNotFoundException, AuthorizationException;
 
     /**
      * Commit the new version and make it available for the end user.
@@ -163,6 +154,31 @@ public abstract class LocallyCachedBlob {
     public abstract long getSizeOnDisk();
 
     /**
+     * Get the size of p in bytes.
+     * @param p the path to read.
+     * @return the size of p in bytes.
+     */
+    protected static long getSizeOnDisk(Path p) throws IOException {
+        if (!Files.exists(p)) {
+            return 0;
+        } else if (Files.isRegularFile(p)) {
+            return Files.size(p);
+        } else {
+            //We will not follow sym links
+            return Files.walk(p)
+                    .filter((subp) -> Files.isRegularFile(subp, LinkOption.NOFOLLOW_LINKS))
+                    .mapToLong((subp) -> {
+                        try {
+                            return Files.size(subp);
+                        } catch (IOException e) {
+                            LOG.warn("Could not get the size of {}", subp);
+                        }
+                        return 0;
+                    }).sum();
+        }
+    }
+
+    /**
      * Updates the last updated time.  This should be called when references are added or removed.
      */
     protected synchronized void touch() {
@@ -254,4 +270,23 @@ public abstract class LocallyCachedBlob {
     }
 
     public abstract boolean isFullyDownloaded();
+
+    static class DownloadMeta {
+        private final Path downloadPath;
+        private final long version;
+
+        public DownloadMeta(Path downloadPath, long version) {
+            this.downloadPath = downloadPath;
+            this.version = version;
+        }
+
+        public Path getDownloadPath() {
+            return downloadPath;
+        }
+
+        public long getVersion() {
+            return version;
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/81307a50/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java
index b430f0b..9058c30 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java
@@ -123,7 +123,7 @@ public class LocallyCachedTopologyBlob extends LocallyCachedBlob {
     }
 
     @Override
-    public long downloadToTempLocation(ClientBlobStore store)
+    public long fetchUnzipToTemp(ClientBlobStore store)
         throws IOException, KeyNotFoundException, AuthorizationException {
         if (isLocalMode && type == TopologyBlobType.TOPO_JAR) {
             LOG.debug("DOWNLOADING LOCAL JAR to TEMP LOCATION... {}", topologyId);
@@ -148,17 +148,21 @@ public class LocallyCachedTopologyBlob extends LocallyCachedBlob {
         }
 
 
-        long newVersion = downloadToTempLocation(store, type.getKey(topologyId), version, fsOps,
-                                                 (version) -> topologyBasicBlobsRootDir.resolve(type.getTempFileName(version)));
+        DownloadMeta downloadMeta = fetch(store, type.getKey(topologyId),
+            v -> {
+                Path path = topologyBasicBlobsRootDir.resolve(type.getTempFileName(v));
+                fsOps.forceMkdir(path.getParent());
+                return path;
+            }, fsOps::getOutputStream);
 
-        Path tmpLocation = topologyBasicBlobsRootDir.resolve(type.getTempFileName(newVersion));
+        Path tmpLocation = downloadMeta.getDownloadPath();
 
         if (type.needsExtraction()) {
-            Path extractionDest = topologyBasicBlobsRootDir.resolve(type.getTempExtractionDir(newVersion));
+            Path extractionDest = topologyBasicBlobsRootDir.resolve(type.getTempExtractionDir(downloadMeta.getVersion()));
             extractDirFromJar(tmpLocation.toAbsolutePath().toString(), ServerConfigUtils.RESOURCES_SUBDIR,
                               extractionDest);
         }
-        return newVersion;
+        return downloadMeta.getVersion();
     }
 
     protected void extractDirFromJar(String jarpath, String dir, Path dest) throws IOException {

http://git-wip-us.apache.org/repos/asf/storm/blob/81307a50/storm-server/src/main/java/org/apache/storm/utils/EnumUtil.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/utils/EnumUtil.java b/storm-server/src/main/java/org/apache/storm/utils/EnumUtil.java
new file mode 100644
index 0000000..0b1d723
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/utils/EnumUtil.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
+ * See the NOTICE file distributed with this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.storm.utils;
+
+import java.util.EnumMap;
+import java.util.function.Function;
+
+public class EnumUtil {
+    public static String toMetricName(Enum type) {
+        return type.name().toLowerCase().replace('_', '-');
+    }
+
+    /**
+     * Create an Enum map with given lambda mapper.
+     * @param klass the Enum class
+     * @param mapper The mapper producing value with key (enum constant)
+     * @param <T> An Enum class
+     * @param <U> Mapped class
+     * @return An Enum map
+     */
+    public static <T extends Enum<T>, U> EnumMap<T, U> toEnumMap(Class<T> klass, Function<? super T, ? extends U> mapper) {
+        EnumMap<T, U> map = new EnumMap<>(klass);
+        for (T elem : klass.getEnumConstants()) {
+            map.put(elem, mapper.apply(elem));
+        }
+        return map;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/81307a50/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
index 282c139..6f09f3f 100644
--- a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
+++ b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
@@ -244,7 +244,7 @@ public class SlotTest {
             LSWorkerHeartbeat oldhb = mkWorkerHB(topoId, port, execList, Time.currentTimeSecs() - 10);
             LSWorkerHeartbeat goodhb = mkWorkerHB(topoId, port, execList, Time.currentTimeSecs());
             when(container.readHeartbeat()).thenReturn(oldhb, oldhb, goodhb, goodhb);
-            when(container.areAllProcessesDead()).thenReturn(false, true);
+            when(container.areAllProcessesDead()).thenReturn(false, false, true);
 
             ISupervisor iSuper = mock(ISupervisor.class);
             LocalState state = mock(LocalState.class);
@@ -289,7 +289,7 @@ public class SlotTest {
             Container cContainer = mock(Container.class);
             LSWorkerHeartbeat chb = mkWorkerHB(cTopoId, port, cExecList, Time.currentTimeSecs());
             when(cContainer.readHeartbeat()).thenReturn(chb);
-            when(cContainer.areAllProcessesDead()).thenReturn(false, true);
+            when(cContainer.areAllProcessesDead()).thenReturn(false, false, true);
 
             String nTopoId = "NEW";
             List<ExecutorInfo> nExecList = mkExecutorInfoList(1, 2, 3, 4, 5);
@@ -382,7 +382,7 @@ public class SlotTest {
             Container cContainer = mock(Container.class);
             LSWorkerHeartbeat chb = mkWorkerHB(cTopoId, port, cExecList, Time.currentTimeSecs());
             when(cContainer.readHeartbeat()).thenReturn(chb);
-            when(cContainer.areAllProcessesDead()).thenReturn(false, true);
+            when(cContainer.areAllProcessesDead()).thenReturn(false, false, true);
 
             AsyncLocalizer localizer = mock(AsyncLocalizer.class);
             BlobChangingCallback cb = mock(BlobChangingCallback.class);
@@ -523,7 +523,7 @@ public class SlotTest {
             Container cContainer = mock(Container.class);
             LSWorkerHeartbeat chb = mkWorkerHB(cTopoId, port, cExecList, Time.currentTimeSecs());
             when(cContainer.readHeartbeat()).thenReturn(chb);
-            when(cContainer.areAllProcessesDead()).thenReturn(false, true);
+            when(cContainer.areAllProcessesDead()).thenReturn(false, false, true);
             AsyncLocalizer localizer = mock(AsyncLocalizer.class);
             Container nContainer = mock(Container.class);
             LocalState state = mock(LocalState.class);

http://git-wip-us.apache.org/repos/asf/storm/blob/81307a50/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
index 938301f..a919dd3 100644
--- a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
+++ b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
@@ -122,19 +122,19 @@ public class AsyncLocalizerTest {
         doReturn(jarBlob).when(bl).getTopoJar(topoId);
         when(jarBlob.getLocalVersion()).thenReturn(-1L);
         when(jarBlob.getRemoteVersion(any())).thenReturn(100L);
-        when(jarBlob.downloadToTempLocation(any())).thenReturn(100L);
+        when(jarBlob.fetchUnzipToTemp(any())).thenReturn(100L);
 
         LocallyCachedTopologyBlob codeBlob = mock(LocallyCachedTopologyBlob.class);
         doReturn(codeBlob).when(bl).getTopoCode(topoId);
         when(codeBlob.getLocalVersion()).thenReturn(-1L);
         when(codeBlob.getRemoteVersion(any())).thenReturn(200L);
-        when(codeBlob.downloadToTempLocation(any())).thenReturn(200L);
+        when(codeBlob.fetchUnzipToTemp(any())).thenReturn(200L);
 
         LocallyCachedTopologyBlob confBlob = mock(LocallyCachedTopologyBlob.class);
         doReturn(confBlob).when(bl).getTopoConf(topoId);
         when(confBlob.getLocalVersion()).thenReturn(-1L);
         when(confBlob.getRemoteVersion(any())).thenReturn(300L);
-        when(confBlob.downloadToTempLocation(any())).thenReturn(300L);
+        when(confBlob.fetchUnzipToTemp(any())).thenReturn(300L);
 
         ReflectionUtils origRU = ReflectionUtils.setInstance(mockedRU);
         ServerUtils origUtils = ServerUtils.setInstance(mockedU);
@@ -145,19 +145,19 @@ public class AsyncLocalizerTest {
             Future<Void> f = bl.requestDownloadBaseTopologyBlobs(pna, null);
             f.get(20, TimeUnit.SECONDS);
 
-            verify(jarBlob).downloadToTempLocation(any());
+            verify(jarBlob).fetchUnzipToTemp(any());
             verify(jarBlob).informAllOfChangeAndWaitForConsensus();
             verify(jarBlob).commitNewVersion(100L);
             verify(jarBlob).informAllChangeComplete();
             verify(jarBlob).cleanupOrphanedData();
 
-            verify(codeBlob).downloadToTempLocation(any());
+            verify(codeBlob).fetchUnzipToTemp(any());
             verify(codeBlob).informAllOfChangeAndWaitForConsensus();
             verify(codeBlob).commitNewVersion(200L);
             verify(codeBlob).informAllChangeComplete();
             verify(codeBlob).cleanupOrphanedData();
 
-            verify(confBlob).downloadToTempLocation(any());
+            verify(confBlob).fetchUnzipToTemp(any());
             verify(confBlob).informAllOfChangeAndWaitForConsensus();
             verify(confBlob).commitNewVersion(300L);
             verify(confBlob).informAllChangeComplete();
@@ -436,7 +436,7 @@ public class AsyncLocalizerTest {
 
             when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta(new
                                                                                          FileInputStream(archiveFile.getAbsolutePath()),
-                                                                                     0));
+                                                                                     0, archiveFile.length()));
 
             long timeBefore = Time.currentTimeMillis();
             Time.advanceTime(10);
@@ -602,10 +602,8 @@ public class AsyncLocalizerTest {
             when(mockblobstore.getBlob(key2)).thenReturn(new TestInputStreamWithMeta(0));
             when(mockblobstore.getBlob(key3)).thenReturn(new TestInputStreamWithMeta(0));
 
-            List<LocalResource> keys = Arrays.asList(new LocalResource[]{
-                new LocalResource(key1, false, false),
-                new LocalResource(key2, false, false), new LocalResource(key3, false, false)
-            });
+            List<LocalResource> keys = Arrays.asList(new LocalResource(key1, false, false),
+                    new LocalResource(key2, false, false), new LocalResource(key3, false, false));
             File user1Dir = localizer.getLocalUserFileCacheDir(user1);
             assertTrue("failed to create user dir", user1Dir.mkdirs());
 
@@ -746,7 +744,8 @@ public class AsyncLocalizerTest {
         ReadableBlobMeta rbm = new ReadableBlobMeta();
         rbm.set_settable(new SettableBlobMeta(WORLD_EVERYTHING));
         when(mockblobstore.getBlobMeta(anyString())).thenReturn(rbm);
-        when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta(1));
+        //thenReturn always returns the same object, which is already consumed by the time User3 tries to getBlob!
+        when(mockblobstore.getBlob(key1)).thenAnswer((i) -> new TestInputStreamWithMeta(1));
         when(mockblobstore.getBlob(key2)).thenReturn(new TestInputStreamWithMeta(1));
         when(mockblobstore.getBlob(key3)).thenReturn(new TestInputStreamWithMeta(1));
 
@@ -795,6 +794,13 @@ public class AsyncLocalizerTest {
         assertTrue("blob not created", keyFile3.exists());
         assertTrue("blob not created", keyFile1user3.exists());
 
+        //Should assert file size
+        assertEquals("size doesn't match", 34, lrsrc.getSizeOnDisk());
+        assertEquals("size doesn't match", 34, lrsrc2.getSizeOnDisk());
+        assertEquals("size doesn't match", 34, lrsrc3.getSizeOnDisk());
+        //This was 0 byte in test
+        assertEquals("size doesn't match", 34, lrsrc1_user3.getSizeOnDisk());
+
         ConcurrentMap<String, LocalizedResource> lrsrcSet = localizer.getUserFiles().get(user1);
         assertEquals("local resource set size wrong", 1, lrsrcSet.size());
         ConcurrentMap<String, LocalizedResource> lrsrcSet2 = localizer.getUserFiles().get(user2);
@@ -941,16 +947,20 @@ public class AsyncLocalizerTest {
 
     class TestInputStreamWithMeta extends InputStreamWithMeta {
         private final long version;
+        private final long fileLength;
         private InputStream iostream;
 
         public TestInputStreamWithMeta(long version) {
-            iostream = IOUtils.toInputStream("some test data for my input stream");
+            final String DEFAULT_DATA = "some test data for my input stream";
+            iostream = IOUtils.toInputStream(DEFAULT_DATA);
             this.version = version;
+            this.fileLength = DEFAULT_DATA.length();
         }
 
-        public TestInputStreamWithMeta(InputStream istream, long version) {
+        public TestInputStreamWithMeta(InputStream istream, long version, long fileLength) {
             iostream = istream;
             this.version = version;
+            this.fileLength = fileLength;
         }
 
         @Override
@@ -975,7 +985,7 @@ public class AsyncLocalizerTest {
 
         @Override
         public long getFileLength() {
-            return 0;
+            return fileLength;
         }
     }
 }


[4/4] storm git commit: Merge branch 'STORM-3125' of https://github.com/zd-project/storm into STORM-3125-3126-3127-merge

Posted by ka...@apache.org.
Merge branch 'STORM-3125' of https://github.com/zd-project/storm into STORM-3125-3126-3127-merge


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/daec2484
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/daec2484
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/daec2484

Branch: refs/heads/master
Commit: daec24841501c911b06fc10a5c98956e40fc1cd4
Parents: afe35f3 11383c2
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Jul 10 06:12:23 2018 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Jul 10 06:12:23 2018 +0900

----------------------------------------------------------------------
 .../daemon/drpc/BlockingOutstandingRequest.java |   3 +-
 .../daemon/supervisor/ReadClusterState.java     |   9 +-
 .../apache/storm/daemon/supervisor/Slot.java    | 306 +++++++++++--------
 .../storm/daemon/supervisor/Supervisor.java     |   3 +-
 .../apache/storm/localizer/AsyncLocalizer.java  |   4 +-
 .../org/apache/storm/localizer/IOFunction.java  |  22 ++
 .../storm/localizer/LocalizedResource.java      |  77 ++---
 .../storm/localizer/LocallyCachedBlob.java      | 115 ++++---
 .../localizer/LocallyCachedTopologyBlob.java    |  16 +-
 .../java/org/apache/storm/utils/EnumUtil.java   |  40 +++
 .../storm/daemon/supervisor/SlotTest.java       |   8 +-
 .../storm/localizer/AsyncLocalizerTest.java     |  40 ++-
 12 files changed, 394 insertions(+), 249 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/daec2484/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
----------------------------------------------------------------------


[2/4] storm git commit: STORM-3126: Refactored methods for kill_workers to avoid unnecessary force kill.

Posted by ka...@apache.org.
STORM-3126: Refactored methods for kill_workers to avoid
unnecessary force kill.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/216ed738
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/216ed738
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/216ed738

Branch: refs/heads/master
Commit: 216ed738e5243d552ca20d2c9b13fb99c56e33e6
Parents: 81307a5
Author: Zhengdai Hu <hu...@gmail.com>
Authored: Fri Jul 6 16:51:18 2018 -0500
Committer: Zhengdai Hu <hu...@gmail.com>
Committed: Fri Jul 6 16:52:20 2018 -0500

----------------------------------------------------------------------
 .../main/java/org/apache/storm/daemon/supervisor/Supervisor.java  | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/216ed738/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
index f275835..b8bb4aa 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
@@ -498,15 +498,14 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
         }
         for (Killable k : containers) {
             try {
-                k.forceKill();
                 long start = Time.currentTimeMillis();
                 while (!k.areAllProcessesDead()) {
                     if ((Time.currentTimeMillis() - start) > 10_000) {
                         throw new RuntimeException("Giving up on killing " + k
                                                    + " after " + (Time.currentTimeMillis() - start) + " ms");
                     }
-                    Time.sleep(100);
                     k.forceKill();
+                    Time.sleep(100);
                 }
                 k.cleanUp();
             } catch (Exception e) {


[3/4] storm git commit: STORM-3127: Refactor AsyncLocalizer to avoid potential race condition

Posted by ka...@apache.org.
STORM-3127: Refactor AsyncLocalizer to avoid potential race condition

(cherry picked from commit 706029a)


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/11383c2a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/11383c2a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/11383c2a

Branch: refs/heads/master
Commit: 11383c2ae08bbadb10857bf0a9dd1eb69b7d2ccc
Parents: 216ed73
Author: Zhengdai Hu <hu...@gmail.com>
Authored: Tue Jun 26 09:58:57 2018 -0500
Committer: Zhengdai Hu <hu...@gmail.com>
Committed: Fri Jul 6 16:52:20 2018 -0500

----------------------------------------------------------------------
 .../src/main/java/org/apache/storm/localizer/AsyncLocalizer.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/11383c2a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
index 9d91aa0..19a8cd3 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
@@ -531,9 +531,9 @@ public class AsyncLocalizer implements AutoCloseable {
                 // go off to blobstore and get it
                 // assume dir passed in exists and has correct permission
                 LOG.debug("fetching blob: {}", key);
+                lrsrc.addReference(pna, localResource.needsCallback() ? cb : null);
                 futures.add(downloadOrUpdate(lrsrc));
                 results.add(lrsrc);
-                lrsrc.addReference(pna, localResource.needsCallback() ? cb : null);
             }
 
             for (CompletableFuture<?> futureRsrc : futures) {