You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/07/21 13:17:28 UTC

[27/54] [abbrv] [partial] incubator-quarks git commit: add "org.apache." prefix to edgent package names

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/http/src/main/java/org/apache/edgent/connectors/http/HttpStreams.java
----------------------------------------------------------------------
diff --git a/connectors/http/src/main/java/org/apache/edgent/connectors/http/HttpStreams.java b/connectors/http/src/main/java/org/apache/edgent/connectors/http/HttpStreams.java
new file mode 100644
index 0000000..58cb766
--- /dev/null
+++ b/connectors/http/src/main/java/org/apache/edgent/connectors/http/HttpStreams.java
@@ -0,0 +1,309 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.connectors.http;
+
+import org.apache.edgent.connectors.http.runtime.HttpRequester;
+import org.apache.edgent.function.BiFunction;
+import org.apache.edgent.function.Function;
+import org.apache.edgent.function.Supplier;
+import org.apache.edgent.function.UnaryOperator;
+import org.apache.edgent.topology.TStream;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+
+import com.google.gson.JsonObject;
+
+
+/**
+ * HTTP streams.
+ *
+ */
+public class HttpStreams {
+    
+    /**
+     * Make an HTTP GET request with JsonObject. <br>
+     * 
+     * Method specifically works with JsonObjects. For each JsonObject in the stream, 
+     * HTTP GET request is executed on provided uri. As a result, Response is added to
+     * the response TStream.
+     * <br>
+     * 
+     * Sample usage:<br>
+     * 
+     * <pre>
+     * {@code
+     *     DirectProvider ep = new DirectProvider();
+     *     Topology topology = ep.newTopology();
+     *     final String url = "http://httpbin.org/get?";
+     * 
+     *     JsonObject request1 = new JsonObject();
+     *     request1.addProperty("a", "abc");
+     *     request1.addProperty("b", "42");
+     * 
+     *     TStream<JsonObject> stream = topology.collection(Arrays.asList(request1));
+     *     TStream<JsonObject> rc = HttpStreams.getJson(stream,
+     *             HttpClients::noAuthentication,
+     *             t -> url + "a=" + t.get("a").getAsString() + "&b="
+     *                     + t.get("b").getAsString());
+     * }
+     * </pre>
+     * 
+     * <br>
+     * See <i>HttpTest</i> for example. <br>
+     * 
+     * @param stream - JsonObject TStream.
+     * @param clientCreator - CloseableHttpClient supplier preferably created using {@link HttpClients}
+     * @param uri - URI function which returns URI string
+     * @return TStream of JsonObject which contains responses of GET requests
+     * 
+     * @see HttpStreams#requests(TStream, Supplier, Function, Function, BiFunction)
+     */
+    public static TStream<JsonObject> getJson(TStream<JsonObject> stream,
+            Supplier<CloseableHttpClient> clientCreator,
+            Function<JsonObject,String> uri) {
+        
+        return HttpStreams.<JsonObject,JsonObject>requests(stream, clientCreator,
+            t -> HttpGet.METHOD_NAME, uri, HttpResponders.json());
+    }
+    
+    /**
+     * Make an HTTP DELETE request with JsonObject. <br>
+     * 
+     * Method specifically works with JsonObjects. For each JsonObject in the
+     * stream, HTTP DELETE request is executed on provided uri. As a result,
+     * Response is added to the response TStream. <br>
+     * 
+     * Sample usage:<br>
+     * 
+     * <pre>
+     * {@code
+     *     DirectProvider ep = new DirectProvider();
+     *     Topology topology = ep.newTopology();
+     *     final String url = "http://httpbin.org/delete?";
+     * 
+     *     JsonObject request = new JsonObject();
+     *     request.addProperty("a", "abc");
+     *     request.addProperty("b", "42");
+     * 
+     *     TStream<JsonObject> stream = topology.collection(Arrays.asList(request));
+     *     TStream<JsonObject> rc = HttpStreams.deleteJson(stream,
+     *             HttpClients::noAuthentication,
+     *             t -> url + "a=" + t.get("a").getAsString() + "&b="
+     *                     + t.get("b").getAsString());
+     * }
+     * </pre>
+     * 
+     * <br>
+     * See <i>HttpTest</i> for example. <br>
+     * 
+     * @param stream - JsonObject TStream.
+     * @param clientCreator - CloseableHttpClient supplier preferably created using {@link HttpClients}
+     * @param uri - URI function which returns URI string
+     * @return TStream of JsonObject which contains responses of DELETE requests
+     * 
+     * @see HttpStreams#requests(TStream, Supplier, Function, Function, BiFunction)
+     */
+    public static TStream<JsonObject> deleteJson(TStream<JsonObject> stream,
+            Supplier<CloseableHttpClient> clientCreator,
+            Function<JsonObject,String> uri) {
+        
+        return HttpStreams.<JsonObject,JsonObject>requests(stream, clientCreator,
+            t -> HttpDelete.METHOD_NAME, uri, HttpResponders.json());
+    }
+    
+    /**
+     * Make an HTTP POST request with JsonObject. <br>
+     * 
+     * Method specifically works with JsonObjects. For each JsonObject in the stream, 
+     * HTTP POST request is executed on provided uri. Request body is filled using
+     * HttpEntity provided by body function. As a result, Response is added to
+     * the response TStream.<br>
+     * 
+     * Sample usage:<br>
+     * 
+     * <pre>
+     * {@code
+     *     DirectProvider ep = new DirectProvider();
+     *     Topology topology = ep.newTopology();
+     *     final String url = "http://httpbin.org/post";
+     * 
+     *     JsonObject body = new JsonObject();
+     *     body.addProperty("foo", "abc");
+     *     body.addProperty("bar", 42);
+     * 
+     *     TStream<JsonObject> stream = topology.collection(Arrays.asList(body));
+     *     TStream<JsonObject> rc = HttpStreams.postJson(stream,
+     *             HttpClients::noAuthentication, t -> url, t -> t);
+     * }
+     * </pre>
+     * 
+     * <br>
+     * See HttpTest for example. <br>
+     * 
+     * @param stream - JsonObject TStream.
+     * @param clientCreator - CloseableHttpClient supplier preferably created using {@link HttpClients}
+     * @param uri - URI function which returns URI string
+     * @param body - Function that returns JsonObject which will be set as a body for the request.
+     * @return TStream of JsonObject which contains responses of POST requests
+     * 
+     * @see HttpStreams#requestsWithBody(TStream, Supplier, Function, Function, Function, BiFunction)
+     */
+    public static TStream<JsonObject> postJson(TStream<JsonObject> stream,
+            Supplier<CloseableHttpClient> clientCreator,
+            Function<JsonObject, String> uri,
+            UnaryOperator<JsonObject> body) {
+
+        return HttpStreams.<JsonObject, JsonObject> requestsWithBody(stream,
+                clientCreator, t -> HttpPost.METHOD_NAME, uri, 
+                t -> new ByteArrayEntity(body.apply(t).toString().getBytes()),
+                HttpResponders.json());
+    }
+    
+    /**
+     * Make an HTTP PUT request with JsonObject. <br>
+     * 
+     * Method specifically works with JsonObjects. For each JsonObject in the
+     * stream, HTTP PUT request is executed on provided uri. Request body is
+     * filled using HttpEntity provided by body function. As a result, Response
+     * is added to the response TStream.<br>
+     * 
+     * Sample usage:<br>
+     * 
+     * <pre>
+     * {@code
+     *     DirectProvider ep = new DirectProvider();
+     *     Topology topology = ep.newTopology();
+     *     final String url = "http://httpbin.org/put";
+     * 
+     *     JsonObject body = new JsonObject();
+     *     body.addProperty("foo", "abc");
+     *     body.addProperty("bar", 42);
+     * 
+     *     TStream<JsonObject> stream = topology.collection(Arrays.asList(body));
+     *     TStream<JsonObject> rc = HttpStreams.putJson(stream,
+     *             HttpClients::noAuthentication, t -> url, t -> t);
+     * }
+     * </pre>
+     * 
+     * <br>
+     * See HttpTest for example. <br>
+     * 
+     * @param stream - JsonObject TStream.
+     * @param clientCreator - CloseableHttpClient supplier preferably created using {@link HttpClients}
+     * @param uri - URI function which returns URI string
+     * @param body - Function that returns JsonObject which will be set as a body for the request.
+     * @return TStream of JsonObject which contains responses of PUT requests
+     * 
+     * @see HttpStreams#requestsWithBody(TStream, Supplier, Function, Function, Function, BiFunction)
+     */
+    public static TStream<JsonObject> putJson(TStream<JsonObject> stream,
+            Supplier<CloseableHttpClient> clientCreator,
+            Function<JsonObject, String> uri,
+            UnaryOperator<JsonObject> body) {
+        return HttpStreams.<JsonObject, JsonObject> requestsWithBody(stream,
+                clientCreator, t -> HttpPut.METHOD_NAME, uri, 
+                t -> new ByteArrayEntity(body.apply(t).toString().getBytes()),
+                HttpResponders.json());
+    }
+    
+    /**
+     * Make an HTTP request for each tuple on a stream.
+     * <UL>
+     * <LI>{@code clientCreator} is invoked once to create a new HTTP client
+     * to make the requests.
+     * </LI>
+     *  <LI>
+     * {@code method} is invoked for each tuple to define the method
+     * to be used for the HTTP request driven by the tuple. A fixed method
+     * can be declared using a function such as:
+     * <UL style="list-style-type:none"><LI>{@code t -> HttpGet.METHOD_NAME}</LI></UL>
+     *  </LI>
+     *  <LI>
+     * {@code uri} is invoked for each tuple to define the URI
+     * to be used for the HTTP request driven by the tuple. A fixed method
+     * can be declared using a function such as:
+     * <UL style="list-style-type:none"><LI>{@code t -> "http://www.example.com"}</LI></UL>
+     *  </LI>
+     *  <LI>
+     *  {@code response} is invoked after each request that did not throw an exception.
+     *  It is passed the input tuple and the HTTP response. The function must completely
+     *  consume the entity stream for the response. The return value is present on
+     *  the stream returned by this method if it is non-null. A null return results
+     *  in no tuple on the returned stream.
+     *  
+     *  </LI>
+     *  </UL>
+     *  
+     * @param <T> Tuple type for input stream
+     * @param <R> Tuple type for output stream
+     * @param stream Stream to invoke HTTP requests.
+     * @param clientCreator Function to create a HTTP client.
+     * @param method Function to define the HTTP method.
+     * @param uri Function to define the URI.
+     * @param response Function to process the response.
+     * @return Stream containing HTTP responses processed by the {@code response} function.
+     * 
+     * @see HttpClients
+     * @see HttpResponders
+     */
+    public static <T,R> TStream<R> requests(TStream<T> stream,
+            Supplier<CloseableHttpClient> clientCreator,
+            Function<T,String> method,
+            Function<T,String> uri,
+            BiFunction<T,CloseableHttpResponse,R> response) {
+        
+        return stream.map(new HttpRequester<T,R>(clientCreator, method, uri, response));
+    }
+    
+    /**
+     * Make an HTTP request with body for each tuple.<br>
+     * 
+     * @param <T> Tuple type for input stream
+     * @param <R> Tuple type for output stream
+     * @param stream Stream to invoke HTTP requests.
+     * @param clientCreator Function to create a HTTP client.
+     * @param method Function to define the HTTP method.
+     * @param uri Function to define the URI.
+     * @param body Function to define the HTTP request body
+     * @param response Function to process the response.
+     * @return Stream containing HTTP responses processed by the {@code response} function.
+     * 
+     * @see HttpStreams#requests(TStream, Supplier, Function, Function, BiFunction)
+     * @see HttpClients
+     * @see HttpResponders
+     * 
+     */
+    public static <T, R> TStream<R> requestsWithBody(TStream<T> stream,
+            Supplier<CloseableHttpClient> clientCreator,
+            Function<T, String> method, 
+            Function<T, String> uri,
+            Function<T, HttpEntity> body,
+            BiFunction<T, CloseableHttpResponse, R> response) {
+
+        return stream.map(new HttpRequester<T, R>(clientCreator, method, uri, body, response));
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/http/src/main/java/org/apache/edgent/connectors/http/package-info.java
----------------------------------------------------------------------
diff --git a/connectors/http/src/main/java/org/apache/edgent/connectors/http/package-info.java b/connectors/http/src/main/java/org/apache/edgent/connectors/http/package-info.java
new file mode 100644
index 0000000..a2015c7
--- /dev/null
+++ b/connectors/http/src/main/java/org/apache/edgent/connectors/http/package-info.java
@@ -0,0 +1,24 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+/**
+ * HTTP stream connector.
+ */
+package org.apache.edgent.connectors.http;
+

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/http/src/main/java/org/apache/edgent/connectors/http/runtime/HttpRequester.java
----------------------------------------------------------------------
diff --git a/connectors/http/src/main/java/org/apache/edgent/connectors/http/runtime/HttpRequester.java b/connectors/http/src/main/java/org/apache/edgent/connectors/http/runtime/HttpRequester.java
new file mode 100644
index 0000000..c26a527
--- /dev/null
+++ b/connectors/http/src/main/java/org/apache/edgent/connectors/http/runtime/HttpRequester.java
@@ -0,0 +1,130 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.connectors.http.runtime;
+
+import java.io.IOException;
+
+import org.apache.edgent.function.BiFunction;
+import org.apache.edgent.function.Function;
+import org.apache.edgent.function.Supplier;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpEntityEnclosingRequest;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.impl.client.CloseableHttpClient;
+
+/**
+ * Function that processes HTTP requests at runtime.
+ * 
+ * @param <T> Tuple type of request stream
+ * @param <R> Tuple type of result stream
+ */
+public class HttpRequester<T,R> implements Function<T,R>{
+    
+    private static final long serialVersionUID = 1L;
+    
+    private final Supplier<CloseableHttpClient> clientCreator;
+    private final Function<T,String> method;
+    private final Function<T,String> url;
+    private final BiFunction<T,CloseableHttpResponse,R> responseProcessor;
+    private final Function<T, HttpEntity> entity;
+    
+    private CloseableHttpClient client;
+    
+    public HttpRequester(
+            Supplier<CloseableHttpClient> clientCreator,
+            Function<T,String> method,
+            Function<T,String> url,
+            BiFunction<T,CloseableHttpResponse,R> responseProcessor) {
+        this.clientCreator = clientCreator;
+        this.method = method;
+        this.url = url;
+        this.responseProcessor = responseProcessor;
+        this.entity = null;
+    }
+    
+    public HttpRequester(
+            Supplier<CloseableHttpClient> clientCreator,
+            Function<T,String> method,
+            Function<T,String> url,
+            Function<T, HttpEntity> entity,
+            BiFunction<T,CloseableHttpResponse,R> responseProcessor) {
+        this.clientCreator = clientCreator;
+        this.method = method;
+        this.url = url;
+        this.entity = entity;
+        this.responseProcessor = responseProcessor;
+    }
+
+    @Override
+    public R apply(T t) {
+        
+        if (client == null)
+            client = clientCreator.get();
+        
+        String m = method.apply(t);
+        String uri = url.apply(t);
+        HttpUriRequest request;
+        
+        switch (m) {
+        
+        case HttpGet.METHOD_NAME:
+            request = new HttpGet(uri);
+            break;
+        case HttpDelete.METHOD_NAME:
+            request = new HttpDelete(uri);
+            break;
+        case HttpPost.METHOD_NAME:
+            request = new HttpPost(uri);
+            break;
+        case HttpPut.METHOD_NAME:
+            request = new HttpPut(uri);
+            break;
+            
+        default:
+            throw new IllegalArgumentException();
+        }
+        
+        // If entity is not null means http request should have a body
+        if (entity != null) {
+            
+            HttpEntity body = entity.apply(t);
+
+            if (request instanceof HttpEntityEnclosingRequest == false) {
+                throw new IllegalArgumentException("Http request does not support body");
+            }
+            
+            ((HttpEntityEnclosingRequest) request).setEntity(body);
+        }
+        
+        try {
+            try (CloseableHttpResponse response = client.execute(request)) {
+                return responseProcessor.apply(t, response);
+            }
+             
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/http/src/test/java/edgent/test/connectors/http/HttpGlobalTest.java
----------------------------------------------------------------------
diff --git a/connectors/http/src/test/java/edgent/test/connectors/http/HttpGlobalTest.java b/connectors/http/src/test/java/edgent/test/connectors/http/HttpGlobalTest.java
deleted file mode 100644
index 29d9734..0000000
--- a/connectors/http/src/test/java/edgent/test/connectors/http/HttpGlobalTest.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.connectors.http;
-
-/**
- * This globalization test goes against http://httpbin.org
- * a freely available web-server for testing requests.
- *
- */
-public class HttpGlobalTest extends HttpTest {
-
-    private static final String globalProp1 = "\u5b57\u6bcd";
-    private static final String globalProp2 = "\u56db\u5341\u4e8c";
-
-    public String getProp1() {
-        return globalProp1;
-    }
-
-    public String getProp2() {
-        return globalProp2;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/http/src/test/java/edgent/test/connectors/http/HttpTest.java
----------------------------------------------------------------------
diff --git a/connectors/http/src/test/java/edgent/test/connectors/http/HttpTest.java b/connectors/http/src/test/java/edgent/test/connectors/http/HttpTest.java
deleted file mode 100644
index 3ca9fc8..0000000
--- a/connectors/http/src/test/java/edgent/test/connectors/http/HttpTest.java
+++ /dev/null
@@ -1,380 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.connectors.http;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.http.client.methods.HttpDelete;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.client.methods.HttpPut;
-import org.apache.http.entity.ByteArrayEntity;
-import org.junit.Test;
-
-import com.google.gson.JsonObject;
-
-import edgent.connectors.http.HttpClients;
-import edgent.connectors.http.HttpResponders;
-import edgent.connectors.http.HttpStreams;
-import edgent.providers.direct.DirectProvider;
-import edgent.topology.TStream;
-import edgent.topology.Topology;
-import edgent.topology.tester.Condition;
-import edgent.topology.tester.Tester;
-
-/**
- * These tests go against http://httpbin.org
- * a freely available web-server for testing requests.
- *
- */
-public class HttpTest {
-
-    private static final String prop1 = "abc";
-    private static final String prop2 = "42";
-
-    public String getProp1() {
-        return prop1;
-    }
-
-    public String getProp2() {
-        return prop2;
-    }
-
-    @Test
-    public void testGet() throws Exception {
-        
-        DirectProvider ep = new DirectProvider();
-        
-        Topology topology = ep.newTopology();
-        
-        String url = "http://httpbin.org/get";
-        
-        TStream<String> rc = HttpStreams.<String,String>requests(
-                topology.strings(url),
-                HttpClients::noAuthentication,
-                t-> HttpGet.METHOD_NAME,
-                t->t,
-                HttpResponders.inputOn200());
-        
-        Tester tester =  topology.getTester();
-        
-        Condition<List<String>> endCondition = tester.streamContents(rc, url);
-        
-        tester.complete(ep, new JsonObject(), endCondition, 10, TimeUnit.SECONDS);
-        
-        assertTrue(endCondition.valid());
-    }
-    
-    @Test
-    public void testPost() throws Exception {
-        DirectProvider ep = new DirectProvider();
-        
-        Topology topology = ep.newTopology();
-        
-        String url = "http://httpbin.org/post";
-        
-        TStream<String> stream = topology.strings(url);
-        TStream<String> rc = HttpStreams.<String, String>requestsWithBody(
-                stream, HttpClients::noAuthentication,
-                t -> HttpPost.METHOD_NAME, 
-                t-> t, t-> new ByteArrayEntity(t.getBytes()),
-                HttpResponders.inputOn200());
-        
-        Tester tester = topology.getTester();
-        
-        Condition<List<String>> endCondition = tester.streamContents(rc, url);
-        
-        tester.complete(ep, new JsonObject(), endCondition, 10, TimeUnit.SECONDS);
-        
-        assertTrue(endCondition.valid());
-    }
-    
-    @Test
-    public void testPut() throws Exception {
-        DirectProvider ep = new DirectProvider();
-        
-        Topology topology = ep.newTopology();
-        
-        String url = "http://httpbin.org/put";
-        
-        TStream<String> stream = topology.strings(url);
-        TStream<String> rc = HttpStreams.<String, String>requestsWithBody(
-                stream, HttpClients::noAuthentication,
-                t -> HttpPut.METHOD_NAME, 
-                t-> t, t-> new ByteArrayEntity(t.getBytes()),
-                HttpResponders.inputOn200());
-        
-        Tester tester = topology.getTester();
-        
-        Condition<List<String>> endCondition = tester.streamContents(rc, url);
-        
-        tester.complete(ep, new JsonObject(), endCondition, 10, TimeUnit.SECONDS);
-        
-        assertTrue(endCondition.valid());
-    }
-    
-    @Test
-    public void testDelete() throws Exception {
-        DirectProvider ep = new DirectProvider();
-        
-        Topology topology = ep.newTopology();
-        
-        String url = "http://httpbin.org/delete";
-        
-        TStream<String> stream = topology.strings(url);
-        TStream<String> rc = HttpStreams.<String, String>requests(
-                stream, HttpClients::noAuthentication,
-                t -> HttpDelete.METHOD_NAME, 
-                t-> t, HttpResponders.inputOn200());
-        
-        Tester tester = topology.getTester();
-        
-        Condition<List<String>> endCondition = tester.streamContents(rc, url);
-        
-        tester.complete(ep, new JsonObject(), endCondition, 10, TimeUnit.SECONDS);
-        
-        assertTrue(endCondition.valid());
-    }
-    
-    @Test
-    public void testStatusCode() throws Exception {
-        
-        DirectProvider ep = new DirectProvider();
-        
-        Topology topology = ep.newTopology();
-        
-        String url = "http://httpbin.org/status/";
-        
-        TStream<Integer> rc = HttpStreams.<Integer,Integer>requests(
-                topology.collection(Arrays.asList(200, 404, 202)),
-                HttpClients::noAuthentication,
-                t-> HttpGet.METHOD_NAME,
-                t-> url + Integer.toString(t),
-                (t,resp) -> resp.getStatusLine().getStatusCode());
-        
-        Tester tester =  topology.getTester();
-        
-        Condition<List<Integer>> endCondition = tester.streamContents(rc, 200, 404, 202);
-        
-        tester.complete(ep,  new JsonObject(), endCondition, 10, TimeUnit.SECONDS);
-        
-        assertTrue(endCondition.valid());
-    }
-    
-    /**
-     * Test basic authentication, first with valid user/password
-     * and then with invalid (results in 401).
-     * @throws Exception
-     */
-    @Test
-    public void testBasicAuthentication() throws Exception {
-        
-        DirectProvider ep = new DirectProvider();
-        
-        Topology topology = ep.newTopology();
-        
-        String url = "http://httpbin.org/basic-auth/";
-        
-        TStream<Integer> rc = HttpStreams.<String,Integer>requests(
-                topology.strings("A", "B"),
-                () -> HttpClients.basic("usA", "pwdA4"),
-                t-> HttpGet.METHOD_NAME,
-                t-> url + "us" + t + "/pwd" + t + "4",
-                (t,resp) -> resp.getStatusLine().getStatusCode());
-        
-        Tester tester =  topology.getTester();
-        
-        Condition<List<Integer>> endCondition = tester.streamContents(rc, 200, 401);
-        
-        tester.complete(ep,  new JsonObject(), endCondition, 10, TimeUnit.SECONDS);
-        
-        assertTrue(endCondition.getResult().toString(), endCondition.valid());
-    }
-    
-    @Test
-    public void testJsonGet() throws Exception {
-        JsonObject request = new JsonObject();
-        request.addProperty("a", getProp1());
-        request.addProperty("b", getProp2());
-
-        testJsonGet(request);
-    }
-
-    public void testJsonGet(JsonObject request) throws Exception {
-        
-        DirectProvider ep = new DirectProvider();
-        
-        Topology topology = ep.newTopology();
-        
-        final String url = "http://httpbin.org/get?";
-        
-        TStream<JsonObject> rc = HttpStreams.getJson(
-                topology.collection(Arrays.asList(request)),
-                HttpClients::noAuthentication,
-                t-> url + "a=" + t.get("a").getAsString() + "&b=" + t.get("b").getAsString()
-                );
-        
-        TStream<Boolean> resStream = rc.map(j -> {
-            assertTrue(j.has("request"));
-            assertTrue(j.has("response"));
-            JsonObject req = j.getAsJsonObject("request");
-            JsonObject res = j.getAsJsonObject("response");
-            
-            assertTrue(res.has("status"));
-            assertTrue(res.has("entity"));           
-            
-            assertEquals(req, res.getAsJsonObject("entity").getAsJsonObject("args"));
-            return true;
-        }
-        );
-        
-        rc.print();
-         
-        Tester tester =  topology.getTester();
-        
-        Condition<List<Boolean>> endCondition = tester.streamContents(resStream, true);
-        
-        tester.complete(ep,  new JsonObject(), endCondition, 10, TimeUnit.SECONDS);
-        
-        assertTrue(endCondition.getResult().toString(), endCondition.valid());
-    }
-    
-    @Test
-    public void testJsonDelete() throws Exception {
-        
-        DirectProvider ep = new DirectProvider();
-        
-        Topology topology = ep.newTopology();
-        
-        final String url = "http://httpbin.org/delete?";
-        
-        JsonObject request = new JsonObject();
-        request.addProperty("a", getProp1());
-        request.addProperty("b", getProp2());
-        
-        TStream<JsonObject> stream = topology.collection(Arrays.asList(request));
-        TStream<JsonObject> rc = HttpStreams.deleteJson(
-                stream, HttpClients::noAuthentication,
-                t-> url + "a=" + t.get("a").getAsString() + "&b=" + t.get("b").getAsString()
-                );
-        
-        TStream<Boolean> resStream = rc.map(j -> {
-            assertTrue(j.has("request"));
-            assertTrue(j.has("response"));
-            JsonObject req = j.getAsJsonObject("request");
-            JsonObject res = j.getAsJsonObject("response");
-            
-            assertTrue(res.has("status"));
-            assertTrue(res.has("entity"));           
-            
-            assertEquals(req, res.getAsJsonObject("entity").getAsJsonObject("args"));
-            return true;
-        }
-        );
-        
-        rc.print();
-         
-        Tester tester =  topology.getTester();
-        
-        Condition<List<Boolean>> endCondition = tester.streamContents(resStream, true);
-        
-        tester.complete(ep,  new JsonObject(), endCondition, 10, TimeUnit.SECONDS);
-        
-        assertTrue(endCondition.getResult().toString(), endCondition.valid());
-    }
-    
-    @Test
-    public void testJsonPost() throws Exception {
-
-        DirectProvider ep = new DirectProvider();
-
-        Topology topology = ep.newTopology();
-
-        final String url = "http://httpbin.org/post";
-
-        JsonObject body = new JsonObject();
-        body.addProperty("foo", getProp1());
-        body.addProperty("bar", getProp2());
-
-        TStream<JsonObject> stream = topology.collection(Arrays.asList(body));
-        TStream<JsonObject> rc = HttpStreams.postJson(
-                stream, HttpClients::noAuthentication, t -> url,
-                t -> t);
-        TStream<Boolean> resStream = rc.map(j -> {
-            assertTrue(j.has("request"));
-            assertTrue(j.has("response"));
-            JsonObject req = j.getAsJsonObject("request");
-            JsonObject res = j.getAsJsonObject("response");
-
-            assertTrue(res.has("status"));
-            assertTrue(res.has("entity"));
-
-            assertEquals(req, res.getAsJsonObject("entity").getAsJsonObject("json"));
-            return true;
-        });
-
-        rc.print();
-        Tester tester = topology.getTester();
-        Condition<List<Boolean>> endCondition = tester.streamContents(resStream, true);
-        tester.complete(ep, new JsonObject(), endCondition, 10, TimeUnit.SECONDS);
-        assertTrue(endCondition.getResult().toString(), endCondition.valid());
-    }
-
-    @Test
-    public void testJsonPut() throws Exception {
-
-        DirectProvider ep = new DirectProvider();
-
-        Topology topology = ep.newTopology();
-
-        final String url = "http://httpbin.org/put";
-
-        JsonObject body = new JsonObject();
-        body.addProperty("foo", getProp1());
-        body.addProperty("bar", getProp2());
-
-        TStream<JsonObject> stream = topology.collection(Arrays.asList(body));
-        TStream<JsonObject> rc = HttpStreams.putJson(
-                stream, HttpClients::noAuthentication, t -> url,
-                t -> t);
-        TStream<Boolean> resStream = rc.map(j -> {
-            assertTrue(j.has("request"));
-            assertTrue(j.has("response"));
-            JsonObject req = j.getAsJsonObject("request");
-            JsonObject res = j.getAsJsonObject("response");
-
-            assertTrue(res.has("status"));
-            assertTrue(res.has("entity"));
-
-            assertEquals(req, res.getAsJsonObject("entity").getAsJsonObject("json"));
-            return true;
-        });
-
-        rc.print();
-        Tester tester = topology.getTester();
-        Condition<List<Boolean>> endCondition = tester.streamContents(resStream, true);
-        tester.complete(ep, new JsonObject(), endCondition, 10, TimeUnit.SECONDS);
-        assertTrue(endCondition.getResult().toString(), endCondition.valid());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/http/src/test/java/org/apache/edgent/test/connectors/http/HttpGlobalTest.java
----------------------------------------------------------------------
diff --git a/connectors/http/src/test/java/org/apache/edgent/test/connectors/http/HttpGlobalTest.java b/connectors/http/src/test/java/org/apache/edgent/test/connectors/http/HttpGlobalTest.java
new file mode 100644
index 0000000..6561240
--- /dev/null
+++ b/connectors/http/src/test/java/org/apache/edgent/test/connectors/http/HttpGlobalTest.java
@@ -0,0 +1,39 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.connectors.http;
+
+/**
+ * This globalization test goes against http://httpbin.org
+ * a freely available web-server for testing requests.
+ *
+ */
+public class HttpGlobalTest extends HttpTest {
+
+    private static final String globalProp1 = "\u5b57\u6bcd";
+    private static final String globalProp2 = "\u56db\u5341\u4e8c";
+
+    public String getProp1() {
+        return globalProp1;
+    }
+
+    public String getProp2() {
+        return globalProp2;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/http/src/test/java/org/apache/edgent/test/connectors/http/HttpTest.java
----------------------------------------------------------------------
diff --git a/connectors/http/src/test/java/org/apache/edgent/test/connectors/http/HttpTest.java b/connectors/http/src/test/java/org/apache/edgent/test/connectors/http/HttpTest.java
new file mode 100644
index 0000000..0511150
--- /dev/null
+++ b/connectors/http/src/test/java/org/apache/edgent/test/connectors/http/HttpTest.java
@@ -0,0 +1,379 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.connectors.http;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.edgent.connectors.http.HttpClients;
+import org.apache.edgent.connectors.http.HttpResponders;
+import org.apache.edgent.connectors.http.HttpStreams;
+import org.apache.edgent.providers.direct.DirectProvider;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+import org.apache.edgent.topology.tester.Condition;
+import org.apache.edgent.topology.tester.Tester;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.ByteArrayEntity;
+import org.junit.Test;
+
+import com.google.gson.JsonObject;
+
+/**
+ * These tests go against http://httpbin.org
+ * a freely available web-server for testing requests.
+ *
+ */
+public class HttpTest {
+
+    private static final String prop1 = "abc";
+    private static final String prop2 = "42";
+
+    public String getProp1() {
+        return prop1;
+    }
+
+    public String getProp2() {
+        return prop2;
+    }
+
+    @Test
+    public void testGet() throws Exception {
+        
+        DirectProvider ep = new DirectProvider();
+        
+        Topology topology = ep.newTopology();
+        
+        String url = "http://httpbin.org/get";
+        
+        TStream<String> rc = HttpStreams.<String,String>requests(
+                topology.strings(url),
+                HttpClients::noAuthentication,
+                t-> HttpGet.METHOD_NAME,
+                t->t,
+                HttpResponders.inputOn200());
+        
+        Tester tester =  topology.getTester();
+        
+        Condition<List<String>> endCondition = tester.streamContents(rc, url);
+        
+        tester.complete(ep, new JsonObject(), endCondition, 10, TimeUnit.SECONDS);
+        
+        assertTrue(endCondition.valid());
+    }
+    
+    @Test
+    public void testPost() throws Exception {
+        DirectProvider ep = new DirectProvider();
+        
+        Topology topology = ep.newTopology();
+        
+        String url = "http://httpbin.org/post";
+        
+        TStream<String> stream = topology.strings(url);
+        TStream<String> rc = HttpStreams.<String, String>requestsWithBody(
+                stream, HttpClients::noAuthentication,
+                t -> HttpPost.METHOD_NAME, 
+                t-> t, t-> new ByteArrayEntity(t.getBytes()),
+                HttpResponders.inputOn200());
+        
+        Tester tester = topology.getTester();
+        
+        Condition<List<String>> endCondition = tester.streamContents(rc, url);
+        
+        tester.complete(ep, new JsonObject(), endCondition, 10, TimeUnit.SECONDS);
+        
+        assertTrue(endCondition.valid());
+    }
+    
+    @Test
+    public void testPut() throws Exception {
+        DirectProvider ep = new DirectProvider();
+        
+        Topology topology = ep.newTopology();
+        
+        String url = "http://httpbin.org/put";
+        
+        TStream<String> stream = topology.strings(url);
+        TStream<String> rc = HttpStreams.<String, String>requestsWithBody(
+                stream, HttpClients::noAuthentication,
+                t -> HttpPut.METHOD_NAME, 
+                t-> t, t-> new ByteArrayEntity(t.getBytes()),
+                HttpResponders.inputOn200());
+        
+        Tester tester = topology.getTester();
+        
+        Condition<List<String>> endCondition = tester.streamContents(rc, url);
+        
+        tester.complete(ep, new JsonObject(), endCondition, 10, TimeUnit.SECONDS);
+        
+        assertTrue(endCondition.valid());
+    }
+    
+    @Test
+    public void testDelete() throws Exception {
+        DirectProvider ep = new DirectProvider();
+        
+        Topology topology = ep.newTopology();
+        
+        String url = "http://httpbin.org/delete";
+        
+        TStream<String> stream = topology.strings(url);
+        TStream<String> rc = HttpStreams.<String, String>requests(
+                stream, HttpClients::noAuthentication,
+                t -> HttpDelete.METHOD_NAME, 
+                t-> t, HttpResponders.inputOn200());
+        
+        Tester tester = topology.getTester();
+        
+        Condition<List<String>> endCondition = tester.streamContents(rc, url);
+        
+        tester.complete(ep, new JsonObject(), endCondition, 10, TimeUnit.SECONDS);
+        
+        assertTrue(endCondition.valid());
+    }
+    
+    @Test
+    public void testStatusCode() throws Exception {
+        
+        DirectProvider ep = new DirectProvider();
+        
+        Topology topology = ep.newTopology();
+        
+        String url = "http://httpbin.org/status/";
+        
+        TStream<Integer> rc = HttpStreams.<Integer,Integer>requests(
+                topology.collection(Arrays.asList(200, 404, 202)),
+                HttpClients::noAuthentication,
+                t-> HttpGet.METHOD_NAME,
+                t-> url + Integer.toString(t),
+                (t,resp) -> resp.getStatusLine().getStatusCode());
+        
+        Tester tester =  topology.getTester();
+        
+        Condition<List<Integer>> endCondition = tester.streamContents(rc, 200, 404, 202);
+        
+        tester.complete(ep,  new JsonObject(), endCondition, 10, TimeUnit.SECONDS);
+        
+        assertTrue(endCondition.valid());
+    }
+    
+    /**
+     * Test basic authentication, first with valid user/password
+     * and then with invalid (results in 401).
+     * @throws Exception
+     */
+    @Test
+    public void testBasicAuthentication() throws Exception {
+        
+        DirectProvider ep = new DirectProvider();
+        
+        Topology topology = ep.newTopology();
+        
+        String url = "http://httpbin.org/basic-auth/";
+        
+        TStream<Integer> rc = HttpStreams.<String,Integer>requests(
+                topology.strings("A", "B"),
+                () -> HttpClients.basic("usA", "pwdA4"),
+                t-> HttpGet.METHOD_NAME,
+                t-> url + "us" + t + "/pwd" + t + "4",
+                (t,resp) -> resp.getStatusLine().getStatusCode());
+        
+        Tester tester =  topology.getTester();
+        
+        Condition<List<Integer>> endCondition = tester.streamContents(rc, 200, 401);
+        
+        tester.complete(ep,  new JsonObject(), endCondition, 10, TimeUnit.SECONDS);
+        
+        assertTrue(endCondition.getResult().toString(), endCondition.valid());
+    }
+    
+    @Test
+    public void testJsonGet() throws Exception {
+        JsonObject request = new JsonObject();
+        request.addProperty("a", getProp1());
+        request.addProperty("b", getProp2());
+
+        testJsonGet(request);
+    }
+
+    public void testJsonGet(JsonObject request) throws Exception {
+        
+        DirectProvider ep = new DirectProvider();
+        
+        Topology topology = ep.newTopology();
+        
+        final String url = "http://httpbin.org/get?";
+        
+        TStream<JsonObject> rc = HttpStreams.getJson(
+                topology.collection(Arrays.asList(request)),
+                HttpClients::noAuthentication,
+                t-> url + "a=" + t.get("a").getAsString() + "&b=" + t.get("b").getAsString()
+                );
+        
+        TStream<Boolean> resStream = rc.map(j -> {
+            assertTrue(j.has("request"));
+            assertTrue(j.has("response"));
+            JsonObject req = j.getAsJsonObject("request");
+            JsonObject res = j.getAsJsonObject("response");
+            
+            assertTrue(res.has("status"));
+            assertTrue(res.has("entity"));           
+            
+            assertEquals(req, res.getAsJsonObject("entity").getAsJsonObject("args"));
+            return true;
+        }
+        );
+        
+        rc.print();
+         
+        Tester tester =  topology.getTester();
+        
+        Condition<List<Boolean>> endCondition = tester.streamContents(resStream, true);
+        
+        tester.complete(ep,  new JsonObject(), endCondition, 10, TimeUnit.SECONDS);
+        
+        assertTrue(endCondition.getResult().toString(), endCondition.valid());
+    }
+    
+    @Test
+    public void testJsonDelete() throws Exception {
+        
+        DirectProvider ep = new DirectProvider();
+        
+        Topology topology = ep.newTopology();
+        
+        final String url = "http://httpbin.org/delete?";
+        
+        JsonObject request = new JsonObject();
+        request.addProperty("a", getProp1());
+        request.addProperty("b", getProp2());
+        
+        TStream<JsonObject> stream = topology.collection(Arrays.asList(request));
+        TStream<JsonObject> rc = HttpStreams.deleteJson(
+                stream, HttpClients::noAuthentication,
+                t-> url + "a=" + t.get("a").getAsString() + "&b=" + t.get("b").getAsString()
+                );
+        
+        TStream<Boolean> resStream = rc.map(j -> {
+            assertTrue(j.has("request"));
+            assertTrue(j.has("response"));
+            JsonObject req = j.getAsJsonObject("request");
+            JsonObject res = j.getAsJsonObject("response");
+            
+            assertTrue(res.has("status"));
+            assertTrue(res.has("entity"));           
+            
+            assertEquals(req, res.getAsJsonObject("entity").getAsJsonObject("args"));
+            return true;
+        }
+        );
+        
+        rc.print();
+         
+        Tester tester =  topology.getTester();
+        
+        Condition<List<Boolean>> endCondition = tester.streamContents(resStream, true);
+        
+        tester.complete(ep,  new JsonObject(), endCondition, 10, TimeUnit.SECONDS);
+        
+        assertTrue(endCondition.getResult().toString(), endCondition.valid());
+    }
+    
+    @Test
+    public void testJsonPost() throws Exception {
+
+        DirectProvider ep = new DirectProvider();
+
+        Topology topology = ep.newTopology();
+
+        final String url = "http://httpbin.org/post";
+
+        JsonObject body = new JsonObject();
+        body.addProperty("foo", getProp1());
+        body.addProperty("bar", getProp2());
+
+        TStream<JsonObject> stream = topology.collection(Arrays.asList(body));
+        TStream<JsonObject> rc = HttpStreams.postJson(
+                stream, HttpClients::noAuthentication, t -> url,
+                t -> t);
+        TStream<Boolean> resStream = rc.map(j -> {
+            assertTrue(j.has("request"));
+            assertTrue(j.has("response"));
+            JsonObject req = j.getAsJsonObject("request");
+            JsonObject res = j.getAsJsonObject("response");
+
+            assertTrue(res.has("status"));
+            assertTrue(res.has("entity"));
+
+            assertEquals(req, res.getAsJsonObject("entity").getAsJsonObject("json"));
+            return true;
+        });
+
+        rc.print();
+        Tester tester = topology.getTester();
+        Condition<List<Boolean>> endCondition = tester.streamContents(resStream, true);
+        tester.complete(ep, new JsonObject(), endCondition, 10, TimeUnit.SECONDS);
+        assertTrue(endCondition.getResult().toString(), endCondition.valid());
+    }
+
+    @Test
+    public void testJsonPut() throws Exception {
+
+        DirectProvider ep = new DirectProvider();
+
+        Topology topology = ep.newTopology();
+
+        final String url = "http://httpbin.org/put";
+
+        JsonObject body = new JsonObject();
+        body.addProperty("foo", getProp1());
+        body.addProperty("bar", getProp2());
+
+        TStream<JsonObject> stream = topology.collection(Arrays.asList(body));
+        TStream<JsonObject> rc = HttpStreams.putJson(
+                stream, HttpClients::noAuthentication, t -> url,
+                t -> t);
+        TStream<Boolean> resStream = rc.map(j -> {
+            assertTrue(j.has("request"));
+            assertTrue(j.has("response"));
+            JsonObject req = j.getAsJsonObject("request");
+            JsonObject res = j.getAsJsonObject("response");
+
+            assertTrue(res.has("status"));
+            assertTrue(res.has("entity"));
+
+            assertEquals(req, res.getAsJsonObject("entity").getAsJsonObject("json"));
+            return true;
+        });
+
+        rc.print();
+        Tester tester = topology.getTester();
+        Condition<List<Boolean>> endCondition = tester.streamContents(resStream, true);
+        tester.complete(ep, new JsonObject(), endCondition, 10, TimeUnit.SECONDS);
+        assertTrue(endCondition.getResult().toString(), endCondition.valid());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/iot/src/main/java/edgent/connectors/iot/Commands.java
----------------------------------------------------------------------
diff --git a/connectors/iot/src/main/java/edgent/connectors/iot/Commands.java b/connectors/iot/src/main/java/edgent/connectors/iot/Commands.java
deleted file mode 100644
index 2005156..0000000
--- a/connectors/iot/src/main/java/edgent/connectors/iot/Commands.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.connectors.iot;
-
-/**
- * Devic command identifiers used by Edgent.
- * 
- * @see IotDevice#RESERVED_ID_PREFIX
- */
-public interface Commands {
-    
-    /**
-     * Command identifier used for the control service.
-     * <BR>
-     * The command payload is used to invoke operations
-     * against control MBeans using an instance of
-     * {@link edgent.runtime.jsoncontrol.JsonControlService}.
-     * <BR>
-     * Value is {@value}.
-     * 
-     * @see edgent.execution.services.ControlService
-     * @see edgent.providers.iot.IotProvider
-     */
-    String CONTROL_SERVICE = IotDevice.RESERVED_ID_PREFIX + "Control";
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/iot/src/main/java/edgent/connectors/iot/Events.java
----------------------------------------------------------------------
diff --git a/connectors/iot/src/main/java/edgent/connectors/iot/Events.java b/connectors/iot/src/main/java/edgent/connectors/iot/Events.java
deleted file mode 100644
index 2e401ab..0000000
--- a/connectors/iot/src/main/java/edgent/connectors/iot/Events.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.connectors.iot;
-
-/**
- * Device event identifiers used by Edgent.
- * 
- * @see IotDevice#RESERVED_ID_PREFIX
- */
-public interface Events {
-    
-    /**
-     * An IotProvider has started.
-     * Event data is an empty JSON object
-     */
-    String IOT_START = IotDevice.RESERVED_ID_PREFIX + "IotStart";
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/iot/src/main/java/edgent/connectors/iot/HeartBeat.java
----------------------------------------------------------------------
diff --git a/connectors/iot/src/main/java/edgent/connectors/iot/HeartBeat.java b/connectors/iot/src/main/java/edgent/connectors/iot/HeartBeat.java
deleted file mode 100644
index 2f45060..0000000
--- a/connectors/iot/src/main/java/edgent/connectors/iot/HeartBeat.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.connectors.iot;
-
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.TimeZone;
-import java.util.concurrent.TimeUnit;
-
-import com.google.gson.JsonObject;
-
-import edgent.function.Functions;
-import edgent.topology.TStream;
-import edgent.topology.plumbing.PlumbingStreams;
-
-public class HeartBeat {
-  private HeartBeat() { };
-  
-  /**
-   * Add IoT device heart beat processing to a topology.
-   * <P>
-   * An IoTDevice event containing heart beat information 
-   * is periodically published to the specified {@code eventId}.
-   * </P>
-   * <P>
-   * The heart beat provides clients of the IoT hub with liveness information
-   * about the device and its connection to the hub.
-   * </P>
-   * <P>
-   * The heart beat also ensures there is some immediate output so
-   * the connection to the IoT hub happens as soon as possible.
-   * In the case where there may not otherwise be
-   * IoT events to publish, a heart beat ensures a connection
-   * to the IoT hub is maintained.
-   * </P>
-   * <P>
-   * The heart beat's event payload is the JSON for a JsonObject with the
-   * heart beat's properties:
-   * <ul>
-   * <li>"when" : (string) ISO8601 UTC date/time. e.g. "2016-07-12T17:57:08Z"</li>
-   * <li>"time" : (number) {@link System#currentTimeMillis()}</li>
-   * </ul> 
-   * 
-   * @param iotDevice IoT hub device
-   * @param period the heart beat period
-   * @param unit TimeUnit for the period
-   * @param eventId the IotDevice eventId to use for the event
-   * @return the {@code TStream<JsonObject>} heartbeat stream
-   */
-  public static TStream<JsonObject> addHeartBeat(IotDevice iotDevice, long period, TimeUnit unit, String eventId) {
-    DateFormat df = newIso8601Formatter();
-    TStream<Date> hb = iotDevice.topology().poll(
-        () -> new Date(),
-        period, unit).tag("heartbeat");
-    // Convert to JSON
-    TStream<JsonObject> hbj = hb.map(date -> {
-        JsonObject j = new  JsonObject();
-        j.addProperty("when", df.format(date));
-        j.addProperty("time", date.getTime());
-        return j;
-    }).tag("heartbeat");
-    
-    TStream<JsonObject> hbs = hbj;
-  
-    // Tolerate connection outages.  Don't block upstream processing
-    // and retain the most recent heartbeat if unable to publish.
-    hbj = PlumbingStreams.pressureReliever(hbj, 
-                Functions.unpartitioned(), 1).tag("pressureRelieved");
-  
-    iotDevice.events(hbj, eventId, QoS.FIRE_AND_FORGET);
-    
-    return hbs;
-  }
-  
-  private static DateFormat newIso8601Formatter() {
-    // Quoted "Z" to indicate UTC, no timezone offset
-    DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
-    df.setTimeZone(TimeZone.getTimeZone("UTC"));
-    return df;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/iot/src/main/java/edgent/connectors/iot/IotDevice.java
----------------------------------------------------------------------
diff --git a/connectors/iot/src/main/java/edgent/connectors/iot/IotDevice.java b/connectors/iot/src/main/java/edgent/connectors/iot/IotDevice.java
deleted file mode 100644
index 5798166..0000000
--- a/connectors/iot/src/main/java/edgent/connectors/iot/IotDevice.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package edgent.connectors.iot;
-
-import com.google.gson.JsonObject;
-
-import edgent.function.Function;
-import edgent.function.UnaryOperator;
-import edgent.topology.TSink;
-import edgent.topology.TStream;
-import edgent.topology.TopologyElement;
-
-/**
- * Generic Internet of Things device connector.
- */
-public interface IotDevice extends TopologyElement {
-    
-    /**
-     * Device event and command identifiers starting with {@value} are reserved for use by Edgent.
-     */
-    String RESERVED_ID_PREFIX = "edgent";
-
-    /**
-     * Publish a stream's tuples as device events.
-     * <p>
-     * Each tuple is published as a device event with the supplied functions
-     * providing the event identifier, payload and QoS. The event identifier and
-     * QoS can be generated based upon the tuple.
-     * 
-     * @param stream
-     *            Stream to be published.
-     * @param eventId
-     *            function to supply the event identifier.
-     * @param payload
-     *            function to supply the event's payload.
-     * @param qos
-     *            function to supply the event's delivery Quality of Service.
-     * @return TSink sink element representing termination of this stream.
-     */
-    TSink<JsonObject> events(TStream<JsonObject> stream, Function<JsonObject, String> eventId,
-            UnaryOperator<JsonObject> payload,
-            Function<JsonObject, Integer> qos) ;
-    
-    /**
-     * Publish a stream's tuples as device events.
-     * <p>
-     * Each tuple is published as a device event with fixed event identifier and
-     * QoS.
-     * 
-     * @param stream
-     *            Stream to be published.
-     * @param eventId
-     *            Event identifier.
-     * @param qos
-     *            Event's delivery Quality of Service.
-     * @return TSink sink element representing termination of this stream.
-     */
-    TSink<JsonObject> events(TStream<JsonObject> stream, String eventId, int qos) ;
-    
-    /**
-     * Command identifier key.
-     * Key is {@value}.
-     * 
-     * @see #commands(String...)
-     */
-    String CMD_ID = "command";
-    
-    /**
-     * Command timestamp (in milliseconds) key.
-     * Key is {@value}.
-     * 
-     * @see #commands(String...)
-     */
-    String CMD_TS = "tsms";
-    /**
-     * Command format key.
-     * Key is {@value}.
-     * 
-     * @see #commands(String...)
-     */
-    String CMD_FORMAT = "format";
-    /**
-     * Command payload key.
-     * If the command format is {@code json} then
-     * the key's value will be a {@code JsonObject},
-     * otherwise a {@code String}.
-     * Key is {@value}.
-     * 
-     * @see #commands(String...)
-     */
-    String CMD_PAYLOAD = "payload";
-
-    /**
-     * Create a stream of device commands as JSON objects.
-     * Each command sent to the device matching {@code commands} will result in a tuple
-     * on the stream. The JSON object has these keys:
-     * <UL>
-     * <LI>{@link #CMD_ID command} - Command identifier as a String</LI>
-     * <LI>{@link #CMD_TS tsms} - Timestamp of the command in milliseconds since the 1970/1/1 epoch.</LI>
-     * <LI>{@link #CMD_FORMAT format} - Format of the command as a String</LI>
-     * <LI>{@link #CMD_PAYLOAD payload} - Payload of the command
-     * <UL>
-     * <LI>If {@code format} is {@code json} then {@code payload} is JSON</LI>
-     * <LI>Otherwise {@code payload} is String</LI>
-     * </UL>
-     * </LI>
-     * </UL>
-     * 
-     * 
-     * @param commands Command identifiers to include. If no command identifiers are provided then the
-     * stream will contain all device commands.
-     * @return Stream containing device commands.
-     */
-    TStream<JsonObject> commands(String... commands);
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/iot/src/main/java/edgent/connectors/iot/QoS.java
----------------------------------------------------------------------
diff --git a/connectors/iot/src/main/java/edgent/connectors/iot/QoS.java b/connectors/iot/src/main/java/edgent/connectors/iot/QoS.java
deleted file mode 100644
index 3dd8ea2..0000000
--- a/connectors/iot/src/main/java/edgent/connectors/iot/QoS.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.connectors.iot;
-
-/**
- * Device event quality of service levels.
- * The QoS levels match the MQTT specification.
- * <BR>
- * An implementation of {@link IotDevice} may not
- * support all QoS levels.
- * 
- * @see <a href="http://mqtt.org/">mqtt.org</a>
- * @see IotDevice#events(edgent.topology.TStream, String, int)
- * @see IotDevice#events(edgent.topology.TStream, edgent.function.Function, edgent.function.UnaryOperator, edgent.function.Function)
-
- */
-public interface QoS {
-    
-    /**
-     * The message containing the event arrives at the message hub either once or not at all.
-     * <BR>
-     * Value is {@code 0}.
-     */
-    Integer AT_MOST_ONCE = 0;
-    
-    /**
-     * Fire and forget the event. Synonym for {@link #AT_MOST_ONCE}.
-     * <BR>
-     * Value is {@code 0}.
-     */
-    Integer FIRE_AND_FORGET = 0;
-    
-    /**
-     * The message containing the event arrives at the message hub at least once.
-     * The message may be seen at the hub multiple times.
-     * <BR>
-     * Value is {@code 1}.
-     */
-    Integer AT_LEAST_ONCE = 1;
-    
-    /**
-     * The message containing the event arrives at the message hub exactly once.
-     * <BR>
-     * Value is {@code 2}.
-     */ 
-    Integer EXACTLY_ONCE = 2;
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/iot/src/main/java/edgent/connectors/iot/package-info.java
----------------------------------------------------------------------
diff --git a/connectors/iot/src/main/java/edgent/connectors/iot/package-info.java b/connectors/iot/src/main/java/edgent/connectors/iot/package-info.java
deleted file mode 100644
index 4ea72c8..0000000
--- a/connectors/iot/src/main/java/edgent/connectors/iot/package-info.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-/**
- * Edgent device connector API to a message hub.
- * <P>
- * Generic device model that supports a device model consisting of:
- * <UL>
- * <LI>
- * <B>Device events</B> - A device {@link edgent.connectors.iot.IotDevice#events(edgent.topology.TStream, String, int) publishes} <em>events</em> as messages to a message hub to allow
- * analysis or processing by back-end systems, etc.. A device event consists of:
- * <UL>
- * <LI>  <B>event identifier</B> - Application specified event type. E.g. {@code engineAlert}</LI>
- * <LI>  <B>event payload</B> - Application specified event payload. E.g. the engine alert code and sensor reading.</LI>
- * <LI>  <B>QoS</B> - {@link edgent.connectors.iot.QoS Quality of service} for message delivery. Using MQTT QoS definitions.</LI>
- * </UL>
- * Device events can be used to send any data including abnormal events
- * (e.g. a fault condition on an engine), periodic or aggregate sensor readings,
- * device user input etc.
- * <BR>
- * The format for the payload is JSON, support for other payload formats may be added
- * in the future.
- * </LI>
- * <LI>
- * <B>Device Commands</B> - A device {@link edgent.connectors.iot.IotDevice#commands(String...) subscribes} to <em>commands</em> from back-end systems
- * through the message hub. A device command consists of:
- * <UL>
- * <LI>  <B>command identifier</B> - Application specified command type. E.g. {@code statusMessage}</LI>
- * <LI>  <B>command payload</B> - Application specified command payload. E.g. the severity and
- * text of the message to display.</LI>
- * </UL>
- * Device commands can be used to perform any action on the device including displaying information,
- * controlling the device (e.g. reduce maximum engine revolutions), controlling the Edgent application, etc.
- * <BR>
- * The format for the payload is typically JSON, though other formats may be used.
- * </LI>
- * </UL>
- * <P>
- * Device event and command identifiers starting with "{@link edgent.connectors.iot.IotDevice#RESERVED_ID_PREFIX edgent}"
- * are reserved for use by Edgent.
- * </P>
- */
-package edgent.connectors.iot;
-

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/Commands.java
----------------------------------------------------------------------
diff --git a/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/Commands.java b/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/Commands.java
new file mode 100644
index 0000000..4f4c284
--- /dev/null
+++ b/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/Commands.java
@@ -0,0 +1,42 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.connectors.iot;
+
+/**
+ * Devic command identifiers used by Edgent.
+ * 
+ * @see IotDevice#RESERVED_ID_PREFIX
+ */
+public interface Commands {
+    
+    /**
+     * Command identifier used for the control service.
+     * <BR>
+     * The command payload is used to invoke operations
+     * against control MBeans using an instance of
+     * {@link org.apache.edgent.runtime.jsoncontrol.JsonControlService}.
+     * <BR>
+     * Value is {@value}.
+     * 
+     * @see org.apache.edgent.execution.services.ControlService
+     * @see org.apache.edgent.providers.iot.IotProvider
+     */
+    String CONTROL_SERVICE = IotDevice.RESERVED_ID_PREFIX + "Control";
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/Events.java
----------------------------------------------------------------------
diff --git a/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/Events.java b/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/Events.java
new file mode 100644
index 0000000..92c69f1
--- /dev/null
+++ b/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/Events.java
@@ -0,0 +1,33 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.connectors.iot;
+
+/**
+ * Device event identifiers used by Edgent.
+ * 
+ * @see IotDevice#RESERVED_ID_PREFIX
+ */
+public interface Events {
+    
+    /**
+     * An IotProvider has started.
+     * Event data is an empty JSON object
+     */
+    String IOT_START = IotDevice.RESERVED_ID_PREFIX + "IotStart";
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/HeartBeat.java
----------------------------------------------------------------------
diff --git a/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/HeartBeat.java b/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/HeartBeat.java
new file mode 100644
index 0000000..0224b0f
--- /dev/null
+++ b/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/HeartBeat.java
@@ -0,0 +1,99 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.connectors.iot;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.TimeZone;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.edgent.function.Functions;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.plumbing.PlumbingStreams;
+
+import com.google.gson.JsonObject;
+
+public class HeartBeat {
+  private HeartBeat() { };
+  
+  /**
+   * Add IoT device heart beat processing to a topology.
+   * <P>
+   * An IoTDevice event containing heart beat information 
+   * is periodically published to the specified {@code eventId}.
+   * </P>
+   * <P>
+   * The heart beat provides clients of the IoT hub with liveness information
+   * about the device and its connection to the hub.
+   * </P>
+   * <P>
+   * The heart beat also ensures there is some immediate output so
+   * the connection to the IoT hub happens as soon as possible.
+   * In the case where there may not otherwise be
+   * IoT events to publish, a heart beat ensures a connection
+   * to the IoT hub is maintained.
+   * </P>
+   * <P>
+   * The heart beat's event payload is the JSON for a JsonObject with the
+   * heart beat's properties:
+   * <ul>
+   * <li>"when" : (string) ISO8601 UTC date/time. e.g. "2016-07-12T17:57:08Z"</li>
+   * <li>"time" : (number) {@link System#currentTimeMillis()}</li>
+   * </ul> 
+   * 
+   * @param iotDevice IoT hub device
+   * @param period the heart beat period
+   * @param unit TimeUnit for the period
+   * @param eventId the IotDevice eventId to use for the event
+   * @return the {@code TStream<JsonObject>} heartbeat stream
+   */
+  public static TStream<JsonObject> addHeartBeat(IotDevice iotDevice, long period, TimeUnit unit, String eventId) {
+    DateFormat df = newIso8601Formatter();
+    TStream<Date> hb = iotDevice.topology().poll(
+        () -> new Date(),
+        period, unit).tag("heartbeat");
+    // Convert to JSON
+    TStream<JsonObject> hbj = hb.map(date -> {
+        JsonObject j = new  JsonObject();
+        j.addProperty("when", df.format(date));
+        j.addProperty("time", date.getTime());
+        return j;
+    }).tag("heartbeat");
+    
+    TStream<JsonObject> hbs = hbj;
+  
+    // Tolerate connection outages.  Don't block upstream processing
+    // and retain the most recent heartbeat if unable to publish.
+    hbj = PlumbingStreams.pressureReliever(hbj, 
+                Functions.unpartitioned(), 1).tag("pressureRelieved");
+  
+    iotDevice.events(hbj, eventId, QoS.FIRE_AND_FORGET);
+    
+    return hbs;
+  }
+  
+  private static DateFormat newIso8601Formatter() {
+    // Quoted "Z" to indicate UTC, no timezone offset
+    DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
+    df.setTimeZone(TimeZone.getTimeZone("UTC"));
+    return df;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/IotDevice.java
----------------------------------------------------------------------
diff --git a/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/IotDevice.java b/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/IotDevice.java
new file mode 100644
index 0000000..920561d
--- /dev/null
+++ b/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/IotDevice.java
@@ -0,0 +1,132 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.connectors.iot;
+
+import org.apache.edgent.function.Function;
+import org.apache.edgent.function.UnaryOperator;
+import org.apache.edgent.topology.TSink;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.TopologyElement;
+
+import com.google.gson.JsonObject;
+
+/**
+ * Generic Internet of Things device connector.
+ */
+public interface IotDevice extends TopologyElement {
+    
+    /**
+     * Device event and command identifiers starting with {@value} are reserved for use by Edgent.
+     */
+    String RESERVED_ID_PREFIX = "edgent";
+
+    /**
+     * Publish a stream's tuples as device events.
+     * <p>
+     * Each tuple is published as a device event with the supplied functions
+     * providing the event identifier, payload and QoS. The event identifier and
+     * QoS can be generated based upon the tuple.
+     * 
+     * @param stream
+     *            Stream to be published.
+     * @param eventId
+     *            function to supply the event identifier.
+     * @param payload
+     *            function to supply the event's payload.
+     * @param qos
+     *            function to supply the event's delivery Quality of Service.
+     * @return TSink sink element representing termination of this stream.
+     */
+    TSink<JsonObject> events(TStream<JsonObject> stream, Function<JsonObject, String> eventId,
+            UnaryOperator<JsonObject> payload,
+            Function<JsonObject, Integer> qos) ;
+    
+    /**
+     * Publish a stream's tuples as device events.
+     * <p>
+     * Each tuple is published as a device event with fixed event identifier and
+     * QoS.
+     * 
+     * @param stream
+     *            Stream to be published.
+     * @param eventId
+     *            Event identifier.
+     * @param qos
+     *            Event's delivery Quality of Service.
+     * @return TSink sink element representing termination of this stream.
+     */
+    TSink<JsonObject> events(TStream<JsonObject> stream, String eventId, int qos) ;
+    
+    /**
+     * Command identifier key.
+     * Key is {@value}.
+     * 
+     * @see #commands(String...)
+     */
+    String CMD_ID = "command";
+    
+    /**
+     * Command timestamp (in milliseconds) key.
+     * Key is {@value}.
+     * 
+     * @see #commands(String...)
+     */
+    String CMD_TS = "tsms";
+    /**
+     * Command format key.
+     * Key is {@value}.
+     * 
+     * @see #commands(String...)
+     */
+    String CMD_FORMAT = "format";
+    /**
+     * Command payload key.
+     * If the command format is {@code json} then
+     * the key's value will be a {@code JsonObject},
+     * otherwise a {@code String}.
+     * Key is {@value}.
+     * 
+     * @see #commands(String...)
+     */
+    String CMD_PAYLOAD = "payload";
+
+    /**
+     * Create a stream of device commands as JSON objects.
+     * Each command sent to the device matching {@code commands} will result in a tuple
+     * on the stream. The JSON object has these keys:
+     * <UL>
+     * <LI>{@link #CMD_ID command} - Command identifier as a String</LI>
+     * <LI>{@link #CMD_TS tsms} - Timestamp of the command in milliseconds since the 1970/1/1 epoch.</LI>
+     * <LI>{@link #CMD_FORMAT format} - Format of the command as a String</LI>
+     * <LI>{@link #CMD_PAYLOAD payload} - Payload of the command
+     * <UL>
+     * <LI>If {@code format} is {@code json} then {@code payload} is JSON</LI>
+     * <LI>Otherwise {@code payload} is String</LI>
+     * </UL>
+     * </LI>
+     * </UL>
+     * 
+     * 
+     * @param commands Command identifiers to include. If no command identifiers are provided then the
+     * stream will contain all device commands.
+     * @return Stream containing device commands.
+     */
+    TStream<JsonObject> commands(String... commands);
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/QoS.java
----------------------------------------------------------------------
diff --git a/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/QoS.java b/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/QoS.java
new file mode 100644
index 0000000..ecfdece
--- /dev/null
+++ b/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/QoS.java
@@ -0,0 +1,63 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.connectors.iot;
+
+/**
+ * Device event quality of service levels.
+ * The QoS levels match the MQTT specification.
+ * <BR>
+ * An implementation of {@link IotDevice} may not
+ * support all QoS levels.
+ * 
+ * @see <a href="http://mqtt.org/">mqtt.org</a>
+ * @see IotDevice#events(org.apache.edgent.topology.TStream, String, int)
+ * @see IotDevice#events(org.apache.edgent.topology.TStream, org.apache.edgent.function.Function, org.apache.edgent.function.UnaryOperator, org.apache.edgent.function.Function)
+
+ */
+public interface QoS {
+    
+    /**
+     * The message containing the event arrives at the message hub either once or not at all.
+     * <BR>
+     * Value is {@code 0}.
+     */
+    Integer AT_MOST_ONCE = 0;
+    
+    /**
+     * Fire and forget the event. Synonym for {@link #AT_MOST_ONCE}.
+     * <BR>
+     * Value is {@code 0}.
+     */
+    Integer FIRE_AND_FORGET = 0;
+    
+    /**
+     * The message containing the event arrives at the message hub at least once.
+     * The message may be seen at the hub multiple times.
+     * <BR>
+     * Value is {@code 1}.
+     */
+    Integer AT_LEAST_ONCE = 1;
+    
+    /**
+     * The message containing the event arrives at the message hub exactly once.
+     * <BR>
+     * Value is {@code 2}.
+     */ 
+    Integer EXACTLY_ONCE = 2;
+}