You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/12/07 13:35:08 UTC
[apex-malhar] branch master updated: APEXMALHAR-2548 Using the
correct websocket scheme when connecting to a SSL cluster
This is an automated email from the ASF dual-hosted git repository.
thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-malhar.git
The following commit(s) were added to refs/heads/master by this push:
new 095a2f7 APEXMALHAR-2548 Using the correct websocket scheme when connecting to a SSL cluster
095a2f7 is described below
commit 095a2f789fa0221d14e92cbdf9bcdd6ba1e933c2
Author: Pramod Immaneni <pr...@datatorrent.com>
AuthorDate: Tue Oct 24 17:10:56 2017 -0700
APEXMALHAR-2548 Using the correct websocket scheme when connecting to a SSL cluster
---
.../datatorrent/apps/logstream/Application.java | 26 ++++-----
.../apex/examples/frauddetect/Application.java | 10 ++--
.../apache/apex/examples/mobile/Application.java | 4 +-
.../apex/examples/mobile/ApplicationTest.java | 6 +-
.../mrmonitor/MRMonitoringApplication.java | 5 +-
.../twitter/KinesisHashtagsApplication.java | 10 ++--
.../twitter/TwitterTopCounterApplication.java | 9 +--
.../wordcount/ApplicationWithQuerySupport.java | 10 +---
.../lib/io/PubSubWebSocketAppDataQuery.java | 9 +--
.../datatorrent/lib/io/WidgetOutputOperator.java | 9 +--
.../apache/apex/malhar/lib/utils/PubSubHelper.java | 67 ++++++++++++++++++++++
.../lib/io/PubSubWebSocketAppDataOperatorTest.java | 10 ++--
.../lib/io/PubSubWebSocketOperatorTest.java | 4 +-
13 files changed, 112 insertions(+), 67 deletions(-)
diff --git a/apps/logstream/src/main/java/com/datatorrent/apps/logstream/Application.java b/apps/logstream/src/main/java/com/datatorrent/apps/logstream/Application.java
index 98dfebd..82c9214 100644
--- a/apps/logstream/src/main/java/com/datatorrent/apps/logstream/Application.java
+++ b/apps/logstream/src/main/java/com/datatorrent/apps/logstream/Application.java
@@ -18,19 +18,24 @@
*/
package com.datatorrent.apps.logstream;
-import java.net.URI;
-import java.util.*;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.apex.malhar.contrib.misc.streamquery.SelectOperator;
import org.apache.apex.malhar.contrib.misc.streamquery.condition.EqualValueCondition;
-
-import org.apache.commons.lang.StringUtils;
+import org.apache.apex.malhar.lib.utils.PubSubHelper;
import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator.InputPort;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.contrib.redis.RedisKeyValPairOutputOperator;
+import com.datatorrent.contrib.redis.RedisMapOutputOperator;
+import com.datatorrent.contrib.redis.RedisNumberSummationMapOutputOperator;
import com.datatorrent.lib.algo.TopN;
import com.datatorrent.lib.io.ConsoleOutputOperator;
import com.datatorrent.lib.io.PubSubWebSocketOutputOperator;
@@ -43,13 +48,6 @@ import com.datatorrent.lib.streamquery.index.ColumnIndex;
import com.datatorrent.lib.util.AbstractDimensionTimeBucketOperator;
import com.datatorrent.lib.util.DimensionTimeBucketSumOperator;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.Operator.InputPort;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.contrib.redis.RedisKeyValPairOutputOperator;
-import com.datatorrent.contrib.redis.RedisMapOutputOperator;
-import com.datatorrent.contrib.redis.RedisNumberSummationMapOutputOperator;
-
/**
* Log stream processing application based on Apex platform.<br>
* This application consumes log data generated by running systems and services
@@ -156,14 +154,12 @@ public class Application implements StreamingApplication
private InputPort<Object> wsOutput(DAG dag, String operatorName)
{
- String daemonAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
- if (!StringUtils.isEmpty(daemonAddress)) {
- URI uri = URI.create("ws://" + daemonAddress + "/pubsub");
+ if (PubSubHelper.isGatewayConfigured(dag)) {
String appId = "appid";
//appId = dag.attrValue(DAG.APPLICATION_ID, null); // will be used once UI is able to pick applications from list and listen to corresponding application
String topic = "apps.logstream." + appId + "." + operatorName;
PubSubWebSocketOutputOperator<Object> wsOut = dag.addOperator(operatorName, new PubSubWebSocketOutputOperator<Object>());
- wsOut.setUri(uri);
+ wsOut.setUri(PubSubHelper.getURI(dag));
wsOut.setTopic(topic);
return wsOut.input;
}
diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/Application.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/Application.java
index 73c38ef..eed9a68 100644
--- a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/Application.java
+++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/Application.java
@@ -20,11 +20,13 @@ package org.apache.apex.examples.frauddetect;
import java.io.Serializable;
import java.net.URI;
+
import org.apache.apex.examples.frauddetect.operator.HdfsStringOutputOperator;
import org.apache.apex.examples.frauddetect.operator.MongoDBOutputOperator;
+import org.apache.apex.malhar.lib.utils.PubSubHelper;
import org.apache.hadoop.conf.Configuration;
+
import com.datatorrent.api.Context;
-import com.datatorrent.api.Context.DAGContext;
import com.datatorrent.api.DAG;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
@@ -88,11 +90,7 @@ public class Application implements StreamingApplication
{
try {
- String gatewayAddress = dag.getValue(DAGContext.GATEWAY_CONNECT_ADDRESS);
- if (gatewayAddress == null) {
- gatewayAddress = "localhost:9090";
- }
- URI duri = URI.create("ws://" + gatewayAddress + "/pubsub");
+ URI duri = PubSubHelper.getURIWithDefault(dag, "localhost:9090");
PubSubWebSocketInputOperator userTxWsInput = getPubSubWebSocketInputOperator("userTxInput", dag, duri, "examples.app.frauddetect.submitTransaction");
PubSubWebSocketOutputOperator ccUserAlertWsOutput = getPubSubWebSocketOutputOperator("ccUserAlertQueryOutput", dag, duri, "examples.app.frauddetect.fraudAlert");
diff --git a/examples/mobile/src/main/java/org/apache/apex/examples/mobile/Application.java b/examples/mobile/src/main/java/org/apache/apex/examples/mobile/Application.java
index f719643..dd1e136 100644
--- a/examples/mobile/src/main/java/org/apache/apex/examples/mobile/Application.java
+++ b/examples/mobile/src/main/java/org/apache/apex/examples/mobile/Application.java
@@ -26,6 +26,7 @@ import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.apex.malhar.lib.utils.PubSubHelper;
import org.apache.commons.lang.mutable.MutableLong;
import org.apache.commons.lang3.Range;
import org.apache.hadoop.conf.Configuration;
@@ -157,8 +158,7 @@ public class Application implements StreamingApplication
// done generating data
LOG.info("Finished generating seed data.");
- String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
- URI uri = URI.create("ws://" + gatewayAddress + "/pubsub");
+ URI uri = PubSubHelper.getURI(dag);
PubSubWebSocketOutputOperator<Object> wsOut = dag.addOperator("LocationResults", new PubSubWebSocketOutputOperator<Object>());
wsOut.setUri(uri);
PubSubWebSocketInputOperator<Map<String, String>> wsIn = dag.addOperator("QueryLocation", new PubSubWebSocketInputOperator<Map<String, String>>());
diff --git a/examples/mobile/src/test/java/org/apache/apex/examples/mobile/ApplicationTest.java b/examples/mobile/src/test/java/org/apache/apex/examples/mobile/ApplicationTest.java
index ce6ca41..b88ed57 100644
--- a/examples/mobile/src/test/java/org/apache/apex/examples/mobile/ApplicationTest.java
+++ b/examples/mobile/src/test/java/org/apache/apex/examples/mobile/ApplicationTest.java
@@ -21,7 +21,6 @@ package org.apache.apex.examples.mobile;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
-
import javax.servlet.Servlet;
import org.eclipse.jetty.server.Connector;
@@ -32,11 +31,10 @@ import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
+import org.apache.apex.malhar.lib.utils.PubSubHelper;
import org.apache.hadoop.conf.Configuration;
import com.datatorrent.api.LocalMode;
-
import com.datatorrent.lib.helper.SamplePubSubWebSocketServlet;
import com.datatorrent.lib.io.PubSubWebSocketInputOperator;
import com.datatorrent.lib.io.PubSubWebSocketOutputOperator;
@@ -67,7 +65,7 @@ public class ApplicationTest
server.start();
Connector[] connector = server.getConnectors();
conf.set("dt.attr.GATEWAY_CONNECT_ADDRESS", "localhost:" + connector[0].getLocalPort());
- URI uri = URI.create("ws://localhost:" + connector[0].getLocalPort() + "/pubsub");
+ URI uri = PubSubHelper.getURI("localhost:" + connector[0].getLocalPort());
PubSubWebSocketOutputOperator<Object> outputOperator = new PubSubWebSocketOutputOperator<Object>();
outputOperator.setUri(uri);
diff --git a/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRMonitoringApplication.java b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRMonitoringApplication.java
index 288da84..55d98aa 100644
--- a/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRMonitoringApplication.java
+++ b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRMonitoringApplication.java
@@ -23,6 +23,7 @@ import java.net.URI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.apex.malhar.lib.utils.PubSubHelper;
import org.apache.hadoop.conf.Configuration;
import com.datatorrent.api.DAG;
@@ -48,10 +49,8 @@ public class MRMonitoringApplication implements StreamingApplication
@Override
public void populateDAG(DAG dag, Configuration conf)
{
- String daemonAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
MRJobStatusOperator mrJobOperator = dag.addOperator("JobMonitor", new MRJobStatusOperator());
- URI uri = URI.create("ws://" + daemonAddress + "/pubsub");
- logger.info("WebSocket with daemon at {}", daemonAddress);
+ URI uri = PubSubHelper.getURI(dag);
PubSubWebSocketInputOperator wsIn = dag.addOperator("Query", new PubSubWebSocketInputOperator());
wsIn.setUri(uri);
diff --git a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/KinesisHashtagsApplication.java b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/KinesisHashtagsApplication.java
index be7edfb..225ea25 100644
--- a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/KinesisHashtagsApplication.java
+++ b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/KinesisHashtagsApplication.java
@@ -18,9 +18,9 @@
*/
package org.apache.apex.examples.twitter;
-import java.net.URI;
-import org.apache.commons.lang.StringUtils;
+import org.apache.apex.malhar.lib.utils.PubSubHelper;
import org.apache.hadoop.conf.Configuration;
+
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.Operator.InputPort;
@@ -174,13 +174,11 @@ public class KinesisHashtagsApplication implements StreamingApplication
private InputPort<Object> consoleOutput(DAG dag, String operatorName)
{
- String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
- if (!StringUtils.isEmpty(gatewayAddress)) {
- URI uri = URI.create("ws://" + gatewayAddress + "/pubsub");
+ if (PubSubHelper.isGatewayConfigured(dag)) {
String topic = "examples.twitter." + operatorName;
//LOG.info("WebSocket with gateway at: {}", gatewayAddress);
PubSubWebSocketOutputOperator<Object> wsOut = dag.addOperator(operatorName, new PubSubWebSocketOutputOperator<Object>());
- wsOut.setUri(uri);
+ wsOut.setUri(PubSubHelper.getURI(dag));
wsOut.setTopic(topic);
return wsOut.input;
}
diff --git a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTopCounterApplication.java b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTopCounterApplication.java
index ee43383..77384a8 100644
--- a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTopCounterApplication.java
+++ b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTopCounterApplication.java
@@ -22,11 +22,10 @@ import java.net.URI;
import java.util.List;
import java.util.Map;
-import org.apache.commons.lang.StringUtils;
+import org.apache.apex.malhar.lib.utils.PubSubHelper;
import org.apache.hadoop.conf.Configuration;
import com.google.common.collect.Maps;
-
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
@@ -34,7 +33,6 @@ import com.datatorrent.api.Operator;
import com.datatorrent.api.Operator.OutputPort;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
-
import com.datatorrent.contrib.twitter.TwitterSampleInput;
import com.datatorrent.lib.algo.UniqueCounter;
import com.datatorrent.lib.appdata.schemas.SchemaUtils;
@@ -189,9 +187,8 @@ public class TwitterTopCounterApplication implements StreamingApplication
public static void consoleOutput(DAG dag, String operatorName, OutputPort<List<Map<String, Object>>> topCount, String schemaFile, String alias)
{
- String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
- if (!StringUtils.isEmpty(gatewayAddress)) {
- URI uri = URI.create("ws://" + gatewayAddress + "/pubsub");
+ if (PubSubHelper.isGatewayConfigured(dag)) {
+ URI uri = PubSubHelper.getURI(dag);
AppDataSnapshotServerMap snapshotServer = dag.addOperator("SnapshotServer", new AppDataSnapshotServerMap());
diff --git a/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/ApplicationWithQuerySupport.java b/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/ApplicationWithQuerySupport.java
index 699469b..440b30a 100644
--- a/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/ApplicationWithQuerySupport.java
+++ b/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/ApplicationWithQuerySupport.java
@@ -22,15 +22,13 @@ import java.net.URI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-import org.apache.commons.lang.StringUtils;
+import org.apache.apex.malhar.lib.utils.PubSubHelper;
import org.apache.hadoop.conf.Configuration;
import com.datatorrent.api.DAG;
import com.datatorrent.api.Operator;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
-
import com.datatorrent.lib.appdata.schemas.SchemaUtils;
import com.datatorrent.lib.appdata.snapshot.AppDataSnapshotServerMap;
import com.datatorrent.lib.io.ConsoleOutputOperator;
@@ -84,10 +82,8 @@ public class ApplicationWithQuerySupport implements StreamingApplication
dag.addStream("windowWordCounts", windowWordCount.output, fileWordCount.input);
dag.addStream("fileWordCounts", fileWordCount.fileOutput, wcWriter.input);
- String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
-
- if (!StringUtils.isEmpty(gatewayAddress)) { // add query support
- URI uri = URI.create("ws://" + gatewayAddress + "/pubsub");
+ if (PubSubHelper.isGatewayConfigured(dag)) { // add query support
+ URI uri = PubSubHelper.getURI(dag);
AppDataSnapshotServerMap snapshotServerFile
= dag.addOperator("snapshotServerFile", new AppDataSnapshotServerMap());
diff --git a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java
index 3f2029e..9b1e0cf 100644
--- a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java
+++ b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java
@@ -19,14 +19,13 @@
package com.datatorrent.lib.io;
import java.net.URI;
-import java.net.URISyntaxException;
-
import javax.validation.constraints.Min;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.apex.malhar.lib.utils.PubSubHelper;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DAG;
@@ -116,10 +115,8 @@ public class PubSubWebSocketAppDataQuery extends PubSubWebSocketInputOperator<St
}
try {
- uri = new URI("ws://"
- + context.getValue(DAG.GATEWAY_CONNECT_ADDRESS)
- + "/pubsub");
- } catch (URISyntaxException ex) {
+ uri = PubSubHelper.getURI(context);
+ } catch (Exception ex) {
throw new RuntimeException(ex);
}
}
diff --git a/library/src/main/java/com/datatorrent/lib/io/WidgetOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/WidgetOutputOperator.java
index b027b58..a9030d4 100644
--- a/library/src/main/java/com/datatorrent/lib/io/WidgetOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/WidgetOutputOperator.java
@@ -20,17 +20,15 @@ package com.datatorrent.lib.io;
import java.io.IOException;
import java.lang.reflect.Array;
-import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
-import org.apache.commons.lang.StringUtils;
+import org.apache.apex.malhar.lib.utils.PubSubHelper;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import com.google.common.collect.Maps;
-
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
@@ -129,9 +127,8 @@ public class WidgetOutputOperator extends BaseOperator
@Override
public void setup(OperatorContext context)
{
- String gatewayAddress = context.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
- if (!StringUtils.isEmpty(gatewayAddress)) {
- wsoo.setUri(URI.create("ws://" + gatewayAddress + "/pubsub"));
+ if (PubSubHelper.isGatewayConfigured(context)) {
+ wsoo.setUri(PubSubHelper.getURI(context));
wsoo.setup(context);
} else {
isWebSocketConnected = false;
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/PubSubHelper.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/PubSubHelper.java
new file mode 100644
index 0000000..51eaeee
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/PubSubHelper.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils;
+
+import java.net.URI;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.Context;
+
+@InterfaceStability.Evolving
+public class PubSubHelper
+{
+ private static final Logger logger = LoggerFactory.getLogger(PubSubHelper.class);
+
+ public static boolean isGatewayConfigured(Context context)
+ {
+ return context.getValue(Context.DAGContext.GATEWAY_CONNECT_ADDRESS) != null;
+ }
+
+ public static URI getURI(Context context)
+ {
+ return getURIWithDefault(context, null);
+ }
+
+ public static URI getURIWithDefault(Context context, String defaultAddress)
+ {
+ String address = context.getValue(Context.DAGContext.GATEWAY_CONNECT_ADDRESS);
+ if (address == null) {
+ address = defaultAddress;
+ }
+ return getURI(address, context.getValue(Context.DAGContext.GATEWAY_USE_SSL));
+ }
+
+ public static URI getURI(String address)
+ {
+ return getURI(address, false);
+ }
+
+ public static URI getURI(String address, boolean useSSL)
+ {
+ if (address == null) {
+ throw new NullPointerException("No address specified");
+ }
+ String uri = (useSSL ? "wss://" : "ws://") + address + "/pubsub";
+ logger.debug("PubSub uri {}", uri);
+ return URI.create(uri);
+ }
+}
diff --git a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataOperatorTest.java
index 7801619..fc49bea 100644
--- a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataOperatorTest.java
@@ -20,25 +20,25 @@ package com.datatorrent.lib.io;
import java.lang.reflect.Method;
import java.net.URI;
-import java.net.URISyntaxException;
import org.junit.Assert;
import org.junit.Test;
+import org.apache.apex.malhar.lib.utils.PubSubHelper;
import com.datatorrent.common.experimental.AppData;
public abstract class PubSubWebSocketAppDataOperatorTest
{
public static final String GATEWAY_CONNECT_ADDRESS_STRING = "my.gateway.com";
- public static final String URI_ADDRESS_STRING = "ws://localhost:6666/pubsub";
+ public static final String URI_ADDRESS_STRING = "localhost:6666";
public static final URI GATEWAY_CONNECT_ADDRESS;
public static final URI URI_ADDRESS;
static {
try {
- GATEWAY_CONNECT_ADDRESS = new URI("ws://" + GATEWAY_CONNECT_ADDRESS_STRING + "/pubsub");
- URI_ADDRESS = new URI(URI_ADDRESS_STRING);
- } catch (URISyntaxException ex) {
+ GATEWAY_CONNECT_ADDRESS = PubSubHelper.getURI(GATEWAY_CONNECT_ADDRESS_STRING);
+ URI_ADDRESS = PubSubHelper.getURI(URI_ADDRESS_STRING);
+ } catch (Exception ex) {
throw new RuntimeException(ex);
}
}
diff --git a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketOperatorTest.java
index e165649..7f1e4dd 100644
--- a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketOperatorTest.java
@@ -30,6 +30,8 @@ import org.eclipse.jetty.servlet.ServletHolder;
import org.junit.Assert;
import org.junit.Test;
+import org.apache.apex.malhar.lib.utils.PubSubHelper;
+
import com.datatorrent.lib.helper.SamplePubSubWebSocketServlet;
import com.datatorrent.lib.testbench.CollectorTestSink;
@@ -51,7 +53,7 @@ public class PubSubWebSocketOperatorTest
contextHandler.addServlet(sh, "/*");
server.start();
Connector[] connector = server.getConnectors();
- URI uri = URI.create("ws://localhost:" + connector[0].getLocalPort() + "/pubsub");
+ URI uri = PubSubHelper.getURI("localhost:" + connector[0].getLocalPort());
PubSubWebSocketOutputOperator<Object> outputOperator = new PubSubWebSocketOutputOperator<Object>();
outputOperator.setUri(uri);
--
To stop receiving notification emails like this one, please contact
['"commits@apex.apache.org" <co...@apex.apache.org>'].