You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2015/09/10 00:32:13 UTC
[25/50] [abbrv] incubator-apex-malhar git commit: Set the uri of app
data pub sub operators to be the GATEWAY_CONNECT_ADDRESS by default.
Set the uri of app data pub sub operators to be the GATEWAY_CONNECT_ADDRESS by default.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/0be7372b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/0be7372b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/0be7372b
Branch: refs/heads/master
Commit: 0be7372b1129b616d33ccaddc4c05441b52a6968
Parents: 93ce29c
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Fri Jul 24 20:07:41 2015 -0700
Committer: Chetan Narsude <ch...@datatorrent.com>
Committed: Tue Aug 4 09:42:07 2015 -0700
----------------------------------------------------------------------
.../lib/io/PubSubWebSocketAppDataQuery.java | 48 ++++++++++++-
.../lib/io/PubSubWebSocketAppDataResult.java | 27 ++++++-
.../lib/io/WebSocketInputOperator.java | 4 +-
.../lib/io/WebSocketOutputOperator.java | 4 +-
.../io/PubSubWebSocketAppDataOperatorTest.java | 74 ++++++++++++++++++++
.../lib/io/PubSubWebSocketAppDataQueryTest.java | 66 ++++++++++++++---
.../io/PubSubWebSocketAppDataResultTest.java | 20 ++----
7 files changed, 212 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0be7372b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java
index f2b1fdb..14a2d2b 100644
--- a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java
+++ b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java
@@ -16,6 +16,9 @@
package com.datatorrent.lib.io;
+import java.net.URI;
+import java.net.URISyntaxException;
+
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
@@ -23,6 +26,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
import com.datatorrent.common.experimental.AppData;
import com.datatorrent.common.util.PubSubMessage;
@@ -50,9 +54,51 @@ public class PubSubWebSocketAppDataQuery extends PubSubWebSocketInputOperator<St
@Override
public void setup(OperatorContext context)
{
+ this.uri = uriHelper(context, uri);
+ logger.debug("Setting up:\nuri:{}\ntopic:{}",this.getUri(), this.getTopic());
super.setup(context);
+ }
- logger.debug("Setting up:\nuri:{}\ntopic:{}",this.getUri(), this.getTopic());
+ public static URI uriHelper(OperatorContext context, URI uri)
+ {
+ if (uri == null) {
+ if (context.getValue(DAG.GATEWAY_CONNECT_ADDRESS) == null) {
+ throw new IllegalArgumentException("The uri property is not set and the dt.attr.GATEWAY_CONNECT_ADDRESS is not defined");
+ }
+
+ try {
+ uri = new URI("ws://"
+ + context.getValue(DAG.GATEWAY_CONNECT_ADDRESS)
+ + "/pubsub");
+ } catch (URISyntaxException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ return uri;
+ }
+
+ /**
+ * Gets the URI for WebSocket connection.
+ *
+ * @return the URI
+ */
+ @Override
+ public URI getUri()
+ {
+ return uri;
+ }
+
+ /**
+ * The URI for WebSocket connection. If this is not set, the value of the dt.attr.GATEWAY_CONNECT_ADDRESS DAG attribute is used. If neither this
+ * property or dt.attr.GATEWAY_CONNECT_ADDRESS attribute is set, then this operator will fail with an {@link IllegalArgumentException}.
+ *
+ * @param uri
+ */
+ @Override
+ public void setUri(URI uri)
+ {
+ this.uri = uri;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0be7372b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResult.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResult.java b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResult.java
index 3401233..5f0b947 100644
--- a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResult.java
+++ b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResult.java
@@ -26,6 +26,7 @@ import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.common.experimental.AppData;
import com.datatorrent.common.util.PubSubMessage.PubSubMessageType;
+import java.net.URI;
/**
* This is an app data pub sub result operator. This operator is used to send results to
* App Data dashboards produced by App Data store operators.
@@ -47,8 +48,9 @@ public class PubSubWebSocketAppDataResult extends PubSubWebSocketOutputOperator<
@Override
public void setup(OperatorContext context)
{
+ this.uri = PubSubWebSocketAppDataQuery.uriHelper(context, uri);
+ logger.debug("Setting up:\nuri:{}\ntopic:{}",this.getUri(), this.getTopic());
super.setup(context);
- logger.debug("Setting up: ");
}
@Override
@@ -57,6 +59,29 @@ public class PubSubWebSocketAppDataResult extends PubSubWebSocketOutputOperator<
return "pubsub";
}
+ /**
+ * Gets the URI for WebSocket connection.
+ *
+ * @return the URI
+ */
+ @Override
+ public URI getUri()
+ {
+ return uri;
+ }
+
+ /**
+ * The URI for WebSocket connection. If this is not set, the value of the dt.attr.GATEWAY_CONNECT_ADDRESS DAG attribute is used. If neither this
+ * property or dt.attr.GATEWAY_CONNECT_ADDRESS attribute is set, then this operator will fail with an {@link IllegalArgumentException}.
+ *
+ * @param uri
+ */
+ @Override
+ public void setUri(URI uri)
+ {
+ this.uri = uri;
+ }
+
@Override
public String convertMapToMessage(String t) throws IOException
{
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0be7372b/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
index dabcacb..a8cfa6e 100644
--- a/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
@@ -51,8 +51,8 @@ public class WebSocketInputOperator<T> extends SimpleSinglePortInputOperator<T>
* Timeout interval for reading from server. 0 or negative indicates no timeout.
*/
public int readTimeoutMillis = 0;
- @NotNull
- private URI uri;
+ //Do not make this @NotNull since null is a valid value for some child classes
+ protected URI uri;
private transient AsyncHttpClient client;
private transient final JsonFactory jsonFactory = new JsonFactory();
protected transient final ObjectMapper mapper = new ObjectMapper(jsonFactory);
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0be7372b/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java
index a92f8b3..a7ab3bd 100644
--- a/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java
@@ -49,8 +49,8 @@ import com.datatorrent.api.DefaultInputPort;
public class WebSocketOutputOperator<T> extends BaseOperator
{
private static final Logger LOG = LoggerFactory.getLogger(WebSocketOutputOperator.class);
- @NotNull
- private URI uri;
+ //Do not make this @NotNull since null is a valid value for some child classes
+ protected URI uri;
private transient AsyncHttpClient client;
private transient final JsonFactory jsonFactory = new JsonFactory();
protected transient final ObjectMapper mapper = new ObjectMapper(jsonFactory);
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0be7372b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataOperatorTest.java
new file mode 100644
index 0000000..bc379ce
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataOperatorTest.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2015 DataTorrent
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.datatorrent.lib.io;
+
+import java.lang.reflect.Method;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.datatorrent.common.experimental.AppData;
+
+public abstract class PubSubWebSocketAppDataOperatorTest
+{
+ public static final String GATEWAY_CONNECT_ADDRESS_STRING = "my.gateway.com";
+ public static final String URI_ADDRESS_STRING = "ws://localhost:6666/pubsub";
+ public static final URI GATEWAY_CONNECT_ADDRESS;
+ public static final URI URI_ADDRESS;
+
+ static
+ {
+ try {
+ GATEWAY_CONNECT_ADDRESS = new URI("ws://" + GATEWAY_CONNECT_ADDRESS_STRING + "/pubsub");
+ URI_ADDRESS = new URI(URI_ADDRESS_STRING);
+ } catch (URISyntaxException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ public abstract AppData.ConnectionInfoProvider getOperator();
+
+ @Test
+ public void testGetAppDataURL() throws Exception
+ {
+ String topic = "test";
+ String correct = "pubsub";
+
+ AppData.ConnectionInfoProvider pubsub = getOperator();
+
+ setUri(pubsub, URI_ADDRESS);
+ setTopic(pubsub, topic);
+
+ Assert.assertEquals("The url is incorrect.", correct, pubsub.getAppDataURL());
+ }
+
+ public void setUri(Object o, URI uri) throws Exception
+ {
+ Class<?> clazz = o.getClass();
+ Method m = clazz.getMethod("setUri", URI.class);
+ m.invoke(o, uri);
+ }
+
+ public void setTopic(Object o, String topic) throws Exception
+ {
+ Class<?> clazz = o.getClass();
+ Method m = clazz.getMethod("setTopic", String.class);
+ m.invoke(o, topic);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0be7372b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQueryTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQueryTest.java b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQueryTest.java
index 894ed72..c2aa0da 100644
--- a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQueryTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQueryTest.java
@@ -15,23 +15,69 @@
*/
package com.datatorrent.lib.io;
-import java.net.URI;
import org.junit.Assert;
+import org.junit.BeforeClass;
import org.junit.Test;
-public class PubSubWebSocketAppDataQueryTest
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+
+import com.datatorrent.common.experimental.AppData.ConnectionInfoProvider;
+
+public class PubSubWebSocketAppDataQueryTest extends PubSubWebSocketAppDataOperatorTest
{
+ private static OperatorContext context;
+ private static OperatorContext emptyContext;
+
+ @BeforeClass
+ public static void setupContext() throws Exception
+ {
+ Attribute.AttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+ attributes.put(Context.DAGContext.GATEWAY_CONNECT_ADDRESS, GATEWAY_CONNECT_ADDRESS_STRING);
+ context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributes);
+
+ attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+ emptyContext = new OperatorContextTestHelper.TestIdOperatorContext(1, attributes);
+ }
+
+ @Override
+ public ConnectionInfoProvider getOperator()
+ {
+ return new PubSubWebSocketAppDataQuery();
+ }
+
@Test
- public void testGetAppDataURL() throws Exception
+ public void testURISet() throws Exception
{
- URI uri = URI.create("ws://localhost:6666/pubsub");
- String topic = "test";
- String correct = "pubsub";
+ Assert.assertEquals(URI_ADDRESS, PubSubWebSocketAppDataQuery.uriHelper(emptyContext, URI_ADDRESS));
+ }
- PubSubWebSocketAppDataQuery pubsub = new PubSubWebSocketAppDataQuery();
- pubsub.setUri(uri);
- pubsub.setTopic(topic);
+ @Test
+ public void testNoURISet() throws Exception
+ {
+ boolean threwException = false;
+
+ try {
+ PubSubWebSocketAppDataQuery.uriHelper(emptyContext, null);
+ } catch (Exception e) {
+ threwException = e instanceof IllegalArgumentException;
+ }
- Assert.assertEquals("The url is incorrect.", correct, pubsub.getAppDataURL());
+ Assert.assertTrue(threwException);
+ }
+
+ @Test
+ public void testAttrSet() throws Exception
+ {
+ Assert.assertEquals(GATEWAY_CONNECT_ADDRESS, PubSubWebSocketAppDataQuery.uriHelper(context, null));
+ }
+
+ @Test
+ public void testAttrAndURISet() throws Exception
+ {
+ Assert.assertEquals(URI_ADDRESS, PubSubWebSocketAppDataQuery.uriHelper(context, URI_ADDRESS));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0be7372b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResultTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResultTest.java b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResultTest.java
index e38e439..7afa211 100644
--- a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResultTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResultTest.java
@@ -15,23 +15,13 @@
*/
package com.datatorrent.lib.io;
-import java.net.URI;
-import org.junit.Assert;
-import org.junit.Test;
+import com.datatorrent.common.experimental.AppData.ConnectionInfoProvider;
-public class PubSubWebSocketAppDataResultTest
+public class PubSubWebSocketAppDataResultTest extends PubSubWebSocketAppDataOperatorTest
{
- @Test
- public void testGetAppDataURL() throws Exception
+ @Override
+ public ConnectionInfoProvider getOperator()
{
- URI uri = URI.create("ws://localhost:6666/pubsub");
- String topic = "test";
- String correct = "pubsub";
-
- PubSubWebSocketAppDataResult pubsub = new PubSubWebSocketAppDataResult();
- pubsub.setUri(uri);
- pubsub.setTopic(topic);
-
- Assert.assertEquals("The url is incorrect.", correct, pubsub.getAppDataURL());
+ return new PubSubWebSocketAppDataResult();
}
}