You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/07/21 13:17:10 UTC

[09/54] [abbrv] [partial] incubator-quarks git commit: add "org.apache." prefix to edgent package names

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/runtime/etiao/src/main/java/org/apache/edgent/runtime/etiao/graph/model/VertexType.java
----------------------------------------------------------------------
diff --git a/runtime/etiao/src/main/java/org/apache/edgent/runtime/etiao/graph/model/VertexType.java b/runtime/etiao/src/main/java/org/apache/edgent/runtime/etiao/graph/model/VertexType.java
new file mode 100644
index 0000000..492838f
--- /dev/null
+++ b/runtime/etiao/src/main/java/org/apache/edgent/runtime/etiao/graph/model/VertexType.java
@@ -0,0 +1,67 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.runtime.etiao.graph.model;
+
+import org.apache.edgent.graph.Vertex;
+import org.apache.edgent.oplet.Oplet;
+import org.apache.edgent.runtime.etiao.graph.ExecutableVertex;
+
+/**
+ * A {@code VertexType} in a graph.
+ * <p>
+ * A {@code VertexType} has an {@link InvocationType} instance.
+ * 
+ * @param <I> Data type the oplet consumes on its input ports.
+ * @param <O> Data type the oplet produces on its output ports.
+ */
+public class VertexType<I, O> {
+
+    /**
+     * Vertex identifier, unique within the {@code GraphType} this vertex 
+     * belongs to.
+     */
+    private final String id;
+
+    /**
+     * The oplet invocation that is being executed.
+     */
+    private final InvocationType<I, O> invocation;
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public VertexType(Vertex<? extends Oplet<?, ?>, ?, ?> value, IdMapper<String> ids) {
+        this.id = (value instanceof ExecutableVertex) ?
+            ids.add(value, ((ExecutableVertex) value).getInvocationId()) :
+            // Can't get an id from the vertex, generate unique value
+            ids.add(value);
+        this.invocation = new InvocationType(value.getInstance());
+    }
+
+    public VertexType() {
+        this.id = null;
+        this.invocation = null;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public InvocationType<I, O> getInvocation() {
+        return invocation;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/runtime/etiao/src/main/java/org/apache/edgent/runtime/etiao/mbeans/EtiaoJobBean.java
----------------------------------------------------------------------
diff --git a/runtime/etiao/src/main/java/org/apache/edgent/runtime/etiao/mbeans/EtiaoJobBean.java b/runtime/etiao/src/main/java/org/apache/edgent/runtime/etiao/mbeans/EtiaoJobBean.java
new file mode 100644
index 0000000..6be36f9
--- /dev/null
+++ b/runtime/etiao/src/main/java/org/apache/edgent/runtime/etiao/mbeans/EtiaoJobBean.java
@@ -0,0 +1,207 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.runtime.etiao.mbeans;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.edgent.execution.Job;
+import org.apache.edgent.execution.Job.Action;
+import org.apache.edgent.execution.mbeans.JobMXBean;
+import org.apache.edgent.execution.services.ControlService;
+import org.apache.edgent.execution.services.Controls;
+import org.apache.edgent.runtime.etiao.EtiaoJob;
+import org.apache.edgent.runtime.etiao.graph.model.GraphType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+/**
+ * Implementation of a control interface for the {@code EtiaoJob}.
+ */
+public class EtiaoJobBean implements JobMXBean {
+    private final EtiaoJob job;
+    private ControlService controlService;
+    private String controlId;
+    private static final Logger logger = LoggerFactory.getLogger(EtiaoJobBean.class);
+    
+    /**
+     * Factory method which creates an {@code EtiaoJobBean} instance to
+     * control the specified {@code EtiaoJob} and registers it with the 
+     * specified {@code ControlService}.
+     * 
+     * @param cs the control service
+     * @param job the controlled job
+     * @return a registered bean instance
+     */
+    public static EtiaoJobBean registerControl(ControlService cs, EtiaoJob job) {
+        EtiaoJobBean bean = new EtiaoJobBean(job);
+        bean.registerControl(cs);
+        return bean;        
+    }
+
+    private EtiaoJobBean(EtiaoJob job) {
+        this.job = job;
+    }
+
+    public String getControlId() {
+        return controlId;
+    }
+
+    public boolean wasRegistered() {
+        return controlId != null;
+    }
+
+    @Override
+    public String getId() {
+        return job.getId();
+    }
+
+    @Override
+    public String getName() {
+        return job.getName();
+    }
+
+    @Override
+    public Job.State getCurrentState() {
+        return job.getCurrentState();
+    }
+
+    @Override
+    public Job.State getNextState() {
+        return job.getNextState();
+    }
+
+    @Override
+    public String graphSnapshot() {
+        Gson gson = new GsonBuilder().create();
+        return gson.toJson(new GraphType(job.graph()));
+    }
+
+    @Override
+    public Job.Health getHealth() {
+        return job.getHealth();
+    }
+
+    @Override
+    public String getLastError() {
+        return job.getLastError();
+    }
+
+    @Override
+    public void stateChange(Action action) {
+        job.stateChange(action);
+        if (wasRegistered() && action == Action.CLOSE) {
+            unregisterControlAsync();
+        }
+    }
+
+    private void registerControl(ControlService cs) {
+        if (cs == null)
+            throw new IllegalArgumentException("ControlService must not be null");
+
+        logger.trace("Registering control for job id {}, job name {}", job.getId(), job.getName());
+
+        this.controlService = cs;
+        JobMXBean oldControl = cs.getControl(JobMXBean.TYPE, job.getName(), JobMXBean.class);
+
+        if (oldControl != null) {
+            String oldControlId = cs.getControlId(JobMXBean.TYPE, job.getName(), JobMXBean.class);
+            if (oldControlId != null) {
+                if (isJobClosed(oldControl)) {
+                    cs.unregister(oldControlId);
+                    logger.debug("Old control id {} for CLOSED job name {} was unregistered", 
+                            oldControlId, job.getName());
+                }
+                else {
+                    throw new IllegalStateException(
+                        "Cannot register job control for alias " + 
+                        job.getName() + " because a job control with id " + oldControlId + 
+                        " for the same alias already exists and is not CLOSED");
+                }
+            }
+        }
+        this.controlId = cs.registerControl(JobMXBean.TYPE, job.getId(), 
+                job.getName(), JobMXBean.class, this);
+        logger.debug("Control for job id {}, job name {} was registered with id {}", 
+                job.getId(), job.getName(), controlId);
+    }
+
+    private void unregisterControlAsync() {
+        if (controlService == null)
+            throw new IllegalStateException(
+                    "The ControlService of a registered bean must not be null");
+
+        getThread(new Runnable() {
+            @Override
+            public void run() {
+                unregisterControl();
+            }
+        }).start();
+    }
+
+    private void unregisterControl() {
+        if (!wasRegistered())
+            return;
+
+        long startTime = System.currentTimeMillis();
+        try {
+            try {
+                job.completeClosing(Controls.JOB_HOLD_AFTER_CLOSE_SECS, TimeUnit.SECONDS);
+            } catch (ExecutionException e) {
+                String cause = e.getCause() != null ? e.getCause().getMessage() : "unknown";
+                logger.info("Error {} during completion of job {} caused by {}", 
+                        e.getMessage(), job.getName(), cause);
+                logger.debug("Error during completion of job " + job.getName(), e);
+            } catch (TimeoutException e) {
+                logger.info("Timed out after {} milliseconds waiting for job {} to complete", 
+                        (System.currentTimeMillis() - startTime), job.getName());                        
+            }
+            long remaining = startTime + Controls.JOB_HOLD_AFTER_CLOSE_SECS * 1000 - System.currentTimeMillis();
+            if (remaining < 0)
+                remaining = 0;
+            else
+                logger.trace("Job completed, waiting {} milliseconds before unregistering control {}", 
+                        remaining, controlId);
+
+            Thread.sleep(remaining);
+        } catch (InterruptedException e) {
+            // Swallow the exception and unregister the control 
+        }
+        finally {
+            controlService.unregister(controlId);
+            logger.trace("Control {} unregistered", controlId);
+        }
+    }
+
+    private Thread getThread(Runnable r) {
+        ThreadFactory threads = Executors.defaultThreadFactory();
+        return threads.newThread(r);
+    }
+    
+    private boolean isJobClosed(JobMXBean job) {
+        return job.getCurrentState() == Job.State.CLOSED &&
+                job.getNextState() == Job.State.CLOSED;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/runtime/etiao/src/main/java/org/apache/edgent/runtime/etiao/package-info.java
----------------------------------------------------------------------
diff --git a/runtime/etiao/src/main/java/org/apache/edgent/runtime/etiao/package-info.java b/runtime/etiao/src/main/java/org/apache/edgent/runtime/etiao/package-info.java
new file mode 100644
index 0000000..4b2f49b
--- /dev/null
+++ b/runtime/etiao/src/main/java/org/apache/edgent/runtime/etiao/package-info.java
@@ -0,0 +1,33 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+/**
+ * A runtime for executing an Edgent streaming topology, designed as an embeddable library 
+ * so that it can be executed in a simple Java application.
+ * 
+ * <h2>"EveryThing Is An Oplet" (ETIAO)</h2>
+ *
+ * The runtime's focus is on executing oplets and their connected streams, where each 
+ * oplet is just a black box. Specifically this means that functionality is added by the introduction 
+ * of oplets into the graph that were not explicitly declared by the application developer. 
+ * For example, metrics are implemented by oplets, not the runtime. A metric collector is an 
+ * oplet that calculates metrics on tuples accepted on its input port, and them makes them 
+ * available, for example through JMX.
+ */
+package org.apache.edgent.runtime.etiao;

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/runtime/etiao/src/test/java/edgent/test/runtime/etiao/EtiaoGraphTest.java
----------------------------------------------------------------------
diff --git a/runtime/etiao/src/test/java/edgent/test/runtime/etiao/EtiaoGraphTest.java b/runtime/etiao/src/test/java/edgent/test/runtime/etiao/EtiaoGraphTest.java
deleted file mode 100644
index b40ade9..0000000
--- a/runtime/etiao/src/test/java/edgent/test/runtime/etiao/EtiaoGraphTest.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.runtime.etiao;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-
-import org.junit.Test;
-
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-
-import edgent.execution.services.ServiceContainer;
-import edgent.function.Consumer;
-import edgent.graph.Connector;
-import edgent.graph.Graph;
-import edgent.graph.Vertex;
-import edgent.oplet.core.AbstractOplet;
-import edgent.oplet.core.Sink;
-import edgent.oplet.core.Split;
-import edgent.oplet.functional.SupplierPeriodicSource;
-import edgent.runtime.etiao.graph.DirectGraph;
-import edgent.runtime.etiao.graph.model.GraphType;
-import edgent.test.graph.GraphTest;
-
-public class EtiaoGraphTest extends GraphTest {
-
-    @Override
-    protected Graph createGraph() {
-        return new DirectGraph(this.getClass().getSimpleName(), new ServiceContainer());
-    }
-
-    @Test
-    public void testEmptyGraphToJson() {
-        Graph g = getGraph();
-        GraphType gt = new GraphType(g);
-        Gson gson = new GsonBuilder().setPrettyPrinting().create();
-        String json = gson.toJson(gt);
-        
-        GraphType gt2 = new Gson().fromJson(json, GraphType.class);
-        assertEquals(0, gt2.getVertices().size());
-        assertEquals(0, gt2.getEdges().size());
-    }
-
-    @Test
-    public void testGraphToJson() {
-        Graph g = getGraph();
-        TestOp<String, Integer> op = new TestOp<>();
-        /* Vertex<TestOp<String, Integer>, String, Integer> v = */g.insert(op, 1, 1);
-        GraphType gt = new GraphType(g);
-        Gson gson = new GsonBuilder().setPrettyPrinting().create();
-        String json = gson.toJson(gt);
-        
-        GraphType gt2 = new Gson().fromJson(json, GraphType.class);
-        assertEquals(1, gt2.getVertices().size());
-        assertEquals(0, gt2.getEdges().size());
-    }
-
-    @Test
-    public void testGraphToJson2() {
-        Graph g = getGraph();
-
-        TestOp<String, Integer> op1 = new TestOp<>();
-        Vertex<TestOp<String, Integer>, String, Integer> v = g.insert(op1, 1, 1);
-        
-        TestOp<Integer, Integer> op2 = new TestOp<>();
-        /*Connector<Integer> out2 = */g.pipe(v.getConnectors().get(0), op2);
-        
-        Gson gson = new GsonBuilder().setPrettyPrinting().create();
-        String json = gson.toJson(new GraphType(g));
-        
-        GraphType gt2 = new Gson().fromJson(json, GraphType.class);
-        assertEquals(2, gt2.getVertices().size());
-        assertEquals(1, gt2.getEdges().size());
-    }
-
-    @Test
-    public void testGraphToJson4() {
-        Graph g = getGraph();
-        
-        /*                                   /-- V2
-         * V0(Integer)-- V1(Double)-- FanOut
-         *                                   \-- V3 
-         */
-        Vertex<TestOp<String, Integer>, String, Integer> v0 = g.insert(new TestOp<>(), 1, 1);
-        Connector<Integer> out0 = v0.getConnectors().get(0);
-        Connector<Double> out1 = g.pipe(out0, new TestOp<Integer, Double>());
-        Vertex<TestOp<Double, String>, Double, String> v2 = g.insert(new TestOp<Double, String>(), 1, 1);
-        Vertex<TestOp<Double, String>, Double, String> v3 = g.insert(new TestOp<Double, String>(), 1, 1);
-        out1.connect(v2, 0);
-        out1.connect(v3, 0);
-        
-        Gson gson = new GsonBuilder().setPrettyPrinting().create();
-        String json = gson.toJson(new GraphType(g));
-        
-        GraphType gt2 = new Gson().fromJson(json, GraphType.class);
-        assertEquals(5, gt2.getVertices().size());
-        assertEquals(4, gt2.getEdges().size());
-    }
-
-    @Test
-    public void testGraphToJson5() {
-        Graph g = getGraph();
-        
-        /*                         /-- V2
-         * V0(Double)-- V1(Double)---- V3
-         *                         \-- V4 
-         */
-        Random r = new Random();
-        Vertex<SupplierPeriodicSource<Double>, Void, Double> v0 = g.insert(
-                new SupplierPeriodicSource<>(100, TimeUnit.MILLISECONDS, () -> (r.nextDouble() * 3)),
-                0, 1);
-        Connector<Double> out0 = v0.getConnectors().get(0);
-        out0.tag("dots", "hashes", "ats");
-        
-        // Insert split - see ConnectorStream.split()
-        Split<Double> splitOp = new Split<Double>(
-                tuple -> {
-                    switch (tuple.intValue()) {
-                    case 0:
-                        return 0;
-                    case 1:
-                        return 1;
-                    default:
-                        return 2;
-                    }
-                });
-        Vertex<Split<Double>, Double, Double> v1 = g.insert(splitOp, 1, 3);
-        out0.connect(v1, 0);
-
-        // Insert and connect sinks
-        Vertex<Sink<Double>, Double, Void> v2 = g.insert(
-                new Sink<>(tuple -> System.out.print(".")), 1, 0);
-        v1.getConnectors().get(0).connect(v2, 0);
-        v1.getConnectors().get(0).tag("dots");
-
-        Vertex<Sink<Double>, Double, Void> v3 = g.insert(
-                new Sink<>(tuple -> System.out.print("#")), 1, 0);
-        v1.getConnectors().get(1).connect(v3, 0);
-        v1.getConnectors().get(1).tag("hashes");
-        
-        Vertex<Sink<Double>, Double, Void> v4 = g.insert(
-                new Sink<>(tuple -> System.out.print("@")), 1, 0);
-        v1.getConnectors().get(2).connect(v4, 0);
-        v1.getConnectors().get(2).tag("ats");
-
-        Gson gson = new GsonBuilder().setPrettyPrinting().create();
-        String json = gson.toJson(new GraphType(g));
-        
-        GraphType gt = new Gson().fromJson(json, GraphType.class);
-        assertEquals(5, gt.getVertices().size());
-        assertEquals(4, gt.getEdges().size());
-    }
-
-    private static class TestOp<I, O> extends AbstractOplet<I, O> {
-
-        @Override
-        public void start() {
-        }
-
-        @Override
-        public List<? extends Consumer<I>> getInputs() {
-            return null;
-        }
-
-        @Override
-        public void close() throws Exception {
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/runtime/etiao/src/test/java/org/apache/edgent/test/runtime/etiao/EtiaoGraphTest.java
----------------------------------------------------------------------
diff --git a/runtime/etiao/src/test/java/org/apache/edgent/test/runtime/etiao/EtiaoGraphTest.java b/runtime/etiao/src/test/java/org/apache/edgent/test/runtime/etiao/EtiaoGraphTest.java
new file mode 100644
index 0000000..302e7e0
--- /dev/null
+++ b/runtime/etiao/src/test/java/org/apache/edgent/test/runtime/etiao/EtiaoGraphTest.java
@@ -0,0 +1,188 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.runtime.etiao;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.edgent.execution.services.ServiceContainer;
+import org.apache.edgent.function.Consumer;
+import org.apache.edgent.graph.Connector;
+import org.apache.edgent.graph.Graph;
+import org.apache.edgent.graph.Vertex;
+import org.apache.edgent.oplet.core.AbstractOplet;
+import org.apache.edgent.oplet.core.Sink;
+import org.apache.edgent.oplet.core.Split;
+import org.apache.edgent.oplet.functional.SupplierPeriodicSource;
+import org.apache.edgent.runtime.etiao.graph.DirectGraph;
+import org.apache.edgent.runtime.etiao.graph.model.GraphType;
+import org.apache.edgent.test.graph.GraphTest;
+import org.junit.Test;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+public class EtiaoGraphTest extends GraphTest {
+
+    @Override
+    protected Graph createGraph() {
+        return new DirectGraph(this.getClass().getSimpleName(), new ServiceContainer());
+    }
+
+    @Test
+    public void testEmptyGraphToJson() {
+        Graph g = getGraph();
+        GraphType gt = new GraphType(g);
+        Gson gson = new GsonBuilder().setPrettyPrinting().create();
+        String json = gson.toJson(gt);
+        
+        GraphType gt2 = new Gson().fromJson(json, GraphType.class);
+        assertEquals(0, gt2.getVertices().size());
+        assertEquals(0, gt2.getEdges().size());
+    }
+
+    @Test
+    public void testGraphToJson() {
+        Graph g = getGraph();
+        TestOp<String, Integer> op = new TestOp<>();
+        /* Vertex<TestOp<String, Integer>, String, Integer> v = */g.insert(op, 1, 1);
+        GraphType gt = new GraphType(g);
+        Gson gson = new GsonBuilder().setPrettyPrinting().create();
+        String json = gson.toJson(gt);
+        
+        GraphType gt2 = new Gson().fromJson(json, GraphType.class);
+        assertEquals(1, gt2.getVertices().size());
+        assertEquals(0, gt2.getEdges().size());
+    }
+
+    @Test
+    public void testGraphToJson2() {
+        Graph g = getGraph();
+
+        TestOp<String, Integer> op1 = new TestOp<>();
+        Vertex<TestOp<String, Integer>, String, Integer> v = g.insert(op1, 1, 1);
+        
+        TestOp<Integer, Integer> op2 = new TestOp<>();
+        /*Connector<Integer> out2 = */g.pipe(v.getConnectors().get(0), op2);
+        
+        Gson gson = new GsonBuilder().setPrettyPrinting().create();
+        String json = gson.toJson(new GraphType(g));
+        
+        GraphType gt2 = new Gson().fromJson(json, GraphType.class);
+        assertEquals(2, gt2.getVertices().size());
+        assertEquals(1, gt2.getEdges().size());
+    }
+
+    @Test
+    public void testGraphToJson4() {
+        Graph g = getGraph();
+        
+        /*                                   /-- V2
+         * V0(Integer)-- V1(Double)-- FanOut
+         *                                   \-- V3 
+         */
+        Vertex<TestOp<String, Integer>, String, Integer> v0 = g.insert(new TestOp<>(), 1, 1);
+        Connector<Integer> out0 = v0.getConnectors().get(0);
+        Connector<Double> out1 = g.pipe(out0, new TestOp<Integer, Double>());
+        Vertex<TestOp<Double, String>, Double, String> v2 = g.insert(new TestOp<Double, String>(), 1, 1);
+        Vertex<TestOp<Double, String>, Double, String> v3 = g.insert(new TestOp<Double, String>(), 1, 1);
+        out1.connect(v2, 0);
+        out1.connect(v3, 0);
+        
+        Gson gson = new GsonBuilder().setPrettyPrinting().create();
+        String json = gson.toJson(new GraphType(g));
+        
+        GraphType gt2 = new Gson().fromJson(json, GraphType.class);
+        assertEquals(5, gt2.getVertices().size());
+        assertEquals(4, gt2.getEdges().size());
+    }
+
+    @Test
+    public void testGraphToJson5() {
+        Graph g = getGraph();
+        
+        /*                         /-- V2
+         * V0(Double)-- V1(Double)---- V3
+         *                         \-- V4 
+         */
+        Random r = new Random();
+        Vertex<SupplierPeriodicSource<Double>, Void, Double> v0 = g.insert(
+                new SupplierPeriodicSource<>(100, TimeUnit.MILLISECONDS, () -> (r.nextDouble() * 3)),
+                0, 1);
+        Connector<Double> out0 = v0.getConnectors().get(0);
+        out0.tag("dots", "hashes", "ats");
+        
+        // Insert split - see ConnectorStream.split()
+        Split<Double> splitOp = new Split<Double>(
+                tuple -> {
+                    switch (tuple.intValue()) {
+                    case 0:
+                        return 0;
+                    case 1:
+                        return 1;
+                    default:
+                        return 2;
+                    }
+                });
+        Vertex<Split<Double>, Double, Double> v1 = g.insert(splitOp, 1, 3);
+        out0.connect(v1, 0);
+
+        // Insert and connect sinks
+        Vertex<Sink<Double>, Double, Void> v2 = g.insert(
+                new Sink<>(tuple -> System.out.print(".")), 1, 0);
+        v1.getConnectors().get(0).connect(v2, 0);
+        v1.getConnectors().get(0).tag("dots");
+
+        Vertex<Sink<Double>, Double, Void> v3 = g.insert(
+                new Sink<>(tuple -> System.out.print("#")), 1, 0);
+        v1.getConnectors().get(1).connect(v3, 0);
+        v1.getConnectors().get(1).tag("hashes");
+        
+        Vertex<Sink<Double>, Double, Void> v4 = g.insert(
+                new Sink<>(tuple -> System.out.print("@")), 1, 0);
+        v1.getConnectors().get(2).connect(v4, 0);
+        v1.getConnectors().get(2).tag("ats");
+
+        Gson gson = new GsonBuilder().setPrettyPrinting().create();
+        String json = gson.toJson(new GraphType(g));
+        
+        GraphType gt = new Gson().fromJson(json, GraphType.class);
+        assertEquals(5, gt.getVertices().size());
+        assertEquals(4, gt.getEdges().size());
+    }
+
+    private static class TestOp<I, O> extends AbstractOplet<I, O> {
+
+        @Override
+        public void start() {
+        }
+
+        @Override
+        public List<? extends Consumer<I>> getInputs() {
+            return null;
+        }
+
+        @Override
+        public void close() throws Exception {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/runtime/jmxcontrol/src/main/java/edgent/runtime/jmxcontrol/JMXControlService.java
----------------------------------------------------------------------
diff --git a/runtime/jmxcontrol/src/main/java/edgent/runtime/jmxcontrol/JMXControlService.java b/runtime/jmxcontrol/src/main/java/edgent/runtime/jmxcontrol/JMXControlService.java
deleted file mode 100644
index 6065c98..0000000
--- a/runtime/jmxcontrol/src/main/java/edgent/runtime/jmxcontrol/JMXControlService.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.runtime.jmxcontrol;
-
-import java.lang.management.ManagementFactory;
-import java.util.Hashtable;
-import java.util.Set;
-
-import javax.management.InstanceAlreadyExistsException;
-import javax.management.InstanceNotFoundException;
-import javax.management.JMX;
-import javax.management.MBeanRegistrationException;
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.NotCompliantMBeanException;
-import javax.management.ObjectName;
-
-import edgent.execution.services.ControlService;
-
-/**
- * Control service that registers control objects
- * as MBeans in a JMX server.
- *
- */
-public class JMXControlService implements ControlService {
-	
-	private final MBeanServer mbs;
-	private final String domain;
-	private final Hashtable<String,String> additionalKeys;
-	
-	/**
-	 * JMX control service using the platform MBean server.
-	 * @param domain Domain the MBeans are registered in.
-	 * @param additionalKeys additional name/value keys to add to the generated JMX object names
-	 */
-	public JMXControlService(String domain, Hashtable<String,String> additionalKeys) {
-		mbs = ManagementFactory.getPlatformMBeanServer();
-		this.domain = domain;
-		this.additionalKeys = additionalKeys;
-	}
-	
-	
-	/**
-	 * Get the MBean server being used by this control service.
-	 * @return MBean server being used by this control service.
-	 */
-	public MBeanServer getMbs() {
-		return mbs;
-	}
-	
-	/**
-     * Get the JMX domain being used by this control service.
-     * @return JMX domain being used by this control service.
-     */
-	public String getDomain() {
-        return domain;
-    }
-
-	/**
-	 * 
-	 * Register a control object as an MBean.
-	 * 
-	 * {@inheritDoc}
-	 * 
-	 * The MBean is registered within the domain returned by {@link #getDomain()}
-	 * and an `ObjectName` with these keys:
-	 * <UL>
-	 * <LI>type</LI> {@code type}
-	 * <LI>interface</LI> {@code controlInterface.getName()}
-	 * <LI>id</LI> {@code type}
-	 * <LI>alias</LI> {@code alias}
-	 * </UL>
-	 * 
-	 */
-	@Override
-	public <T> String registerControl(String type, String id, String alias, Class<T> controlInterface, T control) {
-		Hashtable<String,String> table = new Hashtable<>();
-		
-		table.put("type", ObjectName.quote(type));
-		table.put("interface", ObjectName.quote(controlInterface.getName()));
-		table.put("id", ObjectName.quote(id));
-		if (alias != null)
-		   table.put("alias", ObjectName.quote(alias));
-		
-		additionalNameKeys(table);
-			
-        try {
-            ObjectName on = ObjectName.getInstance(getDomain(), table);
-            getMbs().registerMBean(control, on);
-
-            return getControlId(on);
-        } catch (InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException
-                | MalformedObjectNameException e) {
-            throw new RuntimeException(e);
-        }
-
-	}
-	
-	protected void additionalNameKeys(Hashtable<String,String> table) {
-	    table.putAll(additionalKeys);
-	}
-	
-	@Override
-	public void unregister(String controlId) {
-		try {
-            mbs.unregisterMBean(ObjectName.getInstance(controlId));
-        } catch (MBeanRegistrationException | InstanceNotFoundException | MalformedObjectNameException
-                | NullPointerException e) {
-            throw new RuntimeException(e);
-        }
-	}
-
-    @Override
-    public <T> T  getControl(String type, String alias, Class<T> controlInterface) {
-        MBeanServer mBeanServer = getMbs();
-        ObjectName name = getObjectNameForInterface(type, alias, controlInterface);
-        return name != null ? JMX.newMXBeanProxy(mBeanServer, name, controlInterface) : null;
-    }
-
-    @Override
-    public <T> String getControlId(String type, String alias, Class<T> controlInterface) {
-        return getControlId(getObjectNameForInterface(type, alias, controlInterface));
-    }
-
-    private <T> ObjectName getObjectNameForInterface(String type, String alias, Class<T> controlInterface) {
-        try {
-            Set<ObjectName> names = getObjectNamesForInterface(type, alias, controlInterface.getName());
-            
-            if (names.isEmpty())
-                return null;
-            if (names.size() != 1)
-                throw new RuntimeException("Alias " + alias + " not unique for type " + type);
-    
-            ObjectName name = null;
-            for (ObjectName on : names) {
-                name = on;
-                break;
-            }
-            return name;
-        }
-        catch (MalformedObjectNameException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private static String getControlId(ObjectName on) {
-        return on != null ? on.getCanonicalName() : null;
-    }
-
-    private Set<ObjectName> getObjectNamesForInterface(String type, String alias, String interfaceName) 
-            throws MalformedObjectNameException {
-        
-        Hashtable<String,String> table = new Hashtable<>();       
-        table.put("interface", ObjectName.quote(interfaceName));
-        table.put("type", ObjectName.quote(type));
-        if (alias != null)
-            table.put("alias", ObjectName.quote(alias));
-        ObjectName objName = new ObjectName(getDomain(), table);
-        
-        // Add the wildcard for any other properties.
-        objName = new ObjectName(objName.getCanonicalName()+",*");
-
-        MBeanServer mBeanServer = getMbs();
-        return mBeanServer.queryNames(objName, null);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/runtime/jmxcontrol/src/main/java/org/apache/edgent/runtime/jmxcontrol/JMXControlService.java
----------------------------------------------------------------------
diff --git a/runtime/jmxcontrol/src/main/java/org/apache/edgent/runtime/jmxcontrol/JMXControlService.java b/runtime/jmxcontrol/src/main/java/org/apache/edgent/runtime/jmxcontrol/JMXControlService.java
new file mode 100644
index 0000000..c039fd1
--- /dev/null
+++ b/runtime/jmxcontrol/src/main/java/org/apache/edgent/runtime/jmxcontrol/JMXControlService.java
@@ -0,0 +1,182 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.runtime.jmxcontrol;
+
+import java.lang.management.ManagementFactory;
+import java.util.Hashtable;
+import java.util.Set;
+
+import javax.management.InstanceAlreadyExistsException;
+import javax.management.InstanceNotFoundException;
+import javax.management.JMX;
+import javax.management.MBeanRegistrationException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+
+import org.apache.edgent.execution.services.ControlService;
+
+/**
+ * Control service that registers control objects
+ * as MBeans in a JMX server.
+ *
+ */
+public class JMXControlService implements ControlService {
+	
+	private final MBeanServer mbs;
+	private final String domain;
+	private final Hashtable<String,String> additionalKeys;
+	
+	/**
+	 * JMX control service using the platform MBean server.
+	 * @param domain Domain the MBeans are registered in.
+	 * @param additionalKeys additional name/value keys to add to the generated JMX object names
+	 */
+	public JMXControlService(String domain, Hashtable<String,String> additionalKeys) {
+		mbs = ManagementFactory.getPlatformMBeanServer();
+		this.domain = domain;
+		this.additionalKeys = additionalKeys;
+	}
+	
+	
+	/**
+	 * Get the MBean server being used by this control service.
+	 * @return MBean server being used by this control service.
+	 */
+	public MBeanServer getMbs() {
+		return mbs;
+	}
+	
+	/**
+     * Get the JMX domain being used by this control service.
+     * @return JMX domain being used by this control service.
+     */
+	public String getDomain() {
+        return domain;
+    }
+
+	/**
+	 * 
+	 * Register a control object as an MBean.
+	 * 
+	 * {@inheritDoc}
+	 * 
+	 * The MBean is registered within the domain returned by {@link #getDomain()}
+	 * and an `ObjectName` with these keys:
+	 * <UL>
+	 * <LI>type</LI> {@code type}
+	 * <LI>interface</LI> {@code controlInterface.getName()}
+	 * <LI>id</LI> {@code type}
+	 * <LI>alias</LI> {@code alias}
+	 * </UL>
+	 * 
+	 */
+	@Override
+	public <T> String registerControl(String type, String id, String alias, Class<T> controlInterface, T control) {
+		Hashtable<String,String> table = new Hashtable<>();
+		
+		table.put("type", ObjectName.quote(type));
+		table.put("interface", ObjectName.quote(controlInterface.getName()));
+		table.put("id", ObjectName.quote(id));
+		if (alias != null)
+		   table.put("alias", ObjectName.quote(alias));
+		
+		additionalNameKeys(table);
+			
+        try {
+            ObjectName on = ObjectName.getInstance(getDomain(), table);
+            getMbs().registerMBean(control, on);
+
+            return getControlId(on);
+        } catch (InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException
+                | MalformedObjectNameException e) {
+            throw new RuntimeException(e);
+        }
+
+	}
+	
+	protected void additionalNameKeys(Hashtable<String,String> table) {
+	    table.putAll(additionalKeys);
+	}
+	
+	@Override
+	public void unregister(String controlId) {
+		try {
+            mbs.unregisterMBean(ObjectName.getInstance(controlId));
+        } catch (MBeanRegistrationException | InstanceNotFoundException | MalformedObjectNameException
+                | NullPointerException e) {
+            throw new RuntimeException(e);
+        }
+	}
+
+    @Override
+    public <T> T  getControl(String type, String alias, Class<T> controlInterface) {
+        MBeanServer mBeanServer = getMbs();
+        ObjectName name = getObjectNameForInterface(type, alias, controlInterface);
+        return name != null ? JMX.newMXBeanProxy(mBeanServer, name, controlInterface) : null;
+    }
+
+    @Override
+    public <T> String getControlId(String type, String alias, Class<T> controlInterface) {
+        return getControlId(getObjectNameForInterface(type, alias, controlInterface));
+    }
+
+    private <T> ObjectName getObjectNameForInterface(String type, String alias, Class<T> controlInterface) {
+        try {
+            Set<ObjectName> names = getObjectNamesForInterface(type, alias, controlInterface.getName());
+            
+            if (names.isEmpty())
+                return null;
+            if (names.size() != 1)
+                throw new RuntimeException("Alias " + alias + " not unique for type " + type);
+    
+            ObjectName name = null;
+            for (ObjectName on : names) {
+                name = on;
+                break;
+            }
+            return name;
+        }
+        catch (MalformedObjectNameException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static String getControlId(ObjectName on) {
+        return on != null ? on.getCanonicalName() : null;
+    }
+
+    private Set<ObjectName> getObjectNamesForInterface(String type, String alias, String interfaceName) 
+            throws MalformedObjectNameException {
+        
+        Hashtable<String,String> table = new Hashtable<>();       
+        table.put("interface", ObjectName.quote(interfaceName));
+        table.put("type", ObjectName.quote(type));
+        if (alias != null)
+            table.put("alias", ObjectName.quote(alias));
+        ObjectName objName = new ObjectName(getDomain(), table);
+        
+        // Add the wildcard for any other properties.
+        objName = new ObjectName(objName.getCanonicalName()+",*");
+
+        MBeanServer mBeanServer = getMbs();
+        return mBeanServer.queryNames(objName, null);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/runtime/jmxcontrol/src/test/java/edgent/test/runtime/jmxcontrol/JMXControlServiceTest.java
----------------------------------------------------------------------
diff --git a/runtime/jmxcontrol/src/test/java/edgent/test/runtime/jmxcontrol/JMXControlServiceTest.java b/runtime/jmxcontrol/src/test/java/edgent/test/runtime/jmxcontrol/JMXControlServiceTest.java
deleted file mode 100644
index d29657d..0000000
--- a/runtime/jmxcontrol/src/test/java/edgent/test/runtime/jmxcontrol/JMXControlServiceTest.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.runtime.jmxcontrol;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import java.lang.management.ManagementFactory;
-import java.util.Hashtable;
-
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-import javax.management.timer.Timer;
-import javax.management.timer.TimerMBean;
-
-import org.junit.Test;
-
-import edgent.execution.services.ControlService;
-import edgent.runtime.jmxcontrol.JMXControlService;
-
-public class JMXControlServiceTest {
-    
-    private static final String DOMAIN = JMXControlServiceTest.class.getName();
-
-    @Test
-    public void testControlObject() throws Exception {
-        
-        ControlService cs = new JMXControlService(DOMAIN, new Hashtable<>());
-        
-        String type = "timer";
-        String id = "a";
-        String alias = "ControlA";
-        String controlId = cs.registerControl(type, id, alias, TimerMBean.class, new Timer());
-        
-        assertNotNull(controlId);
-        
-        ObjectName on = ObjectName.getInstance(controlId);
-        
-        assertEquals(DOMAIN, on.getDomain());
-        
-        assertEquals(type, ObjectName.unquote(on.getKeyProperty("type")));
-        assertEquals(id, ObjectName.unquote(on.getKeyProperty("id")));
-        assertEquals(alias, ObjectName.unquote(on.getKeyProperty("alias")));
-        assertEquals(TimerMBean.class.getName(), ObjectName.unquote(on.getKeyProperty("interface")));
-        
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        
-        assertTrue(mbs.isRegistered(on));
-        
-        cs.unregister(controlId);
-        assertFalse(mbs.isRegistered(on));  
-    }
-    
-    @Test
-    public void testAdditionalKeys() throws Exception {
-        
-        Hashtable<String,String> addKeys = new Hashtable<>();
-        addKeys.put("job", "jobid");
-        addKeys.put("device", ObjectName.quote("pi"));
-        
-        ControlService cs = new JMXControlService(DOMAIN, addKeys);
-        
-        String type = "timer";
-        String id = "a";
-        String alias = "ControlA";
-        String controlId = cs.registerControl(type, id, alias, TimerMBean.class, new Timer());
-        
-        assertNotNull(controlId);
-        
-        ObjectName on = ObjectName.getInstance(controlId);
-        
-        assertEquals(DOMAIN, on.getDomain());
-        
-        assertEquals(type, ObjectName.unquote(on.getKeyProperty("type")));
-        assertEquals(id, ObjectName.unquote(on.getKeyProperty("id")));
-        assertEquals(alias, ObjectName.unquote(on.getKeyProperty("alias")));
-        assertEquals(TimerMBean.class.getName(), ObjectName.unquote(on.getKeyProperty("interface")));
-        
-        assertEquals("jobid", on.getKeyProperty("job"));
-        assertEquals("pi", ObjectName.unquote(on.getKeyProperty("device")));
- 
-        
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        
-        assertTrue(mbs.isRegistered(on));
-        
-        cs.unregister(controlId);
-        assertFalse(mbs.isRegistered(on));  
-    }
-    
-    @Test(expected=RuntimeException.class)
-    public void testDoubleRegister() throws Exception {
-        
-        ControlService cs = new JMXControlService(DOMAIN, new Hashtable<>());
-        
-        String type = "timer";
-        String id = "a";
-        String alias = "ControlA";
-        String controlId = cs.registerControl(type, id, alias, TimerMBean.class, new Timer());
-        try {
-            cs.registerControl(type, id, alias, TimerMBean.class, new Timer());
-        } finally {
-            cs.unregister(controlId);
-        }
-    }
-    @Test(expected=RuntimeException.class)
-    public void testDoubleunregister() throws Exception {
-        
-        ControlService cs = new JMXControlService(DOMAIN, new Hashtable<>());
-        
-        String type = "timer";
-        String id = "a";
-        String alias = "ControlA";
-        String controlId = cs.registerControl(type, id, alias, TimerMBean.class, new Timer());
-        cs.unregister(controlId);
-        cs.unregister(controlId);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/runtime/jmxcontrol/src/test/java/org/apache/edgent/test/runtime/jmxcontrol/JMXControlServiceTest.java
----------------------------------------------------------------------
diff --git a/runtime/jmxcontrol/src/test/java/org/apache/edgent/test/runtime/jmxcontrol/JMXControlServiceTest.java b/runtime/jmxcontrol/src/test/java/org/apache/edgent/test/runtime/jmxcontrol/JMXControlServiceTest.java
new file mode 100644
index 0000000..79863f6
--- /dev/null
+++ b/runtime/jmxcontrol/src/test/java/org/apache/edgent/test/runtime/jmxcontrol/JMXControlServiceTest.java
@@ -0,0 +1,135 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.runtime.jmxcontrol;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.lang.management.ManagementFactory;
+import java.util.Hashtable;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.timer.Timer;
+import javax.management.timer.TimerMBean;
+
+import org.apache.edgent.execution.services.ControlService;
+import org.apache.edgent.runtime.jmxcontrol.JMXControlService;
+import org.junit.Test;
+
+public class JMXControlServiceTest {
+    
+    private static final String DOMAIN = JMXControlServiceTest.class.getName();
+
+    @Test
+    public void testControlObject() throws Exception {
+        
+        ControlService cs = new JMXControlService(DOMAIN, new Hashtable<>());
+        
+        String type = "timer";
+        String id = "a";
+        String alias = "ControlA";
+        String controlId = cs.registerControl(type, id, alias, TimerMBean.class, new Timer());
+        
+        assertNotNull(controlId);
+        
+        ObjectName on = ObjectName.getInstance(controlId);
+        
+        assertEquals(DOMAIN, on.getDomain());
+        
+        assertEquals(type, ObjectName.unquote(on.getKeyProperty("type")));
+        assertEquals(id, ObjectName.unquote(on.getKeyProperty("id")));
+        assertEquals(alias, ObjectName.unquote(on.getKeyProperty("alias")));
+        assertEquals(TimerMBean.class.getName(), ObjectName.unquote(on.getKeyProperty("interface")));
+        
+        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+        
+        assertTrue(mbs.isRegistered(on));
+        
+        cs.unregister(controlId);
+        assertFalse(mbs.isRegistered(on));  
+    }
+    
+    @Test
+    public void testAdditionalKeys() throws Exception {
+        
+        Hashtable<String,String> addKeys = new Hashtable<>();
+        addKeys.put("job", "jobid");
+        addKeys.put("device", ObjectName.quote("pi"));
+        
+        ControlService cs = new JMXControlService(DOMAIN, addKeys);
+        
+        String type = "timer";
+        String id = "a";
+        String alias = "ControlA";
+        String controlId = cs.registerControl(type, id, alias, TimerMBean.class, new Timer());
+        
+        assertNotNull(controlId);
+        
+        ObjectName on = ObjectName.getInstance(controlId);
+        
+        assertEquals(DOMAIN, on.getDomain());
+        
+        assertEquals(type, ObjectName.unquote(on.getKeyProperty("type")));
+        assertEquals(id, ObjectName.unquote(on.getKeyProperty("id")));
+        assertEquals(alias, ObjectName.unquote(on.getKeyProperty("alias")));
+        assertEquals(TimerMBean.class.getName(), ObjectName.unquote(on.getKeyProperty("interface")));
+        
+        assertEquals("jobid", on.getKeyProperty("job"));
+        assertEquals("pi", ObjectName.unquote(on.getKeyProperty("device")));
+ 
+        
+        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+        
+        assertTrue(mbs.isRegistered(on));
+        
+        cs.unregister(controlId);
+        assertFalse(mbs.isRegistered(on));  
+    }
+    
+    @Test(expected=RuntimeException.class)
+    public void testDoubleRegister() throws Exception {
+        
+        ControlService cs = new JMXControlService(DOMAIN, new Hashtable<>());
+        
+        String type = "timer";
+        String id = "a";
+        String alias = "ControlA";
+        String controlId = cs.registerControl(type, id, alias, TimerMBean.class, new Timer());
+        try {
+            cs.registerControl(type, id, alias, TimerMBean.class, new Timer());
+        } finally {
+            cs.unregister(controlId);
+        }
+    }
+    @Test(expected=RuntimeException.class)
+    public void testDoubleunregister() throws Exception {
+        
+        ControlService cs = new JMXControlService(DOMAIN, new Hashtable<>());
+        
+        String type = "timer";
+        String id = "a";
+        String alias = "ControlA";
+        String controlId = cs.registerControl(type, id, alias, TimerMBean.class, new Timer());
+        cs.unregister(controlId);
+        cs.unregister(controlId);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/runtime/jobregistry/src/main/java/edgent/runtime/jobregistry/JobEvents.java
----------------------------------------------------------------------
diff --git a/runtime/jobregistry/src/main/java/edgent/runtime/jobregistry/JobEvents.java b/runtime/jobregistry/src/main/java/edgent/runtime/jobregistry/JobEvents.java
deleted file mode 100644
index 8444358..0000000
--- a/runtime/jobregistry/src/main/java/edgent/runtime/jobregistry/JobEvents.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.runtime.jobregistry;
-
-import edgent.execution.Job;
-import edgent.execution.services.JobRegistryService;
-import edgent.execution.services.RuntimeServices;
-import edgent.execution.services.JobRegistryService.EventType;
-import edgent.function.BiConsumer;
-import edgent.function.BiFunction;
-import edgent.function.Consumer;
-import edgent.function.Supplier;
-import edgent.topology.TStream;
-import edgent.topology.Topology;
-
-/**
- * A source of job event tuples.
- * <p>
- * A stream of job event tuples is 
- * {@linkplain #source(Topology, BiFunction) created} by a listener which
- * subscribes to a {@link JobRegistryService}.
- * </p>
- */
-public class JobEvents {
-
-    /**
-     * Declares a stream populated by {@link JobRegistryService} events.
-     * <p>
-     * The job registry is passed as a runtime service. At startup 
-     * {@code JobRegistryService#addListener()} is called by the 
-     * runtime to subscribe an event listener.  The listener invokes the given 
-     * {@code wrapper} function to construct a tuple from a job event
-     * and submits the tuple on the returned stream.</p>
-     * <p>
-     * When the topology's execution is terminated, 
-     * {@code JobRegistryServic#removeListener()}  in invoked to unsubscribe 
-     * the tuple source from the job registry. 
-     * </p>
-     *
-     * @param <T> Tuple type
-     * @param topology the stream topology
-     * @param wrapper constructs a tuple from a job event
-     * @return new stream containing the tuples generated by the specified {@code wrapper}.
-     * 
-     * @see Topology#getRuntimeServiceSupplier() 
-     * @see JobRegistryService#addListener(BiConsumer)
-     * @see JobRegistryService#removeListener(BiConsumer)
-     */
-    public static <T> TStream<T> source(
-            Topology topology, 
-            BiFunction<JobRegistryService.EventType, Job, T> wrapper) {
-
-        Supplier<RuntimeServices> rts = topology.getRuntimeServiceSupplier();
-        return topology.events(new JobEventsSetup<T>(wrapper, rts));
-    }
-
-    /**
-     * Job events setup Consumer that adds a subscriber to the 
-     * JobRegistryService on start up and removes it on close. 
-     *
-     * @param <T> Type of the tuples.
-     */
-    private static final class JobEventsSetup<T> 
-            implements Consumer<Consumer<T>>, AutoCloseable {
-
-        private static final long serialVersionUID = 1L;
-        private final Supplier<RuntimeServices> rts;
-        private final JobEventsListener<T> listener;
-        
-        JobEventsSetup(BiFunction<JobRegistryService.EventType, Job, T> 
-                tupleGen, Supplier<RuntimeServices> rts) {
-
-            this.rts = rts;
-            this.listener = new JobEventsListener<T>(tupleGen);
-        }
-
-        @Override
-        public void accept(Consumer<T> submitter) {
-            JobRegistryService jobRegistry = rts.get().getService(JobRegistryService.class);
-            if (jobRegistry != null) {
-                listener.setSubmitter(submitter);
-                jobRegistry.addListener(listener);
-            }
-        }
-
-        @Override
-        public void close() throws Exception {
-            JobRegistryService jobRegistry = rts.get().getService(JobRegistryService.class);
-            if (jobRegistry != null) {
-                jobRegistry.removeListener(listener);
-            }
-        }
-        
-        /**
-         * JobRegistryService listener which uses a tuple generator for 
-         * wrapping job events into tuples and a consumer for submitting 
-         * the tuples. 
-         *
-         * @param <T> Type of the tuples.
-         */
-        private static final class JobEventsListener<T> 
-                implements BiConsumer<JobRegistryService.EventType, Job> {
-
-            private static final long serialVersionUID = 1L;
-            private Consumer<T> eventSubmitter;
-            private final BiFunction<JobRegistryService.EventType, Job, T> tupleGenerator;
-            
-            JobEventsListener(BiFunction<JobRegistryService.EventType, Job, T> tupleGen) {
-                this.tupleGenerator = tupleGen;
-            }
-
-            void setSubmitter(Consumer<T> submitter) {
-                this.eventSubmitter = submitter;
-            }
-
-            @Override
-            public void accept(EventType evType, Job job) {
-                T tuple = tupleGenerator.apply(evType, job);
-                eventSubmitter.accept(tuple);          
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/runtime/jobregistry/src/main/java/edgent/runtime/jobregistry/JobRegistry.java
----------------------------------------------------------------------
diff --git a/runtime/jobregistry/src/main/java/edgent/runtime/jobregistry/JobRegistry.java b/runtime/jobregistry/src/main/java/edgent/runtime/jobregistry/JobRegistry.java
deleted file mode 100644
index a408cc5..0000000
--- a/runtime/jobregistry/src/main/java/edgent/runtime/jobregistry/JobRegistry.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.runtime.jobregistry;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import edgent.execution.Job;
-import edgent.execution.services.JobRegistryService;
-import edgent.execution.services.ServiceContainer;
-import edgent.function.BiConsumer;
-
-/**
- * Maintains a set of registered jobs and a set of listeners.
- * Notifies listeners on job additions, deletions and updates.
- */
-public class JobRegistry implements JobRegistryService {
-
-    /**
-     * Creates and registers a {@link JobRegistry} with the given service 
-     * container.
-     * 
-     * @param services provides access to service registration
-     * @return service instance.
-     */
-    public static JobRegistryService createAndRegister(ServiceContainer services) {
-        JobRegistryService service = new JobRegistry();
-        services.addService(JobRegistryService.class, service);
-        return service;        
-    }
-
-    private final ConcurrentMap<String /*JobId*/, Job> jobs;
-    private final Broadcaster<JobRegistryService.EventType, Job> listeners;
-    private static final Logger logger = LoggerFactory.getLogger(JobRegistry.class);
-
-    /**
-     * Creates a new {@link JobRegistry}.
-     */
-    public JobRegistry() {
-        this.jobs = new ConcurrentHashMap<String, Job>();
-        this.listeners = new Broadcaster<JobRegistryService.EventType, Job>();
-    }
-
-    @Override
-    public void addListener(BiConsumer<JobRegistryService.EventType, Job> listener) {
-        listeners.add(listener);
-        
-        synchronized (jobs) {
-            for (Job job : jobs.values())
-                listener.accept(JobRegistryService.EventType.ADD, job);
-        }
-    }
-
-    @Override
-    public boolean removeListener(BiConsumer<JobRegistryService.EventType, Job> listener) {
-        return listeners.remove(listener);
-    }
-
-    @Override
-    public Set<String> getJobIds() {
-        return Collections.unmodifiableSet(jobs.keySet());
-    }
-
-    @Override
-    public Job getJob(String id) {
-        return jobs.get(id);
-    }
-
-    @Override
-    public boolean removeJob(String jobId) {
-        final Job removed = jobs.remove(jobId);
-        if (removed != null) {
-            listeners.onEvent(JobRegistryService.EventType.REMOVE, removed);
-            return true;
-        }
-        return false;
-    }
-
-    @Override
-    public void addJob(Job job) throws IllegalArgumentException {
-        final Job existing = jobs.putIfAbsent(job.getId(), job);
-        if (existing == null) {
-            listeners.onEvent(JobRegistryService.EventType.ADD, job);
-        } else {
-            throw new IllegalArgumentException("A job with Id " + job.getId()
-                + " already exists");
-        }
-    }
-
-    @Override
-    public boolean updateJob(Job job) {
-        if (jobs.containsValue(job)) {
-            listeners.onEvent(JobRegistryService.EventType.UPDATE, job);
-            return true;
-        }
-        return false;
-    }
-
-    private static class Broadcaster<T, O> {
-        private final List<BiConsumer<T, O>> listeners;
-
-        Broadcaster() {
-            this.listeners = new CopyOnWriteArrayList<BiConsumer<T, O>>();
-        }
-
-        void add(BiConsumer<T, O> listener) {
-            if (listener == null) {
-                throw new IllegalArgumentException("Null listener") ;
-            }
-            listeners.add(listener);
-        }
-
-        boolean remove(BiConsumer<T, O> listener) {
-            if (listener == null)
-                return false;
-            return listeners.remove(listener);
-        }
-
-        private void onEvent(T event, O job) {
-            for (BiConsumer<T, O> listener : listeners) {
-                try {
-                    listener.accept(event, job);
-                } catch (Exception e) {
-                    logger.error("Exception caught while invoking listener:" + e);
-                }
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/runtime/jobregistry/src/main/java/edgent/runtime/jobregistry/package-info.java
----------------------------------------------------------------------
diff --git a/runtime/jobregistry/src/main/java/edgent/runtime/jobregistry/package-info.java b/runtime/jobregistry/src/main/java/edgent/runtime/jobregistry/package-info.java
deleted file mode 100644
index 64f7ef9..0000000
--- a/runtime/jobregistry/src/main/java/edgent/runtime/jobregistry/package-info.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.runtime.jobregistry;
-
-/**
- * {@link JobRegistryService} implementation which provides {@link Job} 
- * registration, access to registered jobs, as well as subscriptions to job
- * update events.
- */

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/runtime/jobregistry/src/main/java/org/apache/edgent/runtime/jobregistry/JobEvents.java
----------------------------------------------------------------------
diff --git a/runtime/jobregistry/src/main/java/org/apache/edgent/runtime/jobregistry/JobEvents.java b/runtime/jobregistry/src/main/java/org/apache/edgent/runtime/jobregistry/JobEvents.java
new file mode 100644
index 0000000..8a6bbbf
--- /dev/null
+++ b/runtime/jobregistry/src/main/java/org/apache/edgent/runtime/jobregistry/JobEvents.java
@@ -0,0 +1,139 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.runtime.jobregistry;
+
+import org.apache.edgent.execution.Job;
+import org.apache.edgent.execution.services.JobRegistryService;
+import org.apache.edgent.execution.services.RuntimeServices;
+import org.apache.edgent.execution.services.JobRegistryService.EventType;
+import org.apache.edgent.function.BiConsumer;
+import org.apache.edgent.function.BiFunction;
+import org.apache.edgent.function.Consumer;
+import org.apache.edgent.function.Supplier;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+
+/**
+ * A source of job event tuples.
+ * <p>
+ * A stream of job event tuples is 
+ * {@linkplain #source(Topology, BiFunction) created} by a listener which
+ * subscribes to a {@link JobRegistryService}.
+ * </p>
+ */
+public class JobEvents {
+
+    /**
+     * Declares a stream populated by {@link JobRegistryService} events.
+     * <p>
+     * The job registry is passed as a runtime service. At startup 
+     * {@code JobRegistryService#addListener()} is called by the 
+     * runtime to subscribe an event listener.  The listener invokes the given 
+     * {@code wrapper} function to construct a tuple from a job event
+     * and submits the tuple on the returned stream.</p>
+     * <p>
+     * When the topology's execution is terminated, 
+     * {@code JobRegistryServic#removeListener()}  in invoked to unsubscribe 
+     * the tuple source from the job registry. 
+     * </p>
+     *
+     * @param <T> Tuple type
+     * @param topology the stream topology
+     * @param wrapper constructs a tuple from a job event
+     * @return new stream containing the tuples generated by the specified {@code wrapper}.
+     * 
+     * @see Topology#getRuntimeServiceSupplier() 
+     * @see JobRegistryService#addListener(BiConsumer)
+     * @see JobRegistryService#removeListener(BiConsumer)
+     */
+    public static <T> TStream<T> source(
+            Topology topology, 
+            BiFunction<JobRegistryService.EventType, Job, T> wrapper) {
+
+        Supplier<RuntimeServices> rts = topology.getRuntimeServiceSupplier();
+        return topology.events(new JobEventsSetup<T>(wrapper, rts));
+    }
+
+    /**
+     * Job events setup Consumer that adds a subscriber to the 
+     * JobRegistryService on start up and removes it on close. 
+     *
+     * @param <T> Type of the tuples.
+     */
+    private static final class JobEventsSetup<T> 
+            implements Consumer<Consumer<T>>, AutoCloseable {
+
+        private static final long serialVersionUID = 1L;
+        private final Supplier<RuntimeServices> rts;
+        private final JobEventsListener<T> listener;
+        
+        JobEventsSetup(BiFunction<JobRegistryService.EventType, Job, T> 
+                tupleGen, Supplier<RuntimeServices> rts) {
+
+            this.rts = rts;
+            this.listener = new JobEventsListener<T>(tupleGen);
+        }
+
+        @Override
+        public void accept(Consumer<T> submitter) {
+            JobRegistryService jobRegistry = rts.get().getService(JobRegistryService.class);
+            if (jobRegistry != null) {
+                listener.setSubmitter(submitter);
+                jobRegistry.addListener(listener);
+            }
+        }
+
+        @Override
+        public void close() throws Exception {
+            JobRegistryService jobRegistry = rts.get().getService(JobRegistryService.class);
+            if (jobRegistry != null) {
+                jobRegistry.removeListener(listener);
+            }
+        }
+        
+        /**
+         * JobRegistryService listener which uses a tuple generator for 
+         * wrapping job events into tuples and a consumer for submitting 
+         * the tuples. 
+         *
+         * @param <T> Type of the tuples.
+         */
+        private static final class JobEventsListener<T> 
+                implements BiConsumer<JobRegistryService.EventType, Job> {
+
+            private static final long serialVersionUID = 1L;
+            private Consumer<T> eventSubmitter;
+            private final BiFunction<JobRegistryService.EventType, Job, T> tupleGenerator;
+            
+            JobEventsListener(BiFunction<JobRegistryService.EventType, Job, T> tupleGen) {
+                this.tupleGenerator = tupleGen;
+            }
+
+            void setSubmitter(Consumer<T> submitter) {
+                this.eventSubmitter = submitter;
+            }
+
+            @Override
+            public void accept(EventType evType, Job job) {
+                T tuple = tupleGenerator.apply(evType, job);
+                eventSubmitter.accept(tuple);          
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/runtime/jobregistry/src/main/java/org/apache/edgent/runtime/jobregistry/JobRegistry.java
----------------------------------------------------------------------
diff --git a/runtime/jobregistry/src/main/java/org/apache/edgent/runtime/jobregistry/JobRegistry.java b/runtime/jobregistry/src/main/java/org/apache/edgent/runtime/jobregistry/JobRegistry.java
new file mode 100644
index 0000000..289307e
--- /dev/null
+++ b/runtime/jobregistry/src/main/java/org/apache/edgent/runtime/jobregistry/JobRegistry.java
@@ -0,0 +1,151 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.runtime.jobregistry;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.apache.edgent.execution.Job;
+import org.apache.edgent.execution.services.JobRegistryService;
+import org.apache.edgent.execution.services.ServiceContainer;
+import org.apache.edgent.function.BiConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Maintains a set of registered jobs and a set of listeners.
+ * Notifies listeners on job additions, deletions and updates.
+ */
+public class JobRegistry implements JobRegistryService {
+
+    /**
+     * Creates and registers a {@link JobRegistry} with the given service 
+     * container.
+     * 
+     * @param services provides access to service registration
+     * @return service instance.
+     */
+    public static JobRegistryService createAndRegister(ServiceContainer services) {
+        JobRegistryService service = new JobRegistry();
+        services.addService(JobRegistryService.class, service);
+        return service;        
+    }
+
+    private final ConcurrentMap<String /*JobId*/, Job> jobs;
+    private final Broadcaster<JobRegistryService.EventType, Job> listeners;
+    private static final Logger logger = LoggerFactory.getLogger(JobRegistry.class);
+
+    /**
+     * Creates a new {@link JobRegistry}.
+     */
+    public JobRegistry() {
+        this.jobs = new ConcurrentHashMap<String, Job>();
+        this.listeners = new Broadcaster<JobRegistryService.EventType, Job>();
+    }
+
+    @Override
+    public void addListener(BiConsumer<JobRegistryService.EventType, Job> listener) {
+        listeners.add(listener);
+        
+        synchronized (jobs) {
+            for (Job job : jobs.values())
+                listener.accept(JobRegistryService.EventType.ADD, job);
+        }
+    }
+
+    @Override
+    public boolean removeListener(BiConsumer<JobRegistryService.EventType, Job> listener) {
+        return listeners.remove(listener);
+    }
+
+    @Override
+    public Set<String> getJobIds() {
+        return Collections.unmodifiableSet(jobs.keySet());
+    }
+
+    @Override
+    public Job getJob(String id) {
+        return jobs.get(id);
+    }
+
+    @Override
+    public boolean removeJob(String jobId) {
+        final Job removed = jobs.remove(jobId);
+        if (removed != null) {
+            listeners.onEvent(JobRegistryService.EventType.REMOVE, removed);
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public void addJob(Job job) throws IllegalArgumentException {
+        final Job existing = jobs.putIfAbsent(job.getId(), job);
+        if (existing == null) {
+            listeners.onEvent(JobRegistryService.EventType.ADD, job);
+        } else {
+            throw new IllegalArgumentException("A job with Id " + job.getId()
+                + " already exists");
+        }
+    }
+
+    @Override
+    public boolean updateJob(Job job) {
+        if (jobs.containsValue(job)) {
+            listeners.onEvent(JobRegistryService.EventType.UPDATE, job);
+            return true;
+        }
+        return false;
+    }
+
+    private static class Broadcaster<T, O> {
+        private final List<BiConsumer<T, O>> listeners;
+
+        Broadcaster() {
+            this.listeners = new CopyOnWriteArrayList<BiConsumer<T, O>>();
+        }
+
+        void add(BiConsumer<T, O> listener) {
+            if (listener == null) {
+                throw new IllegalArgumentException("Null listener") ;
+            }
+            listeners.add(listener);
+        }
+
+        boolean remove(BiConsumer<T, O> listener) {
+            if (listener == null)
+                return false;
+            return listeners.remove(listener);
+        }
+
+        private void onEvent(T event, O job) {
+            for (BiConsumer<T, O> listener : listeners) {
+                try {
+                    listener.accept(event, job);
+                } catch (Exception e) {
+                    logger.error("Exception caught while invoking listener:" + e);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/runtime/jobregistry/src/main/java/org/apache/edgent/runtime/jobregistry/package-info.java
----------------------------------------------------------------------
diff --git a/runtime/jobregistry/src/main/java/org/apache/edgent/runtime/jobregistry/package-info.java b/runtime/jobregistry/src/main/java/org/apache/edgent/runtime/jobregistry/package-info.java
new file mode 100644
index 0000000..132d5a8
--- /dev/null
+++ b/runtime/jobregistry/src/main/java/org/apache/edgent/runtime/jobregistry/package-info.java
@@ -0,0 +1,25 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.runtime.jobregistry;
+
+/**
+ * {@link JobRegistryService} implementation which provides {@link Job} 
+ * registration, access to registered jobs, as well as subscriptions to job
+ * update events.
+ */

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/runtime/jobregistry/src/test/java/edgent/test/runtime/jobregistry/JobRegistryTest.java
----------------------------------------------------------------------
diff --git a/runtime/jobregistry/src/test/java/edgent/test/runtime/jobregistry/JobRegistryTest.java b/runtime/jobregistry/src/test/java/edgent/test/runtime/jobregistry/JobRegistryTest.java
deleted file mode 100644
index 2f36efb..0000000
--- a/runtime/jobregistry/src/test/java/edgent/test/runtime/jobregistry/JobRegistryTest.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.runtime.jobregistry;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
-
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.junit.Test;
-
-import edgent.execution.Job;
-import edgent.execution.services.JobRegistryService;
-import edgent.execution.services.JobRegistryService.EventType;
-import edgent.function.BiConsumer;
-import edgent.runtime.jobregistry.JobRegistry;
-
-public class JobRegistryTest {
-
-    @Test
-    public void testGetJob() {
-        JobRegistryService service = new JobRegistry();
-        Job job1 = new JobImpl("Job_1");
-        Job job2 = new JobImpl("Job_2");
-        service.addJob(job1);
-        service.addJob(job2);
-        
-        assertSame(job1, service.getJob("Job_1"));
-        assertSame(job2, service.getJob("Job_2"));
-    }
-
-    @Test
-    public void testAddJob() {
-        JobRegistryService service = new JobRegistry();
-        
-        service.addJob(new JobImpl("Job_1"));
-        service.addListener((et, j) ->
-            {
-                assertEquals(JobRegistryService.EventType.ADD, et);
-                assertEquals("Job_1", j.getId());
-            });
-    }
-    
-    @Test
-    public void testAddJob2() {
-        JobRegistryService service = new JobRegistry();
-        
-        service.addListener((et, j) -> {
-                assertEquals(JobRegistryService.EventType.ADD, et);
-                assertEquals("Job_1", j.getId());
-            });
-        service.addJob(new JobImpl("Job_1"));
-    }
-
-    @Test
-    public void testRemoveJob() {
-        JobRegistryService service = new JobRegistry();
-        service.addJob(new JobImpl("Job_1"));
-        service.addJob(new JobImpl("Job_2"));
-        
-        service.addListener(new JobHandler(JobRegistryService.EventType.REMOVE));
-        service.removeJob("Job_1");
-        service.removeJob("Job_2");
-    }
-
-    @Test
-    public void testUpdateJob() {
-        JobRegistryService service = new JobRegistry();
-        service.addJob(new JobImpl("Job_1"));
-        service.addJob(new JobImpl("Job_2"));
-        
-        service.addListener(new JobHandler(JobRegistryService.EventType.UPDATE));
-        service.updateJob(service.getJob("Job_1"));
-        service.updateJob(service.getJob("Job_2"));
-    }
-
-    private static class JobHandler implements BiConsumer<JobRegistryService.EventType, Job> {
-        private static final long serialVersionUID = 1L;
-        private int eventCount = 0;
-        private final JobRegistryService.EventType eventType;
-        
-        JobHandler(JobRegistryService.EventType et) {
-            this.eventType = et;
-        }
-
-        @Override
-        public void accept(EventType et, Job j) {
-            eventCount++;
-            switch (eventCount) {
-            case 1:
-            case 2:
-                assertEquals(JobRegistryService.EventType.ADD, et);
-                break;
-            case 3:
-                assertEquals(eventType, et);
-                assertEquals("Job_1", j.getId());
-                break;
-            case 4:
-                assertEquals(eventType, et);
-                assertEquals("Job_2", j.getId());
-                break;
-            }
-        }
-    }
-
-    private static class JobImpl implements Job {
-
-        private String id;
-        
-        JobImpl(String id) {
-            this.id = id;
-        }
-
-        @Override
-        public State getCurrentState() {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public State getNextState() {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public void stateChange(Action action) throws IllegalArgumentException {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public Health getHealth() {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public String getLastError() {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public String getName() {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public String getId() {
-            return id;
-        }
-
-        @Override
-        public void complete() throws ExecutionException, InterruptedException {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public void complete(long timeout, TimeUnit unit)
-                throws ExecutionException, InterruptedException, TimeoutException {
-            throw new UnsupportedOperationException();
-        }
-        
-    }
-}