You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2019/03/15 18:42:17 UTC
[storm] branch master updated: STORM-3319: Fix failing assertion in
Slot
This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 947c7b2 STORM-3319: Fix failing assertion in Slot
new 18ead35 Merge branch 'STORM-3319' of https://github.com/srdo/storm into STORM-3319-merging
947c7b2 is described below
commit 947c7b22acf4cdf4f95001b26e700a590d675220
Author: Stig Rohde Døssing <sr...@apache.org>
AuthorDate: Sun Jan 20 03:15:03 2019 +0100
STORM-3319: Fix failing assertion in Slot
---
.../org/apache/storm/daemon/supervisor/Slot.java | 99 +++++++++++++++++-----
.../org/apache/storm/localizer/AsyncLocalizer.java | 10 +--
.../apache/storm/localizer/LocalizedResource.java | 2 +-
.../apache/storm/localizer/LocallyCachedBlob.java | 54 +++++++-----
.../storm/localizer/LocallyCachedTopologyBlob.java | 2 +-
.../apache/storm/daemon/supervisor/SlotTest.java | 80 +++++++++++++++++
.../apache/storm/localizer/AsyncLocalizerTest.java | 12 +--
7 files changed, 198 insertions(+), 61 deletions(-)
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
index 8b1b2f5..74c3e3d 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
@@ -128,13 +128,10 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
}
} catch (ContainerRecoveryException e) {
//We could not recover container will be null.
+ currentAssignment = null;
}
newAssignment = currentAssignment;
- if (container == null) {
- currentAssignment = null;
- //Assigned something but it is not running
- }
}
}
@@ -155,7 +152,9 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
//In some cases the new LocalAssignment may be equivalent to the old, but
// It is not equal. In those cases we want to update the current assignment to
// be the same as the new assignment
+ //PRECONDITION: The new and current assignments must be equivalent
private static DynamicState updateAssignmentIfNeeded(DynamicState dynamicState) {
+ assert equivalent(dynamicState.newAssignment, dynamicState.currentAssignment);
if (dynamicState.newAssignment != null
&& !dynamicState.newAssignment.equals(dynamicState.currentAssignment)) {
dynamicState =
@@ -244,7 +243,10 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
*/
private static DynamicState prepareForNewAssignmentNoWorkersRunning(DynamicState dynamicState, StaticState staticState) throws IOException {
assert (dynamicState.container == null);
+ assert dynamicState.currentAssignment == null;
+ //We're either going to empty, or starting fresh blob download. Either way, the changing blob notifications are outdated.
+ dynamicState = drainAllChangingBlobs(dynamicState);
if (dynamicState.newAssignment == null) {
return dynamicState.withState(MachineState.EMPTY);
}
@@ -276,7 +278,6 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
pendingDownload = staticState.localizer.requestDownloadTopologyBlobs(
dynamicState.newAssignment, staticState.port, staticState.changingCallback);
}
- dynamicState = drainAllChangingBlobs(dynamicState);
next = dynamicState.withState(MachineState.KILL)
.withPendingLocalization(dynamicState.newAssignment, pendingDownload);
break;
@@ -330,10 +331,13 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
/**
* Drop all of the changingBlobs and pendingChangingBlobs.
+ *
+ * PRECONDITION: container is null
* @param dynamicState current state.
* @return the next state.
*/
private static DynamicState drainAllChangingBlobs(DynamicState dynamicState) {
+ assert dynamicState.container == null;
if (!dynamicState.changingBlobs.isEmpty()) {
for (BlobChanging rc : dynamicState.changingBlobs) {
rc.latch.countDown();
@@ -416,6 +420,7 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
assert (dynamicState.pendingLocalization != null);
assert (dynamicState.pendingDownload != null);
assert (dynamicState.container == null);
+ assert dynamicState.currentAssignment == null;
//Ignore changes to scheduling while downloading the topology blobs
// We don't support canceling the download through the future yet,
@@ -435,8 +440,9 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
staticState.localizer.releaseSlotFor(dynamicState.pendingLocalization, staticState.port);
// Switch to the new assignment even if localization hasn't completed, or go to empty state
// if no new assignment.
- return prepareForNewAssignmentNoWorkersRunning(dynamicState.withPendingChangingBlobs(Collections.emptySet(), null),
- staticState);
+ return prepareForNewAssignmentNoWorkersRunning(dynamicState
+ .withPendingLocalization(null, null),
+ staticState);
}
// Wait until time out
@@ -449,7 +455,6 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
.withPendingLocalization(null, null);
}
- dynamicState = updateAssignmentIfNeeded(dynamicState);
staticState.slotMetrics.numWorkersLaunched.mark();
Container c =
staticState.containerLauncher.launchContainer(staticState.port, dynamicState.pendingLocalization, staticState.localState);
@@ -471,7 +476,10 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
staticState.localizer.releaseSlotFor(dynamicState.pendingLocalization, staticState.port);
// we wait for 3 seconds
Time.sleepSecs(3);
- return dynamicState.withState(MachineState.EMPTY);
+ //Try again, or go to empty if assignment has been nulled
+ return prepareForNewAssignmentNoWorkersRunning(dynamicState
+ .withPendingLocalization(null, null),
+ staticState);
}
}
@@ -492,6 +500,8 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
assert dynamicState.container == null;
assert dynamicState.pendingChangingBlobsAssignment != null;
assert !dynamicState.pendingChangingBlobs.isEmpty();
+ assert dynamicState.pendingDownload == null;
+ assert dynamicState.pendingLocalization == null;
if (!equivalent(dynamicState.newAssignment, dynamicState.currentAssignment)) {
//We were rescheduled while waiting for the resources to be updated,
@@ -502,8 +512,7 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
staticState.localizer.releaseSlotFor(dynamicState.currentAssignment, staticState.port);
}
staticState.localizer.releaseSlotFor(dynamicState.pendingChangingBlobsAssignment, staticState.port);
- return prepareForNewAssignmentNoWorkersRunning(dynamicState.withCurrentAssignment(null, null)
- .withPendingChangingBlobs(Collections.emptySet(), null),
+ return prepareForNewAssignmentNoWorkersRunning(dynamicState.withCurrentAssignment(null, null),
staticState);
}
@@ -546,6 +555,8 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
private static DynamicState handleKill(DynamicState dynamicState, StaticState staticState) throws Exception {
assert (dynamicState.container != null);
assert (dynamicState.currentAssignment != null);
+ assert dynamicState.pendingChangingBlobs.isEmpty();
+ assert dynamicState.pendingChangingBlobsAssignment == null;
if (dynamicState.container.areAllProcessesDead()) {
LOG.info("SLOT {} all processes are dead...", staticState.port);
@@ -572,6 +583,10 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
private static DynamicState handleKillAndRelaunch(DynamicState dynamicState, StaticState staticState) throws Exception {
assert (dynamicState.container != null);
assert (dynamicState.currentAssignment != null);
+ assert dynamicState.pendingChangingBlobs.isEmpty();
+ assert dynamicState.pendingChangingBlobsAssignment == null;
+ assert dynamicState.pendingLocalization == null;
+ assert dynamicState.pendingDownload == null;
if (dynamicState.container.areAllProcessesDead()) {
if (equivalent(dynamicState.newAssignment, dynamicState.currentAssignment)) {
@@ -604,6 +619,10 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
private static DynamicState handleKillBlobUpdate(DynamicState dynamicState, StaticState staticState) throws Exception {
assert (dynamicState.container != null);
assert (dynamicState.currentAssignment != null);
+ assert dynamicState.pendingChangingBlobs.isEmpty();
+ assert dynamicState.pendingChangingBlobsAssignment == null;
+ assert dynamicState.pendingDownload == null;
+ assert dynamicState.pendingLocalization == null;
//Release things that don't need to wait for us
dynamicState = filterChangingBlobsFor(dynamicState, dynamicState.currentAssignment);
@@ -638,6 +657,10 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
private static DynamicState handleWaitingForWorkerStart(DynamicState dynamicState, StaticState staticState) throws Exception {
assert (dynamicState.container != null);
assert (dynamicState.currentAssignment != null);
+ assert dynamicState.pendingChangingBlobs.isEmpty();
+ assert dynamicState.pendingChangingBlobsAssignment == null;
+ assert dynamicState.pendingDownload == null;
+ assert dynamicState.pendingLocalization == null;
LSWorkerHeartbeat hb = dynamicState.container.readHeartbeat();
if (hb != null) {
@@ -682,6 +705,10 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
private static DynamicState handleRunning(DynamicState dynamicState, StaticState staticState) throws Exception {
assert (dynamicState.container != null);
assert (dynamicState.currentAssignment != null);
+ assert dynamicState.pendingChangingBlobs.isEmpty();
+ assert dynamicState.pendingChangingBlobsAssignment == null;
+ assert dynamicState.pendingDownload == null;
+ assert dynamicState.pendingLocalization == null;
if (!equivalent(dynamicState.newAssignment, dynamicState.currentAssignment)) {
LOG.info("SLOT {}: Assignment Changed from {} to {}", staticState.port, dynamicState.currentAssignment,
@@ -777,8 +804,12 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
}
static DynamicState handleEmpty(DynamicState dynamicState, StaticState staticState) throws InterruptedException, IOException {
- assert dynamicState.changingBlobs.isEmpty();
+ assert dynamicState.currentAssignment == null;
+ assert dynamicState.container == null;
+ assert dynamicState.pendingChangingBlobs.isEmpty();
assert dynamicState.pendingChangingBlobsAssignment == null;
+ assert dynamicState.pendingDownload == null;
+ assert dynamicState.pendingLocalization == null;
if (!equivalent(dynamicState.newAssignment, dynamicState.currentAssignment)) {
return prepareForNewAssignmentNoWorkersRunning(dynamicState, staticState);
}
@@ -796,7 +827,7 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
return dynamicState;
}
- public MachineState getMachineState() {
+ MachineState getMachineState() {
return dynamicState.state;
}
@@ -810,12 +841,12 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
@Override
public void blobChanging(LocalAssignment assignment, int port, LocallyCachedBlob blob, GoodToGo go) {
- assert port == staticState.port : "got a callaback that is not for us " + port + " != " + staticState.port;
+ assert port == staticState.port : "got a callback that is not for us " + port + " != " + staticState.port;
//This is called async so lets assume that it is something we care about
try {
changingBlobs.put(new BlobChanging(assignment, blob, go.getLatch()));
} catch (InterruptedException e) {
- throw new RuntimeException("This should not have happend, but it did (the queue is unbounded)", e);
+ throw new RuntimeException("This should not have happened, but it did (the queue is unbounded)", e);
}
}
@@ -957,13 +988,37 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
}
enum MachineState {
+ /**
+ * Slot is empty, and no worker is running.
+ */
EMPTY,
+ /**
+ * Slot is running a worker.
+ */
RUNNING,
+ /**
+ * Slot has launched a worker, and is waiting for it to heartbeat.
+ */
WAITING_FOR_WORKER_START,
+ /**
+ * Slot has just killed its worker, and is now waiting for it to die so it can be relaunched in the same container.
+ */
KILL_AND_RELAUNCH,
+ /**
+ * Slot has just killed its worker, and is now waiting for it to die so the container can be deleted.
+ */
KILL,
+ /**
+ * Slot has just killed its worker, and is now waiting for it to die so the localizer can update a blob.
+ */
KILL_BLOB_UPDATE,
+ /**
+ * The slot is empty, and is waiting for blobs to download before the worker can be launched.
+ */
WAITING_FOR_BLOB_LOCALIZATION,
+ /**
+ * The slot is empty, and is waiting for blobs to be updated before the worker can be (re)launched.
+ */
WAITING_FOR_BLOB_UPDATE;
@Override
@@ -1043,14 +1098,17 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
public final Set<TopoProfileAction> pendingStopProfileActions;
/**
- * Blobs that are changed and need to be synced.
+ * Blobs that are changed and need to be synced. The localizer notifies the Slot about changing blobs on every state step.
+ * Blob updates are blocked until the Slot unblocks them, at which point they go in {@link #pendingChangingBlobs}.
+ * Updates are blocked until the Slot worker is dead, since blobs may otherwise be actively used.
*/
public final Set<BlobChanging> changingBlobs;
- public final LocalAssignment pendingChangingBlobsAssignment;
-
+
/**
- * Signals that acknowledged changing blobs have been updated.
+ * The assignment {@link #pendingChangingBlobs} belongs to.
*/
+ public final LocalAssignment pendingChangingBlobsAssignment;
+
public final Set<Future<Void>> pendingChangingBlobs;
/**
@@ -1197,6 +1255,9 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
this.pendingChangingBlobs, this.pendingChangingBlobsAssignment, this.slotMetrics);
}
+ /**
+ * Set the blocked changing blobs. This is an input from the outside, and should never be called by the state machine steps.
+ */
public DynamicState withChangingBlobs(Set<BlobChanging> changingBlobs) {
if (changingBlobs == this.changingBlobs) {
return this;
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
index 28bb2dd..f1bc79b 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
@@ -70,7 +70,7 @@ import org.slf4j.LoggerFactory;
public class AsyncLocalizer implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(AsyncLocalizer.class);
- private static final CompletableFuture<Void> ALL_DONE_FUTURE = new CompletableFuture<>();
+ private static final CompletableFuture<Void> ALL_DONE_FUTURE = CompletableFuture.completedFuture(null);
private static final int ATTEMPTS_INTERVAL_TIME = 100;
private final Timer singleBlobLocalizationDuration;
@@ -78,10 +78,6 @@ public class AsyncLocalizer implements AutoCloseable {
private final Timer blobLocalizationDuration;
private final Meter numBlobUpdateVersionChanged;
- static {
- ALL_DONE_FUTURE.complete(null);
- }
-
// track resources - user to resourceSet
//ConcurrentHashMap is explicitly used everywhere in this class because it uses locks to guarantee atomicity for compute and
// computeIfAbsent where as ConcurrentMap allows for a retry of the function passed in, and would require the function to have
@@ -275,9 +271,7 @@ public class AsyncLocalizer implements AutoCloseable {
Timer.Context t = singleBlobLocalizationDuration.time();
try {
long newVersion = blob.fetchUnzipToTemp(blobStore);
- blob.informAllOfChangeAndWaitForConsensus();
- blob.commitNewVersion(newVersion);
- blob.informAllChangeComplete();
+ blob.informReferencesAndCommitNewVersion(newVersion);
t.stop();
} finally {
blob.cleanupOrphanedData();
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java b/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java
index 541c075..ea3bdac 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java
@@ -276,7 +276,7 @@ public class LocalizedResource extends LocallyCachedBlob {
}
@Override
- public void commitNewVersion(long version) throws IOException {
+ protected void commitNewVersion(long version) throws IOException {
String key = getKey();
LOG.info("Blob: {} updated to version {} from version {}", key, version, getLocalVersion());
Path localVersionFile = versionFilePath;
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
index 131b269..c948240 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
@@ -23,8 +23,10 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.storm.blobstore.ClientBlobStore;
import org.apache.storm.blobstore.InputStreamWithMeta;
@@ -44,11 +46,10 @@ public abstract class LocallyCachedBlob {
// A callback that does nothing.
private static final BlobChangingCallback NOOP_CB = (assignment, port, resource, go) -> {
};
- private final Map<PortAndAssignment, BlobChangingCallback> references = new HashMap<>();
+ private final ConcurrentHashMap<PortAndAssignment, BlobChangingCallback> references = new ConcurrentHashMap<>();
private final String blobDescription;
private final String blobKey;
- private long lastUsed = Time.currentTimeMillis();
- private CompletableFuture<Void> doneUpdating = null;
+ private AtomicLong lastUsed = new AtomicLong(Time.currentTimeMillis());
private final Histogram fetchingRate;
@@ -144,7 +145,7 @@ public abstract class LocallyCachedBlob {
* PRECONDITION: this can only be called with a lock on this instance held.
* @param version the version of the blob to commit.
*/
- public abstract void commitNewVersion(long version) throws IOException;
+ protected abstract void commitNewVersion(long version) throws IOException;
/**
* Clean up any temporary files. This will be called after updating a blob, either successfully or if an error has occured.
@@ -195,22 +196,22 @@ public abstract class LocallyCachedBlob {
/**
* Updates the last updated time. This should be called when references are added or removed.
*/
- protected synchronized void touch() {
- lastUsed = Time.currentTimeMillis();
- LOG.debug("Setting {} ts to {}", blobKey, lastUsed);
+ protected void touch() {
+ lastUsed.set(Time.currentTimeMillis());
+ LOG.debug("Setting {} ts to {}", blobKey, lastUsed.get());
}
/**
* Get the last time that this used for LRU calculations.
*/
- public synchronized long getLastUsed() {
- return lastUsed;
+ public long getLastUsed() {
+ return lastUsed.get();
}
/**
* Return true if this blob is actively being used, else false (meaning it can be deleted, but might not be).
*/
- public synchronized boolean isUsed() {
+ public boolean isUsed() {
return !references.isEmpty();
}
@@ -219,7 +220,7 @@ public abstract class LocallyCachedBlob {
* @param pna the slot and assignment that are using this blob.
* @param cb an optional callback indicating that they want to know/synchronize when a blob is updated.
*/
- public synchronized void addReference(final PortAndAssignment pna, BlobChangingCallback cb) {
+ public void addReference(final PortAndAssignment pna, BlobChangingCallback cb) {
LOG.debug("Adding reference {}", pna);
if (cb == null) {
cb = NOOP_CB;
@@ -233,7 +234,7 @@ public abstract class LocallyCachedBlob {
* Removes a reservation for this blob from a given slot and assignemnt.
* @param pna the slot + assignment that no longer needs this blob.
*/
- public synchronized void removeReference(final PortAndAssignment pna) {
+ public void removeReference(final PortAndAssignment pna) {
LOG.debug("Removing reference {}", pna);
if (references.remove(pna) == null) {
LOG.warn("{} had no reservation for {}", pna, blobDescription);
@@ -242,13 +243,26 @@ public abstract class LocallyCachedBlob {
}
/**
+ * Inform all of the callbacks that a change is going to happen and then wait for them to all get back that it is OK to make that
+ * change. Commit the new version once all callbacks are ready. Finally inform all callbacks that the commit is complete.
+ */
+ public synchronized void informReferencesAndCommitNewVersion(long newVersion) throws IOException {
+ CompletableFuture<Void> doneUpdating = informAllOfChangeAndWaitForConsensus();
+ commitNewVersion(newVersion);
+ doneUpdating.complete(null);
+ }
+
+ /**
* Inform all of the callbacks that a change is going to happen and then wait for
* them to all get back that it is OK to make that change.
+ *
+ * @return A future to complete when the change is committed
*/
- public synchronized void informAllOfChangeAndWaitForConsensus() {
- CountDownLatch cdl = new CountDownLatch(references.size());
- doneUpdating = new CompletableFuture<>();
- for (Map.Entry<PortAndAssignment, BlobChangingCallback> entry : references.entrySet()) {
+ private CompletableFuture<Void> informAllOfChangeAndWaitForConsensus() {
+ HashMap<PortAndAssignment, BlobChangingCallback> refsCopy = new HashMap<>(references);
+ CountDownLatch cdl = new CountDownLatch(refsCopy.size());
+ CompletableFuture<Void> doneUpdating = new CompletableFuture<>();
+ for (Map.Entry<PortAndAssignment, BlobChangingCallback> entry : refsCopy.entrySet()) {
GoodToGo gtg = new GoodToGo(cdl, doneUpdating);
try {
PortAndAssignment pna = entry.getKey();
@@ -264,13 +278,7 @@ public abstract class LocallyCachedBlob {
//Interrupted is thrown when we are shutting down.
// So just ignore it for now...
}
- }
-
- /**
- * Inform all of the callbacks that the change to the blob is complete.
- */
- public synchronized void informAllChangeComplete() {
- doneUpdating.complete(null);
+ return doneUpdating;
}
/**
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java
index 48ca064..3b019f4 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java
@@ -204,7 +204,7 @@ public class LocallyCachedTopologyBlob extends LocallyCachedBlob {
}
@Override
- public void commitNewVersion(long newVersion) throws IOException {
+ protected void commitNewVersion(long newVersion) throws IOException {
//This is not atomic (so if something bad happens in the middle we need to be able to recover
Path tempLoc = topologyBasicBlobsRootDir.resolve(type.getTempFileName(newVersion));
Path dest = topologyBasicBlobsRootDir.resolve(type.getFileName());
diff --git a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
index 21a9bc2..9dfe2d7 100644
--- a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
+++ b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
@@ -12,6 +12,9 @@
package org.apache.storm.daemon.supervisor;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -52,6 +55,7 @@ import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.*;
+import java.util.concurrent.ExecutionException;
import org.apache.storm.metric.StormMetricsRegistry;
public class SlotTest {
@@ -231,7 +235,83 @@ public class SlotTest {
assertTrue(Time.currentTimeMillis() > 2000);
}
}
+
+ @Test
+ public void testErrorHandlingWhenLocalizationFails() throws Exception {
+ try (SimulatedTime t = new SimulatedTime(1010)) {
+ int port = 8080;
+ String topoId = "NEW";
+ List<ExecutorInfo> execList = mkExecutorInfoList(1, 2, 3, 4, 5);
+ LocalAssignment newAssignment =
+ mkLocalAssignment(topoId, execList, mkWorkerResources(100.0, 100.0, 100.0));
+
+ AsyncLocalizer localizer = mock(AsyncLocalizer.class);
+ BlobChangingCallback cb = mock(BlobChangingCallback.class);
+ Container container = mock(Container.class);
+ LocalState state = mock(LocalState.class);
+ ContainerLauncher containerLauncher = mock(ContainerLauncher.class);
+ when(containerLauncher.launchContainer(port, newAssignment, state)).thenReturn(container);
+ LSWorkerHeartbeat hb = mkWorkerHB(topoId, port, execList, Time.currentTimeSecs());
+ when(container.readHeartbeat()).thenReturn(hb, hb);
+
+ @SuppressWarnings("unchecked")
+ CompletableFuture<Void> blobFuture = mock(CompletableFuture.class);
+ CompletableFuture<Void> secondBlobFuture = mock(CompletableFuture.class);
+ when(secondBlobFuture.get(anyLong(), any())).thenThrow(new ExecutionException(new RuntimeException("Localization failure")));
+ CompletableFuture<Void> thirdBlobFuture = mock(CompletableFuture.class);
+ when(localizer.requestDownloadTopologyBlobs(newAssignment, port, cb))
+ .thenReturn(blobFuture)
+ .thenReturn(secondBlobFuture)
+ .thenReturn(thirdBlobFuture);
+ ISupervisor iSuper = mock(ISupervisor.class);
+ SlotMetrics slotMetrics = new SlotMetrics(new StormMetricsRegistry());
+ StaticState staticState = new StaticState(localizer, 5000, 120000, 1000, 1000,
+ containerLauncher, "localhost", port, iSuper, state, cb, null, null, slotMetrics);
+ DynamicState dynamicState = new DynamicState(null, null, null, slotMetrics)
+ .withNewAssignment(newAssignment);
+
+ DynamicState nextState = Slot.stateMachineStep(dynamicState, staticState);
+ verify(localizer).requestDownloadTopologyBlobs(newAssignment, port, cb);
+ assertEquals(MachineState.WAITING_FOR_BLOB_LOCALIZATION, nextState.state);
+ assertSame("pendingDownload not set properly", blobFuture, nextState.pendingDownload);
+ assertEquals(newAssignment, nextState.pendingLocalization);
+ assertEquals(0, Time.currentTimeMillis());
+
+ //Assignment has changed
+ nextState = Slot.stateMachineStep(nextState.withNewAssignment(null), staticState);
+ assertThat(nextState.state, is(MachineState.EMPTY));
+ assertThat(nextState.pendingChangingBlobs, is(Collections.emptySet()));
+ assertThat(nextState.pendingChangingBlobsAssignment, nullValue());
+ assertThat(nextState.pendingLocalization, nullValue());
+ assertThat(nextState.pendingDownload, nullValue());
+
+ clearInvocations(localizer);
+ nextState = Slot.stateMachineStep(dynamicState.withNewAssignment(newAssignment), staticState);
+ verify(localizer).requestDownloadTopologyBlobs(newAssignment, port, cb);
+ assertEquals(MachineState.WAITING_FOR_BLOB_LOCALIZATION, nextState.state);
+ assertSame("pendingDownload not set properly", secondBlobFuture, nextState.pendingDownload);
+ assertEquals(newAssignment, nextState.pendingLocalization);
+
+ //Error occurs, but assignment has not changed
+ clearInvocations(localizer);
+ nextState = Slot.stateMachineStep(nextState, staticState);
+ verify(localizer).requestDownloadTopologyBlobs(newAssignment, port, cb);
+ assertEquals(MachineState.WAITING_FOR_BLOB_LOCALIZATION, nextState.state);
+ assertSame("pendingDownload not set properly", thirdBlobFuture, nextState.pendingDownload);
+ assertEquals(newAssignment, nextState.pendingLocalization);
+ assertThat(Time.currentTimeMillis(), greaterThan(3L));
+
+ nextState = Slot.stateMachineStep(nextState, staticState);
+ verify(thirdBlobFuture).get(1000, TimeUnit.MILLISECONDS);
+ verify(containerLauncher).launchContainer(port, newAssignment, state);
+ assertEquals(MachineState.WAITING_FOR_WORKER_START, nextState.state);
+ assertSame("pendingDownload is not null", null, nextState.pendingDownload);
+ assertSame(null, nextState.pendingLocalization);
+ assertSame(newAssignment, nextState.currentAssignment);
+ assertSame(container, nextState.container);
+ }
+ }
@Test
public void testRelaunch() throws Exception {
diff --git a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
index eef3205..8228702 100644
--- a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
+++ b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
@@ -149,21 +149,15 @@ public class AsyncLocalizerTest {
f.get(20, TimeUnit.SECONDS);
verify(jarBlob).fetchUnzipToTemp(any());
- verify(jarBlob).informAllOfChangeAndWaitForConsensus();
- verify(jarBlob).commitNewVersion(100L);
- verify(jarBlob).informAllChangeComplete();
+ verify(jarBlob).informReferencesAndCommitNewVersion(100L);
verify(jarBlob).cleanupOrphanedData();
verify(codeBlob).fetchUnzipToTemp(any());
- verify(codeBlob).informAllOfChangeAndWaitForConsensus();
- verify(codeBlob).commitNewVersion(200L);
- verify(codeBlob).informAllChangeComplete();
+ verify(codeBlob).informReferencesAndCommitNewVersion(200L);
verify(codeBlob).cleanupOrphanedData();
verify(confBlob).fetchUnzipToTemp(any());
- verify(confBlob).informAllOfChangeAndWaitForConsensus();
- verify(confBlob).commitNewVersion(300L);
- verify(confBlob).informAllChangeComplete();
+ verify(confBlob).informReferencesAndCommitNewVersion(300L);
verify(confBlob).cleanupOrphanedData();
} finally {
bl.close();