You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2017/11/03 20:07:15 UTC

[apex-core] branch master updated: APEXCORE-791 Making gateway security related settings available during construction of the DAG

This is an automated email from the ASF dual-hosted git repository.

vrozov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-core.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ca51f2  APEXCORE-791 Making gateway security related settings available during construction of the DAG
9ca51f2 is described below

commit 9ca51f2683ffe61f8c550630ad8412b37da50393
Author: Pramod Immaneni <pr...@datatorrent.com>
AuthorDate: Tue Oct 24 15:34:47 2017 -0700

    APEXCORE-791 Making gateway security related settings available during construction of the DAG
---
 .../stram/StreamingContainerManager.java           | 23 ++----
 .../stram/debug/TupleRecorderCollection.java       | 33 +++------
 .../plan/logical/LogicalPlanConfiguration.java     | 27 ++++++-
 .../stram/util/PubSubWebSocketClient.java          | 16 ++--
 .../stram/util/SharedPubSubWebSocketClient.java    | 16 +++-
 .../engine/util/PubSubWebSocketClientBuilder.java  | 85 ++++++++++++++++++++++
 6 files changed, 149 insertions(+), 51 deletions(-)

diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 07641d2..65be99a 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -71,6 +71,7 @@ import org.apache.apex.engine.events.grouping.GroupingRequest.EventGroupId;
 import org.apache.apex.engine.plugin.ApexPluginDispatcher;
 import org.apache.apex.engine.plugin.NoOpApexPluginDispatcher;
 import org.apache.apex.engine.util.CascadeStorageAgent;
+import org.apache.apex.engine.util.PubSubWebSocketClientBuilder;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.builder.ToStringBuilder;
@@ -197,7 +198,6 @@ import net.engio.mbassy.bus.config.BusConfiguration;
 public class StreamingContainerManager implements PlanContext
 {
   private static final Logger LOG = LoggerFactory.getLogger(StreamingContainerManager.class);
-  public static final String GATEWAY_LOGIN_URL_PATH = "/ws/v2/login";
   public static final String BUILTIN_APPDATA_URL = "builtin";
   public static final String CONTAINERS_INFO_FILENAME_FORMAT = "containers_%d.json";
   public static final String OPERATORS_INFO_FILENAME_FORMAT = "operators_%d.json";
@@ -556,23 +556,12 @@ public class StreamingContainerManager implements PlanContext
 
   private void setupWsClient()
   {
-    String gatewayAddress = plan.getLogicalPlan().getValue(LogicalPlan.GATEWAY_CONNECT_ADDRESS);
-    boolean gatewayUseSsl = plan.getLogicalPlan().getValue(LogicalPlan.GATEWAY_USE_SSL);
-    String gatewayUserName = plan.getLogicalPlan().getValue(LogicalPlan.GATEWAY_USER_NAME);
-    String gatewayPassword = plan.getLogicalPlan().getValue(LogicalPlan.GATEWAY_PASSWORD);
-    int timeout = plan.getLogicalPlan().getValue(LogicalPlan.PUBSUB_CONNECT_TIMEOUT_MILLIS);
-
-    if (gatewayAddress != null) {
+    wsClient = new PubSubWebSocketClientBuilder().setContext(plan.getLogicalPlan()).build();
+    if (wsClient != null) {
       try {
-        wsClient = new SharedPubSubWebSocketClient((gatewayUseSsl ? "wss://" : "ws://") + gatewayAddress + "/pubsub", timeout);
-        if (gatewayUserName != null && gatewayPassword != null) {
-          wsClient.setLoginUrl((gatewayUseSsl ? "https://" : "http://") + gatewayAddress + GATEWAY_LOGIN_URL_PATH);
-          wsClient.setUserName(gatewayUserName);
-          wsClient.setPassword(gatewayPassword);
-        }
-        wsClient.setup();
-      } catch (Exception ex) {
-        LOG.warn("Cannot establish websocket connection to {}", gatewayAddress, ex);
+        wsClient.openConnection();
+      } catch (Exception e) {
+        LOG.warn("Cannot establish websocket connection to uri {}", wsClient.getUri(), e);
       }
     }
   }
diff --git a/engine/src/main/java/com/datatorrent/stram/debug/TupleRecorderCollection.java b/engine/src/main/java/com/datatorrent/stram/debug/TupleRecorderCollection.java
index e41b348..49998cf 100644
--- a/engine/src/main/java/com/datatorrent/stram/debug/TupleRecorderCollection.java
+++ b/engine/src/main/java/com/datatorrent/stram/debug/TupleRecorderCollection.java
@@ -26,6 +26,8 @@ import java.util.Map;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.engine.util.PubSubWebSocketClientBuilder;
+
 import com.datatorrent.api.Component;
 import com.datatorrent.api.Context;
 import com.datatorrent.api.Context.OperatorContext;
@@ -39,7 +41,6 @@ import com.datatorrent.api.Stats.OperatorStats.PortStats;
 import com.datatorrent.api.StatsListener;
 import com.datatorrent.api.StatsListener.OperatorRequest;
 import com.datatorrent.api.StringCodec;
-import com.datatorrent.stram.StreamingContainerManager;
 import com.datatorrent.stram.api.ContainerContext;
 import com.datatorrent.stram.api.ContainerEvent.ContainerStatsEvent;
 import com.datatorrent.stram.api.ContainerEvent.NodeActivationEvent;
@@ -66,10 +67,6 @@ import net.engio.mbassy.listener.Handler;
 public class TupleRecorderCollection extends HashMap<OperatorIdPortNamePair, TupleRecorder> implements Component<Context>
 {
   private int tupleRecordingPartFileSize;
-  private String gatewayAddress;
-  private boolean gatewayUseSsl = false;
-  private String gatewayUserName;
-  private String gatewayPassword;
   private long tupleRecordingPartFileTimeMillis;
   private String appPath;
   private String appId;
@@ -89,13 +86,11 @@ public class TupleRecorderCollection extends HashMap<OperatorIdPortNamePair, Tup
     tupleRecordingPartFileSize = ctx.getValue(LogicalPlan.TUPLE_RECORDING_PART_FILE_SIZE);
     tupleRecordingPartFileTimeMillis = ctx.getValue(LogicalPlan.TUPLE_RECORDING_PART_FILE_TIME_MILLIS);
     appId = ctx.getValue(LogicalPlan.APPLICATION_ID);
-    gatewayAddress = ctx.getValue(LogicalPlan.GATEWAY_CONNECT_ADDRESS);
-    gatewayUseSsl = ctx.getValue(LogicalPlan.GATEWAY_USE_SSL);
-    gatewayUserName = ctx.getValue(LogicalPlan.GATEWAY_USER_NAME);
-    gatewayPassword = ctx.getValue(LogicalPlan.GATEWAY_PASSWORD);
     appPath = ctx.getValue(LogicalPlan.APPLICATION_PATH);
     codecs = ctx.getAttributes().get(Context.DAGContext.STRING_CODECS);
 
+    wsClient = new PubSubWebSocketClientBuilder().setContext(ctx).build();
+
     RequestDelegateImpl impl = new RequestDelegateImpl();
     RequestFactory rf = ctx.getValue(ContainerContext.REQUEST_FACTORY);
     if (rf == null) {
@@ -161,21 +156,11 @@ public class TupleRecorderCollection extends HashMap<OperatorIdPortNamePair, Tup
     if (!conflict) {
       logger.debug("Executing start recording request for {}", operatorIdPortNamePair);
 
-      if (gatewayAddress != null && wsClient == null) {
-        synchronized (this) {
-          if (wsClient == null) {
-            try {
-              wsClient = new SharedPubSubWebSocketClient((gatewayUseSsl ? "wss://" : "ws://") + gatewayAddress + "/pubsub", 500);
-              if (gatewayUserName != null && gatewayPassword != null) {
-                wsClient.setLoginUrl((gatewayUseSsl ? "https://" : "http://") + gatewayAddress + StreamingContainerManager.GATEWAY_LOGIN_URL_PATH);
-                wsClient.setUserName(gatewayUserName);
-                wsClient.setPassword(gatewayPassword);
-              }
-              wsClient.setup();
-            } catch (Exception ex) {
-              logger.warn("Error initializing websocket", ex);
-            }
-          }
+      if (wsClient != null) {
+        try {
+          wsClient.openConnection();
+        } catch (Exception e) {
+          logger.warn("Cannot establish websocket connection to uri {}", wsClient.getUri(), e);
         }
       }
 
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
index 01a4c7b..5fad04f 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
@@ -143,6 +143,9 @@ public class LogicalPlanConfiguration
 
   public static final String KEY_APPLICATION_NAME = keyAndDeprecation(Context.DAGContext.APPLICATION_NAME);
   public static final String KEY_GATEWAY_CONNECT_ADDRESS = keyAndDeprecation(Context.DAGContext.GATEWAY_CONNECT_ADDRESS);
+  public static final String KEY_GATEWAY_USE_SSL = keyAndDeprecation(Context.DAGContext.GATEWAY_USE_SSL);
+  public static final String KEY_GATEWAY_USER_NAME = keyAndDeprecation(Context.DAGContext.GATEWAY_USER_NAME);
+  public static final String KEY_GATEWAY_PASSWORD = keyAndDeprecation(Context.DAGContext.GATEWAY_PASSWORD);
 
   private static String keyAndDeprecation(Attribute<?> attr)
   {
@@ -2248,9 +2251,8 @@ public class LogicalPlanConfiguration
    */
   public void prepareDAG(LogicalPlan dag, StreamingApplication app, String name)
   {
-    // EVENTUALLY to be replaced by variable enabled configuration in the demo where the attribute below is used
-    String connectAddress = conf.get(KEY_GATEWAY_CONNECT_ADDRESS);
-    dag.setAttribute(Context.DAGContext.GATEWAY_CONNECT_ADDRESS, connectAddress == null ? conf.get(GATEWAY_LISTEN_ADDRESS) : connectAddress);
+    prepareDAGAttributes(dag);
+
     pluginManager.setup(dag);
     if (app != null) {
       pluginManager.dispatch(PRE_POPULATE_DAG.event);
@@ -2277,6 +2279,25 @@ public class LogicalPlanConfiguration
     pluginManager.teardown();
   }
 
+  private void prepareDAGAttributes(LogicalPlan dag)
+  {
+    // Consider making all attributes available for DAG construction
+    // EVENTUALLY to be replaced by variable enabled configuration in the demo where the attribute below is used
+    String connectAddress = conf.get(KEY_GATEWAY_CONNECT_ADDRESS);
+    dag.setAttribute(DAGContext.GATEWAY_CONNECT_ADDRESS, connectAddress == null ? conf.get(GATEWAY_LISTEN_ADDRESS) : connectAddress);
+    if (conf.getBoolean(KEY_GATEWAY_USE_SSL, DAGContext.GATEWAY_USE_SSL.defaultValue)) {
+      dag.setAttribute(DAGContext.GATEWAY_USE_SSL, true);
+    }
+    String username = conf.get(KEY_GATEWAY_USER_NAME);
+    if (username != null) {
+      dag.setAttribute(DAGContext.GATEWAY_USER_NAME, username);
+    }
+    String password = conf.get(KEY_GATEWAY_PASSWORD);
+    if (password != null) {
+      dag.setAttribute(DAGContext.GATEWAY_PASSWORD, password);
+    }
+  }
+
   private void flattenDAG(LogicalPlan dag, Configuration conf)
   {
     for (ModuleMeta moduleMeta : dag.getAllModules()) {
diff --git a/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketClient.java b/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketClient.java
index 47986cb..b8f5bfe 100644
--- a/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketClient.java
+++ b/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketClient.java
@@ -138,6 +138,11 @@ public abstract class PubSubWebSocketClient implements Component<Context>
     this.uri = uri;
   }
 
+  public URI getUri()
+  {
+    return uri;
+  }
+
   public void setIoThreadMultiplier(int ioThreadMultiplier)
   {
     this.ioThreadMultiplier = ioThreadMultiplier;
@@ -239,17 +244,18 @@ public abstract class PubSubWebSocketClient implements Component<Context>
     }
   }
 
+  protected boolean isConnectionSetup()
+  {
+    return (connection != null);
+  }
+
   /**
    *
    * @return true if the connection is open; false otherwise.
    */
   public boolean isConnectionOpen()
   {
-    if (connection == null) {
-      return false;
-    }
-
-    return connection.isOpen();
+    return isConnectionSetup() ? connection.isOpen() : false;
   }
 
   /**
diff --git a/engine/src/main/java/com/datatorrent/stram/util/SharedPubSubWebSocketClient.java b/engine/src/main/java/com/datatorrent/stram/util/SharedPubSubWebSocketClient.java
index 73572f2..d981127 100644
--- a/engine/src/main/java/com/datatorrent/stram/util/SharedPubSubWebSocketClient.java
+++ b/engine/src/main/java/com/datatorrent/stram/util/SharedPubSubWebSocketClient.java
@@ -57,6 +57,11 @@ public class SharedPubSubWebSocketClient extends PubSubWebSocketClient
 
   }
 
+  /**
+   * Construct a SharedPubSubWebSocketClient with the given parameters
+   * @param uri The web socket server uri
+   * @param timeoutMillis The connection timeout
+   */
   public SharedPubSubWebSocketClient(URI uri, long timeoutMillis)
   {
     this.setUri(uri);
@@ -64,14 +69,21 @@ public class SharedPubSubWebSocketClient extends PubSubWebSocketClient
     this.timeoutMillis = timeoutMillis;
   }
 
+  /**
+   * Construct a SharedPubSubWebSocketClient with the given parameters
+   * @param uri The web socket server uri as string
+   * @param timeoutMillis The connection timeout
+   */
   public SharedPubSubWebSocketClient(String uri, long timeoutMillis) throws URISyntaxException
   {
     this(new URI(uri), timeoutMillis);
   }
 
-  public void setup() throws IOException, ExecutionException, InterruptedException, TimeoutException
+  public synchronized void openConnection() throws IOException, ExecutionException, InterruptedException, TimeoutException
   {
-    openConnection(timeoutMillis);
+    if (!isConnectionSetup()) {
+      super.openConnection(timeoutMillis);
+    }
   }
 
   public synchronized void addHandler(String topic, boolean numSubscribers, Handler handler)
diff --git a/engine/src/main/java/org/apache/apex/engine/util/PubSubWebSocketClientBuilder.java b/engine/src/main/java/org/apache/apex/engine/util/PubSubWebSocketClientBuilder.java
new file mode 100644
index 0000000..dc0bd3a
--- /dev/null
+++ b/engine/src/main/java/org/apache/apex/engine/util/PubSubWebSocketClientBuilder.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.engine.util;
+
+import java.net.URISyntaxException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
+import com.datatorrent.stram.util.PubSubWebSocketClient;
+import com.datatorrent.stram.util.SharedPubSubWebSocketClient;
+
+public class PubSubWebSocketClientBuilder
+{
+  public static final String GATEWAY_LOGIN_URL_PATH = "/ws/v2/login";
+
+  private static final Logger logger = LoggerFactory.getLogger(PubSubWebSocketClientBuilder.class);
+
+  private Context context;
+
+  public PubSubWebSocketClientBuilder setContext(Context context)
+  {
+    this.context = context;
+    return this;
+  }
+
+  private <T extends PubSubWebSocketClient> T build(Class<T> clazz)
+  {
+    Preconditions.checkState(context != null, "Context not specified");
+    String gatewayAddress = context.getValue(LogicalPlan.GATEWAY_CONNECT_ADDRESS);
+    if (gatewayAddress != null) {
+      int timeout = context.getValue(LogicalPlan.PUBSUB_CONNECT_TIMEOUT_MILLIS);
+      boolean gatewayUseSsl = context.getValue(LogicalPlan.GATEWAY_USE_SSL);
+
+      // The builder can be used to build different types of PubSub clients in future but for now only one is supported
+      SharedPubSubWebSocketClient wsClient = null;
+
+      try {
+        wsClient = new SharedPubSubWebSocketClient((gatewayUseSsl ? "wss://" : "ws://") + gatewayAddress + "/pubsub", timeout);
+
+        String gatewayUserName = context.getValue(LogicalPlan.GATEWAY_USER_NAME);
+        String gatewayPassword = context.getValue(LogicalPlan.GATEWAY_PASSWORD);
+        if (gatewayUserName != null && gatewayPassword != null) {
+          wsClient.setLoginUrl((gatewayUseSsl ? "https://" : "http://") + gatewayAddress + GATEWAY_LOGIN_URL_PATH);
+          wsClient.setUserName(gatewayUserName);
+          wsClient.setPassword(gatewayPassword);
+        }
+
+        return (T)wsClient;
+      } catch (URISyntaxException e) {
+        logger.warn("Unable to initialize websocket for gateway address {}", gatewayAddress, e);
+      }
+
+      return null;
+    }
+
+    return null;
+  }
+
+  public SharedPubSubWebSocketClient build()
+  {
+    return build(SharedPubSubWebSocketClient.class);
+  }
+
+}

-- 
To stop receiving notification emails like this one, please contact
['"commits@apex.apache.org" <co...@apex.apache.org>'].