You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by sj...@apache.org on 2015/12/14 11:57:52 UTC

[3/6] incubator-brooklyn git commit: releasePortForwarding: exec all concurrently

releasePortForwarding: exec all concurrently

- Also execute the releasing of the sshHostAndPort concurrently.
- Adds fallback behaviour for if not executed inside an execution 
  context.


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/fa4da076
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/fa4da076
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/fa4da076

Branch: refs/heads/master
Commit: fa4da0763b9eba8cbe9927ff33fb3da5f855adb1
Parents: a16a729
Author: Aled Sage <al...@gmail.com>
Authored: Mon Dec 7 20:56:45 2015 +0000
Committer: Mark McKenna <m4...@gmail.com>
Committed: Fri Dec 11 15:50:42 2015 +0000

----------------------------------------------------------------------
 .../location/jclouds/JcloudsLocation.java       | 78 +++++++++++++-------
 1 file changed, 52 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/fa4da076/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/JcloudsLocation.java
----------------------------------------------------------------------
diff --git a/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/JcloudsLocation.java b/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/JcloudsLocation.java
index 2f08af8..f368cdb 100644
--- a/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/JcloudsLocation.java
+++ b/locations/jclouds/src/main/java/org/apache/brooklyn/location/jclouds/JcloudsLocation.java
@@ -58,7 +58,6 @@ import org.apache.brooklyn.api.location.NoMachinesAvailableException;
 import org.apache.brooklyn.api.location.PortRange;
 import org.apache.brooklyn.api.mgmt.AccessController;
 import org.apache.brooklyn.api.mgmt.Task;
-import org.apache.brooklyn.api.mgmt.TaskAdaptable;
 import org.apache.brooklyn.config.ConfigKey;
 import org.apache.brooklyn.config.ConfigKey.HasConfigKey;
 import org.apache.brooklyn.core.config.ConfigUtils;
@@ -185,6 +184,9 @@ import com.google.common.collect.Sets.SetView;
 import com.google.common.io.Files;
 import com.google.common.net.HostAndPort;
 
+import io.cloudsoft.winrm4j.pywinrm.Session;
+import io.cloudsoft.winrm4j.pywinrm.WinRMFactory;
+
 /**
  * For provisioning and managing VMs in a particular provider/region, using jclouds.
  * Configuration flags are defined in {@link JcloudsLocationConfig}.
@@ -2449,15 +2451,23 @@ public class JcloudsLocation extends AbstractCloudMachineProvisioningLocation im
         final JcloudsPortForwarderExtension portForwarder = machine.getConfig(PORT_FORWARDER);
         PortForwardManager portForwardManager = machine.getConfig(PORT_FORWARDING_MANAGER);
         final NodeMetadata node = (machine instanceof JcloudsSshMachineLocation) ? ((JcloudsSshMachineLocation) machine).getNode() : null;
+        final Map<String, Runnable> subtasks = Maps.newLinkedHashMap();
 
         if (portForwarder == null) {
             LOG.debug("No port-forwarding to close (because portForwarder null) on release of " + machine);
         } else {
-            // Release the port-forwarding for the login-port, which was explicilty created by JcloudsLocation
+            // Release the port-forwarding for the login-port, which was explicitly created by JcloudsLocation
             if (usePortForwarding && node != null) {
-                HostAndPort sshHostAndPortOverride = machine.getSshHostAndPort();
-                LOG.debug("Closing port-forwarding at {} for machine {}: {}->{}", new Object[] {this, machine, sshHostAndPortOverride, node.getLoginPort()});
-                portForwarder.closePortForwarding(node, node.getLoginPort(), sshHostAndPortOverride, Protocol.TCP);
+                final HostAndPort sshHostAndPortOverride = machine.getSshHostAndPort();
+                final int loginPort = node.getLoginPort();
+                subtasks.put(
+                        "Close port-forward "+sshHostAndPortOverride+"->"+node.getLoginPort(),
+                        new Runnable() {
+                            public void run() {
+                                LOG.debug("Closing port-forwarding at {} for machine {}: {}->{}", new Object[] {this, machine, sshHostAndPortOverride, node.getLoginPort()});
+                                portForwarder.closePortForwarding(node, loginPort, sshHostAndPortOverride, Protocol.TCP);
+                            }
+                        });
             }
 
             // Get all the other port-forwarding mappings for this VM, and release all of those
@@ -2472,34 +2482,50 @@ public class JcloudsLocation extends AbstractCloudMachineProvisioningLocation im
                 mappings = ImmutableSet.of();
             }
 
-            final TaskBuilder<Void> builder = TaskBuilder.<Void>builder()
-                    .parallel(true)
-                    .displayName("close port-forwarding at "+machine);
-
             for (final PortMapping mapping : mappings) {
                 final HostAndPort publicEndpoint = mapping.getPublicEndpoint();
                 final int targetPort = mapping.getPrivatePort();
                 final Protocol protocol = Protocol.TCP;
                 if (publicEndpoint != null) {
-                    builder.add(TaskBuilder.builder().displayName("Close port-forward at " +machine).body(new Runnable() {
-                        @Override
-                        public void run() {
-                            LOG.debug("Closing port-forwarding at {} for machine {}: {}->{}", new Object[] {this, machine, publicEndpoint, targetPort});
-                            portForwarder.closePortForwarding(node, targetPort, publicEndpoint, protocol);
-                        }
-                    }).build());
+                    subtasks.put(
+                            "Close port-forward "+publicEndpoint+"->"+targetPort,
+                            new Runnable() {
+                                public void run() {
+                                    LOG.debug("Closing port-forwarding at {} for machine {}: {}->{}", new Object[] {this, machine, publicEndpoint, targetPort});
+                                    portForwarder.closePortForwarding(node, targetPort, publicEndpoint, protocol);
+                                }
+                            });
                 }
             }
-            final Task<Void> task = builder.build();
-            final DynamicTasks.TaskQueueingResult<Void> queueResult = DynamicTasks.queueIfPossible(task);
-            if(!queueResult.isQueuedOrSubmitted()){
-                getManagementContext().getExecutionManager().submit(queueResult);
-            }
-            final String origDetails = Tasks.setBlockingDetails("waiting for closing port-forwarding of "+machine);
-            try {
-                task.blockUntilEnded();
-            } finally {
-                Tasks.setBlockingDetails(origDetails);
+
+            if (subtasks.size() > 0) {
+                final TaskBuilder<Void> builder = TaskBuilder.<Void>builder()
+                        .parallel(true)
+                        .displayName("close port-forwarding at "+machine);
+                for (Map.Entry<String, Runnable> entry : subtasks.entrySet()) {
+                    builder.add(TaskBuilder.builder().displayName(entry.getKey()).body(entry.getValue()).build());
+                }
+                final Task<Void> task = builder.build();
+                final DynamicTasks.TaskQueueingResult<Void> queueResult = DynamicTasks.queueIfPossible(task);
+                if(queueResult.isQueuedOrSubmitted()){
+                    final String origDetails = Tasks.setBlockingDetails("waiting for closing port-forwarding of "+machine);
+                    try {
+                        task.blockUntilEnded();
+                    } finally {
+                        Tasks.setBlockingDetails(origDetails);
+                    }
+                } else {
+                    // Not executing inside an execution context; can't submit!
+                    // It's not enough to just do:
+                    //     getManagementContext().getExecutionManager().submit(queueResult);
+                    // (see CompoundTask.submitIfNecessary, which gets called in ParallelTask).
+                    // Instead, we'll fall back to executing sequentially.
+                    LOG.warn("Releasing port-forwarding of "+machine+" not executing in execution-context "
+                            + "(e.g. not invoked inside effector); falling back to executing sequentially");
+                    for (Runnable subtask : subtasks.values()) {
+                        subtask.run();
+                    }
+                }
             }
         }