You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/07/09 21:12:36 UTC
[1/4] storm git commit: STORM-3125: Refactored and commented code for
Slot and ReadClusterState
Repository: storm
Updated Branches:
refs/heads/master afe35f318 -> daec24841
STORM-3125: Refactored and commented code for Slot and ReadClusterState
(cherry picked from commit 5338491)
STORM-3125: Refactored methods for blob localization
(cherry picked from commit d40397c)
STORM-3125: Refactored methods of state machine for killing workers and
improved tests and metrics.
(cherry picked from commit 5338491)
STORM-3125: Refactored code for DRPC
(cherry picked from commit 3152e6f)
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/81307a50
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/81307a50
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/81307a50
Branch: refs/heads/master
Commit: 81307a50528f71c39786cab577edafe7151d18a0
Parents: d880767
Author: Zhengdai Hu <hu...@gmail.com>
Authored: Mon Jun 18 10:44:19 2018 -0500
Committer: Zhengdai Hu <hu...@gmail.com>
Committed: Fri Jul 6 16:52:17 2018 -0500
----------------------------------------------------------------------
.../daemon/drpc/BlockingOutstandingRequest.java | 3 +-
.../daemon/supervisor/ReadClusterState.java | 9 +-
.../apache/storm/daemon/supervisor/Slot.java | 306 +++++++++++--------
.../apache/storm/localizer/AsyncLocalizer.java | 2 +-
.../org/apache/storm/localizer/IOFunction.java | 22 ++
.../storm/localizer/LocalizedResource.java | 77 ++---
.../storm/localizer/LocallyCachedBlob.java | 115 ++++---
.../localizer/LocallyCachedTopologyBlob.java | 16 +-
.../java/org/apache/storm/utils/EnumUtil.java | 40 +++
.../storm/daemon/supervisor/SlotTest.java | 8 +-
.../storm/localizer/AsyncLocalizerTest.java | 40 ++-
11 files changed, 392 insertions(+), 246 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/81307a50/storm-server/src/main/java/org/apache/storm/daemon/drpc/BlockingOutstandingRequest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/drpc/BlockingOutstandingRequest.java b/storm-server/src/main/java/org/apache/storm/daemon/drpc/BlockingOutstandingRequest.java
index 1426802..e11e363 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/drpc/BlockingOutstandingRequest.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/drpc/BlockingOutstandingRequest.java
@@ -25,8 +25,7 @@ import org.apache.storm.generated.DRPCRequest;
import org.apache.storm.utils.WrappedDRPCExecutionException;
public class BlockingOutstandingRequest extends OutstandingRequest {
- public static final RequestFactory<BlockingOutstandingRequest> FACTORY =
- (function, request) -> new BlockingOutstandingRequest(function, request);
+ public static final RequestFactory<BlockingOutstandingRequest> FACTORY = BlockingOutstandingRequest::new;
private Semaphore _sem;
private volatile String _result = null;
private volatile DRPCExecutionException _e = null;
http://git-wip-us.apache.org/repos/asf/storm/blob/81307a50/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
index b617345..c948e31 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
@@ -99,15 +99,16 @@ public class ReadClusterState implements Runnable, AutoCloseable {
}
try {
- Collection<String> workers = SupervisorUtils.supervisorWorkerIds(superConf);
+ Collection<String> detachedRunningWorkers = SupervisorUtils.supervisorWorkerIds(superConf);
for (Slot slot : slots.values()) {
String workerId = slot.getWorkerId();
+ // We ignore workers that are still bound to a slot, which is monitored by a supervisor
if (workerId != null) {
- workers.remove(workerId);
+ detachedRunningWorkers.remove(workerId);
}
}
- if (!workers.isEmpty()) {
- supervisor.killWorkers(workers, launcher);
+ if (!detachedRunningWorkers.isEmpty()) {
+ supervisor.killWorkers(detachedRunningWorkers, launcher);
}
} catch (Exception e) {
LOG.warn("Error trying to clean up old workers", e);
http://git-wip-us.apache.org/repos/asf/storm/blob/81307a50/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
index 59fea1e..0f52f11 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
@@ -45,6 +45,7 @@ import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.metricstore.WorkerMetricsProcessor;
import org.apache.storm.scheduler.ISupervisor;
import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
+import org.apache.storm.utils.EnumUtil;
import org.apache.storm.utils.LocalState;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.Time;
@@ -56,21 +57,25 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
private static final Logger LOG = LoggerFactory.getLogger(Slot.class);
private static final Meter numWorkersLaunched =
StormMetricsRegistry.registerMeter("supervisor:num-workers-launched");
- private static final Meter numWorkersKilledProcessExit =
- StormMetricsRegistry.registerMeter("supervisor:num-workers-killed-process-exit");
- private static final Meter numWorkersKilledMemoryViolation =
- StormMetricsRegistry.registerMeter("supervisor:num-workers-killed-memory-violation");
- private static final Meter numWorkersKilledHBTimeout =
- StormMetricsRegistry.registerMeter("supervisor:num-workers-killed-hb-timeout");
- private static final Meter numWorkersKilledHBNull =
- StormMetricsRegistry.registerMeter("supervisor:num-workers-killed-hb-null");
+
+ private enum KillReason {
+ ASSIGNMENT_CHANGED, BLOB_CHANGED, PROCESS_EXIT, MEMORY_VIOLATION, HB_TIMEOUT, HB_NULL;
+
+ @Override
+ public String toString() {
+ return EnumUtil.toMetricName(this);
+ }
+
+ }
+
+ private static final Map<KillReason, Meter> numWorkersKilledFor = EnumUtil.toEnumMap(KillReason.class,
+ killReason -> StormMetricsRegistry.registerMeter("supervisor:num-workers-killed-" + killReason.toString()));
private static final Meter numForceKill =
StormMetricsRegistry.registerMeter("supervisor:num-workers-force-kill");
private static final long ONE_SEC_IN_NANO = TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS);
private final AtomicReference<LocalAssignment> newAssignment = new AtomicReference<>();
private final AtomicReference<Set<TopoProfileAction>> profiling = new AtomicReference<>(new HashSet<>());
- ;
private final BlockingQueue<BlobChanging> changingBlobs = new LinkedBlockingQueue<>();
private final StaticState staticState;
private final IStormClusterState clusterState;
@@ -89,54 +94,56 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
WorkerMetricsProcessor metricsProcessor) throws Exception {
super("SLOT_" + port);
this.metricsExec = metricsExec;
-
this.cachedCurrentAssignments = cachedCurrentAssignments;
this.clusterState = clusterState;
- Map<Integer, LocalAssignment> assignments = localState.getLocalAssignmentsMap();
+ this.staticState = new StaticState(localizer,
+ ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS)) * 1000,
+ ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_WORKER_START_TIMEOUT_SECS)) * 1000,
+ ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS)) * 1000,
+ ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_MONITOR_FREQUENCY_SECS)) * 1000,
+ containerLauncher,
+ host,
+ port,
+ iSupervisor,
+ localState,
+ this,
+ metricsExec, metricsProcessor);
+
LocalAssignment currentAssignment = null;
+ Container container = null;
+ LocalAssignment newAssignment = null;
+
+ Map<Integer, LocalAssignment> assignments = localState.getLocalAssignmentsMap();
if (assignments != null) {
currentAssignment = assignments.get(port);
- }
- Container container = null;
- if (currentAssignment != null) {
- try {
- // For now we do not make a transaction when removing a topology assignment from local, an overdue
- // assignment may be left on local disk.
- // So we should check if the local disk assignment is valid when initializing:
- // if topology files does not exist, the worker[possibly alive] will be reassigned if it is timed-out;
- // if topology files exist but the topology id is invalid, just let Supervisor make a sync;
- // if topology files exist and topology files is valid, recover the container.
- if (ClientSupervisorUtils.doRequiredTopoFilesExist(conf, currentAssignment.get_topology_id())) {
- container = containerLauncher.recoverContainer(port, currentAssignment, localState);
- } else {
- // Make the assignment null to let slot clean up the disk assignment.
+ if (currentAssignment != null) {
+ try {
+ // For now we do not make a transaction when removing a topology assignment from local, an overdue
+ // assignment may be left on local disk.
+ // So we should check if the local disk assignment is valid when initializing:
+ // if topology files does not exist, the worker[possibly alive] will be reassigned if it is timed-out;
+ // if topology files exist but the topology id is invalid, just let Supervisor make a sync;
+ // if topology files exist and topology files is valid, recover the container.
+ if (ClientSupervisorUtils.doRequiredTopoFilesExist(conf, currentAssignment.get_topology_id())) {
+ container = containerLauncher.recoverContainer(port, currentAssignment, localState);
+ } else {
+ // Make the assignment null to let slot clean up the disk assignment.
+ currentAssignment = null;
+ }
+ } catch (ContainerRecoveryException e) {
+ //We could not recover container will be null.
+ }
+
+ newAssignment = currentAssignment;
+ if (container == null) {
currentAssignment = null;
+ //Assigned something but it is not running
}
- } catch (ContainerRecoveryException e) {
- //We could not recover container will be null.
}
}
- LocalAssignment newAssignment = currentAssignment;
- if (currentAssignment != null && container == null) {
- currentAssignment = null;
- //Assigned something but it is not running
- }
-
- dynamicState = new DynamicState(currentAssignment, container, newAssignment);
- staticState = new StaticState(localizer,
- ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS)) * 1000,
- ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_WORKER_START_TIMEOUT_SECS)) * 1000,
- ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS)) * 1000,
- ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_MONITOR_FREQUENCY_SECS)) * 1000,
- containerLauncher,
- host,
- port,
- iSupervisor,
- localState,
- this,
- metricsExec, metricsProcessor);
- this.newAssignment.set(dynamicState.newAssignment);
+ setNewAssignment(newAssignment);
+ this.dynamicState = new DynamicState(currentAssignment, container, newAssignment);
if (MachineState.RUNNING == dynamicState.state) {
//We are running so we should recover the blobs.
staticState.localizer.recoverRunningTopology(currentAssignment, port, this);
@@ -171,6 +178,13 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
return false;
}
+ /**
+ * Decide the equivalence of two local assignments, ignoring the order of executors
+ * This is different from #equal method.
+ * @param a Local assignment A
+ * @param b Local assignment B
+ * @return True if A and B are equivalent, ignoring the order of the executors
+ */
@VisibleForTesting
static boolean equivalent(LocalAssignment a, LocalAssignment b) {
if (a == null && b == null) {
@@ -229,7 +243,7 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
* @return the next state
* @throws IOException on any error
*/
- static DynamicState prepareForNewAssignmentNoWorkersRunning(DynamicState dynamicState, StaticState staticState) throws IOException {
+ private static DynamicState prepareForNewAssignmentNoWorkersRunning(DynamicState dynamicState, StaticState staticState) throws IOException {
assert (dynamicState.container == null);
if (dynamicState.newAssignment == null) {
@@ -241,64 +255,55 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
.withState(MachineState.WAITING_FOR_BLOB_LOCALIZATION);
}
- /**
- * Kill the current container and start downloading what the new assignment needs, if there is a new assignment.
- * PRECONDITION: container != null
- * @param dynamicState current state
- * @param staticState static data
- * @return the next state
- * @throws Exception
- */
- static DynamicState killContainerForChangedAssignment(DynamicState dynamicState, StaticState staticState) throws Exception {
- assert (dynamicState.container != null);
+ private static DynamicState killContainerFor(KillReason reason, DynamicState dynamicState, StaticState staticState)
+ throws Exception {
+ assert dynamicState.container != null;
- staticState.iSupervisor.killedWorker(staticState.port);
- dynamicState.container.kill();
- Future<Void> pendingDownload = null;
- if (dynamicState.newAssignment != null) {
- pendingDownload = staticState.localizer.requestDownloadTopologyBlobs(dynamicState.newAssignment, staticState.port,
- staticState.changingCallback);
+ //Skip special case if `storm kill_workers` is already invoked
+ Boolean isDead = dynamicState.container.areAllProcessesDead();
+ if (!isDead) {
+ if (reason == KillReason.ASSIGNMENT_CHANGED || reason == KillReason.BLOB_CHANGED) {
+ staticState.iSupervisor.killedWorker(staticState.port);
+ }
+ dynamicState.container.kill();
}
- dynamicState = drainAllChangingBlobs(dynamicState);
- Time.sleep(staticState.killSleepMs);
- return dynamicState.withPendingLocalization(dynamicState.newAssignment, pendingDownload).withState(MachineState.KILL);
- }
-
- /**
- * Kill the current container, and wait go to the state to inform the localizer that we are ready to go.
- * PRECONDITION: container != null
- * @param dynamicState current state
- * @param staticState static data
- * @return the next state
- */
- private static DynamicState killContainerForChangedBlobs(DynamicState dynamicState, StaticState staticState) throws Exception {
- assert (dynamicState.container != null);
-
- staticState.iSupervisor.killedWorker(staticState.port);
- dynamicState.container.kill();
+ numWorkersKilledFor.get(reason).mark();
- Time.sleep(staticState.killSleepMs);
- return dynamicState.withState(MachineState.KILL_BLOB_UPDATE);
- }
-
- /**
- * Kill the current container and relaunch it. (Something odd happened)
- * PRECONDITION: container != null
- * @param dynamicState current state
- * @param staticState static data
- * @return the next state
- * @throws Exception
- */
- static DynamicState killAndRelaunchContainer(DynamicState dynamicState, StaticState staticState) throws Exception {
- assert (dynamicState.container != null);
+ DynamicState next;
+ switch (reason) {
+ case ASSIGNMENT_CHANGED:
+ Future<Void> pendingDownload = null;
+ if (dynamicState.newAssignment != null) {
+ pendingDownload = staticState.localizer.requestDownloadTopologyBlobs(
+ dynamicState.newAssignment, staticState.port, staticState.changingCallback);
+ }
+ dynamicState = drainAllChangingBlobs(dynamicState);
+ next = dynamicState.withState(MachineState.KILL)
+ .withPendingLocalization(dynamicState.newAssignment, pendingDownload);
+ break;
+
+ case BLOB_CHANGED:
+ next = dynamicState.withState(MachineState.KILL_BLOB_UPDATE);
+ break;
+
+ case PROCESS_EXIT:
+ case MEMORY_VIOLATION:
+ case HB_TIMEOUT:
+ case HB_NULL:
+ //any stop profile actions that hadn't timed out yet, we should restart after the worker is running again.
+ HashSet<TopoProfileAction> mod = new HashSet<>(dynamicState.profileActions);
+ mod.addAll(dynamicState.pendingStopProfileActions);
+ next = dynamicState.withState(MachineState.KILL_AND_RELAUNCH).withProfileActions(mod, Collections.emptySet());
+ break;
- dynamicState.container.kill();
- Time.sleep(staticState.killSleepMs);
+ default:
+ throw new IllegalArgumentException("Unknown reason for killing a container");
+ }
- //any stop profile actions that hadn't timed out yet, we should restart after the worker is running again.
- HashSet<TopoProfileAction> mod = new HashSet<>(dynamicState.profileActions);
- mod.addAll(dynamicState.pendingStopProfileActions);
- return dynamicState.withState(MachineState.KILL_AND_RELAUNCH).withProfileActions(mod, Collections.emptySet());
+ if (!isDead) {
+ Time.sleep(staticState.killSleepMs);
+ }
+ return next;
}
/**
@@ -309,7 +314,7 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
* @param nextState the next MachineState to go to.
* @return the next state.
*/
- static DynamicState cleanupCurrentContainer(DynamicState dynamicState, StaticState staticState, MachineState nextState) throws
+ private static DynamicState cleanupCurrentContainer(DynamicState dynamicState, StaticState staticState, MachineState nextState) throws
Exception {
assert (dynamicState.container != null);
assert (dynamicState.currentAssignment != null);
@@ -345,7 +350,8 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
}
/**
- * Informs the async localizer for all of blobs that the worker is dead.
+ * Informs the async localizer for all of blobs that the worker acknowledged the change of blobs.
+ * Worker has stop as of now.
*
* PRECONDITION: container is null
* PRECONDITION: changingBlobs should only be for the given assignment.
@@ -357,12 +363,13 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
assert dynamicState.changingBlobs.stream().allMatch((cr) -> forSameTopology(cr.assignment, assignment));
Set<Future<Void>> futures = new HashSet<>(dynamicState.changingBlobs.size());
+
+ // We need to add the new futures to the existing ones
if (forSameTopology(dynamicState.pendingChangingBlobsAssignment, assignment)) {
- //We need to add the new futures to the existing ones
futures.addAll(dynamicState.pendingChangingBlobs);
}
- //Otherwise they will just be replaced
+ // Acknowledge all changing blobs as futures
for (BlobChanging rc : dynamicState.changingBlobs) {
futures.add(rc.latch.countDown());
}
@@ -397,7 +404,8 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
}
/**
- * State Transitions for WAITING_FOR_BLOB_LOCALIZATION state.
+ * State Transitions for WAITING_FOR_BLOB_LOCALIZATION state, when the slot is waiting for
+ * blobs of the pending assignment to be completely downloaded, before the container is launched/relaunched.
* PRECONDITION: neither pendingLocalization nor pendingDownload is null.
* PRECONDITION: The slot should be empty
* @param dynamicState current state
@@ -405,14 +413,16 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
* @return the next state
* @throws Exception on any error
*/
- static DynamicState handleWaitingForBlobLocalization(DynamicState dynamicState, StaticState staticState) throws Exception {
+ private static DynamicState handleWaitingForBlobLocalization(DynamicState dynamicState, StaticState staticState) throws Exception {
assert (dynamicState.pendingLocalization != null);
assert (dynamicState.pendingDownload != null);
assert (dynamicState.container == null);
//Ignore changes to scheduling while downloading the topology blobs
// We don't support canceling the download through the future yet,
- // so to keep everything in sync, just wait
+ // because pending blobs may be shared by multiple workers and cancel it
+ // may lead to race condition
+ // To keep everything in sync, just wait for all workers
try {
//Release things that don't need to wait for us to finish downloading.
dynamicState = filterChangingBlobsFor(dynamicState, dynamicState.pendingLocalization);
@@ -421,11 +431,14 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
dynamicState = informChangedBlobs(dynamicState, dynamicState.pendingLocalization);
}
+ // Wait until time out
dynamicState.pendingDownload.get(1000, TimeUnit.MILLISECONDS);
- //Downloading of all blobs finished.
+ //Downloading of all blobs finished. This is the precondition for all codes below.
if (!equivalent(dynamicState.newAssignment, dynamicState.pendingLocalization)) {
//Scheduling changed
staticState.localizer.releaseSlotFor(dynamicState.pendingLocalization, staticState.port);
+ // Switch to the new assignment even if localization hasn't completed, or go to empty state
+ // if no new assignment.
return prepareForNewAssignmentNoWorkersRunning(dynamicState.withPendingChangingBlobs(Collections.emptySet(), null),
staticState);
}
@@ -531,7 +544,7 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
* @return the next state
* @throws Exception on any error
*/
- static DynamicState handleKill(DynamicState dynamicState, StaticState staticState) throws Exception {
+ private static DynamicState handleKill(DynamicState dynamicState, StaticState staticState) throws Exception {
assert (dynamicState.container != null);
assert (dynamicState.currentAssignment != null);
@@ -558,7 +571,7 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
* @return the next state
* @throws Exception on any error
*/
- static DynamicState handleKillAndRelaunch(DynamicState dynamicState, StaticState staticState) throws Exception {
+ private static DynamicState handleKillAndRelaunch(DynamicState dynamicState, StaticState staticState) throws Exception {
assert (dynamicState.container != null);
assert (dynamicState.currentAssignment != null);
@@ -625,7 +638,7 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
* @return the next state
* @throws Exception on any error
*/
- static DynamicState handleWaitingForWorkerStart(DynamicState dynamicState, StaticState staticState) throws Exception {
+ private static DynamicState handleWaitingForWorkerStart(DynamicState dynamicState, StaticState staticState) throws Exception {
assert (dynamicState.container != null);
assert (dynamicState.currentAssignment != null);
@@ -641,7 +654,7 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
//We were rescheduled while waiting for the worker to come up
LOG.info("SLOT {}: Assignment Changed from {} to {}", staticState.port, dynamicState.currentAssignment,
dynamicState.newAssignment);
- return killContainerForChangedAssignment(dynamicState, staticState);
+ return killContainerFor(KillReason.ASSIGNMENT_CHANGED, dynamicState, staticState);
}
dynamicState = updateAssignmentIfNeeded(dynamicState);
@@ -649,13 +662,13 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
if (timeDiffms > staticState.firstHbTimeoutMs) {
LOG.warn("SLOT {}: Container {} failed to launch in {} ms.", staticState.port, dynamicState.container,
staticState.firstHbTimeoutMs);
- return killAndRelaunchContainer(dynamicState, staticState);
+ return killContainerFor(KillReason.HB_TIMEOUT, dynamicState, staticState);
}
dynamicState = filterChangingBlobsFor(dynamicState, dynamicState.currentAssignment);
if (!dynamicState.changingBlobs.isEmpty()) {
//Kill the container and restart it
- return killContainerForChangedBlobs(dynamicState, staticState);
+ return killContainerFor(KillReason.BLOB_CHANGED, dynamicState, staticState);
}
Time.sleep(1000);
return dynamicState;
@@ -669,7 +682,7 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
* @return the next state
* @throws Exception on any error
*/
- static DynamicState handleRunning(DynamicState dynamicState, StaticState staticState) throws Exception {
+ private static DynamicState handleRunning(DynamicState dynamicState, StaticState staticState) throws Exception {
assert (dynamicState.container != null);
assert (dynamicState.currentAssignment != null);
@@ -677,42 +690,38 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
LOG.info("SLOT {}: Assignment Changed from {} to {}", staticState.port, dynamicState.currentAssignment,
dynamicState.newAssignment);
//Scheduling changed while running...
- return killContainerForChangedAssignment(dynamicState, staticState);
+ return killContainerFor(KillReason.ASSIGNMENT_CHANGED, dynamicState, staticState);
}
dynamicState = updateAssignmentIfNeeded(dynamicState);
dynamicState = filterChangingBlobsFor(dynamicState, dynamicState.currentAssignment);
if (!dynamicState.changingBlobs.isEmpty()) {
//Kill the container and restart it
- return killContainerForChangedBlobs(dynamicState, staticState);
+ return killContainerFor(KillReason.BLOB_CHANGED, dynamicState, staticState);
}
if (dynamicState.container.didMainProcessExit()) {
- numWorkersKilledProcessExit.mark();
LOG.warn("SLOT {}: main process has exited", staticState.port);
- return killAndRelaunchContainer(dynamicState, staticState);
+ return killContainerFor(KillReason.PROCESS_EXIT, dynamicState, staticState);
}
if (dynamicState.container.isMemoryLimitViolated(dynamicState.currentAssignment)) {
- numWorkersKilledMemoryViolation.mark();
LOG.warn("SLOT {}: violated memory limits", staticState.port);
- return killAndRelaunchContainer(dynamicState, staticState);
+ return killContainerFor(KillReason.MEMORY_VIOLATION, dynamicState, staticState);
}
LSWorkerHeartbeat hb = dynamicState.container.readHeartbeat();
if (hb == null) {
- numWorkersKilledHBNull.mark();
LOG.warn("SLOT {}: HB returned as null", staticState.port);
//This can happen if the supervisor crashed after launching a
// worker that never came up.
- return killAndRelaunchContainer(dynamicState, staticState);
+ return killContainerFor(KillReason.HB_NULL, dynamicState, staticState);
}
long timeDiffMs = (Time.currentTimeSecs() - hb.get_time_secs()) * 1000;
if (timeDiffMs > staticState.hbTimeoutMs) {
- numWorkersKilledHBTimeout.mark();
LOG.warn("SLOT {}: HB is too old {} > {}", staticState.port, timeDiffMs, staticState.hbTimeoutMs);
- return killAndRelaunchContainer(dynamicState, staticState);
+ return killContainerFor(KillReason.HB_TIMEOUT, dynamicState, staticState);
}
//The worker is up and running check for profiling requests
@@ -826,6 +835,10 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
}
}
+ /**
+ * get the workerID (nullable) from CURRENT container, if existed, or return null.
+ * @return workerID
+ */
public String getWorkerId() {
String workerId = null;
Container c = dynamicState.container;
@@ -945,7 +958,7 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
this.join();
}
- static enum MachineState {
+ enum MachineState {
EMPTY,
RUNNING,
WAITING_FOR_WORKER_START,
@@ -996,19 +1009,46 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
static class DynamicState {
public final MachineState state;
+
+ /**
+ * Latest assignment assigned to this worker, updated asynchronously.
+ */
public final LocalAssignment newAssignment;
+
+ /**
+ * Assignment the worker is running on, when the state machine enters this state.
+ */
public final LocalAssignment currentAssignment;
- public final Container container;
+
+ /**
+ * Assignment assigned to the worker and waiting for blob download.
+ * This is the same as newAssignment if no newer assignment is arrived before
+ * localization completes.
+ */
public final LocalAssignment pendingLocalization;
+
+ public final Container container;
+
+ /**
+ * Signals that blobs have been downloaded for the first time.
+ */
public final Future<Void> pendingDownload;
public final Set<TopoProfileAction> profileActions;
public final Set<TopoProfileAction> pendingStopProfileActions;
+
+ /**
+ * Blobs that are changed and need to be synced.
+ */
public final Set<BlobChanging> changingBlobs;
public final LocalAssignment pendingChangingBlobsAssignment;
+
+ /**
+ * Signals that acknowledged changing blobs have been updated.
+ */
public final Set<Future<Void>> pendingChangingBlobs;
/**
- * The last time that WAITING_FOR_WORKER_START, KILL, or KILL_AND_RELAUNCH were entered into.
+ * The entering time when the machine transitions to current state.
*/
public final long startTime;
@@ -1044,7 +1084,7 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
final Set<BlobChanging> changingBlobs,
final Set<Future<Void>> pendingChangingBlobs, final LocalAssignment pendingChaningBlobsAssignment) {
assert pendingChangingBlobs != null;
- assert !(pendingChangingBlobs.isEmpty() ^ (pendingChaningBlobsAssignment == null));
+ assert pendingChangingBlobs.isEmpty() == (pendingChaningBlobsAssignment == null);
this.state = state;
this.newAssignment = newAssignment;
this.currentAssignment = currentAssignment;
@@ -1099,6 +1139,12 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
return withPendingLocalization(this.pendingLocalization, pendingDownload);
}
+ /**
+ * Transition to the given state. Notice that it's possible to transition to
+ * the same state.
+ * @param state The state to transition into
+ * @return New dynamicState
+ */
public DynamicState withState(final MachineState state) {
long newStartTime = Time.currentTimeMillis();
return new DynamicState(state, this.newAssignment,
http://git-wip-us.apache.org/repos/asf/storm/blob/81307a50/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
index a2b9079..9d91aa0 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
@@ -252,7 +252,7 @@ public class AsyncLocalizer implements AutoCloseable {
long remoteVersion = blob.getRemoteVersion(blobStore);
if (localVersion != remoteVersion || !blob.isFullyDownloaded()) {
try {
- long newVersion = blob.downloadToTempLocation(blobStore);
+ long newVersion = blob.fetchUnzipToTemp(blobStore);
blob.informAllOfChangeAndWaitForConsensus();
blob.commitNewVersion(newVersion);
blob.informAllChangeComplete();
http://git-wip-us.apache.org/repos/asf/storm/blob/81307a50/storm-server/src/main/java/org/apache/storm/localizer/IOFunction.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/IOFunction.java b/storm-server/src/main/java/org/apache/storm/localizer/IOFunction.java
new file mode 100644
index 0000000..e8d8171
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/localizer/IOFunction.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
+ * See the NOTICE file distributed with this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.storm.localizer;
+
+import java.io.IOException;
+
+@FunctionalInterface
+public interface IOFunction<T, R> {
+ R apply(T t) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/81307a50/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java b/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java
index caeca77..6f1c8d7 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java
@@ -18,6 +18,8 @@
package org.apache.storm.localizer;
+import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileOutputStream;
@@ -43,12 +45,12 @@ import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.storm.Config;
import org.apache.storm.blobstore.ClientBlobStore;
-import org.apache.storm.blobstore.InputStreamWithMeta;
import org.apache.storm.daemon.supervisor.IAdvancedFSOps;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.KeyNotFoundException;
import org.apache.storm.generated.ReadableBlobMeta;
import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
+import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.ServerUtils;
import org.apache.storm.utils.ShellUtils;
@@ -56,10 +58,6 @@ import org.apache.storm.utils.WrappedAuthorizationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
-
-import org.apache.storm.utils.ConfigUtils;
-
/**
* Represents a resource that is localized on the supervisor. A localized resource has a .current symlink to the current version file which
* is named filename.{current version}. There is also a filename.version which contains the latest version.
@@ -85,7 +83,7 @@ public class LocalizedResource extends LocallyCachedBlob {
private final Path baseDir;
private final Path versionFilePath;
private final Path symlinkPath;
- private final boolean uncompressed;
+ private final boolean shouldUncompress;
private final IAdvancedFSOps fsOps;
private final String user;
private final Map<String, Object> conf;
@@ -93,18 +91,18 @@ public class LocalizedResource extends LocallyCachedBlob {
// size of the resource
private long size = -1;
- LocalizedResource(String key, Path localBaseDir, boolean uncompressed, IAdvancedFSOps fsOps, Map<String, Object> conf,
+ LocalizedResource(String key, Path localBaseDir, boolean shouldUncompress, IAdvancedFSOps fsOps, Map<String, Object> conf,
String user) {
- super(key + (uncompressed ? " archive" : " file"), key);
+ super(key + (shouldUncompress ? " archive" : " file"), key);
Path base = getLocalUserFileCacheDir(localBaseDir, user);
- this.baseDir = uncompressed ? getCacheDirForArchives(base) : getCacheDirForFiles(base);
+ this.baseDir = shouldUncompress ? getCacheDirForArchives(base) : getCacheDirForFiles(base);
this.conf = conf;
this.symLinksDisabled = (boolean)conf.getOrDefault(Config.DISABLE_SYMLINKS, false);
this.user = user;
this.fsOps = fsOps;
versionFilePath = constructVersionFileName(baseDir, key);
symlinkPath = constructBlobCurrentSymlinkName(baseDir, key);
- this.uncompressed = uncompressed;
+ this.shouldUncompress = shouldUncompress;
//Set the size in case we are recovering an already downloaded object
setSize();
}
@@ -241,48 +239,39 @@ public class LocalizedResource extends LocallyCachedBlob {
}
@Override
- public long downloadToTempLocation(ClientBlobStore store) throws IOException, KeyNotFoundException, AuthorizationException {
+ public long fetchUnzipToTemp(ClientBlobStore store) throws IOException, KeyNotFoundException, AuthorizationException {
String key = getKey();
ReadableBlobMeta meta = store.getBlobMeta(key);
if (!ServerUtils.canUserReadBlob(meta, user, conf)) {
throw new WrappedAuthorizationException(user + " does not have READ access to " + key);
}
- long version;
- Path downloadFile;
- Path finalLocation;
- try (InputStreamWithMeta in = store.getBlob(key)) {
- version = in.getVersion();
- finalLocation = constructBlobWithVersionFileName(baseDir, getKey(), version);
- if (uncompressed) {
+
+ DownloadMeta downloadMeta = fetch(store, key, v -> {
+ Path path = shouldUncompress ? tmpOutputLocation() : constructBlobWithVersionFileName(baseDir, getKey(), v);
// we need to download to temp file and then unpack into the one requested
- downloadFile = tmpOutputLocation();
- } else {
- downloadFile = finalLocation;
- }
- byte[] buffer = new byte[1024];
- int len;
- LOG.debug("Downloading {} to {}", key, downloadFile);
- Path parent = downloadFile.getParent();
- if (!Files.exists(parent)) {
- //There is a race here that we can still lose
- try {
- Files.createDirectory(parent);
- } catch (FileAlreadyExistsException e) {
- //Ignored
- }
- }
- try (FileOutputStream out = new FileOutputStream(downloadFile.toFile())) {
- while ((len = in.read(buffer)) >= 0) {
- out.write(buffer, 0, len);
+ Path parent = path.getParent();
+ if (!Files.exists(parent)) {
+ //There is a race here that we can still lose
+ try {
+ Files.createDirectory(parent);
+ } catch (FileAlreadyExistsException e) {
+ //Ignored
+ }
}
- }
- }
- if (uncompressed) {
+ return path;
+ },
+ FileOutputStream::new
+ );
+
+ Path finalLocation = downloadMeta.getDownloadPath();
+ if (shouldUncompress) {
+ Path downloadFile = finalLocation;
+ finalLocation = constructBlobWithVersionFileName(baseDir, getKey(), downloadMeta.getVersion());
ServerUtils.unpack(downloadFile.toFile(), finalLocation.toFile(), symLinksDisabled);
LOG.debug("Uncompressed {} to: {}", downloadFile, finalLocation);
}
setBlobPermissions(conf, user, finalLocation);
- return version;
+ return downloadMeta.getVersion();
}
@Override
@@ -416,7 +405,7 @@ public class LocalizedResource extends LocallyCachedBlob {
Path fileWithVersion = getFilePathWithVersion();
Path currentSymLink = getCurrentSymlinkPath();
- if (uncompressed) {
+ if (shouldUncompress) {
if (Files.exists(fileWithVersion)) {
// this doesn't follow symlinks, which is what we want
FileUtils.deleteDirectory(fileWithVersion.toFile());
@@ -444,14 +433,14 @@ public class LocalizedResource extends LocallyCachedBlob {
public boolean equals(Object other) {
if (other instanceof LocalizedResource) {
LocalizedResource l = (LocalizedResource) other;
- return getKey().equals(l.getKey()) && uncompressed == l.uncompressed && baseDir.equals(l.baseDir);
+ return getKey().equals(l.getKey()) && shouldUncompress == l.shouldUncompress && baseDir.equals(l.baseDir);
}
return false;
}
@Override
public int hashCode() {
- return getKey().hashCode() + Boolean.hashCode(uncompressed) + baseDir.hashCode();
+ return getKey().hashCode() + Boolean.hashCode(shouldUncompress) + baseDir.hashCode();
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/81307a50/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
index d112ee5..abffd07 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
@@ -12,6 +12,7 @@
package org.apache.storm.localizer;
+import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
@@ -23,10 +24,9 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
+
import org.apache.storm.blobstore.ClientBlobStore;
import org.apache.storm.blobstore.InputStreamWithMeta;
-import org.apache.storm.daemon.supervisor.IAdvancedFSOps;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.KeyNotFoundException;
import org.apache.storm.utils.Time;
@@ -59,23 +59,40 @@ public abstract class LocallyCachedBlob {
this.blobKey = blobKey;
}
- protected static long downloadToTempLocation(ClientBlobStore store, String key, long currentVersion, IAdvancedFSOps fsOps,
- Function<Long, Path> getTempPath)
- throws KeyNotFoundException, AuthorizationException, IOException {
+ /**
+ * Helper function to download blob from blob store.
+ * @param store Blob store to fetch blobs from
+ * @param key Key to retrieve blobs
+ * @param pathSupplier A function that supplies the download destination of a blob. It guarantees the validity
+ * of path or throws {@link IOException}
+ * @param outStreamSupplier A function that supplies the {@link OutputStream} object
+ * @return The metadata of the download session, including blob's version and download destination
+ * @throws KeyNotFoundException Thrown if key to retrieve blob is invalid
+ * @throws AuthorizationException Thrown if the retrieval is not under security authorization
+ * @throws IOException Thrown if any IO error occurs
+ */
+ protected DownloadMeta fetch(ClientBlobStore store, String key,
+ IOFunction<Long, Path> pathSupplier,
+ IOFunction<File, OutputStream> outStreamSupplier)
+ throws KeyNotFoundException, AuthorizationException, IOException {
+
try (InputStreamWithMeta in = store.getBlob(key)) {
long newVersion = in.getVersion();
+ long currentVersion = getLocalVersion();
if (newVersion == currentVersion) {
LOG.warn("The version did not change, but going to download again {} {}", currentVersion, key);
}
- Path tmpLocation = getTempPath.apply(newVersion);
- long totalRead = 0;
+
//Make sure the parent directory is there and ready to go
- fsOps.forceMkdir(tmpLocation.getParent());
- try (OutputStream outStream = fsOps.getOutputStream(tmpLocation.toFile())) {
+ Path downloadPath = pathSupplier.apply(newVersion);
+ LOG.debug("Downloading {} to {}", key, downloadPath);
+
+ long totalRead = 0;
+ try (OutputStream out = outStreamSupplier.apply(downloadPath.toFile())) {
byte[] buffer = new byte[4096];
- int read = 0;
- while ((read = in.read(buffer)) > 0) {
- outStream.write(buffer, 0, read);
+ int read;
+ while ((read = in.read(buffer)) >= 0) {
+ out.write(buffer, 0, read);
totalRead += read;
}
}
@@ -83,33 +100,7 @@ public abstract class LocallyCachedBlob {
if (totalRead != expectedSize) {
throw new IOException("We expected to download " + expectedSize + " bytes but found we got " + totalRead);
}
-
- return newVersion;
- }
- }
-
- /**
- * Get the size of p in bytes.
- * @param p the path to read.
- * @return the size of p in bytes.
- */
- protected static long getSizeOnDisk(Path p) throws IOException {
- if (!Files.exists(p)) {
- return 0;
- } else if (Files.isRegularFile(p)) {
- return Files.size(p);
- } else {
- //We will not follow sym links
- return Files.walk(p)
- .filter((subp) -> Files.isRegularFile(subp, LinkOption.NOFOLLOW_LINKS))
- .mapToLong((subp) -> {
- try {
- return Files.size(subp);
- } catch (IOException e) {
- LOG.warn("Could not get the size of ");
- }
- return 0;
- }).sum();
+ return new DownloadMeta(downloadPath, newVersion);
}
}
@@ -131,7 +122,7 @@ public abstract class LocallyCachedBlob {
* @param store the store to us to download the data.
* @return the version that was downloaded.
*/
- public abstract long downloadToTempLocation(ClientBlobStore store) throws IOException, KeyNotFoundException, AuthorizationException;
+ public abstract long fetchUnzipToTemp(ClientBlobStore store) throws IOException, KeyNotFoundException, AuthorizationException;
/**
* Commit the new version and make it available for the end user.
@@ -163,6 +154,31 @@ public abstract class LocallyCachedBlob {
public abstract long getSizeOnDisk();
/**
+ * Get the size of p in bytes.
+ * @param p the path to read.
+ * @return the size of p in bytes.
+ */
+ protected static long getSizeOnDisk(Path p) throws IOException {
+ if (!Files.exists(p)) {
+ return 0;
+ } else if (Files.isRegularFile(p)) {
+ return Files.size(p);
+ } else {
+ //We will not follow sym links
+ return Files.walk(p)
+ .filter((subp) -> Files.isRegularFile(subp, LinkOption.NOFOLLOW_LINKS))
+ .mapToLong((subp) -> {
+ try {
+ return Files.size(subp);
+ } catch (IOException e) {
+ LOG.warn("Could not get the size of {}", subp);
+ }
+ return 0;
+ }).sum();
+ }
+ }
+
+ /**
* Updates the last updated time. This should be called when references are added or removed.
*/
protected synchronized void touch() {
@@ -254,4 +270,23 @@ public abstract class LocallyCachedBlob {
}
public abstract boolean isFullyDownloaded();
+
+ static class DownloadMeta {
+ private final Path downloadPath;
+ private final long version;
+
+ public DownloadMeta(Path downloadPath, long version) {
+ this.downloadPath = downloadPath;
+ this.version = version;
+ }
+
+ public Path getDownloadPath() {
+ return downloadPath;
+ }
+
+ public long getVersion() {
+ return version;
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/81307a50/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java
index b430f0b..9058c30 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java
@@ -123,7 +123,7 @@ public class LocallyCachedTopologyBlob extends LocallyCachedBlob {
}
@Override
- public long downloadToTempLocation(ClientBlobStore store)
+ public long fetchUnzipToTemp(ClientBlobStore store)
throws IOException, KeyNotFoundException, AuthorizationException {
if (isLocalMode && type == TopologyBlobType.TOPO_JAR) {
LOG.debug("DOWNLOADING LOCAL JAR to TEMP LOCATION... {}", topologyId);
@@ -148,17 +148,21 @@ public class LocallyCachedTopologyBlob extends LocallyCachedBlob {
}
- long newVersion = downloadToTempLocation(store, type.getKey(topologyId), version, fsOps,
- (version) -> topologyBasicBlobsRootDir.resolve(type.getTempFileName(version)));
+ DownloadMeta downloadMeta = fetch(store, type.getKey(topologyId),
+ v -> {
+ Path path = topologyBasicBlobsRootDir.resolve(type.getTempFileName(v));
+ fsOps.forceMkdir(path.getParent());
+ return path;
+ }, fsOps::getOutputStream);
- Path tmpLocation = topologyBasicBlobsRootDir.resolve(type.getTempFileName(newVersion));
+ Path tmpLocation = downloadMeta.getDownloadPath();
if (type.needsExtraction()) {
- Path extractionDest = topologyBasicBlobsRootDir.resolve(type.getTempExtractionDir(newVersion));
+ Path extractionDest = topologyBasicBlobsRootDir.resolve(type.getTempExtractionDir(downloadMeta.getVersion()));
extractDirFromJar(tmpLocation.toAbsolutePath().toString(), ServerConfigUtils.RESOURCES_SUBDIR,
extractionDest);
}
- return newVersion;
+ return downloadMeta.getVersion();
}
protected void extractDirFromJar(String jarpath, String dir, Path dest) throws IOException {
http://git-wip-us.apache.org/repos/asf/storm/blob/81307a50/storm-server/src/main/java/org/apache/storm/utils/EnumUtil.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/utils/EnumUtil.java b/storm-server/src/main/java/org/apache/storm/utils/EnumUtil.java
new file mode 100644
index 0000000..0b1d723
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/utils/EnumUtil.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
+ * See the NOTICE file distributed with this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.storm.utils;
+
+import java.util.EnumMap;
+import java.util.function.Function;
+
+public class EnumUtil {
+ public static String toMetricName(Enum type) {
+ return type.name().toLowerCase().replace('_', '-');
+ }
+
+ /**
+ * Create an Enum map with given lambda mapper.
+ * @param klass the Enum class
+ * @param mapper The mapper producing value with key (enum constant)
+ * @param <T> An Enum class
+ * @param <U> Mapped class
+ * @return An Enum map
+ */
+ public static <T extends Enum<T>, U> EnumMap<T, U> toEnumMap(Class<T> klass, Function<? super T, ? extends U> mapper) {
+ EnumMap<T, U> map = new EnumMap<>(klass);
+ for (T elem : klass.getEnumConstants()) {
+ map.put(elem, mapper.apply(elem));
+ }
+ return map;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/81307a50/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
index 282c139..6f09f3f 100644
--- a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
+++ b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
@@ -244,7 +244,7 @@ public class SlotTest {
LSWorkerHeartbeat oldhb = mkWorkerHB(topoId, port, execList, Time.currentTimeSecs() - 10);
LSWorkerHeartbeat goodhb = mkWorkerHB(topoId, port, execList, Time.currentTimeSecs());
when(container.readHeartbeat()).thenReturn(oldhb, oldhb, goodhb, goodhb);
- when(container.areAllProcessesDead()).thenReturn(false, true);
+ when(container.areAllProcessesDead()).thenReturn(false, false, true);
ISupervisor iSuper = mock(ISupervisor.class);
LocalState state = mock(LocalState.class);
@@ -289,7 +289,7 @@ public class SlotTest {
Container cContainer = mock(Container.class);
LSWorkerHeartbeat chb = mkWorkerHB(cTopoId, port, cExecList, Time.currentTimeSecs());
when(cContainer.readHeartbeat()).thenReturn(chb);
- when(cContainer.areAllProcessesDead()).thenReturn(false, true);
+ when(cContainer.areAllProcessesDead()).thenReturn(false, false, true);
String nTopoId = "NEW";
List<ExecutorInfo> nExecList = mkExecutorInfoList(1, 2, 3, 4, 5);
@@ -382,7 +382,7 @@ public class SlotTest {
Container cContainer = mock(Container.class);
LSWorkerHeartbeat chb = mkWorkerHB(cTopoId, port, cExecList, Time.currentTimeSecs());
when(cContainer.readHeartbeat()).thenReturn(chb);
- when(cContainer.areAllProcessesDead()).thenReturn(false, true);
+ when(cContainer.areAllProcessesDead()).thenReturn(false, false, true);
AsyncLocalizer localizer = mock(AsyncLocalizer.class);
BlobChangingCallback cb = mock(BlobChangingCallback.class);
@@ -523,7 +523,7 @@ public class SlotTest {
Container cContainer = mock(Container.class);
LSWorkerHeartbeat chb = mkWorkerHB(cTopoId, port, cExecList, Time.currentTimeSecs());
when(cContainer.readHeartbeat()).thenReturn(chb);
- when(cContainer.areAllProcessesDead()).thenReturn(false, true);
+ when(cContainer.areAllProcessesDead()).thenReturn(false, false, true);
AsyncLocalizer localizer = mock(AsyncLocalizer.class);
Container nContainer = mock(Container.class);
LocalState state = mock(LocalState.class);
http://git-wip-us.apache.org/repos/asf/storm/blob/81307a50/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
index 938301f..a919dd3 100644
--- a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
+++ b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
@@ -122,19 +122,19 @@ public class AsyncLocalizerTest {
doReturn(jarBlob).when(bl).getTopoJar(topoId);
when(jarBlob.getLocalVersion()).thenReturn(-1L);
when(jarBlob.getRemoteVersion(any())).thenReturn(100L);
- when(jarBlob.downloadToTempLocation(any())).thenReturn(100L);
+ when(jarBlob.fetchUnzipToTemp(any())).thenReturn(100L);
LocallyCachedTopologyBlob codeBlob = mock(LocallyCachedTopologyBlob.class);
doReturn(codeBlob).when(bl).getTopoCode(topoId);
when(codeBlob.getLocalVersion()).thenReturn(-1L);
when(codeBlob.getRemoteVersion(any())).thenReturn(200L);
- when(codeBlob.downloadToTempLocation(any())).thenReturn(200L);
+ when(codeBlob.fetchUnzipToTemp(any())).thenReturn(200L);
LocallyCachedTopologyBlob confBlob = mock(LocallyCachedTopologyBlob.class);
doReturn(confBlob).when(bl).getTopoConf(topoId);
when(confBlob.getLocalVersion()).thenReturn(-1L);
when(confBlob.getRemoteVersion(any())).thenReturn(300L);
- when(confBlob.downloadToTempLocation(any())).thenReturn(300L);
+ when(confBlob.fetchUnzipToTemp(any())).thenReturn(300L);
ReflectionUtils origRU = ReflectionUtils.setInstance(mockedRU);
ServerUtils origUtils = ServerUtils.setInstance(mockedU);
@@ -145,19 +145,19 @@ public class AsyncLocalizerTest {
Future<Void> f = bl.requestDownloadBaseTopologyBlobs(pna, null);
f.get(20, TimeUnit.SECONDS);
- verify(jarBlob).downloadToTempLocation(any());
+ verify(jarBlob).fetchUnzipToTemp(any());
verify(jarBlob).informAllOfChangeAndWaitForConsensus();
verify(jarBlob).commitNewVersion(100L);
verify(jarBlob).informAllChangeComplete();
verify(jarBlob).cleanupOrphanedData();
- verify(codeBlob).downloadToTempLocation(any());
+ verify(codeBlob).fetchUnzipToTemp(any());
verify(codeBlob).informAllOfChangeAndWaitForConsensus();
verify(codeBlob).commitNewVersion(200L);
verify(codeBlob).informAllChangeComplete();
verify(codeBlob).cleanupOrphanedData();
- verify(confBlob).downloadToTempLocation(any());
+ verify(confBlob).fetchUnzipToTemp(any());
verify(confBlob).informAllOfChangeAndWaitForConsensus();
verify(confBlob).commitNewVersion(300L);
verify(confBlob).informAllChangeComplete();
@@ -436,7 +436,7 @@ public class AsyncLocalizerTest {
when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta(new
FileInputStream(archiveFile.getAbsolutePath()),
- 0));
+ 0, archiveFile.length()));
long timeBefore = Time.currentTimeMillis();
Time.advanceTime(10);
@@ -602,10 +602,8 @@ public class AsyncLocalizerTest {
when(mockblobstore.getBlob(key2)).thenReturn(new TestInputStreamWithMeta(0));
when(mockblobstore.getBlob(key3)).thenReturn(new TestInputStreamWithMeta(0));
- List<LocalResource> keys = Arrays.asList(new LocalResource[]{
- new LocalResource(key1, false, false),
- new LocalResource(key2, false, false), new LocalResource(key3, false, false)
- });
+ List<LocalResource> keys = Arrays.asList(new LocalResource(key1, false, false),
+ new LocalResource(key2, false, false), new LocalResource(key3, false, false));
File user1Dir = localizer.getLocalUserFileCacheDir(user1);
assertTrue("failed to create user dir", user1Dir.mkdirs());
@@ -746,7 +744,8 @@ public class AsyncLocalizerTest {
ReadableBlobMeta rbm = new ReadableBlobMeta();
rbm.set_settable(new SettableBlobMeta(WORLD_EVERYTHING));
when(mockblobstore.getBlobMeta(anyString())).thenReturn(rbm);
- when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta(1));
+ //thenReturn always returns the same object, which is already consumed by the time User3 tries to getBlob!
+ when(mockblobstore.getBlob(key1)).thenAnswer((i) -> new TestInputStreamWithMeta(1));
when(mockblobstore.getBlob(key2)).thenReturn(new TestInputStreamWithMeta(1));
when(mockblobstore.getBlob(key3)).thenReturn(new TestInputStreamWithMeta(1));
@@ -795,6 +794,13 @@ public class AsyncLocalizerTest {
assertTrue("blob not created", keyFile3.exists());
assertTrue("blob not created", keyFile1user3.exists());
+ //Should assert file size
+ assertEquals("size doesn't match", 34, lrsrc.getSizeOnDisk());
+ assertEquals("size doesn't match", 34, lrsrc2.getSizeOnDisk());
+ assertEquals("size doesn't match", 34, lrsrc3.getSizeOnDisk());
+ //This was 0 byte in test
+ assertEquals("size doesn't match", 34, lrsrc1_user3.getSizeOnDisk());
+
ConcurrentMap<String, LocalizedResource> lrsrcSet = localizer.getUserFiles().get(user1);
assertEquals("local resource set size wrong", 1, lrsrcSet.size());
ConcurrentMap<String, LocalizedResource> lrsrcSet2 = localizer.getUserFiles().get(user2);
@@ -941,16 +947,20 @@ public class AsyncLocalizerTest {
class TestInputStreamWithMeta extends InputStreamWithMeta {
private final long version;
+ private final long fileLength;
private InputStream iostream;
public TestInputStreamWithMeta(long version) {
- iostream = IOUtils.toInputStream("some test data for my input stream");
+ final String DEFAULT_DATA = "some test data for my input stream";
+ iostream = IOUtils.toInputStream(DEFAULT_DATA);
this.version = version;
+ this.fileLength = DEFAULT_DATA.length();
}
- public TestInputStreamWithMeta(InputStream istream, long version) {
+ public TestInputStreamWithMeta(InputStream istream, long version, long fileLength) {
iostream = istream;
this.version = version;
+ this.fileLength = fileLength;
}
@Override
@@ -975,7 +985,7 @@ public class AsyncLocalizerTest {
@Override
public long getFileLength() {
- return 0;
+ return fileLength;
}
}
}
[4/4] storm git commit: Merge branch 'STORM-3125' of
https://github.com/zd-project/storm into STORM-3125-3126-3127-merge
Posted by ka...@apache.org.
Merge branch 'STORM-3125' of https://github.com/zd-project/storm into STORM-3125-3126-3127-merge
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/daec2484
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/daec2484
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/daec2484
Branch: refs/heads/master
Commit: daec24841501c911b06fc10a5c98956e40fc1cd4
Parents: afe35f3 11383c2
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Jul 10 06:12:23 2018 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Jul 10 06:12:23 2018 +0900
----------------------------------------------------------------------
.../daemon/drpc/BlockingOutstandingRequest.java | 3 +-
.../daemon/supervisor/ReadClusterState.java | 9 +-
.../apache/storm/daemon/supervisor/Slot.java | 306 +++++++++++--------
.../storm/daemon/supervisor/Supervisor.java | 3 +-
.../apache/storm/localizer/AsyncLocalizer.java | 4 +-
.../org/apache/storm/localizer/IOFunction.java | 22 ++
.../storm/localizer/LocalizedResource.java | 77 ++---
.../storm/localizer/LocallyCachedBlob.java | 115 ++++---
.../localizer/LocallyCachedTopologyBlob.java | 16 +-
.../java/org/apache/storm/utils/EnumUtil.java | 40 +++
.../storm/daemon/supervisor/SlotTest.java | 8 +-
.../storm/localizer/AsyncLocalizerTest.java | 40 ++-
12 files changed, 394 insertions(+), 249 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/daec2484/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
----------------------------------------------------------------------
[2/4] storm git commit: STORM-3126: Refactored methods for
kill_workers to avoid unnecessary force kill.
Posted by ka...@apache.org.
STORM-3126: Refactored methods for kill_workers to avoid
unnecessary force kill.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/216ed738
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/216ed738
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/216ed738
Branch: refs/heads/master
Commit: 216ed738e5243d552ca20d2c9b13fb99c56e33e6
Parents: 81307a5
Author: Zhengdai Hu <hu...@gmail.com>
Authored: Fri Jul 6 16:51:18 2018 -0500
Committer: Zhengdai Hu <hu...@gmail.com>
Committed: Fri Jul 6 16:52:20 2018 -0500
----------------------------------------------------------------------
.../main/java/org/apache/storm/daemon/supervisor/Supervisor.java | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/216ed738/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
index f275835..b8bb4aa 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
@@ -498,15 +498,14 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
}
for (Killable k : containers) {
try {
- k.forceKill();
long start = Time.currentTimeMillis();
while (!k.areAllProcessesDead()) {
if ((Time.currentTimeMillis() - start) > 10_000) {
throw new RuntimeException("Giving up on killing " + k
+ " after " + (Time.currentTimeMillis() - start) + " ms");
}
- Time.sleep(100);
k.forceKill();
+ Time.sleep(100);
}
k.cleanUp();
} catch (Exception e) {
[3/4] storm git commit: STORM-3127: Refactor AsyncLocalizer to avoid
potential race condition
Posted by ka...@apache.org.
STORM-3127: Refactor AsyncLocalizer to avoid potential race condition
(cherry picked from commit 706029a)
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/11383c2a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/11383c2a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/11383c2a
Branch: refs/heads/master
Commit: 11383c2ae08bbadb10857bf0a9dd1eb69b7d2ccc
Parents: 216ed73
Author: Zhengdai Hu <hu...@gmail.com>
Authored: Tue Jun 26 09:58:57 2018 -0500
Committer: Zhengdai Hu <hu...@gmail.com>
Committed: Fri Jul 6 16:52:20 2018 -0500
----------------------------------------------------------------------
.../src/main/java/org/apache/storm/localizer/AsyncLocalizer.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/11383c2a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
index 9d91aa0..19a8cd3 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
@@ -531,9 +531,9 @@ public class AsyncLocalizer implements AutoCloseable {
// go off to blobstore and get it
// assume dir passed in exists and has correct permission
LOG.debug("fetching blob: {}", key);
+ lrsrc.addReference(pna, localResource.needsCallback() ? cb : null);
futures.add(downloadOrUpdate(lrsrc));
results.add(lrsrc);
- lrsrc.addReference(pna, localResource.needsCallback() ? cb : null);
}
for (CompletableFuture<?> futureRsrc : futures) {