You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/07/21 13:17:28 UTC
[27/54] [abbrv] [partial] incubator-quarks git commit: add
"org.apache." prefix to edgent package names
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/http/src/main/java/org/apache/edgent/connectors/http/HttpStreams.java
----------------------------------------------------------------------
diff --git a/connectors/http/src/main/java/org/apache/edgent/connectors/http/HttpStreams.java b/connectors/http/src/main/java/org/apache/edgent/connectors/http/HttpStreams.java
new file mode 100644
index 0000000..58cb766
--- /dev/null
+++ b/connectors/http/src/main/java/org/apache/edgent/connectors/http/HttpStreams.java
@@ -0,0 +1,309 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.connectors.http;
+
+import org.apache.edgent.connectors.http.runtime.HttpRequester;
+import org.apache.edgent.function.BiFunction;
+import org.apache.edgent.function.Function;
+import org.apache.edgent.function.Supplier;
+import org.apache.edgent.function.UnaryOperator;
+import org.apache.edgent.topology.TStream;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+
+import com.google.gson.JsonObject;
+
+
+/**
+ * HTTP streams.
+ *
+ */
+public class HttpStreams {
+
+ /**
+ * Make an HTTP GET request with JsonObject. <br>
+ *
+ * Method specifically works with JsonObjects. For each JsonObject in the stream,
+ * HTTP GET request is executed on provided uri. As a result, Response is added to
+ * the response TStream.
+ * <br>
+ *
+ * Sample usage:<br>
+ *
+ * <pre>
+ * {@code
+ * DirectProvider ep = new DirectProvider();
+ * Topology topology = ep.newTopology();
+ * final String url = "http://httpbin.org/get?";
+ *
+ * JsonObject request1 = new JsonObject();
+ * request1.addProperty("a", "abc");
+ * request1.addProperty("b", "42");
+ *
+ * TStream<JsonObject> stream = topology.collection(Arrays.asList(request1));
+ * TStream<JsonObject> rc = HttpStreams.getJson(stream,
+ * HttpClients::noAuthentication,
+ * t -> url + "a=" + t.get("a").getAsString() + "&b="
+ * + t.get("b").getAsString());
+ * }
+ * </pre>
+ *
+ * <br>
+ * See <i>HttpTest</i> for example. <br>
+ *
+ * @param stream - JsonObject TStream.
+ * @param clientCreator - CloseableHttpClient supplier preferably created using {@link HttpClients}
+ * @param uri - URI function which returns URI string
+ * @return TStream of JsonObject which contains responses of GET requests
+ *
+ * @see HttpStreams#requests(TStream, Supplier, Function, Function, BiFunction)
+ */
+ public static TStream<JsonObject> getJson(TStream<JsonObject> stream,
+ Supplier<CloseableHttpClient> clientCreator,
+ Function<JsonObject,String> uri) {
+
+ return HttpStreams.<JsonObject,JsonObject>requests(stream, clientCreator,
+ t -> HttpGet.METHOD_NAME, uri, HttpResponders.json());
+ }
+
+ /**
+ * Make an HTTP DELETE request with JsonObject. <br>
+ *
+ * Method specifically works with JsonObjects. For each JsonObject in the
+ * stream, HTTP DELETE request is executed on provided uri. As a result,
+ * Response is added to the response TStream. <br>
+ *
+ * Sample usage:<br>
+ *
+ * <pre>
+ * {@code
+ * DirectProvider ep = new DirectProvider();
+ * Topology topology = ep.newTopology();
+ * final String url = "http://httpbin.org/delete?";
+ *
+ * JsonObject request = new JsonObject();
+ * request.addProperty("a", "abc");
+ * request.addProperty("b", "42");
+ *
+ * TStream<JsonObject> stream = topology.collection(Arrays.asList(request));
+ * TStream<JsonObject> rc = HttpStreams.deleteJson(stream,
+ * HttpClients::noAuthentication,
+ * t -> url + "a=" + t.get("a").getAsString() + "&b="
+ * + t.get("b").getAsString());
+ * }
+ * </pre>
+ *
+ * <br>
+ * See <i>HttpTest</i> for example. <br>
+ *
+ * @param stream - JsonObject TStream.
+ * @param clientCreator - CloseableHttpClient supplier preferably created using {@link HttpClients}
+ * @param uri - URI function which returns URI string
+ * @return TStream of JsonObject which contains responses of DELETE requests
+ *
+ * @see HttpStreams#requests(TStream, Supplier, Function, Function, BiFunction)
+ */
+ public static TStream<JsonObject> deleteJson(TStream<JsonObject> stream,
+ Supplier<CloseableHttpClient> clientCreator,
+ Function<JsonObject,String> uri) {
+
+ return HttpStreams.<JsonObject,JsonObject>requests(stream, clientCreator,
+ t -> HttpDelete.METHOD_NAME, uri, HttpResponders.json());
+ }
+
+ /**
+ * Make an HTTP POST request with JsonObject. <br>
+ *
+ * Method specifically works with JsonObjects. For each JsonObject in the stream,
+ * HTTP POST request is executed on provided uri. Request body is filled using
+ * HttpEntity provided by body function. As a result, Response is added to
+ * the response TStream.<br>
+ *
+ * Sample usage:<br>
+ *
+ * <pre>
+ * {@code
+ * DirectProvider ep = new DirectProvider();
+ * Topology topology = ep.newTopology();
+ * final String url = "http://httpbin.org/post";
+ *
+ * JsonObject body = new JsonObject();
+ * body.addProperty("foo", "abc");
+ * body.addProperty("bar", 42);
+ *
+ * TStream<JsonObject> stream = topology.collection(Arrays.asList(body));
+ * TStream<JsonObject> rc = HttpStreams.postJson(stream,
+ * HttpClients::noAuthentication, t -> url, t -> t);
+ * }
+ * </pre>
+ *
+ * <br>
+ * See HttpTest for example. <br>
+ *
+ * @param stream - JsonObject TStream.
+ * @param clientCreator - CloseableHttpClient supplier preferably created using {@link HttpClients}
+ * @param uri - URI function which returns URI string
+ * @param body - Function that returns JsonObject which will be set as a body for the request.
+ * @return TStream of JsonObject which contains responses of POST requests
+ *
+ * @see HttpStreams#requestsWithBody(TStream, Supplier, Function, Function, Function, BiFunction)
+ */
+ public static TStream<JsonObject> postJson(TStream<JsonObject> stream,
+ Supplier<CloseableHttpClient> clientCreator,
+ Function<JsonObject, String> uri,
+ UnaryOperator<JsonObject> body) {
+
+ return HttpStreams.<JsonObject, JsonObject> requestsWithBody(stream,
+ clientCreator, t -> HttpPost.METHOD_NAME, uri,
+ t -> new ByteArrayEntity(body.apply(t).toString().getBytes()),
+ HttpResponders.json());
+ }
+
+ /**
+ * Make an HTTP PUT request with JsonObject. <br>
+ *
+ * Method specifically works with JsonObjects. For each JsonObject in the
+ * stream, HTTP PUT request is executed on provided uri. Request body is
+ * filled using HttpEntity provided by body function. As a result, Response
+ * is added to the response TStream.<br>
+ *
+ * Sample usage:<br>
+ *
+ * <pre>
+ * {@code
+ * DirectProvider ep = new DirectProvider();
+ * Topology topology = ep.newTopology();
+ * final String url = "http://httpbin.org/put";
+ *
+ * JsonObject body = new JsonObject();
+ * body.addProperty("foo", "abc");
+ * body.addProperty("bar", 42);
+ *
+ * TStream<JsonObject> stream = topology.collection(Arrays.asList(body));
+ * TStream<JsonObject> rc = HttpStreams.putJson(stream,
+ * HttpClients::noAuthentication, t -> url, t -> t);
+ * }
+ * </pre>
+ *
+ * <br>
+ * See HttpTest for example. <br>
+ *
+ * @param stream - JsonObject TStream.
+ * @param clientCreator - CloseableHttpClient supplier preferably created using {@link HttpClients}
+ * @param uri - URI function which returns URI string
+ * @param body - Function that returns JsonObject which will be set as a body for the request.
+ * @return TStream of JsonObject which contains responses of PUT requests
+ *
+ * @see HttpStreams#requestsWithBody(TStream, Supplier, Function, Function, Function, BiFunction)
+ */
+ public static TStream<JsonObject> putJson(TStream<JsonObject> stream,
+ Supplier<CloseableHttpClient> clientCreator,
+ Function<JsonObject, String> uri,
+ UnaryOperator<JsonObject> body) {
+ return HttpStreams.<JsonObject, JsonObject> requestsWithBody(stream,
+ clientCreator, t -> HttpPut.METHOD_NAME, uri,
+ t -> new ByteArrayEntity(body.apply(t).toString().getBytes()),
+ HttpResponders.json());
+ }
+
+ /**
+ * Make an HTTP request for each tuple on a stream.
+ * <UL>
+ * <LI>{@code clientCreator} is invoked once to create a new HTTP client
+ * to make the requests.
+ * </LI>
+ * <LI>
+ * {@code method} is invoked for each tuple to define the method
+ * to be used for the HTTP request driven by the tuple. A fixed method
+ * can be declared using a function such as:
+ * <UL style="list-style-type:none"><LI>{@code t -> HttpGet.METHOD_NAME}</LI></UL>
+ * </LI>
+ * <LI>
+ * {@code uri} is invoked for each tuple to define the URI
+ * to be used for the HTTP request driven by the tuple. A fixed method
+ * can be declared using a function such as:
+ * <UL style="list-style-type:none"><LI>{@code t -> "http://www.example.com"}</LI></UL>
+ * </LI>
+ * <LI>
+ * {@code response} is invoked after each request that did not throw an exception.
+ * It is passed the input tuple and the HTTP response. The function must completely
+ * consume the entity stream for the response. The return value is present on
+ * the stream returned by this method if it is non-null. A null return results
+ * in no tuple on the returned stream.
+ *
+ * </LI>
+ * </UL>
+ *
+ * @param <T> Tuple type for input stream
+ * @param <R> Tuple type for output stream
+ * @param stream Stream to invoke HTTP requests.
+ * @param clientCreator Function to create a HTTP client.
+ * @param method Function to define the HTTP method.
+ * @param uri Function to define the URI.
+ * @param response Function to process the response.
+ * @return Stream containing HTTP responses processed by the {@code response} function.
+ *
+ * @see HttpClients
+ * @see HttpResponders
+ */
+ public static <T,R> TStream<R> requests(TStream<T> stream,
+ Supplier<CloseableHttpClient> clientCreator,
+ Function<T,String> method,
+ Function<T,String> uri,
+ BiFunction<T,CloseableHttpResponse,R> response) {
+
+ return stream.map(new HttpRequester<T,R>(clientCreator, method, uri, response));
+ }
+
+ /**
+ * Make an HTTP request with body for each tuple.<br>
+ *
+ * @param <T> Tuple type for input stream
+ * @param <R> Tuple type for output stream
+ * @param stream Stream to invoke HTTP requests.
+ * @param clientCreator Function to create a HTTP client.
+ * @param method Function to define the HTTP method.
+ * @param uri Function to define the URI.
+ * @param body Function to define the HTTP request body
+ * @param response Function to process the response.
+ * @return Stream containing HTTP responses processed by the {@code response} function.
+ *
+ * @see HttpStreams#requests(TStream, Supplier, Function, Function, BiFunction)
+ * @see HttpClients
+ * @see HttpResponders
+ *
+ */
+ public static <T, R> TStream<R> requestsWithBody(TStream<T> stream,
+ Supplier<CloseableHttpClient> clientCreator,
+ Function<T, String> method,
+ Function<T, String> uri,
+ Function<T, HttpEntity> body,
+ BiFunction<T, CloseableHttpResponse, R> response) {
+
+ return stream.map(new HttpRequester<T, R>(clientCreator, method, uri, body, response));
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/http/src/main/java/org/apache/edgent/connectors/http/package-info.java
----------------------------------------------------------------------
diff --git a/connectors/http/src/main/java/org/apache/edgent/connectors/http/package-info.java b/connectors/http/src/main/java/org/apache/edgent/connectors/http/package-info.java
new file mode 100644
index 0000000..a2015c7
--- /dev/null
+++ b/connectors/http/src/main/java/org/apache/edgent/connectors/http/package-info.java
@@ -0,0 +1,24 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+/**
+ * HTTP stream connector.
+ */
+package org.apache.edgent.connectors.http;
+
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/http/src/main/java/org/apache/edgent/connectors/http/runtime/HttpRequester.java
----------------------------------------------------------------------
diff --git a/connectors/http/src/main/java/org/apache/edgent/connectors/http/runtime/HttpRequester.java b/connectors/http/src/main/java/org/apache/edgent/connectors/http/runtime/HttpRequester.java
new file mode 100644
index 0000000..c26a527
--- /dev/null
+++ b/connectors/http/src/main/java/org/apache/edgent/connectors/http/runtime/HttpRequester.java
@@ -0,0 +1,130 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.connectors.http.runtime;
+
+import java.io.IOException;
+
+import org.apache.edgent.function.BiFunction;
+import org.apache.edgent.function.Function;
+import org.apache.edgent.function.Supplier;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpEntityEnclosingRequest;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.impl.client.CloseableHttpClient;
+
+/**
+ * Function that processes HTTP requests at runtime.
+ *
+ * @param <T> Tuple type of request stream
+ * @param <R> Tuple type of result stream
+ */
+public class HttpRequester<T,R> implements Function<T,R>{
+
+ private static final long serialVersionUID = 1L;
+
+ private final Supplier<CloseableHttpClient> clientCreator;
+ private final Function<T,String> method;
+ private final Function<T,String> url;
+ private final BiFunction<T,CloseableHttpResponse,R> responseProcessor;
+ private final Function<T, HttpEntity> entity;
+
+ private CloseableHttpClient client;
+
+ public HttpRequester(
+ Supplier<CloseableHttpClient> clientCreator,
+ Function<T,String> method,
+ Function<T,String> url,
+ BiFunction<T,CloseableHttpResponse,R> responseProcessor) {
+ this.clientCreator = clientCreator;
+ this.method = method;
+ this.url = url;
+ this.responseProcessor = responseProcessor;
+ this.entity = null;
+ }
+
+ public HttpRequester(
+ Supplier<CloseableHttpClient> clientCreator,
+ Function<T,String> method,
+ Function<T,String> url,
+ Function<T, HttpEntity> entity,
+ BiFunction<T,CloseableHttpResponse,R> responseProcessor) {
+ this.clientCreator = clientCreator;
+ this.method = method;
+ this.url = url;
+ this.entity = entity;
+ this.responseProcessor = responseProcessor;
+ }
+
+ @Override
+ public R apply(T t) {
+
+ if (client == null)
+ client = clientCreator.get();
+
+ String m = method.apply(t);
+ String uri = url.apply(t);
+ HttpUriRequest request;
+
+ switch (m) {
+
+ case HttpGet.METHOD_NAME:
+ request = new HttpGet(uri);
+ break;
+ case HttpDelete.METHOD_NAME:
+ request = new HttpDelete(uri);
+ break;
+ case HttpPost.METHOD_NAME:
+ request = new HttpPost(uri);
+ break;
+ case HttpPut.METHOD_NAME:
+ request = new HttpPut(uri);
+ break;
+
+ default:
+ throw new IllegalArgumentException();
+ }
+
+ // If entity is not null means http request should have a body
+ if (entity != null) {
+
+ HttpEntity body = entity.apply(t);
+
+ if (request instanceof HttpEntityEnclosingRequest == false) {
+ throw new IllegalArgumentException("Http request does not support body");
+ }
+
+ ((HttpEntityEnclosingRequest) request).setEntity(body);
+ }
+
+ try {
+ try (CloseableHttpResponse response = client.execute(request)) {
+ return responseProcessor.apply(t, response);
+ }
+
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/http/src/test/java/edgent/test/connectors/http/HttpGlobalTest.java
----------------------------------------------------------------------
diff --git a/connectors/http/src/test/java/edgent/test/connectors/http/HttpGlobalTest.java b/connectors/http/src/test/java/edgent/test/connectors/http/HttpGlobalTest.java
deleted file mode 100644
index 29d9734..0000000
--- a/connectors/http/src/test/java/edgent/test/connectors/http/HttpGlobalTest.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.connectors.http;
-
-/**
- * This globalization test goes against http://httpbin.org
- * a freely available web-server for testing requests.
- *
- */
-public class HttpGlobalTest extends HttpTest {
-
- private static final String globalProp1 = "\u5b57\u6bcd";
- private static final String globalProp2 = "\u56db\u5341\u4e8c";
-
- public String getProp1() {
- return globalProp1;
- }
-
- public String getProp2() {
- return globalProp2;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/http/src/test/java/edgent/test/connectors/http/HttpTest.java
----------------------------------------------------------------------
diff --git a/connectors/http/src/test/java/edgent/test/connectors/http/HttpTest.java b/connectors/http/src/test/java/edgent/test/connectors/http/HttpTest.java
deleted file mode 100644
index 3ca9fc8..0000000
--- a/connectors/http/src/test/java/edgent/test/connectors/http/HttpTest.java
+++ /dev/null
@@ -1,380 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.connectors.http;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.http.client.methods.HttpDelete;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.client.methods.HttpPut;
-import org.apache.http.entity.ByteArrayEntity;
-import org.junit.Test;
-
-import com.google.gson.JsonObject;
-
-import edgent.connectors.http.HttpClients;
-import edgent.connectors.http.HttpResponders;
-import edgent.connectors.http.HttpStreams;
-import edgent.providers.direct.DirectProvider;
-import edgent.topology.TStream;
-import edgent.topology.Topology;
-import edgent.topology.tester.Condition;
-import edgent.topology.tester.Tester;
-
-/**
- * These tests go against http://httpbin.org
- * a freely available web-server for testing requests.
- *
- */
-public class HttpTest {
-
- private static final String prop1 = "abc";
- private static final String prop2 = "42";
-
- public String getProp1() {
- return prop1;
- }
-
- public String getProp2() {
- return prop2;
- }
-
- @Test
- public void testGet() throws Exception {
-
- DirectProvider ep = new DirectProvider();
-
- Topology topology = ep.newTopology();
-
- String url = "http://httpbin.org/get";
-
- TStream<String> rc = HttpStreams.<String,String>requests(
- topology.strings(url),
- HttpClients::noAuthentication,
- t-> HttpGet.METHOD_NAME,
- t->t,
- HttpResponders.inputOn200());
-
- Tester tester = topology.getTester();
-
- Condition<List<String>> endCondition = tester.streamContents(rc, url);
-
- tester.complete(ep, new JsonObject(), endCondition, 10, TimeUnit.SECONDS);
-
- assertTrue(endCondition.valid());
- }
-
- @Test
- public void testPost() throws Exception {
- DirectProvider ep = new DirectProvider();
-
- Topology topology = ep.newTopology();
-
- String url = "http://httpbin.org/post";
-
- TStream<String> stream = topology.strings(url);
- TStream<String> rc = HttpStreams.<String, String>requestsWithBody(
- stream, HttpClients::noAuthentication,
- t -> HttpPost.METHOD_NAME,
- t-> t, t-> new ByteArrayEntity(t.getBytes()),
- HttpResponders.inputOn200());
-
- Tester tester = topology.getTester();
-
- Condition<List<String>> endCondition = tester.streamContents(rc, url);
-
- tester.complete(ep, new JsonObject(), endCondition, 10, TimeUnit.SECONDS);
-
- assertTrue(endCondition.valid());
- }
-
- @Test
- public void testPut() throws Exception {
- DirectProvider ep = new DirectProvider();
-
- Topology topology = ep.newTopology();
-
- String url = "http://httpbin.org/put";
-
- TStream<String> stream = topology.strings(url);
- TStream<String> rc = HttpStreams.<String, String>requestsWithBody(
- stream, HttpClients::noAuthentication,
- t -> HttpPut.METHOD_NAME,
- t-> t, t-> new ByteArrayEntity(t.getBytes()),
- HttpResponders.inputOn200());
-
- Tester tester = topology.getTester();
-
- Condition<List<String>> endCondition = tester.streamContents(rc, url);
-
- tester.complete(ep, new JsonObject(), endCondition, 10, TimeUnit.SECONDS);
-
- assertTrue(endCondition.valid());
- }
-
- @Test
- public void testDelete() throws Exception {
- DirectProvider ep = new DirectProvider();
-
- Topology topology = ep.newTopology();
-
- String url = "http://httpbin.org/delete";
-
- TStream<String> stream = topology.strings(url);
- TStream<String> rc = HttpStreams.<String, String>requests(
- stream, HttpClients::noAuthentication,
- t -> HttpDelete.METHOD_NAME,
- t-> t, HttpResponders.inputOn200());
-
- Tester tester = topology.getTester();
-
- Condition<List<String>> endCondition = tester.streamContents(rc, url);
-
- tester.complete(ep, new JsonObject(), endCondition, 10, TimeUnit.SECONDS);
-
- assertTrue(endCondition.valid());
- }
-
- @Test
- public void testStatusCode() throws Exception {
-
- DirectProvider ep = new DirectProvider();
-
- Topology topology = ep.newTopology();
-
- String url = "http://httpbin.org/status/";
-
- TStream<Integer> rc = HttpStreams.<Integer,Integer>requests(
- topology.collection(Arrays.asList(200, 404, 202)),
- HttpClients::noAuthentication,
- t-> HttpGet.METHOD_NAME,
- t-> url + Integer.toString(t),
- (t,resp) -> resp.getStatusLine().getStatusCode());
-
- Tester tester = topology.getTester();
-
- Condition<List<Integer>> endCondition = tester.streamContents(rc, 200, 404, 202);
-
- tester.complete(ep, new JsonObject(), endCondition, 10, TimeUnit.SECONDS);
-
- assertTrue(endCondition.valid());
- }
-
- /**
- * Test basic authentication, first with valid user/password
- * and then with invalid (results in 401).
- * @throws Exception
- */
- @Test
- public void testBasicAuthentication() throws Exception {
-
- DirectProvider ep = new DirectProvider();
-
- Topology topology = ep.newTopology();
-
- String url = "http://httpbin.org/basic-auth/";
-
- TStream<Integer> rc = HttpStreams.<String,Integer>requests(
- topology.strings("A", "B"),
- () -> HttpClients.basic("usA", "pwdA4"),
- t-> HttpGet.METHOD_NAME,
- t-> url + "us" + t + "/pwd" + t + "4",
- (t,resp) -> resp.getStatusLine().getStatusCode());
-
- Tester tester = topology.getTester();
-
- Condition<List<Integer>> endCondition = tester.streamContents(rc, 200, 401);
-
- tester.complete(ep, new JsonObject(), endCondition, 10, TimeUnit.SECONDS);
-
- assertTrue(endCondition.getResult().toString(), endCondition.valid());
- }
-
- @Test
- public void testJsonGet() throws Exception {
- JsonObject request = new JsonObject();
- request.addProperty("a", getProp1());
- request.addProperty("b", getProp2());
-
- testJsonGet(request);
- }
-
- public void testJsonGet(JsonObject request) throws Exception {
-
- DirectProvider ep = new DirectProvider();
-
- Topology topology = ep.newTopology();
-
- final String url = "http://httpbin.org/get?";
-
- TStream<JsonObject> rc = HttpStreams.getJson(
- topology.collection(Arrays.asList(request)),
- HttpClients::noAuthentication,
- t-> url + "a=" + t.get("a").getAsString() + "&b=" + t.get("b").getAsString()
- );
-
- TStream<Boolean> resStream = rc.map(j -> {
- assertTrue(j.has("request"));
- assertTrue(j.has("response"));
- JsonObject req = j.getAsJsonObject("request");
- JsonObject res = j.getAsJsonObject("response");
-
- assertTrue(res.has("status"));
- assertTrue(res.has("entity"));
-
- assertEquals(req, res.getAsJsonObject("entity").getAsJsonObject("args"));
- return true;
- }
- );
-
- rc.print();
-
- Tester tester = topology.getTester();
-
- Condition<List<Boolean>> endCondition = tester.streamContents(resStream, true);
-
- tester.complete(ep, new JsonObject(), endCondition, 10, TimeUnit.SECONDS);
-
- assertTrue(endCondition.getResult().toString(), endCondition.valid());
- }
-
- @Test
- public void testJsonDelete() throws Exception {
-
- DirectProvider ep = new DirectProvider();
-
- Topology topology = ep.newTopology();
-
- final String url = "http://httpbin.org/delete?";
-
- JsonObject request = new JsonObject();
- request.addProperty("a", getProp1());
- request.addProperty("b", getProp2());
-
- TStream<JsonObject> stream = topology.collection(Arrays.asList(request));
- TStream<JsonObject> rc = HttpStreams.deleteJson(
- stream, HttpClients::noAuthentication,
- t-> url + "a=" + t.get("a").getAsString() + "&b=" + t.get("b").getAsString()
- );
-
- TStream<Boolean> resStream = rc.map(j -> {
- assertTrue(j.has("request"));
- assertTrue(j.has("response"));
- JsonObject req = j.getAsJsonObject("request");
- JsonObject res = j.getAsJsonObject("response");
-
- assertTrue(res.has("status"));
- assertTrue(res.has("entity"));
-
- assertEquals(req, res.getAsJsonObject("entity").getAsJsonObject("args"));
- return true;
- }
- );
-
- rc.print();
-
- Tester tester = topology.getTester();
-
- Condition<List<Boolean>> endCondition = tester.streamContents(resStream, true);
-
- tester.complete(ep, new JsonObject(), endCondition, 10, TimeUnit.SECONDS);
-
- assertTrue(endCondition.getResult().toString(), endCondition.valid());
- }
-
- @Test
- public void testJsonPost() throws Exception {
-
- DirectProvider ep = new DirectProvider();
-
- Topology topology = ep.newTopology();
-
- final String url = "http://httpbin.org/post";
-
- JsonObject body = new JsonObject();
- body.addProperty("foo", getProp1());
- body.addProperty("bar", getProp2());
-
- TStream<JsonObject> stream = topology.collection(Arrays.asList(body));
- TStream<JsonObject> rc = HttpStreams.postJson(
- stream, HttpClients::noAuthentication, t -> url,
- t -> t);
- TStream<Boolean> resStream = rc.map(j -> {
- assertTrue(j.has("request"));
- assertTrue(j.has("response"));
- JsonObject req = j.getAsJsonObject("request");
- JsonObject res = j.getAsJsonObject("response");
-
- assertTrue(res.has("status"));
- assertTrue(res.has("entity"));
-
- assertEquals(req, res.getAsJsonObject("entity").getAsJsonObject("json"));
- return true;
- });
-
- rc.print();
- Tester tester = topology.getTester();
- Condition<List<Boolean>> endCondition = tester.streamContents(resStream, true);
- tester.complete(ep, new JsonObject(), endCondition, 10, TimeUnit.SECONDS);
- assertTrue(endCondition.getResult().toString(), endCondition.valid());
- }
-
- @Test
- public void testJsonPut() throws Exception {
-
- DirectProvider ep = new DirectProvider();
-
- Topology topology = ep.newTopology();
-
- final String url = "http://httpbin.org/put";
-
- JsonObject body = new JsonObject();
- body.addProperty("foo", getProp1());
- body.addProperty("bar", getProp2());
-
- TStream<JsonObject> stream = topology.collection(Arrays.asList(body));
- TStream<JsonObject> rc = HttpStreams.putJson(
- stream, HttpClients::noAuthentication, t -> url,
- t -> t);
- TStream<Boolean> resStream = rc.map(j -> {
- assertTrue(j.has("request"));
- assertTrue(j.has("response"));
- JsonObject req = j.getAsJsonObject("request");
- JsonObject res = j.getAsJsonObject("response");
-
- assertTrue(res.has("status"));
- assertTrue(res.has("entity"));
-
- assertEquals(req, res.getAsJsonObject("entity").getAsJsonObject("json"));
- return true;
- });
-
- rc.print();
- Tester tester = topology.getTester();
- Condition<List<Boolean>> endCondition = tester.streamContents(resStream, true);
- tester.complete(ep, new JsonObject(), endCondition, 10, TimeUnit.SECONDS);
- assertTrue(endCondition.getResult().toString(), endCondition.valid());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/http/src/test/java/org/apache/edgent/test/connectors/http/HttpGlobalTest.java
----------------------------------------------------------------------
diff --git a/connectors/http/src/test/java/org/apache/edgent/test/connectors/http/HttpGlobalTest.java b/connectors/http/src/test/java/org/apache/edgent/test/connectors/http/HttpGlobalTest.java
new file mode 100644
index 0000000..6561240
--- /dev/null
+++ b/connectors/http/src/test/java/org/apache/edgent/test/connectors/http/HttpGlobalTest.java
@@ -0,0 +1,39 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.connectors.http;
+
+/**
+ * This globalization test goes against http://httpbin.org
+ * a freely available web-server for testing requests.
+ *
+ */
+public class HttpGlobalTest extends HttpTest {
+
+ private static final String globalProp1 = "\u5b57\u6bcd";
+ private static final String globalProp2 = "\u56db\u5341\u4e8c";
+
+ public String getProp1() {
+ return globalProp1;
+ }
+
+ public String getProp2() {
+ return globalProp2;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/http/src/test/java/org/apache/edgent/test/connectors/http/HttpTest.java
----------------------------------------------------------------------
diff --git a/connectors/http/src/test/java/org/apache/edgent/test/connectors/http/HttpTest.java b/connectors/http/src/test/java/org/apache/edgent/test/connectors/http/HttpTest.java
new file mode 100644
index 0000000..0511150
--- /dev/null
+++ b/connectors/http/src/test/java/org/apache/edgent/test/connectors/http/HttpTest.java
@@ -0,0 +1,379 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.connectors.http;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.edgent.connectors.http.HttpClients;
+import org.apache.edgent.connectors.http.HttpResponders;
+import org.apache.edgent.connectors.http.HttpStreams;
+import org.apache.edgent.providers.direct.DirectProvider;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+import org.apache.edgent.topology.tester.Condition;
+import org.apache.edgent.topology.tester.Tester;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.ByteArrayEntity;
+import org.junit.Test;
+
+import com.google.gson.JsonObject;
+
+/**
+ * These tests go against http://httpbin.org
+ * a freely available web-server for testing requests.
+ *
+ */
+public class HttpTest {
+
+ private static final String prop1 = "abc";
+ private static final String prop2 = "42";
+
+ public String getProp1() {
+ return prop1;
+ }
+
+ public String getProp2() {
+ return prop2;
+ }
+
+ @Test
+ public void testGet() throws Exception {
+
+ DirectProvider ep = new DirectProvider();
+
+ Topology topology = ep.newTopology();
+
+ String url = "http://httpbin.org/get";
+
+ TStream<String> rc = HttpStreams.<String,String>requests(
+ topology.strings(url),
+ HttpClients::noAuthentication,
+ t-> HttpGet.METHOD_NAME,
+ t->t,
+ HttpResponders.inputOn200());
+
+ Tester tester = topology.getTester();
+
+ Condition<List<String>> endCondition = tester.streamContents(rc, url);
+
+ tester.complete(ep, new JsonObject(), endCondition, 10, TimeUnit.SECONDS);
+
+ assertTrue(endCondition.valid());
+ }
+
+ @Test
+ public void testPost() throws Exception {
+ DirectProvider ep = new DirectProvider();
+
+ Topology topology = ep.newTopology();
+
+ String url = "http://httpbin.org/post";
+
+ TStream<String> stream = topology.strings(url);
+ TStream<String> rc = HttpStreams.<String, String>requestsWithBody(
+ stream, HttpClients::noAuthentication,
+ t -> HttpPost.METHOD_NAME,
+ t-> t, t-> new ByteArrayEntity(t.getBytes()),
+ HttpResponders.inputOn200());
+
+ Tester tester = topology.getTester();
+
+ Condition<List<String>> endCondition = tester.streamContents(rc, url);
+
+ tester.complete(ep, new JsonObject(), endCondition, 10, TimeUnit.SECONDS);
+
+ assertTrue(endCondition.valid());
+ }
+
+ @Test
+ public void testPut() throws Exception {
+ DirectProvider ep = new DirectProvider();
+
+ Topology topology = ep.newTopology();
+
+ String url = "http://httpbin.org/put";
+
+ TStream<String> stream = topology.strings(url);
+ TStream<String> rc = HttpStreams.<String, String>requestsWithBody(
+ stream, HttpClients::noAuthentication,
+ t -> HttpPut.METHOD_NAME,
+ t-> t, t-> new ByteArrayEntity(t.getBytes()),
+ HttpResponders.inputOn200());
+
+ Tester tester = topology.getTester();
+
+ Condition<List<String>> endCondition = tester.streamContents(rc, url);
+
+ tester.complete(ep, new JsonObject(), endCondition, 10, TimeUnit.SECONDS);
+
+ assertTrue(endCondition.valid());
+ }
+
+ @Test
+ public void testDelete() throws Exception {
+ DirectProvider ep = new DirectProvider();
+
+ Topology topology = ep.newTopology();
+
+ String url = "http://httpbin.org/delete";
+
+ TStream<String> stream = topology.strings(url);
+ TStream<String> rc = HttpStreams.<String, String>requests(
+ stream, HttpClients::noAuthentication,
+ t -> HttpDelete.METHOD_NAME,
+ t-> t, HttpResponders.inputOn200());
+
+ Tester tester = topology.getTester();
+
+ Condition<List<String>> endCondition = tester.streamContents(rc, url);
+
+ tester.complete(ep, new JsonObject(), endCondition, 10, TimeUnit.SECONDS);
+
+ assertTrue(endCondition.valid());
+ }
+
+ @Test
+ public void testStatusCode() throws Exception {
+
+ DirectProvider ep = new DirectProvider();
+
+ Topology topology = ep.newTopology();
+
+ String url = "http://httpbin.org/status/";
+
+ TStream<Integer> rc = HttpStreams.<Integer,Integer>requests(
+ topology.collection(Arrays.asList(200, 404, 202)),
+ HttpClients::noAuthentication,
+ t-> HttpGet.METHOD_NAME,
+ t-> url + Integer.toString(t),
+ (t,resp) -> resp.getStatusLine().getStatusCode());
+
+ Tester tester = topology.getTester();
+
+ Condition<List<Integer>> endCondition = tester.streamContents(rc, 200, 404, 202);
+
+ tester.complete(ep, new JsonObject(), endCondition, 10, TimeUnit.SECONDS);
+
+ assertTrue(endCondition.valid());
+ }
+
+ /**
+ * Test basic authentication, first with valid user/password
+ * and then with invalid (results in 401).
+ * @throws Exception
+ */
+ @Test
+ public void testBasicAuthentication() throws Exception {
+
+ DirectProvider ep = new DirectProvider();
+
+ Topology topology = ep.newTopology();
+
+ String url = "http://httpbin.org/basic-auth/";
+
+ TStream<Integer> rc = HttpStreams.<String,Integer>requests(
+ topology.strings("A", "B"),
+ () -> HttpClients.basic("usA", "pwdA4"),
+ t-> HttpGet.METHOD_NAME,
+ t-> url + "us" + t + "/pwd" + t + "4",
+ (t,resp) -> resp.getStatusLine().getStatusCode());
+
+ Tester tester = topology.getTester();
+
+ Condition<List<Integer>> endCondition = tester.streamContents(rc, 200, 401);
+
+ tester.complete(ep, new JsonObject(), endCondition, 10, TimeUnit.SECONDS);
+
+ assertTrue(endCondition.getResult().toString(), endCondition.valid());
+ }
+
+ @Test
+ public void testJsonGet() throws Exception {
+ JsonObject request = new JsonObject();
+ request.addProperty("a", getProp1());
+ request.addProperty("b", getProp2());
+
+ testJsonGet(request);
+ }
+
+ public void testJsonGet(JsonObject request) throws Exception {
+
+ DirectProvider ep = new DirectProvider();
+
+ Topology topology = ep.newTopology();
+
+ final String url = "http://httpbin.org/get?";
+
+ TStream<JsonObject> rc = HttpStreams.getJson(
+ topology.collection(Arrays.asList(request)),
+ HttpClients::noAuthentication,
+ t-> url + "a=" + t.get("a").getAsString() + "&b=" + t.get("b").getAsString()
+ );
+
+ TStream<Boolean> resStream = rc.map(j -> {
+ assertTrue(j.has("request"));
+ assertTrue(j.has("response"));
+ JsonObject req = j.getAsJsonObject("request");
+ JsonObject res = j.getAsJsonObject("response");
+
+ assertTrue(res.has("status"));
+ assertTrue(res.has("entity"));
+
+ assertEquals(req, res.getAsJsonObject("entity").getAsJsonObject("args"));
+ return true;
+ }
+ );
+
+ rc.print();
+
+ Tester tester = topology.getTester();
+
+ Condition<List<Boolean>> endCondition = tester.streamContents(resStream, true);
+
+ tester.complete(ep, new JsonObject(), endCondition, 10, TimeUnit.SECONDS);
+
+ assertTrue(endCondition.getResult().toString(), endCondition.valid());
+ }
+
+ @Test
+ public void testJsonDelete() throws Exception {
+
+ DirectProvider ep = new DirectProvider();
+
+ Topology topology = ep.newTopology();
+
+ final String url = "http://httpbin.org/delete?";
+
+ JsonObject request = new JsonObject();
+ request.addProperty("a", getProp1());
+ request.addProperty("b", getProp2());
+
+ TStream<JsonObject> stream = topology.collection(Arrays.asList(request));
+ TStream<JsonObject> rc = HttpStreams.deleteJson(
+ stream, HttpClients::noAuthentication,
+ t-> url + "a=" + t.get("a").getAsString() + "&b=" + t.get("b").getAsString()
+ );
+
+ TStream<Boolean> resStream = rc.map(j -> {
+ assertTrue(j.has("request"));
+ assertTrue(j.has("response"));
+ JsonObject req = j.getAsJsonObject("request");
+ JsonObject res = j.getAsJsonObject("response");
+
+ assertTrue(res.has("status"));
+ assertTrue(res.has("entity"));
+
+ assertEquals(req, res.getAsJsonObject("entity").getAsJsonObject("args"));
+ return true;
+ }
+ );
+
+ rc.print();
+
+ Tester tester = topology.getTester();
+
+ Condition<List<Boolean>> endCondition = tester.streamContents(resStream, true);
+
+ tester.complete(ep, new JsonObject(), endCondition, 10, TimeUnit.SECONDS);
+
+ assertTrue(endCondition.getResult().toString(), endCondition.valid());
+ }
+
+ @Test
+ public void testJsonPost() throws Exception {
+
+ DirectProvider ep = new DirectProvider();
+
+ Topology topology = ep.newTopology();
+
+ final String url = "http://httpbin.org/post";
+
+ JsonObject body = new JsonObject();
+ body.addProperty("foo", getProp1());
+ body.addProperty("bar", getProp2());
+
+ TStream<JsonObject> stream = topology.collection(Arrays.asList(body));
+ TStream<JsonObject> rc = HttpStreams.postJson(
+ stream, HttpClients::noAuthentication, t -> url,
+ t -> t);
+ TStream<Boolean> resStream = rc.map(j -> {
+ assertTrue(j.has("request"));
+ assertTrue(j.has("response"));
+ JsonObject req = j.getAsJsonObject("request");
+ JsonObject res = j.getAsJsonObject("response");
+
+ assertTrue(res.has("status"));
+ assertTrue(res.has("entity"));
+
+ assertEquals(req, res.getAsJsonObject("entity").getAsJsonObject("json"));
+ return true;
+ });
+
+ rc.print();
+ Tester tester = topology.getTester();
+ Condition<List<Boolean>> endCondition = tester.streamContents(resStream, true);
+ tester.complete(ep, new JsonObject(), endCondition, 10, TimeUnit.SECONDS);
+ assertTrue(endCondition.getResult().toString(), endCondition.valid());
+ }
+
+ @Test
+ public void testJsonPut() throws Exception {
+
+ DirectProvider ep = new DirectProvider();
+
+ Topology topology = ep.newTopology();
+
+ final String url = "http://httpbin.org/put";
+
+ JsonObject body = new JsonObject();
+ body.addProperty("foo", getProp1());
+ body.addProperty("bar", getProp2());
+
+ TStream<JsonObject> stream = topology.collection(Arrays.asList(body));
+ TStream<JsonObject> rc = HttpStreams.putJson(
+ stream, HttpClients::noAuthentication, t -> url,
+ t -> t);
+ TStream<Boolean> resStream = rc.map(j -> {
+ assertTrue(j.has("request"));
+ assertTrue(j.has("response"));
+ JsonObject req = j.getAsJsonObject("request");
+ JsonObject res = j.getAsJsonObject("response");
+
+ assertTrue(res.has("status"));
+ assertTrue(res.has("entity"));
+
+ assertEquals(req, res.getAsJsonObject("entity").getAsJsonObject("json"));
+ return true;
+ });
+
+ rc.print();
+ Tester tester = topology.getTester();
+ Condition<List<Boolean>> endCondition = tester.streamContents(resStream, true);
+ tester.complete(ep, new JsonObject(), endCondition, 10, TimeUnit.SECONDS);
+ assertTrue(endCondition.getResult().toString(), endCondition.valid());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/iot/src/main/java/edgent/connectors/iot/Commands.java
----------------------------------------------------------------------
diff --git a/connectors/iot/src/main/java/edgent/connectors/iot/Commands.java b/connectors/iot/src/main/java/edgent/connectors/iot/Commands.java
deleted file mode 100644
index 2005156..0000000
--- a/connectors/iot/src/main/java/edgent/connectors/iot/Commands.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.connectors.iot;
-
-/**
- * Devic command identifiers used by Edgent.
- *
- * @see IotDevice#RESERVED_ID_PREFIX
- */
-public interface Commands {
-
- /**
- * Command identifier used for the control service.
- * <BR>
- * The command payload is used to invoke operations
- * against control MBeans using an instance of
- * {@link edgent.runtime.jsoncontrol.JsonControlService}.
- * <BR>
- * Value is {@value}.
- *
- * @see edgent.execution.services.ControlService
- * @see edgent.providers.iot.IotProvider
- */
- String CONTROL_SERVICE = IotDevice.RESERVED_ID_PREFIX + "Control";
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/iot/src/main/java/edgent/connectors/iot/Events.java
----------------------------------------------------------------------
diff --git a/connectors/iot/src/main/java/edgent/connectors/iot/Events.java b/connectors/iot/src/main/java/edgent/connectors/iot/Events.java
deleted file mode 100644
index 2e401ab..0000000
--- a/connectors/iot/src/main/java/edgent/connectors/iot/Events.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.connectors.iot;
-
-/**
- * Device event identifiers used by Edgent.
- *
- * @see IotDevice#RESERVED_ID_PREFIX
- */
-public interface Events {
-
- /**
- * An IotProvider has started.
- * Event data is an empty JSON object
- */
- String IOT_START = IotDevice.RESERVED_ID_PREFIX + "IotStart";
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/iot/src/main/java/edgent/connectors/iot/HeartBeat.java
----------------------------------------------------------------------
diff --git a/connectors/iot/src/main/java/edgent/connectors/iot/HeartBeat.java b/connectors/iot/src/main/java/edgent/connectors/iot/HeartBeat.java
deleted file mode 100644
index 2f45060..0000000
--- a/connectors/iot/src/main/java/edgent/connectors/iot/HeartBeat.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.connectors.iot;
-
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.TimeZone;
-import java.util.concurrent.TimeUnit;
-
-import com.google.gson.JsonObject;
-
-import edgent.function.Functions;
-import edgent.topology.TStream;
-import edgent.topology.plumbing.PlumbingStreams;
-
-public class HeartBeat {
- private HeartBeat() { };
-
- /**
- * Add IoT device heart beat processing to a topology.
- * <P>
- * An IoTDevice event containing heart beat information
- * is periodically published to the specified {@code eventId}.
- * </P>
- * <P>
- * The heart beat provides clients of the IoT hub with liveness information
- * about the device and its connection to the hub.
- * </P>
- * <P>
- * The heart beat also ensures there is some immediate output so
- * the connection to the IoT hub happens as soon as possible.
- * In the case where there may not otherwise be
- * IoT events to publish, a heart beat ensures a connection
- * to the IoT hub is maintained.
- * </P>
- * <P>
- * The heart beat's event payload is the JSON for a JsonObject with the
- * heart beat's properties:
- * <ul>
- * <li>"when" : (string) ISO8601 UTC date/time. e.g. "2016-07-12T17:57:08Z"</li>
- * <li>"time" : (number) {@link System#currentTimeMillis()}</li>
- * </ul>
- *
- * @param iotDevice IoT hub device
- * @param period the heart beat period
- * @param unit TimeUnit for the period
- * @param eventId the IotDevice eventId to use for the event
- * @return the {@code TStream<JsonObject>} heartbeat stream
- */
- public static TStream<JsonObject> addHeartBeat(IotDevice iotDevice, long period, TimeUnit unit, String eventId) {
- DateFormat df = newIso8601Formatter();
- TStream<Date> hb = iotDevice.topology().poll(
- () -> new Date(),
- period, unit).tag("heartbeat");
- // Convert to JSON
- TStream<JsonObject> hbj = hb.map(date -> {
- JsonObject j = new JsonObject();
- j.addProperty("when", df.format(date));
- j.addProperty("time", date.getTime());
- return j;
- }).tag("heartbeat");
-
- TStream<JsonObject> hbs = hbj;
-
- // Tolerate connection outages. Don't block upstream processing
- // and retain the most recent heartbeat if unable to publish.
- hbj = PlumbingStreams.pressureReliever(hbj,
- Functions.unpartitioned(), 1).tag("pressureRelieved");
-
- iotDevice.events(hbj, eventId, QoS.FIRE_AND_FORGET);
-
- return hbs;
- }
-
- private static DateFormat newIso8601Formatter() {
- // Quoted "Z" to indicate UTC, no timezone offset
- DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
- df.setTimeZone(TimeZone.getTimeZone("UTC"));
- return df;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/iot/src/main/java/edgent/connectors/iot/IotDevice.java
----------------------------------------------------------------------
diff --git a/connectors/iot/src/main/java/edgent/connectors/iot/IotDevice.java b/connectors/iot/src/main/java/edgent/connectors/iot/IotDevice.java
deleted file mode 100644
index 5798166..0000000
--- a/connectors/iot/src/main/java/edgent/connectors/iot/IotDevice.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package edgent.connectors.iot;
-
-import com.google.gson.JsonObject;
-
-import edgent.function.Function;
-import edgent.function.UnaryOperator;
-import edgent.topology.TSink;
-import edgent.topology.TStream;
-import edgent.topology.TopologyElement;
-
-/**
- * Generic Internet of Things device connector.
- */
-public interface IotDevice extends TopologyElement {
-
- /**
- * Device event and command identifiers starting with {@value} are reserved for use by Edgent.
- */
- String RESERVED_ID_PREFIX = "edgent";
-
- /**
- * Publish a stream's tuples as device events.
- * <p>
- * Each tuple is published as a device event with the supplied functions
- * providing the event identifier, payload and QoS. The event identifier and
- * QoS can be generated based upon the tuple.
- *
- * @param stream
- * Stream to be published.
- * @param eventId
- * function to supply the event identifier.
- * @param payload
- * function to supply the event's payload.
- * @param qos
- * function to supply the event's delivery Quality of Service.
- * @return TSink sink element representing termination of this stream.
- */
- TSink<JsonObject> events(TStream<JsonObject> stream, Function<JsonObject, String> eventId,
- UnaryOperator<JsonObject> payload,
- Function<JsonObject, Integer> qos) ;
-
- /**
- * Publish a stream's tuples as device events.
- * <p>
- * Each tuple is published as a device event with fixed event identifier and
- * QoS.
- *
- * @param stream
- * Stream to be published.
- * @param eventId
- * Event identifier.
- * @param qos
- * Event's delivery Quality of Service.
- * @return TSink sink element representing termination of this stream.
- */
- TSink<JsonObject> events(TStream<JsonObject> stream, String eventId, int qos) ;
-
- /**
- * Command identifier key.
- * Key is {@value}.
- *
- * @see #commands(String...)
- */
- String CMD_ID = "command";
-
- /**
- * Command timestamp (in milliseconds) key.
- * Key is {@value}.
- *
- * @see #commands(String...)
- */
- String CMD_TS = "tsms";
- /**
- * Command format key.
- * Key is {@value}.
- *
- * @see #commands(String...)
- */
- String CMD_FORMAT = "format";
- /**
- * Command payload key.
- * If the command format is {@code json} then
- * the key's value will be a {@code JsonObject},
- * otherwise a {@code String}.
- * Key is {@value}.
- *
- * @see #commands(String...)
- */
- String CMD_PAYLOAD = "payload";
-
- /**
- * Create a stream of device commands as JSON objects.
- * Each command sent to the device matching {@code commands} will result in a tuple
- * on the stream. The JSON object has these keys:
- * <UL>
- * <LI>{@link #CMD_ID command} - Command identifier as a String</LI>
- * <LI>{@link #CMD_TS tsms} - Timestamp of the command in milliseconds since the 1970/1/1 epoch.</LI>
- * <LI>{@link #CMD_FORMAT format} - Format of the command as a String</LI>
- * <LI>{@link #CMD_PAYLOAD payload} - Payload of the command
- * <UL>
- * <LI>If {@code format} is {@code json} then {@code payload} is JSON</LI>
- * <LI>Otherwise {@code payload} is String</LI>
- * </UL>
- * </LI>
- * </UL>
- *
- *
- * @param commands Command identifiers to include. If no command identifiers are provided then the
- * stream will contain all device commands.
- * @return Stream containing device commands.
- */
- TStream<JsonObject> commands(String... commands);
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/iot/src/main/java/edgent/connectors/iot/QoS.java
----------------------------------------------------------------------
diff --git a/connectors/iot/src/main/java/edgent/connectors/iot/QoS.java b/connectors/iot/src/main/java/edgent/connectors/iot/QoS.java
deleted file mode 100644
index 3dd8ea2..0000000
--- a/connectors/iot/src/main/java/edgent/connectors/iot/QoS.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.connectors.iot;
-
-/**
- * Device event quality of service levels.
- * The QoS levels match the MQTT specification.
- * <BR>
- * An implementation of {@link IotDevice} may not
- * support all QoS levels.
- *
- * @see <a href="http://mqtt.org/">mqtt.org</a>
- * @see IotDevice#events(edgent.topology.TStream, String, int)
- * @see IotDevice#events(edgent.topology.TStream, edgent.function.Function, edgent.function.UnaryOperator, edgent.function.Function)
-
- */
-public interface QoS {
-
- /**
- * The message containing the event arrives at the message hub either once or not at all.
- * <BR>
- * Value is {@code 0}.
- */
- Integer AT_MOST_ONCE = 0;
-
- /**
- * Fire and forget the event. Synonym for {@link #AT_MOST_ONCE}.
- * <BR>
- * Value is {@code 0}.
- */
- Integer FIRE_AND_FORGET = 0;
-
- /**
- * The message containing the event arrives at the message hub at least once.
- * The message may be seen at the hub multiple times.
- * <BR>
- * Value is {@code 1}.
- */
- Integer AT_LEAST_ONCE = 1;
-
- /**
- * The message containing the event arrives at the message hub exactly once.
- * <BR>
- * Value is {@code 2}.
- */
- Integer EXACTLY_ONCE = 2;
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/iot/src/main/java/edgent/connectors/iot/package-info.java
----------------------------------------------------------------------
diff --git a/connectors/iot/src/main/java/edgent/connectors/iot/package-info.java b/connectors/iot/src/main/java/edgent/connectors/iot/package-info.java
deleted file mode 100644
index 4ea72c8..0000000
--- a/connectors/iot/src/main/java/edgent/connectors/iot/package-info.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-/**
- * Edgent device connector API to a message hub.
- * <P>
- * Generic device model that supports a device model consisting of:
- * <UL>
- * <LI>
- * <B>Device events</B> - A device {@link edgent.connectors.iot.IotDevice#events(edgent.topology.TStream, String, int) publishes} <em>events</em> as messages to a message hub to allow
- * analysis or processing by back-end systems, etc.. A device event consists of:
- * <UL>
- * <LI> <B>event identifier</B> - Application specified event type. E.g. {@code engineAlert}</LI>
- * <LI> <B>event payload</B> - Application specified event payload. E.g. the engine alert code and sensor reading.</LI>
- * <LI> <B>QoS</B> - {@link edgent.connectors.iot.QoS Quality of service} for message delivery. Using MQTT QoS definitions.</LI>
- * </UL>
- * Device events can be used to send any data including abnormal events
- * (e.g. a fault condition on an engine), periodic or aggregate sensor readings,
- * device user input etc.
- * <BR>
- * The format for the payload is JSON, support for other payload formats may be added
- * in the future.
- * </LI>
- * <LI>
- * <B>Device Commands</B> - A device {@link edgent.connectors.iot.IotDevice#commands(String...) subscribes} to <em>commands</em> from back-end systems
- * through the message hub. A device command consists of:
- * <UL>
- * <LI> <B>command identifier</B> - Application specified command type. E.g. {@code statusMessage}</LI>
- * <LI> <B>command payload</B> - Application specified command payload. E.g. the severity and
- * text of the message to display.</LI>
- * </UL>
- * Device commands can be used to perform any action on the device including displaying information,
- * controlling the device (e.g. reduce maximum engine revolutions), controlling the Edgent application, etc.
- * <BR>
- * The format for the payload is typically JSON, though other formats may be used.
- * </LI>
- * </UL>
- * <P>
- * Device event and command identifiers starting with "{@link edgent.connectors.iot.IotDevice#RESERVED_ID_PREFIX edgent}"
- * are reserved for use by Edgent.
- * </P>
- */
-package edgent.connectors.iot;
-
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/Commands.java
----------------------------------------------------------------------
diff --git a/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/Commands.java b/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/Commands.java
new file mode 100644
index 0000000..4f4c284
--- /dev/null
+++ b/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/Commands.java
@@ -0,0 +1,42 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.connectors.iot;
+
+/**
+ * Devic command identifiers used by Edgent.
+ *
+ * @see IotDevice#RESERVED_ID_PREFIX
+ */
+public interface Commands {
+
+ /**
+ * Command identifier used for the control service.
+ * <BR>
+ * The command payload is used to invoke operations
+ * against control MBeans using an instance of
+ * {@link org.apache.edgent.runtime.jsoncontrol.JsonControlService}.
+ * <BR>
+ * Value is {@value}.
+ *
+ * @see org.apache.edgent.execution.services.ControlService
+ * @see org.apache.edgent.providers.iot.IotProvider
+ */
+ String CONTROL_SERVICE = IotDevice.RESERVED_ID_PREFIX + "Control";
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/Events.java
----------------------------------------------------------------------
diff --git a/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/Events.java b/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/Events.java
new file mode 100644
index 0000000..92c69f1
--- /dev/null
+++ b/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/Events.java
@@ -0,0 +1,33 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.connectors.iot;
+
+/**
+ * Device event identifiers used by Edgent.
+ *
+ * @see IotDevice#RESERVED_ID_PREFIX
+ */
+public interface Events {
+
+ /**
+ * An IotProvider has started.
+ * Event data is an empty JSON object
+ */
+ String IOT_START = IotDevice.RESERVED_ID_PREFIX + "IotStart";
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/HeartBeat.java
----------------------------------------------------------------------
diff --git a/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/HeartBeat.java b/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/HeartBeat.java
new file mode 100644
index 0000000..0224b0f
--- /dev/null
+++ b/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/HeartBeat.java
@@ -0,0 +1,99 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.connectors.iot;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.TimeZone;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.edgent.function.Functions;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.plumbing.PlumbingStreams;
+
+import com.google.gson.JsonObject;
+
+public class HeartBeat {
+ private HeartBeat() { };
+
+ /**
+ * Add IoT device heart beat processing to a topology.
+ * <P>
+ * An IoTDevice event containing heart beat information
+ * is periodically published to the specified {@code eventId}.
+ * </P>
+ * <P>
+ * The heart beat provides clients of the IoT hub with liveness information
+ * about the device and its connection to the hub.
+ * </P>
+ * <P>
+ * The heart beat also ensures there is some immediate output so
+ * the connection to the IoT hub happens as soon as possible.
+ * In the case where there may not otherwise be
+ * IoT events to publish, a heart beat ensures a connection
+ * to the IoT hub is maintained.
+ * </P>
+ * <P>
+ * The heart beat's event payload is the JSON for a JsonObject with the
+ * heart beat's properties:
+ * <ul>
+ * <li>"when" : (string) ISO8601 UTC date/time. e.g. "2016-07-12T17:57:08Z"</li>
+ * <li>"time" : (number) {@link System#currentTimeMillis()}</li>
+ * </ul>
+ *
+ * @param iotDevice IoT hub device
+ * @param period the heart beat period
+ * @param unit TimeUnit for the period
+ * @param eventId the IotDevice eventId to use for the event
+ * @return the {@code TStream<JsonObject>} heartbeat stream
+ */
+ public static TStream<JsonObject> addHeartBeat(IotDevice iotDevice, long period, TimeUnit unit, String eventId) {
+ DateFormat df = newIso8601Formatter();
+ TStream<Date> hb = iotDevice.topology().poll(
+ () -> new Date(),
+ period, unit).tag("heartbeat");
+ // Convert to JSON
+ TStream<JsonObject> hbj = hb.map(date -> {
+ JsonObject j = new JsonObject();
+ j.addProperty("when", df.format(date));
+ j.addProperty("time", date.getTime());
+ return j;
+ }).tag("heartbeat");
+
+ TStream<JsonObject> hbs = hbj;
+
+ // Tolerate connection outages. Don't block upstream processing
+ // and retain the most recent heartbeat if unable to publish.
+ hbj = PlumbingStreams.pressureReliever(hbj,
+ Functions.unpartitioned(), 1).tag("pressureRelieved");
+
+ iotDevice.events(hbj, eventId, QoS.FIRE_AND_FORGET);
+
+ return hbs;
+ }
+
+ private static DateFormat newIso8601Formatter() {
+ // Quoted "Z" to indicate UTC, no timezone offset
+ DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
+ df.setTimeZone(TimeZone.getTimeZone("UTC"));
+ return df;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/IotDevice.java
----------------------------------------------------------------------
diff --git a/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/IotDevice.java b/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/IotDevice.java
new file mode 100644
index 0000000..920561d
--- /dev/null
+++ b/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/IotDevice.java
@@ -0,0 +1,132 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.connectors.iot;
+
+import org.apache.edgent.function.Function;
+import org.apache.edgent.function.UnaryOperator;
+import org.apache.edgent.topology.TSink;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.TopologyElement;
+
+import com.google.gson.JsonObject;
+
+/**
+ * Generic Internet of Things device connector.
+ */
+public interface IotDevice extends TopologyElement {
+
+ /**
+ * Device event and command identifiers starting with {@value} are reserved for use by Edgent.
+ */
+ String RESERVED_ID_PREFIX = "edgent";
+
+ /**
+ * Publish a stream's tuples as device events.
+ * <p>
+ * Each tuple is published as a device event with the supplied functions
+ * providing the event identifier, payload and QoS. The event identifier and
+ * QoS can be generated based upon the tuple.
+ *
+ * @param stream
+ * Stream to be published.
+ * @param eventId
+ * function to supply the event identifier.
+ * @param payload
+ * function to supply the event's payload.
+ * @param qos
+ * function to supply the event's delivery Quality of Service.
+ * @return TSink sink element representing termination of this stream.
+ */
+ TSink<JsonObject> events(TStream<JsonObject> stream, Function<JsonObject, String> eventId,
+ UnaryOperator<JsonObject> payload,
+ Function<JsonObject, Integer> qos) ;
+
+ /**
+ * Publish a stream's tuples as device events.
+ * <p>
+ * Each tuple is published as a device event with fixed event identifier and
+ * QoS.
+ *
+ * @param stream
+ * Stream to be published.
+ * @param eventId
+ * Event identifier.
+ * @param qos
+ * Event's delivery Quality of Service.
+ * @return TSink sink element representing termination of this stream.
+ */
+ TSink<JsonObject> events(TStream<JsonObject> stream, String eventId, int qos) ;
+
+ /**
+ * Command identifier key.
+ * Key is {@value}.
+ *
+ * @see #commands(String...)
+ */
+ String CMD_ID = "command";
+
+ /**
+ * Command timestamp (in milliseconds) key.
+ * Key is {@value}.
+ *
+ * @see #commands(String...)
+ */
+ String CMD_TS = "tsms";
+ /**
+ * Command format key.
+ * Key is {@value}.
+ *
+ * @see #commands(String...)
+ */
+ String CMD_FORMAT = "format";
+ /**
+ * Command payload key.
+ * If the command format is {@code json} then
+ * the key's value will be a {@code JsonObject},
+ * otherwise a {@code String}.
+ * Key is {@value}.
+ *
+ * @see #commands(String...)
+ */
+ String CMD_PAYLOAD = "payload";
+
+ /**
+ * Create a stream of device commands as JSON objects.
+ * Each command sent to the device matching {@code commands} will result in a tuple
+ * on the stream. The JSON object has these keys:
+ * <UL>
+ * <LI>{@link #CMD_ID command} - Command identifier as a String</LI>
+ * <LI>{@link #CMD_TS tsms} - Timestamp of the command in milliseconds since the 1970/1/1 epoch.</LI>
+ * <LI>{@link #CMD_FORMAT format} - Format of the command as a String</LI>
+ * <LI>{@link #CMD_PAYLOAD payload} - Payload of the command
+ * <UL>
+ * <LI>If {@code format} is {@code json} then {@code payload} is JSON</LI>
+ * <LI>Otherwise {@code payload} is String</LI>
+ * </UL>
+ * </LI>
+ * </UL>
+ *
+ *
+ * @param commands Command identifiers to include. If no command identifiers are provided then the
+ * stream will contain all device commands.
+ * @return Stream containing device commands.
+ */
+ TStream<JsonObject> commands(String... commands);
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/QoS.java
----------------------------------------------------------------------
diff --git a/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/QoS.java b/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/QoS.java
new file mode 100644
index 0000000..ecfdece
--- /dev/null
+++ b/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/QoS.java
@@ -0,0 +1,63 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.connectors.iot;
+
+/**
+ * Device event quality of service levels.
+ * The QoS levels match the MQTT specification.
+ * <BR>
+ * An implementation of {@link IotDevice} may not
+ * support all QoS levels.
+ *
+ * @see <a href="http://mqtt.org/">mqtt.org</a>
+ * @see IotDevice#events(org.apache.edgent.topology.TStream, String, int)
+ * @see IotDevice#events(org.apache.edgent.topology.TStream, org.apache.edgent.function.Function, org.apache.edgent.function.UnaryOperator, org.apache.edgent.function.Function)
+
+ */
+public interface QoS {
+
+ /**
+ * The message containing the event arrives at the message hub either once or not at all.
+ * <BR>
+ * Value is {@code 0}.
+ */
+ Integer AT_MOST_ONCE = 0;
+
+ /**
+ * Fire and forget the event. Synonym for {@link #AT_MOST_ONCE}.
+ * <BR>
+ * Value is {@code 0}.
+ */
+ Integer FIRE_AND_FORGET = 0;
+
+ /**
+ * The message containing the event arrives at the message hub at least once.
+ * The message may be seen at the hub multiple times.
+ * <BR>
+ * Value is {@code 1}.
+ */
+ Integer AT_LEAST_ONCE = 1;
+
+ /**
+ * The message containing the event arrives at the message hub exactly once.
+ * <BR>
+ * Value is {@code 2}.
+ */
+ Integer EXACTLY_ONCE = 2;
+}