You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/12/07 13:35:08 UTC

[apex-malhar] branch master updated: APEXMALHAR-2548 Using the correct websocket scheme when connecting to a SSL cluster

This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-malhar.git


The following commit(s) were added to refs/heads/master by this push:
     new 095a2f7  APEXMALHAR-2548 Using the correct websocket scheme when connecting to a SSL cluster
095a2f7 is described below

commit 095a2f789fa0221d14e92cbdf9bcdd6ba1e933c2
Author: Pramod Immaneni <pr...@datatorrent.com>
AuthorDate: Tue Oct 24 17:10:56 2017 -0700

    APEXMALHAR-2548 Using the correct websocket scheme when connecting to a SSL cluster
---
 .../datatorrent/apps/logstream/Application.java    | 26 ++++-----
 .../apex/examples/frauddetect/Application.java     | 10 ++--
 .../apache/apex/examples/mobile/Application.java   |  4 +-
 .../apex/examples/mobile/ApplicationTest.java      |  6 +-
 .../mrmonitor/MRMonitoringApplication.java         |  5 +-
 .../twitter/KinesisHashtagsApplication.java        | 10 ++--
 .../twitter/TwitterTopCounterApplication.java      |  9 +--
 .../wordcount/ApplicationWithQuerySupport.java     | 10 +---
 .../lib/io/PubSubWebSocketAppDataQuery.java        |  9 +--
 .../datatorrent/lib/io/WidgetOutputOperator.java   |  9 +--
 .../apache/apex/malhar/lib/utils/PubSubHelper.java | 67 ++++++++++++++++++++++
 .../lib/io/PubSubWebSocketAppDataOperatorTest.java | 10 ++--
 .../lib/io/PubSubWebSocketOperatorTest.java        |  4 +-
 13 files changed, 112 insertions(+), 67 deletions(-)

diff --git a/apps/logstream/src/main/java/com/datatorrent/apps/logstream/Application.java b/apps/logstream/src/main/java/com/datatorrent/apps/logstream/Application.java
index 98dfebd..82c9214 100644
--- a/apps/logstream/src/main/java/com/datatorrent/apps/logstream/Application.java
+++ b/apps/logstream/src/main/java/com/datatorrent/apps/logstream/Application.java
@@ -18,19 +18,24 @@
  */
 package com.datatorrent.apps.logstream;
 
-import java.net.URI;
-import java.util.*;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.apex.malhar.contrib.misc.streamquery.SelectOperator;
 import org.apache.apex.malhar.contrib.misc.streamquery.condition.EqualValueCondition;
-
-import org.apache.commons.lang.StringUtils;
+import org.apache.apex.malhar.lib.utils.PubSubHelper;
 import org.apache.hadoop.conf.Configuration;
 
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator.InputPort;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.contrib.redis.RedisKeyValPairOutputOperator;
+import com.datatorrent.contrib.redis.RedisMapOutputOperator;
+import com.datatorrent.contrib.redis.RedisNumberSummationMapOutputOperator;
 import com.datatorrent.lib.algo.TopN;
 import com.datatorrent.lib.io.ConsoleOutputOperator;
 import com.datatorrent.lib.io.PubSubWebSocketOutputOperator;
@@ -43,13 +48,6 @@ import com.datatorrent.lib.streamquery.index.ColumnIndex;
 import com.datatorrent.lib.util.AbstractDimensionTimeBucketOperator;
 import com.datatorrent.lib.util.DimensionTimeBucketSumOperator;
 
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.Operator.InputPort;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.contrib.redis.RedisKeyValPairOutputOperator;
-import com.datatorrent.contrib.redis.RedisMapOutputOperator;
-import com.datatorrent.contrib.redis.RedisNumberSummationMapOutputOperator;
-
 /**
  * Log stream processing application based on Apex platform.<br>
  * This application consumes log data generated by running systems and services
@@ -156,14 +154,12 @@ public class Application implements StreamingApplication
 
   private InputPort<Object> wsOutput(DAG dag, String operatorName)
   {
-    String daemonAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
-    if (!StringUtils.isEmpty(daemonAddress)) {
-      URI uri = URI.create("ws://" + daemonAddress + "/pubsub");
+    if (PubSubHelper.isGatewayConfigured(dag)) {
       String appId = "appid";
       //appId = dag.attrValue(DAG.APPLICATION_ID, null); // will be used once UI is able to pick applications from list and listen to corresponding application
       String topic = "apps.logstream." + appId + "." + operatorName;
       PubSubWebSocketOutputOperator<Object> wsOut = dag.addOperator(operatorName, new PubSubWebSocketOutputOperator<Object>());
-      wsOut.setUri(uri);
+      wsOut.setUri(PubSubHelper.getURI(dag));
       wsOut.setTopic(topic);
       return wsOut.input;
     }
diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/Application.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/Application.java
index 73c38ef..eed9a68 100644
--- a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/Application.java
+++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/Application.java
@@ -20,11 +20,13 @@ package org.apache.apex.examples.frauddetect;
 
 import java.io.Serializable;
 import java.net.URI;
+
 import org.apache.apex.examples.frauddetect.operator.HdfsStringOutputOperator;
 import org.apache.apex.examples.frauddetect.operator.MongoDBOutputOperator;
+import org.apache.apex.malhar.lib.utils.PubSubHelper;
 import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.Context;
-import com.datatorrent.api.Context.DAGContext;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
@@ -88,11 +90,7 @@ public class Application implements StreamingApplication
   {
 
     try {
-      String gatewayAddress = dag.getValue(DAGContext.GATEWAY_CONNECT_ADDRESS);
-      if (gatewayAddress == null) {
-        gatewayAddress = "localhost:9090";
-      }
-      URI duri = URI.create("ws://" + gatewayAddress + "/pubsub");
+      URI duri = PubSubHelper.getURIWithDefault(dag, "localhost:9090");
 
       PubSubWebSocketInputOperator userTxWsInput = getPubSubWebSocketInputOperator("userTxInput", dag, duri, "examples.app.frauddetect.submitTransaction");
       PubSubWebSocketOutputOperator ccUserAlertWsOutput = getPubSubWebSocketOutputOperator("ccUserAlertQueryOutput", dag, duri, "examples.app.frauddetect.fraudAlert");
diff --git a/examples/mobile/src/main/java/org/apache/apex/examples/mobile/Application.java b/examples/mobile/src/main/java/org/apache/apex/examples/mobile/Application.java
index f719643..dd1e136 100644
--- a/examples/mobile/src/main/java/org/apache/apex/examples/mobile/Application.java
+++ b/examples/mobile/src/main/java/org/apache/apex/examples/mobile/Application.java
@@ -26,6 +26,7 @@ import java.util.Random;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.malhar.lib.utils.PubSubHelper;
 import org.apache.commons.lang.mutable.MutableLong;
 import org.apache.commons.lang3.Range;
 import org.apache.hadoop.conf.Configuration;
@@ -157,8 +158,7 @@ public class Application implements StreamingApplication
     // done generating data
     LOG.info("Finished generating seed data.");
 
-    String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
-    URI uri = URI.create("ws://" + gatewayAddress + "/pubsub");
+    URI uri = PubSubHelper.getURI(dag);
     PubSubWebSocketOutputOperator<Object> wsOut = dag.addOperator("LocationResults", new PubSubWebSocketOutputOperator<Object>());
     wsOut.setUri(uri);
     PubSubWebSocketInputOperator<Map<String, String>> wsIn = dag.addOperator("QueryLocation", new PubSubWebSocketInputOperator<Map<String, String>>());
diff --git a/examples/mobile/src/test/java/org/apache/apex/examples/mobile/ApplicationTest.java b/examples/mobile/src/test/java/org/apache/apex/examples/mobile/ApplicationTest.java
index ce6ca41..b88ed57 100644
--- a/examples/mobile/src/test/java/org/apache/apex/examples/mobile/ApplicationTest.java
+++ b/examples/mobile/src/test/java/org/apache/apex/examples/mobile/ApplicationTest.java
@@ -21,7 +21,6 @@ package org.apache.apex.examples.mobile;
 import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
-
 import javax.servlet.Servlet;
 
 import org.eclipse.jetty.server.Connector;
@@ -32,11 +31,10 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
+import org.apache.apex.malhar.lib.utils.PubSubHelper;
 import org.apache.hadoop.conf.Configuration;
 
 import com.datatorrent.api.LocalMode;
-
 import com.datatorrent.lib.helper.SamplePubSubWebSocketServlet;
 import com.datatorrent.lib.io.PubSubWebSocketInputOperator;
 import com.datatorrent.lib.io.PubSubWebSocketOutputOperator;
@@ -67,7 +65,7 @@ public class ApplicationTest
     server.start();
     Connector[] connector = server.getConnectors();
     conf.set("dt.attr.GATEWAY_CONNECT_ADDRESS", "localhost:" + connector[0].getLocalPort());
-    URI uri = URI.create("ws://localhost:" + connector[0].getLocalPort() + "/pubsub");
+    URI uri = PubSubHelper.getURI("localhost:" + connector[0].getLocalPort());
 
     PubSubWebSocketOutputOperator<Object> outputOperator = new PubSubWebSocketOutputOperator<Object>();
     outputOperator.setUri(uri);
diff --git a/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRMonitoringApplication.java b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRMonitoringApplication.java
index 288da84..55d98aa 100644
--- a/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRMonitoringApplication.java
+++ b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRMonitoringApplication.java
@@ -23,6 +23,7 @@ import java.net.URI;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.malhar.lib.utils.PubSubHelper;
 import org.apache.hadoop.conf.Configuration;
 
 import com.datatorrent.api.DAG;
@@ -48,10 +49,8 @@ public class MRMonitoringApplication implements StreamingApplication
   @Override
   public void populateDAG(DAG dag, Configuration conf)
   {
-    String daemonAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
     MRJobStatusOperator mrJobOperator = dag.addOperator("JobMonitor", new MRJobStatusOperator());
-    URI uri = URI.create("ws://" + daemonAddress + "/pubsub");
-    logger.info("WebSocket with daemon at {}", daemonAddress);
+    URI uri = PubSubHelper.getURI(dag);
 
     PubSubWebSocketInputOperator wsIn = dag.addOperator("Query", new PubSubWebSocketInputOperator());
     wsIn.setUri(uri);
diff --git a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/KinesisHashtagsApplication.java b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/KinesisHashtagsApplication.java
index be7edfb..225ea25 100644
--- a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/KinesisHashtagsApplication.java
+++ b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/KinesisHashtagsApplication.java
@@ -18,9 +18,9 @@
  */
 package org.apache.apex.examples.twitter;
 
-import java.net.URI;
-import org.apache.commons.lang.StringUtils;
+import org.apache.apex.malhar.lib.utils.PubSubHelper;
 import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.Operator.InputPort;
@@ -174,13 +174,11 @@ public class KinesisHashtagsApplication implements StreamingApplication
 
   private InputPort<Object> consoleOutput(DAG dag, String operatorName)
   {
-    String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
-    if (!StringUtils.isEmpty(gatewayAddress)) {
-      URI uri = URI.create("ws://" + gatewayAddress + "/pubsub");
+    if (PubSubHelper.isGatewayConfigured(dag)) {
       String topic = "examples.twitter." + operatorName;
       //LOG.info("WebSocket with gateway at: {}", gatewayAddress);
       PubSubWebSocketOutputOperator<Object> wsOut = dag.addOperator(operatorName, new PubSubWebSocketOutputOperator<Object>());
-      wsOut.setUri(uri);
+      wsOut.setUri(PubSubHelper.getURI(dag));
       wsOut.setTopic(topic);
       return wsOut.input;
     }
diff --git a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTopCounterApplication.java b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTopCounterApplication.java
index ee43383..77384a8 100644
--- a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTopCounterApplication.java
+++ b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTopCounterApplication.java
@@ -22,11 +22,10 @@ import java.net.URI;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.commons.lang.StringUtils;
+import org.apache.apex.malhar.lib.utils.PubSubHelper;
 import org.apache.hadoop.conf.Configuration;
 
 import com.google.common.collect.Maps;
-
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
@@ -34,7 +33,6 @@ import com.datatorrent.api.Operator;
 import com.datatorrent.api.Operator.OutputPort;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
-
 import com.datatorrent.contrib.twitter.TwitterSampleInput;
 import com.datatorrent.lib.algo.UniqueCounter;
 import com.datatorrent.lib.appdata.schemas.SchemaUtils;
@@ -189,9 +187,8 @@ public class TwitterTopCounterApplication implements StreamingApplication
 
   public static void consoleOutput(DAG dag, String operatorName, OutputPort<List<Map<String, Object>>> topCount, String schemaFile, String alias)
   {
-    String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
-    if (!StringUtils.isEmpty(gatewayAddress)) {
-      URI uri = URI.create("ws://" + gatewayAddress + "/pubsub");
+    if (PubSubHelper.isGatewayConfigured(dag)) {
+      URI uri = PubSubHelper.getURI(dag);
 
       AppDataSnapshotServerMap snapshotServer = dag.addOperator("SnapshotServer", new AppDataSnapshotServerMap());
 
diff --git a/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/ApplicationWithQuerySupport.java b/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/ApplicationWithQuerySupport.java
index 699469b..440b30a 100644
--- a/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/ApplicationWithQuerySupport.java
+++ b/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/ApplicationWithQuerySupport.java
@@ -22,15 +22,13 @@ import java.net.URI;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
-import org.apache.commons.lang.StringUtils;
+import org.apache.apex.malhar.lib.utils.PubSubHelper;
 import org.apache.hadoop.conf.Configuration;
 
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.Operator;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
-
 import com.datatorrent.lib.appdata.schemas.SchemaUtils;
 import com.datatorrent.lib.appdata.snapshot.AppDataSnapshotServerMap;
 import com.datatorrent.lib.io.ConsoleOutputOperator;
@@ -84,10 +82,8 @@ public class ApplicationWithQuerySupport implements StreamingApplication
     dag.addStream("windowWordCounts", windowWordCount.output, fileWordCount.input);
     dag.addStream("fileWordCounts", fileWordCount.fileOutput, wcWriter.input);
 
-    String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
-
-    if (!StringUtils.isEmpty(gatewayAddress)) {        // add query support
-      URI uri = URI.create("ws://" + gatewayAddress + "/pubsub");
+    if (PubSubHelper.isGatewayConfigured(dag)) {        // add query support
+      URI uri = PubSubHelper.getURI(dag);
 
       AppDataSnapshotServerMap snapshotServerFile
           = dag.addOperator("snapshotServerFile", new AppDataSnapshotServerMap());
diff --git a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java
index 3f2029e..9b1e0cf 100644
--- a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java
+++ b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java
@@ -19,14 +19,13 @@
 package com.datatorrent.lib.io;
 
 import java.net.URI;
-import java.net.URISyntaxException;
-
 import javax.validation.constraints.Min;
 
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.apex.malhar.lib.utils.PubSubHelper;
 
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DAG;
@@ -116,10 +115,8 @@ public class PubSubWebSocketAppDataQuery extends PubSubWebSocketInputOperator<St
       }
 
       try {
-        uri = new URI("ws://"
-                      + context.getValue(DAG.GATEWAY_CONNECT_ADDRESS)
-                      + "/pubsub");
-      } catch (URISyntaxException ex) {
+        uri = PubSubHelper.getURI(context);
+      } catch (Exception ex) {
         throw new RuntimeException(ex);
       }
     }
diff --git a/library/src/main/java/com/datatorrent/lib/io/WidgetOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/WidgetOutputOperator.java
index b027b58..a9030d4 100644
--- a/library/src/main/java/com/datatorrent/lib/io/WidgetOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/WidgetOutputOperator.java
@@ -20,17 +20,15 @@ package com.datatorrent.lib.io;
 
 import java.io.IOException;
 import java.lang.reflect.Array;
-import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 
-import org.apache.commons.lang.StringUtils;
+import org.apache.apex.malhar.lib.utils.PubSubHelper;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 
 import com.google.common.collect.Maps;
-
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DefaultInputPort;
@@ -129,9 +127,8 @@ public class WidgetOutputOperator extends BaseOperator
   @Override
   public void setup(OperatorContext context)
   {
-    String gatewayAddress = context.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
-    if (!StringUtils.isEmpty(gatewayAddress)) {
-      wsoo.setUri(URI.create("ws://" + gatewayAddress + "/pubsub"));
+    if (PubSubHelper.isGatewayConfigured(context)) {
+      wsoo.setUri(PubSubHelper.getURI(context));
       wsoo.setup(context);
     } else {
       isWebSocketConnected = false;
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/PubSubHelper.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/PubSubHelper.java
new file mode 100644
index 0000000..51eaeee
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/PubSubHelper.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils;
+
+import java.net.URI;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.Context;
+
+@InterfaceStability.Evolving
+public class PubSubHelper
+{
+  private static final Logger logger = LoggerFactory.getLogger(PubSubHelper.class);
+
+  public static boolean isGatewayConfigured(Context context)
+  {
+    return context.getValue(Context.DAGContext.GATEWAY_CONNECT_ADDRESS) != null;
+  }
+
+  public static URI getURI(Context context)
+  {
+    return getURIWithDefault(context, null);
+  }
+
+  public static URI getURIWithDefault(Context context, String defaultAddress)
+  {
+    String address = context.getValue(Context.DAGContext.GATEWAY_CONNECT_ADDRESS);
+    if (address == null) {
+      address = defaultAddress;
+    }
+    return getURI(address, context.getValue(Context.DAGContext.GATEWAY_USE_SSL));
+  }
+
+  public static URI getURI(String address)
+  {
+    return getURI(address, false);
+  }
+
+  public static URI getURI(String address, boolean useSSL)
+  {
+    if (address == null) {
+      throw new NullPointerException("No address specified");
+    }
+    String uri = (useSSL ? "wss://" : "ws://") + address + "/pubsub";
+    logger.debug("PubSub uri {}", uri);
+    return URI.create(uri);
+  }
+}
diff --git a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataOperatorTest.java
index 7801619..fc49bea 100644
--- a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataOperatorTest.java
@@ -20,25 +20,25 @@ package com.datatorrent.lib.io;
 
 import java.lang.reflect.Method;
 import java.net.URI;
-import java.net.URISyntaxException;
 
 import org.junit.Assert;
 import org.junit.Test;
+import org.apache.apex.malhar.lib.utils.PubSubHelper;
 
 import com.datatorrent.common.experimental.AppData;
 
 public abstract class PubSubWebSocketAppDataOperatorTest
 {
   public static final String GATEWAY_CONNECT_ADDRESS_STRING = "my.gateway.com";
-  public static final String URI_ADDRESS_STRING = "ws://localhost:6666/pubsub";
+  public static final String URI_ADDRESS_STRING = "localhost:6666";
   public static final URI GATEWAY_CONNECT_ADDRESS;
   public static final URI URI_ADDRESS;
 
   static {
     try {
-      GATEWAY_CONNECT_ADDRESS = new URI("ws://" + GATEWAY_CONNECT_ADDRESS_STRING + "/pubsub");
-      URI_ADDRESS = new URI(URI_ADDRESS_STRING);
-    } catch (URISyntaxException ex) {
+      GATEWAY_CONNECT_ADDRESS = PubSubHelper.getURI(GATEWAY_CONNECT_ADDRESS_STRING);
+      URI_ADDRESS = PubSubHelper.getURI(URI_ADDRESS_STRING);
+    } catch (Exception ex) {
       throw new RuntimeException(ex);
     }
   }
diff --git a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketOperatorTest.java
index e165649..7f1e4dd 100644
--- a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketOperatorTest.java
@@ -30,6 +30,8 @@ import org.eclipse.jetty.servlet.ServletHolder;
 import org.junit.Assert;
 import org.junit.Test;
 
+import org.apache.apex.malhar.lib.utils.PubSubHelper;
+
 import com.datatorrent.lib.helper.SamplePubSubWebSocketServlet;
 import com.datatorrent.lib.testbench.CollectorTestSink;
 
@@ -51,7 +53,7 @@ public class PubSubWebSocketOperatorTest
     contextHandler.addServlet(sh, "/*");
     server.start();
     Connector[] connector = server.getConnectors();
-    URI uri = URI.create("ws://localhost:" + connector[0].getLocalPort() + "/pubsub");
+    URI uri = PubSubHelper.getURI("localhost:" + connector[0].getLocalPort());
 
     PubSubWebSocketOutputOperator<Object> outputOperator = new PubSubWebSocketOutputOperator<Object>();
     outputOperator.setUri(uri);

-- 
To stop receiving notification emails like this one, please contact
['"commits@apex.apache.org" <co...@apex.apache.org>'].