You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2017/11/03 20:07:15 UTC
[apex-core] branch master updated: APEXCORE-791 Making gateway
security related settings available during construction of the DAG
This is an automated email from the ASF dual-hosted git repository.
vrozov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-core.git
The following commit(s) were added to refs/heads/master by this push:
new 9ca51f2 APEXCORE-791 Making gateway security related settings available during construction of the DAG
9ca51f2 is described below
commit 9ca51f2683ffe61f8c550630ad8412b37da50393
Author: Pramod Immaneni <pr...@datatorrent.com>
AuthorDate: Tue Oct 24 15:34:47 2017 -0700
APEXCORE-791 Making gateway security related settings available during construction of the DAG
---
.../stram/StreamingContainerManager.java | 23 ++----
.../stram/debug/TupleRecorderCollection.java | 33 +++------
.../plan/logical/LogicalPlanConfiguration.java | 27 ++++++-
.../stram/util/PubSubWebSocketClient.java | 16 ++--
.../stram/util/SharedPubSubWebSocketClient.java | 16 +++-
.../engine/util/PubSubWebSocketClientBuilder.java | 85 ++++++++++++++++++++++
6 files changed, 149 insertions(+), 51 deletions(-)
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 07641d2..65be99a 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -71,6 +71,7 @@ import org.apache.apex.engine.events.grouping.GroupingRequest.EventGroupId;
import org.apache.apex.engine.plugin.ApexPluginDispatcher;
import org.apache.apex.engine.plugin.NoOpApexPluginDispatcher;
import org.apache.apex.engine.util.CascadeStorageAgent;
+import org.apache.apex.engine.util.PubSubWebSocketClientBuilder;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.ToStringBuilder;
@@ -197,7 +198,6 @@ import net.engio.mbassy.bus.config.BusConfiguration;
public class StreamingContainerManager implements PlanContext
{
private static final Logger LOG = LoggerFactory.getLogger(StreamingContainerManager.class);
- public static final String GATEWAY_LOGIN_URL_PATH = "/ws/v2/login";
public static final String BUILTIN_APPDATA_URL = "builtin";
public static final String CONTAINERS_INFO_FILENAME_FORMAT = "containers_%d.json";
public static final String OPERATORS_INFO_FILENAME_FORMAT = "operators_%d.json";
@@ -556,23 +556,12 @@ public class StreamingContainerManager implements PlanContext
private void setupWsClient()
{
- String gatewayAddress = plan.getLogicalPlan().getValue(LogicalPlan.GATEWAY_CONNECT_ADDRESS);
- boolean gatewayUseSsl = plan.getLogicalPlan().getValue(LogicalPlan.GATEWAY_USE_SSL);
- String gatewayUserName = plan.getLogicalPlan().getValue(LogicalPlan.GATEWAY_USER_NAME);
- String gatewayPassword = plan.getLogicalPlan().getValue(LogicalPlan.GATEWAY_PASSWORD);
- int timeout = plan.getLogicalPlan().getValue(LogicalPlan.PUBSUB_CONNECT_TIMEOUT_MILLIS);
-
- if (gatewayAddress != null) {
+ wsClient = new PubSubWebSocketClientBuilder().setContext(plan.getLogicalPlan()).build();
+ if (wsClient != null) {
try {
- wsClient = new SharedPubSubWebSocketClient((gatewayUseSsl ? "wss://" : "ws://") + gatewayAddress + "/pubsub", timeout);
- if (gatewayUserName != null && gatewayPassword != null) {
- wsClient.setLoginUrl((gatewayUseSsl ? "https://" : "http://") + gatewayAddress + GATEWAY_LOGIN_URL_PATH);
- wsClient.setUserName(gatewayUserName);
- wsClient.setPassword(gatewayPassword);
- }
- wsClient.setup();
- } catch (Exception ex) {
- LOG.warn("Cannot establish websocket connection to {}", gatewayAddress, ex);
+ wsClient.openConnection();
+ } catch (Exception e) {
+ LOG.warn("Cannot establish websocket connection to uri {}", wsClient.getUri(), e);
}
}
}
diff --git a/engine/src/main/java/com/datatorrent/stram/debug/TupleRecorderCollection.java b/engine/src/main/java/com/datatorrent/stram/debug/TupleRecorderCollection.java
index e41b348..49998cf 100644
--- a/engine/src/main/java/com/datatorrent/stram/debug/TupleRecorderCollection.java
+++ b/engine/src/main/java/com/datatorrent/stram/debug/TupleRecorderCollection.java
@@ -26,6 +26,8 @@ import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.apex.engine.util.PubSubWebSocketClientBuilder;
+
import com.datatorrent.api.Component;
import com.datatorrent.api.Context;
import com.datatorrent.api.Context.OperatorContext;
@@ -39,7 +41,6 @@ import com.datatorrent.api.Stats.OperatorStats.PortStats;
import com.datatorrent.api.StatsListener;
import com.datatorrent.api.StatsListener.OperatorRequest;
import com.datatorrent.api.StringCodec;
-import com.datatorrent.stram.StreamingContainerManager;
import com.datatorrent.stram.api.ContainerContext;
import com.datatorrent.stram.api.ContainerEvent.ContainerStatsEvent;
import com.datatorrent.stram.api.ContainerEvent.NodeActivationEvent;
@@ -66,10 +67,6 @@ import net.engio.mbassy.listener.Handler;
public class TupleRecorderCollection extends HashMap<OperatorIdPortNamePair, TupleRecorder> implements Component<Context>
{
private int tupleRecordingPartFileSize;
- private String gatewayAddress;
- private boolean gatewayUseSsl = false;
- private String gatewayUserName;
- private String gatewayPassword;
private long tupleRecordingPartFileTimeMillis;
private String appPath;
private String appId;
@@ -89,13 +86,11 @@ public class TupleRecorderCollection extends HashMap<OperatorIdPortNamePair, Tup
tupleRecordingPartFileSize = ctx.getValue(LogicalPlan.TUPLE_RECORDING_PART_FILE_SIZE);
tupleRecordingPartFileTimeMillis = ctx.getValue(LogicalPlan.TUPLE_RECORDING_PART_FILE_TIME_MILLIS);
appId = ctx.getValue(LogicalPlan.APPLICATION_ID);
- gatewayAddress = ctx.getValue(LogicalPlan.GATEWAY_CONNECT_ADDRESS);
- gatewayUseSsl = ctx.getValue(LogicalPlan.GATEWAY_USE_SSL);
- gatewayUserName = ctx.getValue(LogicalPlan.GATEWAY_USER_NAME);
- gatewayPassword = ctx.getValue(LogicalPlan.GATEWAY_PASSWORD);
appPath = ctx.getValue(LogicalPlan.APPLICATION_PATH);
codecs = ctx.getAttributes().get(Context.DAGContext.STRING_CODECS);
+ wsClient = new PubSubWebSocketClientBuilder().setContext(ctx).build();
+
RequestDelegateImpl impl = new RequestDelegateImpl();
RequestFactory rf = ctx.getValue(ContainerContext.REQUEST_FACTORY);
if (rf == null) {
@@ -161,21 +156,11 @@ public class TupleRecorderCollection extends HashMap<OperatorIdPortNamePair, Tup
if (!conflict) {
logger.debug("Executing start recording request for {}", operatorIdPortNamePair);
- if (gatewayAddress != null && wsClient == null) {
- synchronized (this) {
- if (wsClient == null) {
- try {
- wsClient = new SharedPubSubWebSocketClient((gatewayUseSsl ? "wss://" : "ws://") + gatewayAddress + "/pubsub", 500);
- if (gatewayUserName != null && gatewayPassword != null) {
- wsClient.setLoginUrl((gatewayUseSsl ? "https://" : "http://") + gatewayAddress + StreamingContainerManager.GATEWAY_LOGIN_URL_PATH);
- wsClient.setUserName(gatewayUserName);
- wsClient.setPassword(gatewayPassword);
- }
- wsClient.setup();
- } catch (Exception ex) {
- logger.warn("Error initializing websocket", ex);
- }
- }
+ if (wsClient != null) {
+ try {
+ wsClient.openConnection();
+ } catch (Exception e) {
+ logger.warn("Cannot establish websocket connection to uri {}", wsClient.getUri(), e);
}
}
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
index 01a4c7b..5fad04f 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
@@ -143,6 +143,9 @@ public class LogicalPlanConfiguration
public static final String KEY_APPLICATION_NAME = keyAndDeprecation(Context.DAGContext.APPLICATION_NAME);
public static final String KEY_GATEWAY_CONNECT_ADDRESS = keyAndDeprecation(Context.DAGContext.GATEWAY_CONNECT_ADDRESS);
+ public static final String KEY_GATEWAY_USE_SSL = keyAndDeprecation(Context.DAGContext.GATEWAY_USE_SSL);
+ public static final String KEY_GATEWAY_USER_NAME = keyAndDeprecation(Context.DAGContext.GATEWAY_USER_NAME);
+ public static final String KEY_GATEWAY_PASSWORD = keyAndDeprecation(Context.DAGContext.GATEWAY_PASSWORD);
private static String keyAndDeprecation(Attribute<?> attr)
{
@@ -2248,9 +2251,8 @@ public class LogicalPlanConfiguration
*/
public void prepareDAG(LogicalPlan dag, StreamingApplication app, String name)
{
- // EVENTUALLY to be replaced by variable enabled configuration in the demo where the attribute below is used
- String connectAddress = conf.get(KEY_GATEWAY_CONNECT_ADDRESS);
- dag.setAttribute(Context.DAGContext.GATEWAY_CONNECT_ADDRESS, connectAddress == null ? conf.get(GATEWAY_LISTEN_ADDRESS) : connectAddress);
+ prepareDAGAttributes(dag);
+
pluginManager.setup(dag);
if (app != null) {
pluginManager.dispatch(PRE_POPULATE_DAG.event);
@@ -2277,6 +2279,25 @@ public class LogicalPlanConfiguration
pluginManager.teardown();
}
+ private void prepareDAGAttributes(LogicalPlan dag)
+ {
+ // Consider making all attributes available for DAG construction
+ // EVENTUALLY to be replaced by variable enabled configuration in the demo where the attribute below is used
+ String connectAddress = conf.get(KEY_GATEWAY_CONNECT_ADDRESS);
+ dag.setAttribute(DAGContext.GATEWAY_CONNECT_ADDRESS, connectAddress == null ? conf.get(GATEWAY_LISTEN_ADDRESS) : connectAddress);
+ if (conf.getBoolean(KEY_GATEWAY_USE_SSL, DAGContext.GATEWAY_USE_SSL.defaultValue)) {
+ dag.setAttribute(DAGContext.GATEWAY_USE_SSL, true);
+ }
+ String username = conf.get(KEY_GATEWAY_USER_NAME);
+ if (username != null) {
+ dag.setAttribute(DAGContext.GATEWAY_USER_NAME, username);
+ }
+ String password = conf.get(KEY_GATEWAY_PASSWORD);
+ if (password != null) {
+ dag.setAttribute(DAGContext.GATEWAY_PASSWORD, password);
+ }
+ }
+
private void flattenDAG(LogicalPlan dag, Configuration conf)
{
for (ModuleMeta moduleMeta : dag.getAllModules()) {
diff --git a/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketClient.java b/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketClient.java
index 47986cb..b8f5bfe 100644
--- a/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketClient.java
+++ b/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketClient.java
@@ -138,6 +138,11 @@ public abstract class PubSubWebSocketClient implements Component<Context>
this.uri = uri;
}
+ public URI getUri()
+ {
+ return uri;
+ }
+
public void setIoThreadMultiplier(int ioThreadMultiplier)
{
this.ioThreadMultiplier = ioThreadMultiplier;
@@ -239,17 +244,18 @@ public abstract class PubSubWebSocketClient implements Component<Context>
}
}
+ protected boolean isConnectionSetup()
+ {
+ return (connection != null);
+ }
+
/**
*
* @return true if the connection is open; false otherwise.
*/
public boolean isConnectionOpen()
{
- if (connection == null) {
- return false;
- }
-
- return connection.isOpen();
+ return isConnectionSetup() ? connection.isOpen() : false;
}
/**
diff --git a/engine/src/main/java/com/datatorrent/stram/util/SharedPubSubWebSocketClient.java b/engine/src/main/java/com/datatorrent/stram/util/SharedPubSubWebSocketClient.java
index 73572f2..d981127 100644
--- a/engine/src/main/java/com/datatorrent/stram/util/SharedPubSubWebSocketClient.java
+++ b/engine/src/main/java/com/datatorrent/stram/util/SharedPubSubWebSocketClient.java
@@ -57,6 +57,11 @@ public class SharedPubSubWebSocketClient extends PubSubWebSocketClient
}
+ /**
+ * Construct a SharedPubSubWebSocketClient with the given parameters
+ * @param uri The web socket server uri
+ * @param timeoutMillis The connection timeout
+ */
public SharedPubSubWebSocketClient(URI uri, long timeoutMillis)
{
this.setUri(uri);
@@ -64,14 +69,21 @@ public class SharedPubSubWebSocketClient extends PubSubWebSocketClient
this.timeoutMillis = timeoutMillis;
}
+ /**
+ * Construct a SharedPubSubWebSocketClient with the given parameters
+ * @param uri The web socket server uri as string
+ * @param timeoutMillis The connection timeout
+ */
public SharedPubSubWebSocketClient(String uri, long timeoutMillis) throws URISyntaxException
{
this(new URI(uri), timeoutMillis);
}
- public void setup() throws IOException, ExecutionException, InterruptedException, TimeoutException
+ public synchronized void openConnection() throws IOException, ExecutionException, InterruptedException, TimeoutException
{
- openConnection(timeoutMillis);
+ if (!isConnectionSetup()) {
+ super.openConnection(timeoutMillis);
+ }
}
public synchronized void addHandler(String topic, boolean numSubscribers, Handler handler)
diff --git a/engine/src/main/java/org/apache/apex/engine/util/PubSubWebSocketClientBuilder.java b/engine/src/main/java/org/apache/apex/engine/util/PubSubWebSocketClientBuilder.java
new file mode 100644
index 0000000..dc0bd3a
--- /dev/null
+++ b/engine/src/main/java/org/apache/apex/engine/util/PubSubWebSocketClientBuilder.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.engine.util;
+
+import java.net.URISyntaxException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
+import com.datatorrent.stram.util.PubSubWebSocketClient;
+import com.datatorrent.stram.util.SharedPubSubWebSocketClient;
+
+public class PubSubWebSocketClientBuilder
+{
+ public static final String GATEWAY_LOGIN_URL_PATH = "/ws/v2/login";
+
+ private static final Logger logger = LoggerFactory.getLogger(PubSubWebSocketClientBuilder.class);
+
+ private Context context;
+
+ public PubSubWebSocketClientBuilder setContext(Context context)
+ {
+ this.context = context;
+ return this;
+ }
+
+ private <T extends PubSubWebSocketClient> T build(Class<T> clazz)
+ {
+ Preconditions.checkState(context != null, "Context not specified");
+ String gatewayAddress = context.getValue(LogicalPlan.GATEWAY_CONNECT_ADDRESS);
+ if (gatewayAddress != null) {
+ int timeout = context.getValue(LogicalPlan.PUBSUB_CONNECT_TIMEOUT_MILLIS);
+ boolean gatewayUseSsl = context.getValue(LogicalPlan.GATEWAY_USE_SSL);
+
+ // The builder can be used to build different types of PubSub clients in future but for now only one is supported
+ SharedPubSubWebSocketClient wsClient = null;
+
+ try {
+ wsClient = new SharedPubSubWebSocketClient((gatewayUseSsl ? "wss://" : "ws://") + gatewayAddress + "/pubsub", timeout);
+
+ String gatewayUserName = context.getValue(LogicalPlan.GATEWAY_USER_NAME);
+ String gatewayPassword = context.getValue(LogicalPlan.GATEWAY_PASSWORD);
+ if (gatewayUserName != null && gatewayPassword != null) {
+ wsClient.setLoginUrl((gatewayUseSsl ? "https://" : "http://") + gatewayAddress + GATEWAY_LOGIN_URL_PATH);
+ wsClient.setUserName(gatewayUserName);
+ wsClient.setPassword(gatewayPassword);
+ }
+
+ return (T)wsClient;
+ } catch (URISyntaxException e) {
+ logger.warn("Unable to initialize websocket for gateway address {}", gatewayAddress, e);
+ }
+
+ return null;
+ }
+
+ return null;
+ }
+
+ public SharedPubSubWebSocketClient build()
+ {
+ return build(SharedPubSubWebSocketClient.class);
+ }
+
+}
--
To stop receiving notification emails like this one, please contact
['"commits@apex.apache.org" <co...@apex.apache.org>'].