You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/11/03 20:08:00 UTC

[jira] [Commented] (APEXCORE-791) Gateway security settings need to be available in the DAG

    [ https://issues.apache.org/jira/browse/APEXCORE-791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16238283#comment-16238283 ] 

ASF GitHub Bot commented on APEXCORE-791:
-----------------------------------------

vrozov closed pull request #586: APEXCORE-791 Making gateway security related settings available during construction of the DAG
URL: https://github.com/apache/apex-core/pull/586
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 07641d23e2..65be99a6d0 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -71,6 +71,7 @@
 import org.apache.apex.engine.plugin.ApexPluginDispatcher;
 import org.apache.apex.engine.plugin.NoOpApexPluginDispatcher;
 import org.apache.apex.engine.util.CascadeStorageAgent;
+import org.apache.apex.engine.util.PubSubWebSocketClientBuilder;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.builder.ToStringBuilder;
@@ -197,7 +198,6 @@
 public class StreamingContainerManager implements PlanContext
 {
   private static final Logger LOG = LoggerFactory.getLogger(StreamingContainerManager.class);
-  public static final String GATEWAY_LOGIN_URL_PATH = "/ws/v2/login";
   public static final String BUILTIN_APPDATA_URL = "builtin";
   public static final String CONTAINERS_INFO_FILENAME_FORMAT = "containers_%d.json";
   public static final String OPERATORS_INFO_FILENAME_FORMAT = "operators_%d.json";
@@ -556,23 +556,12 @@ private void setupStringCodecs()
 
   private void setupWsClient()
   {
-    String gatewayAddress = plan.getLogicalPlan().getValue(LogicalPlan.GATEWAY_CONNECT_ADDRESS);
-    boolean gatewayUseSsl = plan.getLogicalPlan().getValue(LogicalPlan.GATEWAY_USE_SSL);
-    String gatewayUserName = plan.getLogicalPlan().getValue(LogicalPlan.GATEWAY_USER_NAME);
-    String gatewayPassword = plan.getLogicalPlan().getValue(LogicalPlan.GATEWAY_PASSWORD);
-    int timeout = plan.getLogicalPlan().getValue(LogicalPlan.PUBSUB_CONNECT_TIMEOUT_MILLIS);
-
-    if (gatewayAddress != null) {
+    wsClient = new PubSubWebSocketClientBuilder().setContext(plan.getLogicalPlan()).build();
+    if (wsClient != null) {
       try {
-        wsClient = new SharedPubSubWebSocketClient((gatewayUseSsl ? "wss://" : "ws://") + gatewayAddress + "/pubsub", timeout);
-        if (gatewayUserName != null && gatewayPassword != null) {
-          wsClient.setLoginUrl((gatewayUseSsl ? "https://" : "http://") + gatewayAddress + GATEWAY_LOGIN_URL_PATH);
-          wsClient.setUserName(gatewayUserName);
-          wsClient.setPassword(gatewayPassword);
-        }
-        wsClient.setup();
-      } catch (Exception ex) {
-        LOG.warn("Cannot establish websocket connection to {}", gatewayAddress, ex);
+        wsClient.openConnection();
+      } catch (Exception e) {
+        LOG.warn("Cannot establish websocket connection to uri {}", wsClient.getUri(), e);
       }
     }
   }
diff --git a/engine/src/main/java/com/datatorrent/stram/debug/TupleRecorderCollection.java b/engine/src/main/java/com/datatorrent/stram/debug/TupleRecorderCollection.java
index e41b348a0c..49998cfa00 100644
--- a/engine/src/main/java/com/datatorrent/stram/debug/TupleRecorderCollection.java
+++ b/engine/src/main/java/com/datatorrent/stram/debug/TupleRecorderCollection.java
@@ -26,6 +26,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.engine.util.PubSubWebSocketClientBuilder;
+
 import com.datatorrent.api.Component;
 import com.datatorrent.api.Context;
 import com.datatorrent.api.Context.OperatorContext;
@@ -39,7 +41,6 @@
 import com.datatorrent.api.StatsListener;
 import com.datatorrent.api.StatsListener.OperatorRequest;
 import com.datatorrent.api.StringCodec;
-import com.datatorrent.stram.StreamingContainerManager;
 import com.datatorrent.stram.api.ContainerContext;
 import com.datatorrent.stram.api.ContainerEvent.ContainerStatsEvent;
 import com.datatorrent.stram.api.ContainerEvent.NodeActivationEvent;
@@ -66,10 +67,6 @@
 public class TupleRecorderCollection extends HashMap<OperatorIdPortNamePair, TupleRecorder> implements Component<Context>
 {
   private int tupleRecordingPartFileSize;
-  private String gatewayAddress;
-  private boolean gatewayUseSsl = false;
-  private String gatewayUserName;
-  private String gatewayPassword;
   private long tupleRecordingPartFileTimeMillis;
   private String appPath;
   private String appId;
@@ -89,13 +86,11 @@ public void setup(Context ctx)
     tupleRecordingPartFileSize = ctx.getValue(LogicalPlan.TUPLE_RECORDING_PART_FILE_SIZE);
     tupleRecordingPartFileTimeMillis = ctx.getValue(LogicalPlan.TUPLE_RECORDING_PART_FILE_TIME_MILLIS);
     appId = ctx.getValue(LogicalPlan.APPLICATION_ID);
-    gatewayAddress = ctx.getValue(LogicalPlan.GATEWAY_CONNECT_ADDRESS);
-    gatewayUseSsl = ctx.getValue(LogicalPlan.GATEWAY_USE_SSL);
-    gatewayUserName = ctx.getValue(LogicalPlan.GATEWAY_USER_NAME);
-    gatewayPassword = ctx.getValue(LogicalPlan.GATEWAY_PASSWORD);
     appPath = ctx.getValue(LogicalPlan.APPLICATION_PATH);
     codecs = ctx.getAttributes().get(Context.DAGContext.STRING_CODECS);
 
+    wsClient = new PubSubWebSocketClientBuilder().setContext(ctx).build();
+
     RequestDelegateImpl impl = new RequestDelegateImpl();
     RequestFactory rf = ctx.getValue(ContainerContext.REQUEST_FACTORY);
     if (rf == null) {
@@ -161,21 +156,11 @@ private void startRecording(String id, final Node<?> node, int operatorId, final
     if (!conflict) {
       logger.debug("Executing start recording request for {}", operatorIdPortNamePair);
 
-      if (gatewayAddress != null && wsClient == null) {
-        synchronized (this) {
-          if (wsClient == null) {
-            try {
-              wsClient = new SharedPubSubWebSocketClient((gatewayUseSsl ? "wss://" : "ws://") + gatewayAddress + "/pubsub", 500);
-              if (gatewayUserName != null && gatewayPassword != null) {
-                wsClient.setLoginUrl((gatewayUseSsl ? "https://" : "http://") + gatewayAddress + StreamingContainerManager.GATEWAY_LOGIN_URL_PATH);
-                wsClient.setUserName(gatewayUserName);
-                wsClient.setPassword(gatewayPassword);
-              }
-              wsClient.setup();
-            } catch (Exception ex) {
-              logger.warn("Error initializing websocket", ex);
-            }
-          }
+      if (wsClient != null) {
+        try {
+          wsClient.openConnection();
+        } catch (Exception e) {
+          logger.warn("Cannot establish websocket connection to uri {}", wsClient.getUri(), e);
         }
       }
 
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
index 01a4c7bb75..5fad04f40a 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
@@ -143,6 +143,9 @@
 
   public static final String KEY_APPLICATION_NAME = keyAndDeprecation(Context.DAGContext.APPLICATION_NAME);
   public static final String KEY_GATEWAY_CONNECT_ADDRESS = keyAndDeprecation(Context.DAGContext.GATEWAY_CONNECT_ADDRESS);
+  public static final String KEY_GATEWAY_USE_SSL = keyAndDeprecation(Context.DAGContext.GATEWAY_USE_SSL);
+  public static final String KEY_GATEWAY_USER_NAME = keyAndDeprecation(Context.DAGContext.GATEWAY_USER_NAME);
+  public static final String KEY_GATEWAY_PASSWORD = keyAndDeprecation(Context.DAGContext.GATEWAY_PASSWORD);
 
   private static String keyAndDeprecation(Attribute<?> attr)
   {
@@ -2248,9 +2251,8 @@ private GenericOperator addOperator(LogicalPlan dag, String name, Class<?> clazz
    */
   public void prepareDAG(LogicalPlan dag, StreamingApplication app, String name)
   {
-    // EVENTUALLY to be replaced by variable enabled configuration in the demo where the attribute below is used
-    String connectAddress = conf.get(KEY_GATEWAY_CONNECT_ADDRESS);
-    dag.setAttribute(Context.DAGContext.GATEWAY_CONNECT_ADDRESS, connectAddress == null ? conf.get(GATEWAY_LISTEN_ADDRESS) : connectAddress);
+    prepareDAGAttributes(dag);
+
     pluginManager.setup(dag);
     if (app != null) {
       pluginManager.dispatch(PRE_POPULATE_DAG.event);
@@ -2277,6 +2279,25 @@ public void prepareDAG(LogicalPlan dag, StreamingApplication app, String name)
     pluginManager.teardown();
   }
 
+  private void prepareDAGAttributes(LogicalPlan dag)
+  {
+    // Consider making all attributes available for DAG construction
+    // EVENTUALLY to be replaced by variable enabled configuration in the demo where the attribute below is used
+    String connectAddress = conf.get(KEY_GATEWAY_CONNECT_ADDRESS);
+    dag.setAttribute(DAGContext.GATEWAY_CONNECT_ADDRESS, connectAddress == null ? conf.get(GATEWAY_LISTEN_ADDRESS) : connectAddress);
+    if (conf.getBoolean(KEY_GATEWAY_USE_SSL, DAGContext.GATEWAY_USE_SSL.defaultValue)) {
+      dag.setAttribute(DAGContext.GATEWAY_USE_SSL, true);
+    }
+    String username = conf.get(KEY_GATEWAY_USER_NAME);
+    if (username != null) {
+      dag.setAttribute(DAGContext.GATEWAY_USER_NAME, username);
+    }
+    String password = conf.get(KEY_GATEWAY_PASSWORD);
+    if (password != null) {
+      dag.setAttribute(DAGContext.GATEWAY_PASSWORD, password);
+    }
+  }
+
   private void flattenDAG(LogicalPlan dag, Configuration conf)
   {
     for (ModuleMeta moduleMeta : dag.getAllModules()) {
diff --git a/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketClient.java b/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketClient.java
index 47986cb892..b8f5bfea94 100644
--- a/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketClient.java
+++ b/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketClient.java
@@ -138,6 +138,11 @@ public void setUri(URI uri)
     this.uri = uri;
   }
 
+  public URI getUri()
+  {
+    return uri;
+  }
+
   public void setIoThreadMultiplier(int ioThreadMultiplier)
   {
     this.ioThreadMultiplier = ioThreadMultiplier;
@@ -239,17 +244,18 @@ public void onOpen(WebSocket ws)
     }
   }
 
+  protected boolean isConnectionSetup()
+  {
+    return (connection != null);
+  }
+
   /**
    *
    * @return true if the connection is open; false otherwise.
    */
   public boolean isConnectionOpen()
   {
-    if (connection == null) {
-      return false;
-    }
-
-    return connection.isOpen();
+    return isConnectionSetup() ? connection.isOpen() : false;
   }
 
   /**
diff --git a/engine/src/main/java/com/datatorrent/stram/util/SharedPubSubWebSocketClient.java b/engine/src/main/java/com/datatorrent/stram/util/SharedPubSubWebSocketClient.java
index 73572f2e8f..d981127a74 100644
--- a/engine/src/main/java/com/datatorrent/stram/util/SharedPubSubWebSocketClient.java
+++ b/engine/src/main/java/com/datatorrent/stram/util/SharedPubSubWebSocketClient.java
@@ -57,6 +57,11 @@
 
   }
 
+  /**
+   * Construct a SharedPubSubWebSocketClient with the given parameters
+   * @param uri The web socket server uri
+   * @param timeoutMillis The connection timeout
+   */
   public SharedPubSubWebSocketClient(URI uri, long timeoutMillis)
   {
     this.setUri(uri);
@@ -64,14 +69,21 @@ public SharedPubSubWebSocketClient(URI uri, long timeoutMillis)
     this.timeoutMillis = timeoutMillis;
   }
 
+  /**
+   * Construct a SharedPubSubWebSocketClient with the given parameters
+   * @param uri The web socket server uri as string
+   * @param timeoutMillis The connection timeout
+   */
   public SharedPubSubWebSocketClient(String uri, long timeoutMillis) throws URISyntaxException
   {
     this(new URI(uri), timeoutMillis);
   }
 
-  public void setup() throws IOException, ExecutionException, InterruptedException, TimeoutException
+  public synchronized void openConnection() throws IOException, ExecutionException, InterruptedException, TimeoutException
   {
-    openConnection(timeoutMillis);
+    if (!isConnectionSetup()) {
+      super.openConnection(timeoutMillis);
+    }
   }
 
   public synchronized void addHandler(String topic, boolean numSubscribers, Handler handler)
diff --git a/engine/src/main/java/org/apache/apex/engine/util/PubSubWebSocketClientBuilder.java b/engine/src/main/java/org/apache/apex/engine/util/PubSubWebSocketClientBuilder.java
new file mode 100644
index 0000000000..dc0bd3a914
--- /dev/null
+++ b/engine/src/main/java/org/apache/apex/engine/util/PubSubWebSocketClientBuilder.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.engine.util;
+
+import java.net.URISyntaxException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
+import com.datatorrent.stram.util.PubSubWebSocketClient;
+import com.datatorrent.stram.util.SharedPubSubWebSocketClient;
+
+public class PubSubWebSocketClientBuilder
+{
+  public static final String GATEWAY_LOGIN_URL_PATH = "/ws/v2/login";
+
+  private static final Logger logger = LoggerFactory.getLogger(PubSubWebSocketClientBuilder.class);
+
+  private Context context;
+
+  public PubSubWebSocketClientBuilder setContext(Context context)
+  {
+    this.context = context;
+    return this;
+  }
+
+  private <T extends PubSubWebSocketClient> T build(Class<T> clazz)
+  {
+    Preconditions.checkState(context != null, "Context not specified");
+    String gatewayAddress = context.getValue(LogicalPlan.GATEWAY_CONNECT_ADDRESS);
+    if (gatewayAddress != null) {
+      int timeout = context.getValue(LogicalPlan.PUBSUB_CONNECT_TIMEOUT_MILLIS);
+      boolean gatewayUseSsl = context.getValue(LogicalPlan.GATEWAY_USE_SSL);
+
+      // The builder can be used to build different types of PubSub clients in future but for now only one is supported
+      SharedPubSubWebSocketClient wsClient = null;
+
+      try {
+        wsClient = new SharedPubSubWebSocketClient((gatewayUseSsl ? "wss://" : "ws://") + gatewayAddress + "/pubsub", timeout);
+
+        String gatewayUserName = context.getValue(LogicalPlan.GATEWAY_USER_NAME);
+        String gatewayPassword = context.getValue(LogicalPlan.GATEWAY_PASSWORD);
+        if (gatewayUserName != null && gatewayPassword != null) {
+          wsClient.setLoginUrl((gatewayUseSsl ? "https://" : "http://") + gatewayAddress + GATEWAY_LOGIN_URL_PATH);
+          wsClient.setUserName(gatewayUserName);
+          wsClient.setPassword(gatewayPassword);
+        }
+
+        return (T)wsClient;
+      } catch (URISyntaxException e) {
+        logger.warn("Unable to initialize websocket for gateway address {}", gatewayAddress, e);
+      }
+
+      return null;
+    }
+
+    return null;
+  }
+
+  public SharedPubSubWebSocketClient build()
+  {
+    return build(SharedPubSubWebSocketClient.class);
+  }
+
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Gateway security settings need to be available in the DAG
> ---------------------------------------------------------
>
>                 Key: APEXCORE-791
>                 URL: https://issues.apache.org/jira/browse/APEXCORE-791
>             Project: Apache Apex Core
>          Issue Type: Bug
>            Reporter: Pramod Immaneni
>            Assignee: Pramod Immaneni
>            Priority: Major
>
> Gateway connect address attribute is available while constructing the DAG but other gateway security-related attributes such as GATEWAY_USE_SSL are not. These need to be made available as well.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)