You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2015/09/10 00:32:13 UTC

[25/50] [abbrv] incubator-apex-malhar git commit: Set the uri of app data pub sub operators to be the GATEWAY_CONNECT_ADDRESS by default.

Set the uri of app data pub sub operators to be the GATEWAY_CONNECT_ADDRESS by default.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/0be7372b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/0be7372b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/0be7372b

Branch: refs/heads/master
Commit: 0be7372b1129b616d33ccaddc4c05441b52a6968
Parents: 93ce29c
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Fri Jul 24 20:07:41 2015 -0700
Committer: Chetan Narsude <ch...@datatorrent.com>
Committed: Tue Aug 4 09:42:07 2015 -0700

----------------------------------------------------------------------
 .../lib/io/PubSubWebSocketAppDataQuery.java     | 48 ++++++++++++-
 .../lib/io/PubSubWebSocketAppDataResult.java    | 27 ++++++-
 .../lib/io/WebSocketInputOperator.java          |  4 +-
 .../lib/io/WebSocketOutputOperator.java         |  4 +-
 .../io/PubSubWebSocketAppDataOperatorTest.java  | 74 ++++++++++++++++++++
 .../lib/io/PubSubWebSocketAppDataQueryTest.java | 66 ++++++++++++++---
 .../io/PubSubWebSocketAppDataResultTest.java    | 20 ++----
 7 files changed, 212 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0be7372b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java
index f2b1fdb..14a2d2b 100644
--- a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java
+++ b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java
@@ -16,6 +16,9 @@
 package com.datatorrent.lib.io;
 
 
+import java.net.URI;
+import java.net.URISyntaxException;
+
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
@@ -23,6 +26,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
 
 import com.datatorrent.common.experimental.AppData;
 import com.datatorrent.common.util.PubSubMessage;
@@ -50,9 +54,51 @@ public class PubSubWebSocketAppDataQuery extends PubSubWebSocketInputOperator<St
   @Override
   public void setup(OperatorContext context)
   {
+    this.uri = uriHelper(context, uri);
+    logger.debug("Setting up:\nuri:{}\ntopic:{}",this.getUri(), this.getTopic());
     super.setup(context);
+  }
 
-    logger.debug("Setting up:\nuri:{}\ntopic:{}",this.getUri(), this.getTopic());
+  public static URI uriHelper(OperatorContext context, URI uri)
+  {
+    if (uri == null) {
+      if (context.getValue(DAG.GATEWAY_CONNECT_ADDRESS) == null) {
+        throw new IllegalArgumentException("The uri property is not set and the dt.attr.GATEWAY_CONNECT_ADDRESS is not defined");
+      }
+
+      try {
+        uri = new URI("ws://"
+                      + context.getValue(DAG.GATEWAY_CONNECT_ADDRESS)
+                      + "/pubsub");
+      } catch (URISyntaxException ex) {
+        throw new RuntimeException(ex);
+      }
+    }
+
+    return uri;
+  }
+
+  /**
+   * Gets the URI for WebSocket connection.
+   *
+   * @return the URI
+   */
+  @Override
+  public URI getUri()
+  {
+    return uri;
+  }
+
+  /**
+   * The URI for WebSocket connection. If this is not set, the value of the dt.attr.GATEWAY_CONNECT_ADDRESS DAG attribute is used. If neither this
+   * property or dt.attr.GATEWAY_CONNECT_ADDRESS attribute is set, then this operator will fail with an {@link IllegalArgumentException}.
+   *
+   * @param uri
+   */
+  @Override
+  public void setUri(URI uri)
+  {
+    this.uri = uri;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0be7372b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResult.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResult.java b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResult.java
index 3401233..5f0b947 100644
--- a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResult.java
+++ b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResult.java
@@ -26,6 +26,7 @@ import com.datatorrent.api.Context.OperatorContext;
 
 import com.datatorrent.common.experimental.AppData;
 import com.datatorrent.common.util.PubSubMessage.PubSubMessageType;
+import java.net.URI;
 /**
  * This is an app data pub sub result operator. This operator is used to send results to
  * App Data dashboards produced by App Data store operators.
@@ -47,8 +48,9 @@ public class PubSubWebSocketAppDataResult extends PubSubWebSocketOutputOperator<
   @Override
   public void setup(OperatorContext context)
   {
+    this.uri = PubSubWebSocketAppDataQuery.uriHelper(context, uri);
+    logger.debug("Setting up:\nuri:{}\ntopic:{}",this.getUri(), this.getTopic());
     super.setup(context);
-    logger.debug("Setting up: ");
   }
 
   @Override
@@ -57,6 +59,29 @@ public class PubSubWebSocketAppDataResult extends PubSubWebSocketOutputOperator<
     return "pubsub";
   }
 
+  /**
+   * Gets the URI for WebSocket connection.
+   *
+   * @return the URI
+   */
+  @Override
+  public URI getUri()
+  {
+    return uri;
+  }
+
+  /**
+   * The URI for WebSocket connection. If this is not set, the value of the dt.attr.GATEWAY_CONNECT_ADDRESS DAG attribute is used. If neither this
+   * property or dt.attr.GATEWAY_CONNECT_ADDRESS attribute is set, then this operator will fail with an {@link IllegalArgumentException}.
+   *
+   * @param uri
+   */
+  @Override
+  public void setUri(URI uri)
+  {
+    this.uri = uri;
+  }
+
   @Override
   public String convertMapToMessage(String t) throws IOException
   {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0be7372b/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
index dabcacb..a8cfa6e 100644
--- a/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
@@ -51,8 +51,8 @@ public class WebSocketInputOperator<T> extends SimpleSinglePortInputOperator<T>
    * Timeout interval for reading from server. 0 or negative indicates no timeout.
    */
   public int readTimeoutMillis = 0;
-  @NotNull
-  private URI uri;
+  //Do not make this @NotNull since null is a valid value for some child classes
+  protected URI uri;
   private transient AsyncHttpClient client;
   private transient final JsonFactory jsonFactory = new JsonFactory();
   protected transient final ObjectMapper mapper = new ObjectMapper(jsonFactory);

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0be7372b/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java
index a92f8b3..a7ab3bd 100644
--- a/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java
@@ -49,8 +49,8 @@ import com.datatorrent.api.DefaultInputPort;
 public class WebSocketOutputOperator<T> extends BaseOperator
 {
   private static final Logger LOG = LoggerFactory.getLogger(WebSocketOutputOperator.class);
-  @NotNull
-  private URI uri;
+  //Do not make this @NotNull since null is a valid value for some child classes
+  protected URI uri;
   private transient AsyncHttpClient client;
   private transient final JsonFactory jsonFactory = new JsonFactory();
   protected transient final ObjectMapper mapper = new ObjectMapper(jsonFactory);

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0be7372b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataOperatorTest.java
new file mode 100644
index 0000000..bc379ce
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataOperatorTest.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2015 DataTorrent
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.datatorrent.lib.io;
+
+import java.lang.reflect.Method;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.datatorrent.common.experimental.AppData;
+
+public abstract class PubSubWebSocketAppDataOperatorTest
+{
+  public static final String GATEWAY_CONNECT_ADDRESS_STRING = "my.gateway.com";
+  public static final String URI_ADDRESS_STRING = "ws://localhost:6666/pubsub";
+  public static final URI GATEWAY_CONNECT_ADDRESS;
+  public static final URI URI_ADDRESS;
+
+  static
+  {
+    try {
+      GATEWAY_CONNECT_ADDRESS = new URI("ws://" + GATEWAY_CONNECT_ADDRESS_STRING + "/pubsub");
+      URI_ADDRESS = new URI(URI_ADDRESS_STRING);
+    } catch (URISyntaxException ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  public abstract AppData.ConnectionInfoProvider getOperator();
+
+  @Test
+  public void testGetAppDataURL() throws Exception
+  {
+    String topic = "test";
+    String correct = "pubsub";
+
+    AppData.ConnectionInfoProvider pubsub = getOperator();
+
+    setUri(pubsub, URI_ADDRESS);
+    setTopic(pubsub, topic);
+
+    Assert.assertEquals("The url is incorrect.", correct, pubsub.getAppDataURL());
+  }
+
+  public void setUri(Object o, URI uri) throws Exception
+  {
+    Class<?> clazz = o.getClass();
+    Method m = clazz.getMethod("setUri", URI.class);
+    m.invoke(o, uri);
+  }
+
+  public void setTopic(Object o, String topic) throws Exception
+  {
+    Class<?> clazz = o.getClass();
+    Method m = clazz.getMethod("setTopic", String.class);
+    m.invoke(o, topic);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0be7372b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQueryTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQueryTest.java b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQueryTest.java
index 894ed72..c2aa0da 100644
--- a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQueryTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQueryTest.java
@@ -15,23 +15,69 @@
  */
 package com.datatorrent.lib.io;
 
-import java.net.URI;
 import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
-public class PubSubWebSocketAppDataQueryTest
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+
+import com.datatorrent.common.experimental.AppData.ConnectionInfoProvider;
+
+public class PubSubWebSocketAppDataQueryTest extends PubSubWebSocketAppDataOperatorTest
 {
+  private static OperatorContext context;
+  private static OperatorContext emptyContext;
+
+  @BeforeClass
+  public static void setupContext() throws Exception
+  {
+    Attribute.AttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+    attributes.put(Context.DAGContext.GATEWAY_CONNECT_ADDRESS, GATEWAY_CONNECT_ADDRESS_STRING);
+    context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributes);
+
+    attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+    emptyContext = new OperatorContextTestHelper.TestIdOperatorContext(1, attributes);
+  }
+
+  @Override
+  public ConnectionInfoProvider getOperator()
+  {
+    return new PubSubWebSocketAppDataQuery();
+  }
+
   @Test
-  public void testGetAppDataURL() throws Exception
+  public void testURISet() throws Exception
   {
-    URI uri = URI.create("ws://localhost:6666/pubsub");
-    String topic = "test";
-    String correct = "pubsub";
+    Assert.assertEquals(URI_ADDRESS, PubSubWebSocketAppDataQuery.uriHelper(emptyContext, URI_ADDRESS));
+  }
 
-    PubSubWebSocketAppDataQuery pubsub = new PubSubWebSocketAppDataQuery();
-    pubsub.setUri(uri);
-    pubsub.setTopic(topic);
+  @Test
+  public void testNoURISet() throws Exception
+  {
+    boolean threwException = false;
+
+    try {
+      PubSubWebSocketAppDataQuery.uriHelper(emptyContext, null);
+    } catch (Exception e) {
+      threwException = e instanceof IllegalArgumentException;
+    }
 
-    Assert.assertEquals("The url is incorrect.", correct, pubsub.getAppDataURL());
+    Assert.assertTrue(threwException);
+  }
+
+  @Test
+  public void testAttrSet() throws Exception
+  {
+    Assert.assertEquals(GATEWAY_CONNECT_ADDRESS, PubSubWebSocketAppDataQuery.uriHelper(context, null));
+  }
+
+  @Test
+  public void testAttrAndURISet() throws Exception
+  {
+    Assert.assertEquals(URI_ADDRESS, PubSubWebSocketAppDataQuery.uriHelper(context, URI_ADDRESS));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0be7372b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResultTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResultTest.java b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResultTest.java
index e38e439..7afa211 100644
--- a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResultTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResultTest.java
@@ -15,23 +15,13 @@
  */
 package com.datatorrent.lib.io;
 
-import java.net.URI;
-import org.junit.Assert;
-import org.junit.Test;
+import com.datatorrent.common.experimental.AppData.ConnectionInfoProvider;
 
-public class PubSubWebSocketAppDataResultTest
+public class PubSubWebSocketAppDataResultTest extends PubSubWebSocketAppDataOperatorTest
 {
-  @Test
-  public void testGetAppDataURL() throws Exception
+  @Override
+  public ConnectionInfoProvider getOperator()
   {
-    URI uri = URI.create("ws://localhost:6666/pubsub");
-    String topic = "test";
-    String correct = "pubsub";
-
-    PubSubWebSocketAppDataResult pubsub = new PubSubWebSocketAppDataResult();
-    pubsub.setUri(uri);
-    pubsub.setTopic(topic);
-
-    Assert.assertEquals("The url is incorrect.", correct, pubsub.getAppDataURL());
+    return new PubSubWebSocketAppDataResult();
   }
 }