You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by sj...@apache.org on 2015/12/14 11:57:52 UTC
[3/6] incubator-brooklyn git commit: releasePortForwarding: exec all
concurrently
releasePortForwarding: exec all concurrently
- Also execute the releasing of the sshHostAndPort concurrently.
- Adds fallback behaviour for if not executed inside an execution
context.
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/fa4da076
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/fa4da076
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/fa4da076
Branch: refs/heads/master
Commit: fa4da0763b9eba8cbe9927ff33fb3da5f855adb1
Parents: a16a729
Author: Aled Sage <al...@gmail.com>
Authored: Mon Dec 7 20:56:45 2015 +0000
Committer: Mark McKenna <m4...@gmail.com>
Committed: Fri Dec 11 15:50:42 2015 +0000
----------------------------------------------------------------------
.../location/jclouds/JcloudsLocation.java | 78 +++++++++++++-------
1 file changed, 52 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/fa4da076/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/JcloudsLocation.java
----------------------------------------------------------------------
diff --git a/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/JcloudsLocation.java b/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/JcloudsLocation.java
index 2f08af8..f368cdb 100644
--- a/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/JcloudsLocation.java
+++ b/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/JcloudsLocation.java
@@ -58,7 +58,6 @@ import org.apache.brooklyn.api.location.NoMachinesAvailableException;
import org.apache.brooklyn.api.location.PortRange;
import org.apache.brooklyn.api.mgmt.AccessController;
import org.apache.brooklyn.api.mgmt.Task;
-import org.apache.brooklyn.api.mgmt.TaskAdaptable;
import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.config.ConfigKey.HasConfigKey;
import org.apache.brooklyn.core.config.ConfigUtils;
@@ -185,6 +184,9 @@ import com.google.common.collect.Sets.SetView;
import com.google.common.io.Files;
import com.google.common.net.HostAndPort;
+import io.cloudsoft.winrm4j.pywinrm.Session;
+import io.cloudsoft.winrm4j.pywinrm.WinRMFactory;
+
/**
* For provisioning and managing VMs in a particular provider/region, using jclouds.
* Configuration flags are defined in {@link JcloudsLocationConfig}.
@@ -2449,15 +2451,23 @@ public class JcloudsLocation extends AbstractCloudMachineProvisioningLocation im
final JcloudsPortForwarderExtension portForwarder = machine.getConfig(PORT_FORWARDER);
PortForwardManager portForwardManager = machine.getConfig(PORT_FORWARDING_MANAGER);
final NodeMetadata node = (machine instanceof JcloudsSshMachineLocation) ? ((JcloudsSshMachineLocation) machine).getNode() : null;
+ final Map<String, Runnable> subtasks = Maps.newLinkedHashMap();
if (portForwarder == null) {
LOG.debug("No port-forwarding to close (because portForwarder null) on release of " + machine);
} else {
- // Release the port-forwarding for the login-port, which was explicilty created by JcloudsLocation
+ // Release the port-forwarding for the login-port, which was explicitly created by JcloudsLocation
if (usePortForwarding && node != null) {
- HostAndPort sshHostAndPortOverride = machine.getSshHostAndPort();
- LOG.debug("Closing port-forwarding at {} for machine {}: {}->{}", new Object[] {this, machine, sshHostAndPortOverride, node.getLoginPort()});
- portForwarder.closePortForwarding(node, node.getLoginPort(), sshHostAndPortOverride, Protocol.TCP);
+ final HostAndPort sshHostAndPortOverride = machine.getSshHostAndPort();
+ final int loginPort = node.getLoginPort();
+ subtasks.put(
+ "Close port-forward "+sshHostAndPortOverride+"->"+node.getLoginPort(),
+ new Runnable() {
+ public void run() {
+ LOG.debug("Closing port-forwarding at {} for machine {}: {}->{}", new Object[] {this, machine, sshHostAndPortOverride, node.getLoginPort()});
+ portForwarder.closePortForwarding(node, loginPort, sshHostAndPortOverride, Protocol.TCP);
+ }
+ });
}
// Get all the other port-forwarding mappings for this VM, and release all of those
@@ -2472,34 +2482,50 @@ public class JcloudsLocation extends AbstractCloudMachineProvisioningLocation im
mappings = ImmutableSet.of();
}
- final TaskBuilder<Void> builder = TaskBuilder.<Void>builder()
- .parallel(true)
- .displayName("close port-forwarding at "+machine);
-
for (final PortMapping mapping : mappings) {
final HostAndPort publicEndpoint = mapping.getPublicEndpoint();
final int targetPort = mapping.getPrivatePort();
final Protocol protocol = Protocol.TCP;
if (publicEndpoint != null) {
- builder.add(TaskBuilder.builder().displayName("Close port-forward at " +machine).body(new Runnable() {
- @Override
- public void run() {
- LOG.debug("Closing port-forwarding at {} for machine {}: {}->{}", new Object[] {this, machine, publicEndpoint, targetPort});
- portForwarder.closePortForwarding(node, targetPort, publicEndpoint, protocol);
- }
- }).build());
+ subtasks.put(
+ "Close port-forward "+publicEndpoint+"->"+targetPort,
+ new Runnable() {
+ public void run() {
+ LOG.debug("Closing port-forwarding at {} for machine {}: {}->{}", new Object[] {this, machine, publicEndpoint, targetPort});
+ portForwarder.closePortForwarding(node, targetPort, publicEndpoint, protocol);
+ }
+ });
}
}
- final Task<Void> task = builder.build();
- final DynamicTasks.TaskQueueingResult<Void> queueResult = DynamicTasks.queueIfPossible(task);
- if(!queueResult.isQueuedOrSubmitted()){
- getManagementContext().getExecutionManager().submit(queueResult);
- }
- final String origDetails = Tasks.setBlockingDetails("waiting for closing port-forwarding of "+machine);
- try {
- task.blockUntilEnded();
- } finally {
- Tasks.setBlockingDetails(origDetails);
+
+ if (subtasks.size() > 0) {
+ final TaskBuilder<Void> builder = TaskBuilder.<Void>builder()
+ .parallel(true)
+ .displayName("close port-forwarding at "+machine);
+ for (Map.Entry<String, Runnable> entry : subtasks.entrySet()) {
+ builder.add(TaskBuilder.builder().displayName(entry.getKey()).body(entry.getValue()).build());
+ }
+ final Task<Void> task = builder.build();
+ final DynamicTasks.TaskQueueingResult<Void> queueResult = DynamicTasks.queueIfPossible(task);
+ if(queueResult.isQueuedOrSubmitted()){
+ final String origDetails = Tasks.setBlockingDetails("waiting for closing port-forwarding of "+machine);
+ try {
+ task.blockUntilEnded();
+ } finally {
+ Tasks.setBlockingDetails(origDetails);
+ }
+ } else {
+ // Not executing inside an execution context; can't submit!
+ // It's not enough to just do:
+ // getManagementContext().getExecutionManager().submit(queueResult);
+ // (see CompoundTask.submitIfNecessary, which gets called in ParallelTask).
+ // Instead, we'll fall back to executing sequentially.
+ LOG.warn("Releasing port-forwarding of "+machine+" not executing in execution-context "
+ + "(e.g. not invoked inside effector); falling back to executing sequentially");
+ for (Runnable subtask : subtasks.values()) {
+ subtask.run();
+ }
+ }
}
}