You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2019/03/15 18:42:17 UTC

[storm] branch master updated: STORM-3319: Fix failing assertion in Slot

This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 947c7b2  STORM-3319: Fix failing assertion in Slot
     new 18ead35  Merge branch 'STORM-3319' of https://github.com/srdo/storm into STORM-3319-merging
947c7b2 is described below

commit 947c7b22acf4cdf4f95001b26e700a590d675220
Author: Stig Rohde Døssing <sr...@apache.org>
AuthorDate: Sun Jan 20 03:15:03 2019 +0100

    STORM-3319: Fix failing assertion in Slot
---
 .../org/apache/storm/daemon/supervisor/Slot.java   | 99 +++++++++++++++++-----
 .../org/apache/storm/localizer/AsyncLocalizer.java | 10 +--
 .../apache/storm/localizer/LocalizedResource.java  |  2 +-
 .../apache/storm/localizer/LocallyCachedBlob.java  | 54 +++++++-----
 .../storm/localizer/LocallyCachedTopologyBlob.java |  2 +-
 .../apache/storm/daemon/supervisor/SlotTest.java   | 80 +++++++++++++++++
 .../apache/storm/localizer/AsyncLocalizerTest.java | 12 +--
 7 files changed, 198 insertions(+), 61 deletions(-)

diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
index 8b1b2f5..74c3e3d 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
@@ -128,13 +128,10 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
                     }
                 } catch (ContainerRecoveryException e) {
                     //We could not recover container will be null.
+                    currentAssignment = null;
                 }
 
                 newAssignment = currentAssignment;
-                if (container == null) {
-                    currentAssignment = null;
-                    //Assigned something but it is not running
-                }
             }
         }
 
@@ -155,7 +152,9 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
     //In some cases the new LocalAssignment may be equivalent to the old, but
     // It is not equal.  In those cases we want to update the current assignment to
     // be the same as the new assignment
+    //PRECONDITION: The new and current assignments must be equivalent
     private static DynamicState updateAssignmentIfNeeded(DynamicState dynamicState) {
+        assert equivalent(dynamicState.newAssignment, dynamicState.currentAssignment);
         if (dynamicState.newAssignment != null
             && !dynamicState.newAssignment.equals(dynamicState.currentAssignment)) {
             dynamicState =
@@ -244,7 +243,10 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
      */
     private static DynamicState prepareForNewAssignmentNoWorkersRunning(DynamicState dynamicState, StaticState staticState) throws IOException {
         assert (dynamicState.container == null);
+        assert dynamicState.currentAssignment == null;
 
+        //We're either going to empty, or starting fresh blob download. Either way, the changing blob notifications are outdated.
+        dynamicState = drainAllChangingBlobs(dynamicState);
         if (dynamicState.newAssignment == null) {
             return dynamicState.withState(MachineState.EMPTY);
         }
@@ -276,7 +278,6 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
                     pendingDownload = staticState.localizer.requestDownloadTopologyBlobs(
                             dynamicState.newAssignment, staticState.port, staticState.changingCallback);
                 }
-                dynamicState = drainAllChangingBlobs(dynamicState);
                 next = dynamicState.withState(MachineState.KILL)
                         .withPendingLocalization(dynamicState.newAssignment, pendingDownload);
                 break;
@@ -330,10 +331,13 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
 
     /**
      * Drop all of the changingBlobs and pendingChangingBlobs.
+     * 
+     * PRECONDITION: container is null
      * @param dynamicState current state.
      * @return the next state.
      */
     private static DynamicState drainAllChangingBlobs(DynamicState dynamicState) {
+        assert dynamicState.container == null;
         if (!dynamicState.changingBlobs.isEmpty()) {
             for (BlobChanging rc : dynamicState.changingBlobs) {
                 rc.latch.countDown();
@@ -416,6 +420,7 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
         assert (dynamicState.pendingLocalization != null);
         assert (dynamicState.pendingDownload != null);
         assert (dynamicState.container == null);
+        assert dynamicState.currentAssignment == null;
 
         //Ignore changes to scheduling while downloading the topology blobs
         // We don't support canceling the download through the future yet,
@@ -435,8 +440,9 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
                 staticState.localizer.releaseSlotFor(dynamicState.pendingLocalization, staticState.port);
                 // Switch to the new assignment even if localization hasn't completed, or go to empty state
                 // if no new assignment.
-                return prepareForNewAssignmentNoWorkersRunning(dynamicState.withPendingChangingBlobs(Collections.emptySet(), null),
-                                                               staticState);
+                return prepareForNewAssignmentNoWorkersRunning(dynamicState
+                    .withPendingLocalization(null, null),
+                    staticState);
             }
 
             // Wait until time out
@@ -449,7 +455,6 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
                                    .withPendingLocalization(null, null);
             }
 
-            dynamicState = updateAssignmentIfNeeded(dynamicState);
             staticState.slotMetrics.numWorkersLaunched.mark();
             Container c =
                 staticState.containerLauncher.launchContainer(staticState.port, dynamicState.pendingLocalization, staticState.localState);
@@ -471,7 +476,10 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
             staticState.localizer.releaseSlotFor(dynamicState.pendingLocalization, staticState.port);
             // we wait for 3 seconds
             Time.sleepSecs(3);
-            return dynamicState.withState(MachineState.EMPTY);
+            //Try again, or go to empty if assignment has been nulled
+            return prepareForNewAssignmentNoWorkersRunning(dynamicState
+                .withPendingLocalization(null, null),
+                staticState);
         }
     }
 
@@ -492,6 +500,8 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
         assert dynamicState.container == null;
         assert dynamicState.pendingChangingBlobsAssignment != null;
         assert !dynamicState.pendingChangingBlobs.isEmpty();
+        assert dynamicState.pendingDownload == null;
+        assert dynamicState.pendingLocalization == null;
 
         if (!equivalent(dynamicState.newAssignment, dynamicState.currentAssignment)) {
             //We were rescheduled while waiting for the resources to be updated,
@@ -502,8 +512,7 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
                 staticState.localizer.releaseSlotFor(dynamicState.currentAssignment, staticState.port);
             }
             staticState.localizer.releaseSlotFor(dynamicState.pendingChangingBlobsAssignment, staticState.port);
-            return prepareForNewAssignmentNoWorkersRunning(dynamicState.withCurrentAssignment(null, null)
-                                                                       .withPendingChangingBlobs(Collections.emptySet(), null),
+            return prepareForNewAssignmentNoWorkersRunning(dynamicState.withCurrentAssignment(null, null),
                                                            staticState);
         }
 
@@ -546,6 +555,8 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
     private static DynamicState handleKill(DynamicState dynamicState, StaticState staticState) throws Exception {
         assert (dynamicState.container != null);
         assert (dynamicState.currentAssignment != null);
+        assert dynamicState.pendingChangingBlobs.isEmpty();
+        assert dynamicState.pendingChangingBlobsAssignment == null;
 
         if (dynamicState.container.areAllProcessesDead()) {
             LOG.info("SLOT {} all processes are dead...", staticState.port);
@@ -572,6 +583,10 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
     private static DynamicState handleKillAndRelaunch(DynamicState dynamicState, StaticState staticState) throws Exception {
         assert (dynamicState.container != null);
         assert (dynamicState.currentAssignment != null);
+        assert dynamicState.pendingChangingBlobs.isEmpty();
+        assert dynamicState.pendingChangingBlobsAssignment == null;
+        assert dynamicState.pendingLocalization == null;
+        assert dynamicState.pendingDownload == null;
 
         if (dynamicState.container.areAllProcessesDead()) {
             if (equivalent(dynamicState.newAssignment, dynamicState.currentAssignment)) {
@@ -604,6 +619,10 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
     private static DynamicState handleKillBlobUpdate(DynamicState dynamicState, StaticState staticState) throws Exception {
         assert (dynamicState.container != null);
         assert (dynamicState.currentAssignment != null);
+        assert dynamicState.pendingChangingBlobs.isEmpty();
+        assert dynamicState.pendingChangingBlobsAssignment == null;
+        assert dynamicState.pendingDownload == null;
+        assert dynamicState.pendingLocalization == null;
 
         //Release things that don't need to wait for us
         dynamicState = filterChangingBlobsFor(dynamicState, dynamicState.currentAssignment);
@@ -638,6 +657,10 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
     private static DynamicState handleWaitingForWorkerStart(DynamicState dynamicState, StaticState staticState) throws Exception {
         assert (dynamicState.container != null);
         assert (dynamicState.currentAssignment != null);
+        assert dynamicState.pendingChangingBlobs.isEmpty();
+        assert dynamicState.pendingChangingBlobsAssignment == null;
+        assert dynamicState.pendingDownload == null;
+        assert dynamicState.pendingLocalization == null;
 
         LSWorkerHeartbeat hb = dynamicState.container.readHeartbeat();
         if (hb != null) {
@@ -682,6 +705,10 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
     private static DynamicState handleRunning(DynamicState dynamicState, StaticState staticState) throws Exception {
         assert (dynamicState.container != null);
         assert (dynamicState.currentAssignment != null);
+        assert dynamicState.pendingChangingBlobs.isEmpty();
+        assert dynamicState.pendingChangingBlobsAssignment == null;
+        assert dynamicState.pendingDownload == null;
+        assert dynamicState.pendingLocalization == null;
 
         if (!equivalent(dynamicState.newAssignment, dynamicState.currentAssignment)) {
             LOG.info("SLOT {}: Assignment Changed from {} to {}", staticState.port, dynamicState.currentAssignment,
@@ -777,8 +804,12 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
     }
 
     static DynamicState handleEmpty(DynamicState dynamicState, StaticState staticState) throws InterruptedException, IOException {
-        assert dynamicState.changingBlobs.isEmpty();
+        assert dynamicState.currentAssignment == null;
+        assert dynamicState.container == null;
+        assert dynamicState.pendingChangingBlobs.isEmpty();
         assert dynamicState.pendingChangingBlobsAssignment == null;
+        assert dynamicState.pendingDownload == null;
+        assert dynamicState.pendingLocalization == null;
         if (!equivalent(dynamicState.newAssignment, dynamicState.currentAssignment)) {
             return prepareForNewAssignmentNoWorkersRunning(dynamicState, staticState);
         }
@@ -796,7 +827,7 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
         return dynamicState;
     }
 
-    public MachineState getMachineState() {
+    MachineState getMachineState() {
         return dynamicState.state;
     }
 
@@ -810,12 +841,12 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
 
     @Override
     public void blobChanging(LocalAssignment assignment, int port, LocallyCachedBlob blob, GoodToGo go) {
-        assert port == staticState.port : "got a callaback that is not for us " + port + " != " + staticState.port;
+        assert port == staticState.port : "got a callback that is not for us " + port + " != " + staticState.port;
         //This is called async so lets assume that it is something we care about
         try {
             changingBlobs.put(new BlobChanging(assignment, blob, go.getLatch()));
         } catch (InterruptedException e) {
-            throw new RuntimeException("This should not have happend, but it did (the queue is unbounded)", e);
+            throw new RuntimeException("This should not have happened, but it did (the queue is unbounded)", e);
         }
     }
 
@@ -957,13 +988,37 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
     }
 
     enum MachineState {
+        /**
+         * Slot is empty, and no worker is running.
+         */
         EMPTY,
+        /**
+         * Slot is running a worker.
+         */
         RUNNING,
+        /**
+         * Slot has launched a worker, and is waiting for it to heartbeat.
+         */
         WAITING_FOR_WORKER_START,
+        /**
+         * Slot has just killed its worker, and is now waiting for it to die so it can be relaunched in the same container.
+         */
         KILL_AND_RELAUNCH,
+        /**
+         * Slot has just killed its worker, and is now waiting for it to die so the container can be deleted. 
+         */
         KILL,
+        /**
+         * Slot has just killed its worker, and is now waiting for it to die so the localizer can update a blob.
+         */
         KILL_BLOB_UPDATE,
+        /**
+         * The slot is empty, and is waiting for blobs to download before the worker can be launched.
+         */
         WAITING_FOR_BLOB_LOCALIZATION,
+        /**
+         * The slot is empty, and is waiting for blobs to be updated before the worker can be (re)launched.
+         */
         WAITING_FOR_BLOB_UPDATE;
 
         @Override
@@ -1043,14 +1098,17 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
         public final Set<TopoProfileAction> pendingStopProfileActions;
 
         /**
-         * Blobs that are changed and need to be synced.
+         * Blobs that are changed and need to be synced. The localizer notifies the Slot about changing blobs on every state step.
+         * Blob updates are blocked until the Slot unblocks them, at which point they go in {@link #pendingChangingBlobs}.
+         * Updates are blocked until the Slot worker is dead, since blobs may otherwise be actively used.
          */
         public final Set<BlobChanging> changingBlobs;
-        public final LocalAssignment pendingChangingBlobsAssignment;
-
+        
         /**
-         * Signals that acknowledged changing blobs have been updated.
+         * The assignment {@link #pendingChangingBlobs} belongs to.
          */
+        public final LocalAssignment pendingChangingBlobsAssignment;
+
         public final Set<Future<Void>> pendingChangingBlobs;
 
         /**
@@ -1197,6 +1255,9 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
                                     this.pendingChangingBlobs, this.pendingChangingBlobsAssignment, this.slotMetrics);
         }
 
+        /**
+         * Set the blocked changing blobs. This is an input from the outside, and should never be called by the state machine steps.
+         */
         public DynamicState withChangingBlobs(Set<BlobChanging> changingBlobs) {
             if (changingBlobs == this.changingBlobs) {
                 return this;
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
index 28bb2dd..f1bc79b 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
@@ -70,7 +70,7 @@ import org.slf4j.LoggerFactory;
 public class AsyncLocalizer implements AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(AsyncLocalizer.class);
 
-    private static final CompletableFuture<Void> ALL_DONE_FUTURE = new CompletableFuture<>();
+    private static final CompletableFuture<Void> ALL_DONE_FUTURE = CompletableFuture.completedFuture(null);
     private static final int ATTEMPTS_INTERVAL_TIME = 100;
 
     private final Timer singleBlobLocalizationDuration;
@@ -78,10 +78,6 @@ public class AsyncLocalizer implements AutoCloseable {
     private final Timer blobLocalizationDuration;
     private final Meter numBlobUpdateVersionChanged;
 
-    static {
-        ALL_DONE_FUTURE.complete(null);
-    }
-
     // track resources - user to resourceSet
     //ConcurrentHashMap is explicitly used everywhere in this class because it uses locks to guarantee atomicity for compute and
     // computeIfAbsent where as ConcurrentMap allows for a retry of the function passed in, and would require the function to have
@@ -275,9 +271,7 @@ public class AsyncLocalizer implements AutoCloseable {
                                     Timer.Context t = singleBlobLocalizationDuration.time();
                                     try {
                                         long newVersion = blob.fetchUnzipToTemp(blobStore);
-                                        blob.informAllOfChangeAndWaitForConsensus();
-                                        blob.commitNewVersion(newVersion);
-                                        blob.informAllChangeComplete();
+                                        blob.informReferencesAndCommitNewVersion(newVersion);
                                         t.stop();
                                     } finally {
                                         blob.cleanupOrphanedData();
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java b/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java
index 541c075..ea3bdac 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java
@@ -276,7 +276,7 @@ public class LocalizedResource extends LocallyCachedBlob {
     }
 
     @Override
-    public void commitNewVersion(long version) throws IOException {
+    protected void commitNewVersion(long version) throws IOException {
         String key = getKey();
         LOG.info("Blob: {} updated to version {} from version {}", key, version, getLocalVersion());
         Path localVersionFile = versionFilePath;
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
index 131b269..c948240 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
@@ -23,8 +23,10 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.storm.blobstore.ClientBlobStore;
 import org.apache.storm.blobstore.InputStreamWithMeta;
@@ -44,11 +46,10 @@ public abstract class LocallyCachedBlob {
     // A callback that does nothing.
     private static final BlobChangingCallback NOOP_CB = (assignment, port, resource, go) -> {
     };
-    private final Map<PortAndAssignment, BlobChangingCallback> references = new HashMap<>();
+    private final ConcurrentHashMap<PortAndAssignment, BlobChangingCallback> references = new ConcurrentHashMap<>();
     private final String blobDescription;
     private final String blobKey;
-    private long lastUsed = Time.currentTimeMillis();
-    private CompletableFuture<Void> doneUpdating = null;
+    private AtomicLong lastUsed = new AtomicLong(Time.currentTimeMillis());
 
     private final Histogram fetchingRate;
 
@@ -144,7 +145,7 @@ public abstract class LocallyCachedBlob {
      * PRECONDITION: this can only be called with a lock on this instance held.
      * @param version the version of the blob to commit.
      */
-    public abstract void commitNewVersion(long version) throws IOException;
+    protected abstract void commitNewVersion(long version) throws IOException;
 
     /**
      * Clean up any temporary files.  This will be called after updating a blob, either successfully or if an error has occured.
@@ -195,22 +196,22 @@ public abstract class LocallyCachedBlob {
     /**
      * Updates the last updated time.  This should be called when references are added or removed.
      */
-    protected synchronized void touch() {
-        lastUsed = Time.currentTimeMillis();
-        LOG.debug("Setting {} ts to {}", blobKey, lastUsed);
+    protected void touch() {
+        lastUsed.set(Time.currentTimeMillis());
+        LOG.debug("Setting {} ts to {}", blobKey, lastUsed.get());
     }
 
     /**
      * Get the last time that this used for LRU calculations.
      */
-    public synchronized long getLastUsed() {
-        return lastUsed;
+    public long getLastUsed() {
+        return lastUsed.get();
     }
 
     /**
      * Return true if this blob is actively being used, else false (meaning it can be deleted, but might not be).
      */
-    public synchronized boolean isUsed() {
+    public boolean isUsed() {
         return !references.isEmpty();
     }
 
@@ -219,7 +220,7 @@ public abstract class LocallyCachedBlob {
      * @param pna the slot and assignment that are using this blob.
      * @param cb an optional callback indicating that they want to know/synchronize when a blob is updated.
      */
-    public synchronized void addReference(final PortAndAssignment pna, BlobChangingCallback cb) {
+    public void addReference(final PortAndAssignment pna, BlobChangingCallback cb) {
         LOG.debug("Adding reference {}", pna);
         if (cb == null) {
             cb = NOOP_CB;
@@ -233,7 +234,7 @@ public abstract class LocallyCachedBlob {
      * Removes a reservation for this blob from a given slot and assignemnt.
      * @param pna the slot + assignment that no longer needs this blob.
      */
-    public synchronized void removeReference(final PortAndAssignment pna) {
+    public void removeReference(final PortAndAssignment pna) {
         LOG.debug("Removing reference {}", pna);
         if (references.remove(pna) == null) {
             LOG.warn("{} had no reservation for {}", pna, blobDescription);
@@ -242,13 +243,26 @@ public abstract class LocallyCachedBlob {
     }
 
     /**
+     * Inform all of the callbacks that a change is going to happen and then wait for them to all get back that it is OK to make that
+     * change. Commit the new version once all callbacks are ready. Finally inform all callbacks that the commit is complete.
+     */
+    public synchronized void informReferencesAndCommitNewVersion(long newVersion) throws IOException {
+        CompletableFuture<Void> doneUpdating = informAllOfChangeAndWaitForConsensus();
+        commitNewVersion(newVersion);
+        doneUpdating.complete(null);
+    }
+    
+    /**
      * Inform all of the callbacks that a change is going to happen and then wait for
      * them to all get back that it is OK to make that change.
+     * 
+     * @return A future to complete when the change is committed
      */
-    public synchronized void informAllOfChangeAndWaitForConsensus() {
-        CountDownLatch cdl = new CountDownLatch(references.size());
-        doneUpdating = new CompletableFuture<>();
-        for (Map.Entry<PortAndAssignment, BlobChangingCallback> entry : references.entrySet()) {
+    private CompletableFuture<Void> informAllOfChangeAndWaitForConsensus() {
+        HashMap<PortAndAssignment, BlobChangingCallback> refsCopy = new HashMap<>(references);
+        CountDownLatch cdl = new CountDownLatch(refsCopy.size());
+        CompletableFuture<Void> doneUpdating = new CompletableFuture<>();
+        for (Map.Entry<PortAndAssignment, BlobChangingCallback> entry : refsCopy.entrySet()) {
             GoodToGo gtg = new GoodToGo(cdl, doneUpdating);
             try {
                 PortAndAssignment pna = entry.getKey();
@@ -264,13 +278,7 @@ public abstract class LocallyCachedBlob {
             //Interrupted is thrown when we are shutting down.
             // So just ignore it for now...
         }
-    }
-
-    /**
-     * Inform all of the callbacks that the change to the blob is complete.
-     */
-    public synchronized void informAllChangeComplete() {
-        doneUpdating.complete(null);
+        return doneUpdating;
     }
 
     /**
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java
index 48ca064..3b019f4 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java
@@ -204,7 +204,7 @@ public class LocallyCachedTopologyBlob extends LocallyCachedBlob {
     }
 
     @Override
-    public void commitNewVersion(long newVersion) throws IOException {
+    protected void commitNewVersion(long newVersion) throws IOException {
         //This is not atomic (so if something bad happens in the middle we need to be able to recover
         Path tempLoc = topologyBasicBlobsRootDir.resolve(type.getTempFileName(newVersion));
         Path dest = topologyBasicBlobsRootDir.resolve(type.getFileName());
diff --git a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
index 21a9bc2..9dfe2d7 100644
--- a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
+++ b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
@@ -12,6 +12,9 @@
 
 package org.apache.storm.daemon.supervisor;
 
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -52,6 +55,7 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.*;
 
+import java.util.concurrent.ExecutionException;
 import org.apache.storm.metric.StormMetricsRegistry;
 
 public class SlotTest {
@@ -231,7 +235,83 @@ public class SlotTest {
             assertTrue(Time.currentTimeMillis() > 2000);
         }
     }
+    
+    @Test
+    public void testErrorHandlingWhenLocalizationFails() throws Exception {
+        try (SimulatedTime t = new SimulatedTime(1010)) {
+            int port = 8080;
+            String topoId = "NEW";
+            List<ExecutorInfo> execList = mkExecutorInfoList(1, 2, 3, 4, 5);
+            LocalAssignment newAssignment =
+                mkLocalAssignment(topoId, execList, mkWorkerResources(100.0, 100.0, 100.0));
+
+            AsyncLocalizer localizer = mock(AsyncLocalizer.class);
+            BlobChangingCallback cb = mock(BlobChangingCallback.class);
+            Container container = mock(Container.class);
+            LocalState state = mock(LocalState.class);
+            ContainerLauncher containerLauncher = mock(ContainerLauncher.class);
+            when(containerLauncher.launchContainer(port, newAssignment, state)).thenReturn(container);
+            LSWorkerHeartbeat hb = mkWorkerHB(topoId, port, execList, Time.currentTimeSecs());
+            when(container.readHeartbeat()).thenReturn(hb, hb);
+
+            @SuppressWarnings("unchecked")
+            CompletableFuture<Void> blobFuture = mock(CompletableFuture.class);
+            CompletableFuture<Void> secondBlobFuture = mock(CompletableFuture.class);
+            when(secondBlobFuture.get(anyLong(), any())).thenThrow(new ExecutionException(new RuntimeException("Localization failure")));
+            CompletableFuture<Void> thirdBlobFuture = mock(CompletableFuture.class);
+            when(localizer.requestDownloadTopologyBlobs(newAssignment, port, cb))
+                .thenReturn(blobFuture)
+                .thenReturn(secondBlobFuture)
+                .thenReturn(thirdBlobFuture);
 
+            ISupervisor iSuper = mock(ISupervisor.class);
+            SlotMetrics slotMetrics = new SlotMetrics(new StormMetricsRegistry());
+            StaticState staticState = new StaticState(localizer, 5000, 120000, 1000, 1000,
+                                                      containerLauncher, "localhost", port, iSuper, state, cb, null, null, slotMetrics);
+            DynamicState dynamicState = new DynamicState(null, null, null, slotMetrics)
+                .withNewAssignment(newAssignment);
+
+            DynamicState nextState = Slot.stateMachineStep(dynamicState, staticState);
+            verify(localizer).requestDownloadTopologyBlobs(newAssignment, port, cb);
+            assertEquals(MachineState.WAITING_FOR_BLOB_LOCALIZATION, nextState.state);
+            assertSame("pendingDownload not set properly", blobFuture, nextState.pendingDownload);
+            assertEquals(newAssignment, nextState.pendingLocalization);
+            assertEquals(0, Time.currentTimeMillis());
+
+            //Assignment has changed
+            nextState = Slot.stateMachineStep(nextState.withNewAssignment(null), staticState);
+            assertThat(nextState.state, is(MachineState.EMPTY));
+            assertThat(nextState.pendingChangingBlobs, is(Collections.emptySet()));
+            assertThat(nextState.pendingChangingBlobsAssignment, nullValue());
+            assertThat(nextState.pendingLocalization, nullValue());
+            assertThat(nextState.pendingDownload, nullValue());
+            
+            clearInvocations(localizer);
+            nextState = Slot.stateMachineStep(dynamicState.withNewAssignment(newAssignment), staticState);
+            verify(localizer).requestDownloadTopologyBlobs(newAssignment, port, cb);
+            assertEquals(MachineState.WAITING_FOR_BLOB_LOCALIZATION, nextState.state);
+            assertSame("pendingDownload not set properly", secondBlobFuture, nextState.pendingDownload);
+            assertEquals(newAssignment, nextState.pendingLocalization);
+            
+            //Error occurs, but assignment has not changed
+            clearInvocations(localizer);
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            verify(localizer).requestDownloadTopologyBlobs(newAssignment, port, cb);
+            assertEquals(MachineState.WAITING_FOR_BLOB_LOCALIZATION, nextState.state);
+            assertSame("pendingDownload not set properly", thirdBlobFuture, nextState.pendingDownload);
+            assertEquals(newAssignment, nextState.pendingLocalization);
+            assertThat(Time.currentTimeMillis(), greaterThan(3L));
+
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            verify(thirdBlobFuture).get(1000, TimeUnit.MILLISECONDS);
+            verify(containerLauncher).launchContainer(port, newAssignment, state);
+            assertEquals(MachineState.WAITING_FOR_WORKER_START, nextState.state);
+            assertSame("pendingDownload is not null", null, nextState.pendingDownload);
+            assertSame(null, nextState.pendingLocalization);
+            assertSame(newAssignment, nextState.currentAssignment);
+            assertSame(container, nextState.container);
+        }
+    }
 
     @Test
     public void testRelaunch() throws Exception {
diff --git a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
index eef3205..8228702 100644
--- a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
+++ b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
@@ -149,21 +149,15 @@ public class AsyncLocalizerTest {
             f.get(20, TimeUnit.SECONDS);
 
             verify(jarBlob).fetchUnzipToTemp(any());
-            verify(jarBlob).informAllOfChangeAndWaitForConsensus();
-            verify(jarBlob).commitNewVersion(100L);
-            verify(jarBlob).informAllChangeComplete();
+            verify(jarBlob).informReferencesAndCommitNewVersion(100L);
             verify(jarBlob).cleanupOrphanedData();
 
             verify(codeBlob).fetchUnzipToTemp(any());
-            verify(codeBlob).informAllOfChangeAndWaitForConsensus();
-            verify(codeBlob).commitNewVersion(200L);
-            verify(codeBlob).informAllChangeComplete();
+            verify(codeBlob).informReferencesAndCommitNewVersion(200L);
             verify(codeBlob).cleanupOrphanedData();
 
             verify(confBlob).fetchUnzipToTemp(any());
-            verify(confBlob).informAllOfChangeAndWaitForConsensus();
-            verify(confBlob).commitNewVersion(300L);
-            verify(confBlob).informAllChangeComplete();
+            verify(confBlob).informReferencesAndCommitNewVersion(300L);
             verify(confBlob).cleanupOrphanedData();
         } finally {
             bl.close();