You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/07/21 13:17:10 UTC
[09/54] [abbrv] [partial] incubator-quarks git commit: add
"org.apache." prefix to edgent package names
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/runtime/etiao/src/main/java/org/apache/edgent/runtime/etiao/graph/model/VertexType.java
----------------------------------------------------------------------
diff --git a/runtime/etiao/src/main/java/org/apache/edgent/runtime/etiao/graph/model/VertexType.java b/runtime/etiao/src/main/java/org/apache/edgent/runtime/etiao/graph/model/VertexType.java
new file mode 100644
index 0000000..492838f
--- /dev/null
+++ b/runtime/etiao/src/main/java/org/apache/edgent/runtime/etiao/graph/model/VertexType.java
@@ -0,0 +1,67 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.runtime.etiao.graph.model;
+
+import org.apache.edgent.graph.Vertex;
+import org.apache.edgent.oplet.Oplet;
+import org.apache.edgent.runtime.etiao.graph.ExecutableVertex;
+
+/**
+ * A {@code VertexType} in a graph.
+ * <p>
+ * A {@code VertexType} has an {@link InvocationType} instance.
+ *
+ * @param <I> Data type the oplet consumes on its input ports.
+ * @param <O> Data type the oplet produces on its output ports.
+ */
+public class VertexType<I, O> {
+
+ /**
+ * Vertex identifier, unique within the {@code GraphType} this vertex
+ * belongs to.
+ */
+ private final String id;
+
+ /**
+ * The oplet invocation that is being executed.
+ */
+ private final InvocationType<I, O> invocation;
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public VertexType(Vertex<? extends Oplet<?, ?>, ?, ?> value, IdMapper<String> ids) {
+ this.id = (value instanceof ExecutableVertex) ?
+ ids.add(value, ((ExecutableVertex) value).getInvocationId()) :
+ // Can't get an id from the vertex, generate unique value
+ ids.add(value);
+ this.invocation = new InvocationType(value.getInstance());
+ }
+
+ public VertexType() {
+ this.id = null;
+ this.invocation = null;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public InvocationType<I, O> getInvocation() {
+ return invocation;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/runtime/etiao/src/main/java/org/apache/edgent/runtime/etiao/mbeans/EtiaoJobBean.java
----------------------------------------------------------------------
diff --git a/runtime/etiao/src/main/java/org/apache/edgent/runtime/etiao/mbeans/EtiaoJobBean.java b/runtime/etiao/src/main/java/org/apache/edgent/runtime/etiao/mbeans/EtiaoJobBean.java
new file mode 100644
index 0000000..6be36f9
--- /dev/null
+++ b/runtime/etiao/src/main/java/org/apache/edgent/runtime/etiao/mbeans/EtiaoJobBean.java
@@ -0,0 +1,207 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.runtime.etiao.mbeans;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.edgent.execution.Job;
+import org.apache.edgent.execution.Job.Action;
+import org.apache.edgent.execution.mbeans.JobMXBean;
+import org.apache.edgent.execution.services.ControlService;
+import org.apache.edgent.execution.services.Controls;
+import org.apache.edgent.runtime.etiao.EtiaoJob;
+import org.apache.edgent.runtime.etiao.graph.model.GraphType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+/**
+ * Implementation of a control interface for the {@code EtiaoJob}.
+ */
+public class EtiaoJobBean implements JobMXBean {
+ private final EtiaoJob job;
+ private ControlService controlService;
+ private String controlId;
+ private static final Logger logger = LoggerFactory.getLogger(EtiaoJobBean.class);
+
+ /**
+ * Factory method which creates an {@code EtiaoJobBean} instance to
+ * control the specified {@code EtiaoJob} and registers it with the
+ * specified {@code ControlService}.
+ *
+ * @param cs the control service
+ * @param job the controlled job
+ * @return a registered bean instance
+ */
+ public static EtiaoJobBean registerControl(ControlService cs, EtiaoJob job) {
+ EtiaoJobBean bean = new EtiaoJobBean(job);
+ bean.registerControl(cs);
+ return bean;
+ }
+
+ private EtiaoJobBean(EtiaoJob job) {
+ this.job = job;
+ }
+
+ public String getControlId() {
+ return controlId;
+ }
+
+ public boolean wasRegistered() {
+ return controlId != null;
+ }
+
+ @Override
+ public String getId() {
+ return job.getId();
+ }
+
+ @Override
+ public String getName() {
+ return job.getName();
+ }
+
+ @Override
+ public Job.State getCurrentState() {
+ return job.getCurrentState();
+ }
+
+ @Override
+ public Job.State getNextState() {
+ return job.getNextState();
+ }
+
+ @Override
+ public String graphSnapshot() {
+ Gson gson = new GsonBuilder().create();
+ return gson.toJson(new GraphType(job.graph()));
+ }
+
+ @Override
+ public Job.Health getHealth() {
+ return job.getHealth();
+ }
+
+ @Override
+ public String getLastError() {
+ return job.getLastError();
+ }
+
+ @Override
+ public void stateChange(Action action) {
+ job.stateChange(action);
+ if (wasRegistered() && action == Action.CLOSE) {
+ unregisterControlAsync();
+ }
+ }
+
+ private void registerControl(ControlService cs) {
+ if (cs == null)
+ throw new IllegalArgumentException("ControlService must not be null");
+
+ logger.trace("Registering control for job id {}, job name {}", job.getId(), job.getName());
+
+ this.controlService = cs;
+ JobMXBean oldControl = cs.getControl(JobMXBean.TYPE, job.getName(), JobMXBean.class);
+
+ if (oldControl != null) {
+ String oldControlId = cs.getControlId(JobMXBean.TYPE, job.getName(), JobMXBean.class);
+ if (oldControlId != null) {
+ if (isJobClosed(oldControl)) {
+ cs.unregister(oldControlId);
+ logger.debug("Old control id {} for CLOSED job name {} was unregistered",
+ oldControlId, job.getName());
+ }
+ else {
+ throw new IllegalStateException(
+ "Cannot register job control for alias " +
+ job.getName() + " because a job control with id " + oldControlId +
+ " for the same alias already exists and is not CLOSED");
+ }
+ }
+ }
+ this.controlId = cs.registerControl(JobMXBean.TYPE, job.getId(),
+ job.getName(), JobMXBean.class, this);
+ logger.debug("Control for job id {}, job name {} was registered with id {}",
+ job.getId(), job.getName(), controlId);
+ }
+
+ private void unregisterControlAsync() {
+ if (controlService == null)
+ throw new IllegalStateException(
+ "The ControlService of a registered bean must not be null");
+
+ getThread(new Runnable() {
+ @Override
+ public void run() {
+ unregisterControl();
+ }
+ }).start();
+ }
+
+ private void unregisterControl() {
+ if (!wasRegistered())
+ return;
+
+ long startTime = System.currentTimeMillis();
+ try {
+ try {
+ job.completeClosing(Controls.JOB_HOLD_AFTER_CLOSE_SECS, TimeUnit.SECONDS);
+ } catch (ExecutionException e) {
+ String cause = e.getCause() != null ? e.getCause().getMessage() : "unknown";
+ logger.info("Error {} during completion of job {} caused by {}",
+ e.getMessage(), job.getName(), cause);
+ logger.debug("Error during completion of job " + job.getName(), e);
+ } catch (TimeoutException e) {
+ logger.info("Timed out after {} milliseconds waiting for job {} to complete",
+ (System.currentTimeMillis() - startTime), job.getName());
+ }
+ long remaining = startTime + Controls.JOB_HOLD_AFTER_CLOSE_SECS * 1000 - System.currentTimeMillis();
+ if (remaining < 0)
+ remaining = 0;
+ else
+ logger.trace("Job completed, waiting {} milliseconds before unregistering control {}",
+ remaining, controlId);
+
+ Thread.sleep(remaining);
+ } catch (InterruptedException e) {
+ // Swallow the exception and unregister the control
+ }
+ finally {
+ controlService.unregister(controlId);
+ logger.trace("Control {} unregistered", controlId);
+ }
+ }
+
+ private Thread getThread(Runnable r) {
+ ThreadFactory threads = Executors.defaultThreadFactory();
+ return threads.newThread(r);
+ }
+
+ private boolean isJobClosed(JobMXBean job) {
+ return job.getCurrentState() == Job.State.CLOSED &&
+ job.getNextState() == Job.State.CLOSED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/runtime/etiao/src/main/java/org/apache/edgent/runtime/etiao/package-info.java
----------------------------------------------------------------------
diff --git a/runtime/etiao/src/main/java/org/apache/edgent/runtime/etiao/package-info.java b/runtime/etiao/src/main/java/org/apache/edgent/runtime/etiao/package-info.java
new file mode 100644
index 0000000..4b2f49b
--- /dev/null
+++ b/runtime/etiao/src/main/java/org/apache/edgent/runtime/etiao/package-info.java
@@ -0,0 +1,33 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+/**
+ * A runtime for executing an Edgent streaming topology, designed as an embeddable library
+ * so that it can be executed in a simple Java application.
+ *
+ * <h2>"EveryThing Is An Oplet" (ETIAO)</h2>
+ *
+ * The runtime's focus is on executing oplets and their connected streams, where each
+ * oplet is just a black box. Specifically this means that functionality is added by the introduction
+ * of oplets into the graph that were not explicitly declared by the application developer.
+ * For example, metrics are implemented by oplets, not the runtime. A metric collector is an
+ * oplet that calculates metrics on tuples accepted on its input port, and them makes them
+ * available, for example through JMX.
+ */
+package org.apache.edgent.runtime.etiao;
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/runtime/etiao/src/test/java/edgent/test/runtime/etiao/EtiaoGraphTest.java
----------------------------------------------------------------------
diff --git a/runtime/etiao/src/test/java/edgent/test/runtime/etiao/EtiaoGraphTest.java b/runtime/etiao/src/test/java/edgent/test/runtime/etiao/EtiaoGraphTest.java
deleted file mode 100644
index b40ade9..0000000
--- a/runtime/etiao/src/test/java/edgent/test/runtime/etiao/EtiaoGraphTest.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.runtime.etiao;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-
-import org.junit.Test;
-
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-
-import edgent.execution.services.ServiceContainer;
-import edgent.function.Consumer;
-import edgent.graph.Connector;
-import edgent.graph.Graph;
-import edgent.graph.Vertex;
-import edgent.oplet.core.AbstractOplet;
-import edgent.oplet.core.Sink;
-import edgent.oplet.core.Split;
-import edgent.oplet.functional.SupplierPeriodicSource;
-import edgent.runtime.etiao.graph.DirectGraph;
-import edgent.runtime.etiao.graph.model.GraphType;
-import edgent.test.graph.GraphTest;
-
-public class EtiaoGraphTest extends GraphTest {
-
- @Override
- protected Graph createGraph() {
- return new DirectGraph(this.getClass().getSimpleName(), new ServiceContainer());
- }
-
- @Test
- public void testEmptyGraphToJson() {
- Graph g = getGraph();
- GraphType gt = new GraphType(g);
- Gson gson = new GsonBuilder().setPrettyPrinting().create();
- String json = gson.toJson(gt);
-
- GraphType gt2 = new Gson().fromJson(json, GraphType.class);
- assertEquals(0, gt2.getVertices().size());
- assertEquals(0, gt2.getEdges().size());
- }
-
- @Test
- public void testGraphToJson() {
- Graph g = getGraph();
- TestOp<String, Integer> op = new TestOp<>();
- /* Vertex<TestOp<String, Integer>, String, Integer> v = */g.insert(op, 1, 1);
- GraphType gt = new GraphType(g);
- Gson gson = new GsonBuilder().setPrettyPrinting().create();
- String json = gson.toJson(gt);
-
- GraphType gt2 = new Gson().fromJson(json, GraphType.class);
- assertEquals(1, gt2.getVertices().size());
- assertEquals(0, gt2.getEdges().size());
- }
-
- @Test
- public void testGraphToJson2() {
- Graph g = getGraph();
-
- TestOp<String, Integer> op1 = new TestOp<>();
- Vertex<TestOp<String, Integer>, String, Integer> v = g.insert(op1, 1, 1);
-
- TestOp<Integer, Integer> op2 = new TestOp<>();
- /*Connector<Integer> out2 = */g.pipe(v.getConnectors().get(0), op2);
-
- Gson gson = new GsonBuilder().setPrettyPrinting().create();
- String json = gson.toJson(new GraphType(g));
-
- GraphType gt2 = new Gson().fromJson(json, GraphType.class);
- assertEquals(2, gt2.getVertices().size());
- assertEquals(1, gt2.getEdges().size());
- }
-
- @Test
- public void testGraphToJson4() {
- Graph g = getGraph();
-
- /* /-- V2
- * V0(Integer)-- V1(Double)-- FanOut
- * \-- V3
- */
- Vertex<TestOp<String, Integer>, String, Integer> v0 = g.insert(new TestOp<>(), 1, 1);
- Connector<Integer> out0 = v0.getConnectors().get(0);
- Connector<Double> out1 = g.pipe(out0, new TestOp<Integer, Double>());
- Vertex<TestOp<Double, String>, Double, String> v2 = g.insert(new TestOp<Double, String>(), 1, 1);
- Vertex<TestOp<Double, String>, Double, String> v3 = g.insert(new TestOp<Double, String>(), 1, 1);
- out1.connect(v2, 0);
- out1.connect(v3, 0);
-
- Gson gson = new GsonBuilder().setPrettyPrinting().create();
- String json = gson.toJson(new GraphType(g));
-
- GraphType gt2 = new Gson().fromJson(json, GraphType.class);
- assertEquals(5, gt2.getVertices().size());
- assertEquals(4, gt2.getEdges().size());
- }
-
- @Test
- public void testGraphToJson5() {
- Graph g = getGraph();
-
- /* /-- V2
- * V0(Double)-- V1(Double)---- V3
- * \-- V4
- */
- Random r = new Random();
- Vertex<SupplierPeriodicSource<Double>, Void, Double> v0 = g.insert(
- new SupplierPeriodicSource<>(100, TimeUnit.MILLISECONDS, () -> (r.nextDouble() * 3)),
- 0, 1);
- Connector<Double> out0 = v0.getConnectors().get(0);
- out0.tag("dots", "hashes", "ats");
-
- // Insert split - see ConnectorStream.split()
- Split<Double> splitOp = new Split<Double>(
- tuple -> {
- switch (tuple.intValue()) {
- case 0:
- return 0;
- case 1:
- return 1;
- default:
- return 2;
- }
- });
- Vertex<Split<Double>, Double, Double> v1 = g.insert(splitOp, 1, 3);
- out0.connect(v1, 0);
-
- // Insert and connect sinks
- Vertex<Sink<Double>, Double, Void> v2 = g.insert(
- new Sink<>(tuple -> System.out.print(".")), 1, 0);
- v1.getConnectors().get(0).connect(v2, 0);
- v1.getConnectors().get(0).tag("dots");
-
- Vertex<Sink<Double>, Double, Void> v3 = g.insert(
- new Sink<>(tuple -> System.out.print("#")), 1, 0);
- v1.getConnectors().get(1).connect(v3, 0);
- v1.getConnectors().get(1).tag("hashes");
-
- Vertex<Sink<Double>, Double, Void> v4 = g.insert(
- new Sink<>(tuple -> System.out.print("@")), 1, 0);
- v1.getConnectors().get(2).connect(v4, 0);
- v1.getConnectors().get(2).tag("ats");
-
- Gson gson = new GsonBuilder().setPrettyPrinting().create();
- String json = gson.toJson(new GraphType(g));
-
- GraphType gt = new Gson().fromJson(json, GraphType.class);
- assertEquals(5, gt.getVertices().size());
- assertEquals(4, gt.getEdges().size());
- }
-
- private static class TestOp<I, O> extends AbstractOplet<I, O> {
-
- @Override
- public void start() {
- }
-
- @Override
- public List<? extends Consumer<I>> getInputs() {
- return null;
- }
-
- @Override
- public void close() throws Exception {
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/runtime/etiao/src/test/java/org/apache/edgent/test/runtime/etiao/EtiaoGraphTest.java
----------------------------------------------------------------------
diff --git a/runtime/etiao/src/test/java/org/apache/edgent/test/runtime/etiao/EtiaoGraphTest.java b/runtime/etiao/src/test/java/org/apache/edgent/test/runtime/etiao/EtiaoGraphTest.java
new file mode 100644
index 0000000..302e7e0
--- /dev/null
+++ b/runtime/etiao/src/test/java/org/apache/edgent/test/runtime/etiao/EtiaoGraphTest.java
@@ -0,0 +1,188 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.runtime.etiao;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.edgent.execution.services.ServiceContainer;
+import org.apache.edgent.function.Consumer;
+import org.apache.edgent.graph.Connector;
+import org.apache.edgent.graph.Graph;
+import org.apache.edgent.graph.Vertex;
+import org.apache.edgent.oplet.core.AbstractOplet;
+import org.apache.edgent.oplet.core.Sink;
+import org.apache.edgent.oplet.core.Split;
+import org.apache.edgent.oplet.functional.SupplierPeriodicSource;
+import org.apache.edgent.runtime.etiao.graph.DirectGraph;
+import org.apache.edgent.runtime.etiao.graph.model.GraphType;
+import org.apache.edgent.test.graph.GraphTest;
+import org.junit.Test;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+public class EtiaoGraphTest extends GraphTest {
+
+ @Override
+ protected Graph createGraph() {
+ return new DirectGraph(this.getClass().getSimpleName(), new ServiceContainer());
+ }
+
+ @Test
+ public void testEmptyGraphToJson() {
+ Graph g = getGraph();
+ GraphType gt = new GraphType(g);
+ Gson gson = new GsonBuilder().setPrettyPrinting().create();
+ String json = gson.toJson(gt);
+
+ GraphType gt2 = new Gson().fromJson(json, GraphType.class);
+ assertEquals(0, gt2.getVertices().size());
+ assertEquals(0, gt2.getEdges().size());
+ }
+
+ @Test
+ public void testGraphToJson() {
+ Graph g = getGraph();
+ TestOp<String, Integer> op = new TestOp<>();
+ /* Vertex<TestOp<String, Integer>, String, Integer> v = */g.insert(op, 1, 1);
+ GraphType gt = new GraphType(g);
+ Gson gson = new GsonBuilder().setPrettyPrinting().create();
+ String json = gson.toJson(gt);
+
+ GraphType gt2 = new Gson().fromJson(json, GraphType.class);
+ assertEquals(1, gt2.getVertices().size());
+ assertEquals(0, gt2.getEdges().size());
+ }
+
+ @Test
+ public void testGraphToJson2() {
+ Graph g = getGraph();
+
+ TestOp<String, Integer> op1 = new TestOp<>();
+ Vertex<TestOp<String, Integer>, String, Integer> v = g.insert(op1, 1, 1);
+
+ TestOp<Integer, Integer> op2 = new TestOp<>();
+ /*Connector<Integer> out2 = */g.pipe(v.getConnectors().get(0), op2);
+
+ Gson gson = new GsonBuilder().setPrettyPrinting().create();
+ String json = gson.toJson(new GraphType(g));
+
+ GraphType gt2 = new Gson().fromJson(json, GraphType.class);
+ assertEquals(2, gt2.getVertices().size());
+ assertEquals(1, gt2.getEdges().size());
+ }
+
+ @Test
+ public void testGraphToJson4() {
+ Graph g = getGraph();
+
+ /* /-- V2
+ * V0(Integer)-- V1(Double)-- FanOut
+ * \-- V3
+ */
+ Vertex<TestOp<String, Integer>, String, Integer> v0 = g.insert(new TestOp<>(), 1, 1);
+ Connector<Integer> out0 = v0.getConnectors().get(0);
+ Connector<Double> out1 = g.pipe(out0, new TestOp<Integer, Double>());
+ Vertex<TestOp<Double, String>, Double, String> v2 = g.insert(new TestOp<Double, String>(), 1, 1);
+ Vertex<TestOp<Double, String>, Double, String> v3 = g.insert(new TestOp<Double, String>(), 1, 1);
+ out1.connect(v2, 0);
+ out1.connect(v3, 0);
+
+ Gson gson = new GsonBuilder().setPrettyPrinting().create();
+ String json = gson.toJson(new GraphType(g));
+
+ GraphType gt2 = new Gson().fromJson(json, GraphType.class);
+ assertEquals(5, gt2.getVertices().size());
+ assertEquals(4, gt2.getEdges().size());
+ }
+
+ @Test
+ public void testGraphToJson5() {
+ Graph g = getGraph();
+
+ /* /-- V2
+ * V0(Double)-- V1(Double)---- V3
+ * \-- V4
+ */
+ Random r = new Random();
+ Vertex<SupplierPeriodicSource<Double>, Void, Double> v0 = g.insert(
+ new SupplierPeriodicSource<>(100, TimeUnit.MILLISECONDS, () -> (r.nextDouble() * 3)),
+ 0, 1);
+ Connector<Double> out0 = v0.getConnectors().get(0);
+ out0.tag("dots", "hashes", "ats");
+
+ // Insert split - see ConnectorStream.split()
+ Split<Double> splitOp = new Split<Double>(
+ tuple -> {
+ switch (tuple.intValue()) {
+ case 0:
+ return 0;
+ case 1:
+ return 1;
+ default:
+ return 2;
+ }
+ });
+ Vertex<Split<Double>, Double, Double> v1 = g.insert(splitOp, 1, 3);
+ out0.connect(v1, 0);
+
+ // Insert and connect sinks
+ Vertex<Sink<Double>, Double, Void> v2 = g.insert(
+ new Sink<>(tuple -> System.out.print(".")), 1, 0);
+ v1.getConnectors().get(0).connect(v2, 0);
+ v1.getConnectors().get(0).tag("dots");
+
+ Vertex<Sink<Double>, Double, Void> v3 = g.insert(
+ new Sink<>(tuple -> System.out.print("#")), 1, 0);
+ v1.getConnectors().get(1).connect(v3, 0);
+ v1.getConnectors().get(1).tag("hashes");
+
+ Vertex<Sink<Double>, Double, Void> v4 = g.insert(
+ new Sink<>(tuple -> System.out.print("@")), 1, 0);
+ v1.getConnectors().get(2).connect(v4, 0);
+ v1.getConnectors().get(2).tag("ats");
+
+ Gson gson = new GsonBuilder().setPrettyPrinting().create();
+ String json = gson.toJson(new GraphType(g));
+
+ GraphType gt = new Gson().fromJson(json, GraphType.class);
+ assertEquals(5, gt.getVertices().size());
+ assertEquals(4, gt.getEdges().size());
+ }
+
+ private static class TestOp<I, O> extends AbstractOplet<I, O> {
+
+ @Override
+ public void start() {
+ }
+
+ @Override
+ public List<? extends Consumer<I>> getInputs() {
+ return null;
+ }
+
+ @Override
+ public void close() throws Exception {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/runtime/jmxcontrol/src/main/java/edgent/runtime/jmxcontrol/JMXControlService.java
----------------------------------------------------------------------
diff --git a/runtime/jmxcontrol/src/main/java/edgent/runtime/jmxcontrol/JMXControlService.java b/runtime/jmxcontrol/src/main/java/edgent/runtime/jmxcontrol/JMXControlService.java
deleted file mode 100644
index 6065c98..0000000
--- a/runtime/jmxcontrol/src/main/java/edgent/runtime/jmxcontrol/JMXControlService.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.runtime.jmxcontrol;
-
-import java.lang.management.ManagementFactory;
-import java.util.Hashtable;
-import java.util.Set;
-
-import javax.management.InstanceAlreadyExistsException;
-import javax.management.InstanceNotFoundException;
-import javax.management.JMX;
-import javax.management.MBeanRegistrationException;
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.NotCompliantMBeanException;
-import javax.management.ObjectName;
-
-import edgent.execution.services.ControlService;
-
-/**
- * Control service that registers control objects
- * as MBeans in a JMX server.
- *
- */
-public class JMXControlService implements ControlService {
-
- private final MBeanServer mbs;
- private final String domain;
- private final Hashtable<String,String> additionalKeys;
-
- /**
- * JMX control service using the platform MBean server.
- * @param domain Domain the MBeans are registered in.
- * @param additionalKeys additional name/value keys to add to the generated JMX object names
- */
- public JMXControlService(String domain, Hashtable<String,String> additionalKeys) {
- mbs = ManagementFactory.getPlatformMBeanServer();
- this.domain = domain;
- this.additionalKeys = additionalKeys;
- }
-
-
- /**
- * Get the MBean server being used by this control service.
- * @return MBean server being used by this control service.
- */
- public MBeanServer getMbs() {
- return mbs;
- }
-
- /**
- * Get the JMX domain being used by this control service.
- * @return JMX domain being used by this control service.
- */
- public String getDomain() {
- return domain;
- }
-
- /**
- *
- * Register a control object as an MBean.
- *
- * {@inheritDoc}
- *
- * The MBean is registered within the domain returned by {@link #getDomain()}
- * and an `ObjectName` with these keys:
- * <UL>
- * <LI>type</LI> {@code type}
- * <LI>interface</LI> {@code controlInterface.getName()}
- * <LI>id</LI> {@code type}
- * <LI>alias</LI> {@code alias}
- * </UL>
- *
- */
- @Override
- public <T> String registerControl(String type, String id, String alias, Class<T> controlInterface, T control) {
- Hashtable<String,String> table = new Hashtable<>();
-
- table.put("type", ObjectName.quote(type));
- table.put("interface", ObjectName.quote(controlInterface.getName()));
- table.put("id", ObjectName.quote(id));
- if (alias != null)
- table.put("alias", ObjectName.quote(alias));
-
- additionalNameKeys(table);
-
- try {
- ObjectName on = ObjectName.getInstance(getDomain(), table);
- getMbs().registerMBean(control, on);
-
- return getControlId(on);
- } catch (InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException
- | MalformedObjectNameException e) {
- throw new RuntimeException(e);
- }
-
- }
-
- protected void additionalNameKeys(Hashtable<String,String> table) {
- table.putAll(additionalKeys);
- }
-
- @Override
- public void unregister(String controlId) {
- try {
- mbs.unregisterMBean(ObjectName.getInstance(controlId));
- } catch (MBeanRegistrationException | InstanceNotFoundException | MalformedObjectNameException
- | NullPointerException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public <T> T getControl(String type, String alias, Class<T> controlInterface) {
- MBeanServer mBeanServer = getMbs();
- ObjectName name = getObjectNameForInterface(type, alias, controlInterface);
- return name != null ? JMX.newMXBeanProxy(mBeanServer, name, controlInterface) : null;
- }
-
- @Override
- public <T> String getControlId(String type, String alias, Class<T> controlInterface) {
- return getControlId(getObjectNameForInterface(type, alias, controlInterface));
- }
-
- private <T> ObjectName getObjectNameForInterface(String type, String alias, Class<T> controlInterface) {
- try {
- Set<ObjectName> names = getObjectNamesForInterface(type, alias, controlInterface.getName());
-
- if (names.isEmpty())
- return null;
- if (names.size() != 1)
- throw new RuntimeException("Alias " + alias + " not unique for type " + type);
-
- ObjectName name = null;
- for (ObjectName on : names) {
- name = on;
- break;
- }
- return name;
- }
- catch (MalformedObjectNameException e) {
- throw new RuntimeException(e);
- }
- }
-
- private static String getControlId(ObjectName on) {
- return on != null ? on.getCanonicalName() : null;
- }
-
- private Set<ObjectName> getObjectNamesForInterface(String type, String alias, String interfaceName)
- throws MalformedObjectNameException {
-
- Hashtable<String,String> table = new Hashtable<>();
- table.put("interface", ObjectName.quote(interfaceName));
- table.put("type", ObjectName.quote(type));
- if (alias != null)
- table.put("alias", ObjectName.quote(alias));
- ObjectName objName = new ObjectName(getDomain(), table);
-
- // Add the wildcard for any other properties.
- objName = new ObjectName(objName.getCanonicalName()+",*");
-
- MBeanServer mBeanServer = getMbs();
- return mBeanServer.queryNames(objName, null);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/runtime/jmxcontrol/src/main/java/org/apache/edgent/runtime/jmxcontrol/JMXControlService.java
----------------------------------------------------------------------
diff --git a/runtime/jmxcontrol/src/main/java/org/apache/edgent/runtime/jmxcontrol/JMXControlService.java b/runtime/jmxcontrol/src/main/java/org/apache/edgent/runtime/jmxcontrol/JMXControlService.java
new file mode 100644
index 0000000..c039fd1
--- /dev/null
+++ b/runtime/jmxcontrol/src/main/java/org/apache/edgent/runtime/jmxcontrol/JMXControlService.java
@@ -0,0 +1,182 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.runtime.jmxcontrol;
+
+import java.lang.management.ManagementFactory;
+import java.util.Hashtable;
+import java.util.Set;
+
+import javax.management.InstanceAlreadyExistsException;
+import javax.management.InstanceNotFoundException;
+import javax.management.JMX;
+import javax.management.MBeanRegistrationException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+
+import org.apache.edgent.execution.services.ControlService;
+
+/**
+ * Control service that registers control objects
+ * as MBeans in a JMX server.
+ *
+ */
+public class JMXControlService implements ControlService {
+
+ private final MBeanServer mbs;
+ private final String domain;
+ private final Hashtable<String,String> additionalKeys;
+
+ /**
+ * JMX control service using the platform MBean server.
+ * @param domain Domain the MBeans are registered in.
+ * @param additionalKeys additional name/value keys to add to the generated JMX object names
+ */
+ public JMXControlService(String domain, Hashtable<String,String> additionalKeys) {
+ mbs = ManagementFactory.getPlatformMBeanServer();
+ this.domain = domain;
+ this.additionalKeys = additionalKeys;
+ }
+
+
+ /**
+ * Get the MBean server being used by this control service.
+ * @return MBean server being used by this control service.
+ */
+ public MBeanServer getMbs() {
+ return mbs;
+ }
+
+ /**
+ * Get the JMX domain being used by this control service.
+ * @return JMX domain being used by this control service.
+ */
+ public String getDomain() {
+ return domain;
+ }
+
+ /**
+ *
+ * Register a control object as an MBean.
+ *
+ * {@inheritDoc}
+ *
+ * The MBean is registered within the domain returned by {@link #getDomain()}
+ * and an `ObjectName` with these keys:
+ * <UL>
+ * <LI>type</LI> {@code type}
+ * <LI>interface</LI> {@code controlInterface.getName()}
+ * <LI>id</LI> {@code type}
+ * <LI>alias</LI> {@code alias}
+ * </UL>
+ *
+ */
+ @Override
+ public <T> String registerControl(String type, String id, String alias, Class<T> controlInterface, T control) {
+ Hashtable<String,String> table = new Hashtable<>();
+
+ table.put("type", ObjectName.quote(type));
+ table.put("interface", ObjectName.quote(controlInterface.getName()));
+ table.put("id", ObjectName.quote(id));
+ if (alias != null)
+ table.put("alias", ObjectName.quote(alias));
+
+ additionalNameKeys(table);
+
+ try {
+ ObjectName on = ObjectName.getInstance(getDomain(), table);
+ getMbs().registerMBean(control, on);
+
+ return getControlId(on);
+ } catch (InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException
+ | MalformedObjectNameException e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ protected void additionalNameKeys(Hashtable<String,String> table) {
+ table.putAll(additionalKeys);
+ }
+
+ @Override
+ public void unregister(String controlId) {
+ try {
+ mbs.unregisterMBean(ObjectName.getInstance(controlId));
+ } catch (MBeanRegistrationException | InstanceNotFoundException | MalformedObjectNameException
+ | NullPointerException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public <T> T getControl(String type, String alias, Class<T> controlInterface) {
+ MBeanServer mBeanServer = getMbs();
+ ObjectName name = getObjectNameForInterface(type, alias, controlInterface);
+ return name != null ? JMX.newMXBeanProxy(mBeanServer, name, controlInterface) : null;
+ }
+
+ @Override
+ public <T> String getControlId(String type, String alias, Class<T> controlInterface) {
+ return getControlId(getObjectNameForInterface(type, alias, controlInterface));
+ }
+
+ private <T> ObjectName getObjectNameForInterface(String type, String alias, Class<T> controlInterface) {
+ try {
+ Set<ObjectName> names = getObjectNamesForInterface(type, alias, controlInterface.getName());
+
+ if (names.isEmpty())
+ return null;
+ if (names.size() != 1)
+ throw new RuntimeException("Alias " + alias + " not unique for type " + type);
+
+ ObjectName name = null;
+ for (ObjectName on : names) {
+ name = on;
+ break;
+ }
+ return name;
+ }
+ catch (MalformedObjectNameException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static String getControlId(ObjectName on) {
+ return on != null ? on.getCanonicalName() : null;
+ }
+
+ private Set<ObjectName> getObjectNamesForInterface(String type, String alias, String interfaceName)
+ throws MalformedObjectNameException {
+
+ Hashtable<String,String> table = new Hashtable<>();
+ table.put("interface", ObjectName.quote(interfaceName));
+ table.put("type", ObjectName.quote(type));
+ if (alias != null)
+ table.put("alias", ObjectName.quote(alias));
+ ObjectName objName = new ObjectName(getDomain(), table);
+
+ // Add the wildcard for any other properties.
+ objName = new ObjectName(objName.getCanonicalName()+",*");
+
+ MBeanServer mBeanServer = getMbs();
+ return mBeanServer.queryNames(objName, null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/runtime/jmxcontrol/src/test/java/edgent/test/runtime/jmxcontrol/JMXControlServiceTest.java
----------------------------------------------------------------------
diff --git a/runtime/jmxcontrol/src/test/java/edgent/test/runtime/jmxcontrol/JMXControlServiceTest.java b/runtime/jmxcontrol/src/test/java/edgent/test/runtime/jmxcontrol/JMXControlServiceTest.java
deleted file mode 100644
index d29657d..0000000
--- a/runtime/jmxcontrol/src/test/java/edgent/test/runtime/jmxcontrol/JMXControlServiceTest.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.runtime.jmxcontrol;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import java.lang.management.ManagementFactory;
-import java.util.Hashtable;
-
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-import javax.management.timer.Timer;
-import javax.management.timer.TimerMBean;
-
-import org.junit.Test;
-
-import edgent.execution.services.ControlService;
-import edgent.runtime.jmxcontrol.JMXControlService;
-
-public class JMXControlServiceTest {
-
- private static final String DOMAIN = JMXControlServiceTest.class.getName();
-
- @Test
- public void testControlObject() throws Exception {
-
- ControlService cs = new JMXControlService(DOMAIN, new Hashtable<>());
-
- String type = "timer";
- String id = "a";
- String alias = "ControlA";
- String controlId = cs.registerControl(type, id, alias, TimerMBean.class, new Timer());
-
- assertNotNull(controlId);
-
- ObjectName on = ObjectName.getInstance(controlId);
-
- assertEquals(DOMAIN, on.getDomain());
-
- assertEquals(type, ObjectName.unquote(on.getKeyProperty("type")));
- assertEquals(id, ObjectName.unquote(on.getKeyProperty("id")));
- assertEquals(alias, ObjectName.unquote(on.getKeyProperty("alias")));
- assertEquals(TimerMBean.class.getName(), ObjectName.unquote(on.getKeyProperty("interface")));
-
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-
- assertTrue(mbs.isRegistered(on));
-
- cs.unregister(controlId);
- assertFalse(mbs.isRegistered(on));
- }
-
- @Test
- public void testAdditionalKeys() throws Exception {
-
- Hashtable<String,String> addKeys = new Hashtable<>();
- addKeys.put("job", "jobid");
- addKeys.put("device", ObjectName.quote("pi"));
-
- ControlService cs = new JMXControlService(DOMAIN, addKeys);
-
- String type = "timer";
- String id = "a";
- String alias = "ControlA";
- String controlId = cs.registerControl(type, id, alias, TimerMBean.class, new Timer());
-
- assertNotNull(controlId);
-
- ObjectName on = ObjectName.getInstance(controlId);
-
- assertEquals(DOMAIN, on.getDomain());
-
- assertEquals(type, ObjectName.unquote(on.getKeyProperty("type")));
- assertEquals(id, ObjectName.unquote(on.getKeyProperty("id")));
- assertEquals(alias, ObjectName.unquote(on.getKeyProperty("alias")));
- assertEquals(TimerMBean.class.getName(), ObjectName.unquote(on.getKeyProperty("interface")));
-
- assertEquals("jobid", on.getKeyProperty("job"));
- assertEquals("pi", ObjectName.unquote(on.getKeyProperty("device")));
-
-
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-
- assertTrue(mbs.isRegistered(on));
-
- cs.unregister(controlId);
- assertFalse(mbs.isRegistered(on));
- }
-
- @Test(expected=RuntimeException.class)
- public void testDoubleRegister() throws Exception {
-
- ControlService cs = new JMXControlService(DOMAIN, new Hashtable<>());
-
- String type = "timer";
- String id = "a";
- String alias = "ControlA";
- String controlId = cs.registerControl(type, id, alias, TimerMBean.class, new Timer());
- try {
- cs.registerControl(type, id, alias, TimerMBean.class, new Timer());
- } finally {
- cs.unregister(controlId);
- }
- }
- @Test(expected=RuntimeException.class)
- public void testDoubleunregister() throws Exception {
-
- ControlService cs = new JMXControlService(DOMAIN, new Hashtable<>());
-
- String type = "timer";
- String id = "a";
- String alias = "ControlA";
- String controlId = cs.registerControl(type, id, alias, TimerMBean.class, new Timer());
- cs.unregister(controlId);
- cs.unregister(controlId);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/runtime/jmxcontrol/src/test/java/org/apache/edgent/test/runtime/jmxcontrol/JMXControlServiceTest.java
----------------------------------------------------------------------
diff --git a/runtime/jmxcontrol/src/test/java/org/apache/edgent/test/runtime/jmxcontrol/JMXControlServiceTest.java b/runtime/jmxcontrol/src/test/java/org/apache/edgent/test/runtime/jmxcontrol/JMXControlServiceTest.java
new file mode 100644
index 0000000..79863f6
--- /dev/null
+++ b/runtime/jmxcontrol/src/test/java/org/apache/edgent/test/runtime/jmxcontrol/JMXControlServiceTest.java
@@ -0,0 +1,135 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.runtime.jmxcontrol;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.lang.management.ManagementFactory;
+import java.util.Hashtable;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.timer.Timer;
+import javax.management.timer.TimerMBean;
+
+import org.apache.edgent.execution.services.ControlService;
+import org.apache.edgent.runtime.jmxcontrol.JMXControlService;
+import org.junit.Test;
+
+public class JMXControlServiceTest {
+
+ private static final String DOMAIN = JMXControlServiceTest.class.getName();
+
+ @Test
+ public void testControlObject() throws Exception {
+
+ ControlService cs = new JMXControlService(DOMAIN, new Hashtable<>());
+
+ String type = "timer";
+ String id = "a";
+ String alias = "ControlA";
+ String controlId = cs.registerControl(type, id, alias, TimerMBean.class, new Timer());
+
+ assertNotNull(controlId);
+
+ ObjectName on = ObjectName.getInstance(controlId);
+
+ assertEquals(DOMAIN, on.getDomain());
+
+ assertEquals(type, ObjectName.unquote(on.getKeyProperty("type")));
+ assertEquals(id, ObjectName.unquote(on.getKeyProperty("id")));
+ assertEquals(alias, ObjectName.unquote(on.getKeyProperty("alias")));
+ assertEquals(TimerMBean.class.getName(), ObjectName.unquote(on.getKeyProperty("interface")));
+
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+
+ assertTrue(mbs.isRegistered(on));
+
+ cs.unregister(controlId);
+ assertFalse(mbs.isRegistered(on));
+ }
+
+ @Test
+ public void testAdditionalKeys() throws Exception {
+
+ Hashtable<String,String> addKeys = new Hashtable<>();
+ addKeys.put("job", "jobid");
+ addKeys.put("device", ObjectName.quote("pi"));
+
+ ControlService cs = new JMXControlService(DOMAIN, addKeys);
+
+ String type = "timer";
+ String id = "a";
+ String alias = "ControlA";
+ String controlId = cs.registerControl(type, id, alias, TimerMBean.class, new Timer());
+
+ assertNotNull(controlId);
+
+ ObjectName on = ObjectName.getInstance(controlId);
+
+ assertEquals(DOMAIN, on.getDomain());
+
+ assertEquals(type, ObjectName.unquote(on.getKeyProperty("type")));
+ assertEquals(id, ObjectName.unquote(on.getKeyProperty("id")));
+ assertEquals(alias, ObjectName.unquote(on.getKeyProperty("alias")));
+ assertEquals(TimerMBean.class.getName(), ObjectName.unquote(on.getKeyProperty("interface")));
+
+ assertEquals("jobid", on.getKeyProperty("job"));
+ assertEquals("pi", ObjectName.unquote(on.getKeyProperty("device")));
+
+
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+
+ assertTrue(mbs.isRegistered(on));
+
+ cs.unregister(controlId);
+ assertFalse(mbs.isRegistered(on));
+ }
+
+ @Test(expected=RuntimeException.class)
+ public void testDoubleRegister() throws Exception {
+
+ ControlService cs = new JMXControlService(DOMAIN, new Hashtable<>());
+
+ String type = "timer";
+ String id = "a";
+ String alias = "ControlA";
+ String controlId = cs.registerControl(type, id, alias, TimerMBean.class, new Timer());
+ try {
+ cs.registerControl(type, id, alias, TimerMBean.class, new Timer());
+ } finally {
+ cs.unregister(controlId);
+ }
+ }
+ @Test(expected=RuntimeException.class)
+ public void testDoubleunregister() throws Exception {
+
+ ControlService cs = new JMXControlService(DOMAIN, new Hashtable<>());
+
+ String type = "timer";
+ String id = "a";
+ String alias = "ControlA";
+ String controlId = cs.registerControl(type, id, alias, TimerMBean.class, new Timer());
+ cs.unregister(controlId);
+ cs.unregister(controlId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/runtime/jobregistry/src/main/java/edgent/runtime/jobregistry/JobEvents.java
----------------------------------------------------------------------
diff --git a/runtime/jobregistry/src/main/java/edgent/runtime/jobregistry/JobEvents.java b/runtime/jobregistry/src/main/java/edgent/runtime/jobregistry/JobEvents.java
deleted file mode 100644
index 8444358..0000000
--- a/runtime/jobregistry/src/main/java/edgent/runtime/jobregistry/JobEvents.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.runtime.jobregistry;
-
-import edgent.execution.Job;
-import edgent.execution.services.JobRegistryService;
-import edgent.execution.services.RuntimeServices;
-import edgent.execution.services.JobRegistryService.EventType;
-import edgent.function.BiConsumer;
-import edgent.function.BiFunction;
-import edgent.function.Consumer;
-import edgent.function.Supplier;
-import edgent.topology.TStream;
-import edgent.topology.Topology;
-
-/**
- * A source of job event tuples.
- * <p>
- * A stream of job event tuples is
- * {@linkplain #source(Topology, BiFunction) created} by a listener which
- * subscribes to a {@link JobRegistryService}.
- * </p>
- */
-public class JobEvents {
-
- /**
- * Declares a stream populated by {@link JobRegistryService} events.
- * <p>
- * The job registry is passed as a runtime service. At startup
- * {@code JobRegistryService#addListener()} is called by the
- * runtime to subscribe an event listener. The listener invokes the given
- * {@code wrapper} function to construct a tuple from a job event
- * and submits the tuple on the returned stream.</p>
- * <p>
- * When the topology's execution is terminated,
- * {@code JobRegistryServic#removeListener()} in invoked to unsubscribe
- * the tuple source from the job registry.
- * </p>
- *
- * @param <T> Tuple type
- * @param topology the stream topology
- * @param wrapper constructs a tuple from a job event
- * @return new stream containing the tuples generated by the specified {@code wrapper}.
- *
- * @see Topology#getRuntimeServiceSupplier()
- * @see JobRegistryService#addListener(BiConsumer)
- * @see JobRegistryService#removeListener(BiConsumer)
- */
- public static <T> TStream<T> source(
- Topology topology,
- BiFunction<JobRegistryService.EventType, Job, T> wrapper) {
-
- Supplier<RuntimeServices> rts = topology.getRuntimeServiceSupplier();
- return topology.events(new JobEventsSetup<T>(wrapper, rts));
- }
-
- /**
- * Job events setup Consumer that adds a subscriber to the
- * JobRegistryService on start up and removes it on close.
- *
- * @param <T> Type of the tuples.
- */
- private static final class JobEventsSetup<T>
- implements Consumer<Consumer<T>>, AutoCloseable {
-
- private static final long serialVersionUID = 1L;
- private final Supplier<RuntimeServices> rts;
- private final JobEventsListener<T> listener;
-
- JobEventsSetup(BiFunction<JobRegistryService.EventType, Job, T>
- tupleGen, Supplier<RuntimeServices> rts) {
-
- this.rts = rts;
- this.listener = new JobEventsListener<T>(tupleGen);
- }
-
- @Override
- public void accept(Consumer<T> submitter) {
- JobRegistryService jobRegistry = rts.get().getService(JobRegistryService.class);
- if (jobRegistry != null) {
- listener.setSubmitter(submitter);
- jobRegistry.addListener(listener);
- }
- }
-
- @Override
- public void close() throws Exception {
- JobRegistryService jobRegistry = rts.get().getService(JobRegistryService.class);
- if (jobRegistry != null) {
- jobRegistry.removeListener(listener);
- }
- }
-
- /**
- * JobRegistryService listener which uses a tuple generator for
- * wrapping job events into tuples and a consumer for submitting
- * the tuples.
- *
- * @param <T> Type of the tuples.
- */
- private static final class JobEventsListener<T>
- implements BiConsumer<JobRegistryService.EventType, Job> {
-
- private static final long serialVersionUID = 1L;
- private Consumer<T> eventSubmitter;
- private final BiFunction<JobRegistryService.EventType, Job, T> tupleGenerator;
-
- JobEventsListener(BiFunction<JobRegistryService.EventType, Job, T> tupleGen) {
- this.tupleGenerator = tupleGen;
- }
-
- void setSubmitter(Consumer<T> submitter) {
- this.eventSubmitter = submitter;
- }
-
- @Override
- public void accept(EventType evType, Job job) {
- T tuple = tupleGenerator.apply(evType, job);
- eventSubmitter.accept(tuple);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/runtime/jobregistry/src/main/java/edgent/runtime/jobregistry/JobRegistry.java
----------------------------------------------------------------------
diff --git a/runtime/jobregistry/src/main/java/edgent/runtime/jobregistry/JobRegistry.java b/runtime/jobregistry/src/main/java/edgent/runtime/jobregistry/JobRegistry.java
deleted file mode 100644
index a408cc5..0000000
--- a/runtime/jobregistry/src/main/java/edgent/runtime/jobregistry/JobRegistry.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.runtime.jobregistry;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import edgent.execution.Job;
-import edgent.execution.services.JobRegistryService;
-import edgent.execution.services.ServiceContainer;
-import edgent.function.BiConsumer;
-
-/**
- * Maintains a set of registered jobs and a set of listeners.
- * Notifies listeners on job additions, deletions and updates.
- */
-public class JobRegistry implements JobRegistryService {
-
- /**
- * Creates and registers a {@link JobRegistry} with the given service
- * container.
- *
- * @param services provides access to service registration
- * @return service instance.
- */
- public static JobRegistryService createAndRegister(ServiceContainer services) {
- JobRegistryService service = new JobRegistry();
- services.addService(JobRegistryService.class, service);
- return service;
- }
-
- private final ConcurrentMap<String /*JobId*/, Job> jobs;
- private final Broadcaster<JobRegistryService.EventType, Job> listeners;
- private static final Logger logger = LoggerFactory.getLogger(JobRegistry.class);
-
- /**
- * Creates a new {@link JobRegistry}.
- */
- public JobRegistry() {
- this.jobs = new ConcurrentHashMap<String, Job>();
- this.listeners = new Broadcaster<JobRegistryService.EventType, Job>();
- }
-
- @Override
- public void addListener(BiConsumer<JobRegistryService.EventType, Job> listener) {
- listeners.add(listener);
-
- synchronized (jobs) {
- for (Job job : jobs.values())
- listener.accept(JobRegistryService.EventType.ADD, job);
- }
- }
-
- @Override
- public boolean removeListener(BiConsumer<JobRegistryService.EventType, Job> listener) {
- return listeners.remove(listener);
- }
-
- @Override
- public Set<String> getJobIds() {
- return Collections.unmodifiableSet(jobs.keySet());
- }
-
- @Override
- public Job getJob(String id) {
- return jobs.get(id);
- }
-
- @Override
- public boolean removeJob(String jobId) {
- final Job removed = jobs.remove(jobId);
- if (removed != null) {
- listeners.onEvent(JobRegistryService.EventType.REMOVE, removed);
- return true;
- }
- return false;
- }
-
- @Override
- public void addJob(Job job) throws IllegalArgumentException {
- final Job existing = jobs.putIfAbsent(job.getId(), job);
- if (existing == null) {
- listeners.onEvent(JobRegistryService.EventType.ADD, job);
- } else {
- throw new IllegalArgumentException("A job with Id " + job.getId()
- + " already exists");
- }
- }
-
- @Override
- public boolean updateJob(Job job) {
- if (jobs.containsValue(job)) {
- listeners.onEvent(JobRegistryService.EventType.UPDATE, job);
- return true;
- }
- return false;
- }
-
- private static class Broadcaster<T, O> {
- private final List<BiConsumer<T, O>> listeners;
-
- Broadcaster() {
- this.listeners = new CopyOnWriteArrayList<BiConsumer<T, O>>();
- }
-
- void add(BiConsumer<T, O> listener) {
- if (listener == null) {
- throw new IllegalArgumentException("Null listener") ;
- }
- listeners.add(listener);
- }
-
- boolean remove(BiConsumer<T, O> listener) {
- if (listener == null)
- return false;
- return listeners.remove(listener);
- }
-
- private void onEvent(T event, O job) {
- for (BiConsumer<T, O> listener : listeners) {
- try {
- listener.accept(event, job);
- } catch (Exception e) {
- logger.error("Exception caught while invoking listener:" + e);
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/runtime/jobregistry/src/main/java/edgent/runtime/jobregistry/package-info.java
----------------------------------------------------------------------
diff --git a/runtime/jobregistry/src/main/java/edgent/runtime/jobregistry/package-info.java b/runtime/jobregistry/src/main/java/edgent/runtime/jobregistry/package-info.java
deleted file mode 100644
index 64f7ef9..0000000
--- a/runtime/jobregistry/src/main/java/edgent/runtime/jobregistry/package-info.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.runtime.jobregistry;
-
-/**
- * {@link JobRegistryService} implementation which provides {@link Job}
- * registration, access to registered jobs, as well as subscriptions to job
- * update events.
- */
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/runtime/jobregistry/src/main/java/org/apache/edgent/runtime/jobregistry/JobEvents.java
----------------------------------------------------------------------
diff --git a/runtime/jobregistry/src/main/java/org/apache/edgent/runtime/jobregistry/JobEvents.java b/runtime/jobregistry/src/main/java/org/apache/edgent/runtime/jobregistry/JobEvents.java
new file mode 100644
index 0000000..8a6bbbf
--- /dev/null
+++ b/runtime/jobregistry/src/main/java/org/apache/edgent/runtime/jobregistry/JobEvents.java
@@ -0,0 +1,139 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.runtime.jobregistry;
+
+import org.apache.edgent.execution.Job;
+import org.apache.edgent.execution.services.JobRegistryService;
+import org.apache.edgent.execution.services.RuntimeServices;
+import org.apache.edgent.execution.services.JobRegistryService.EventType;
+import org.apache.edgent.function.BiConsumer;
+import org.apache.edgent.function.BiFunction;
+import org.apache.edgent.function.Consumer;
+import org.apache.edgent.function.Supplier;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+
+/**
+ * A source of job event tuples.
+ * <p>
+ * A stream of job event tuples is
+ * {@linkplain #source(Topology, BiFunction) created} by a listener which
+ * subscribes to a {@link JobRegistryService}.
+ * </p>
+ */
+public class JobEvents {
+
+ /**
+ * Declares a stream populated by {@link JobRegistryService} events.
+ * <p>
+ * The job registry is passed as a runtime service. At startup
+ * {@code JobRegistryService#addListener()} is called by the
+ * runtime to subscribe an event listener. The listener invokes the given
+ * {@code wrapper} function to construct a tuple from a job event
+ * and submits the tuple on the returned stream.</p>
+ * <p>
+ * When the topology's execution is terminated,
+ * {@code JobRegistryServic#removeListener()} in invoked to unsubscribe
+ * the tuple source from the job registry.
+ * </p>
+ *
+ * @param <T> Tuple type
+ * @param topology the stream topology
+ * @param wrapper constructs a tuple from a job event
+ * @return new stream containing the tuples generated by the specified {@code wrapper}.
+ *
+ * @see Topology#getRuntimeServiceSupplier()
+ * @see JobRegistryService#addListener(BiConsumer)
+ * @see JobRegistryService#removeListener(BiConsumer)
+ */
+ public static <T> TStream<T> source(
+ Topology topology,
+ BiFunction<JobRegistryService.EventType, Job, T> wrapper) {
+
+ Supplier<RuntimeServices> rts = topology.getRuntimeServiceSupplier();
+ return topology.events(new JobEventsSetup<T>(wrapper, rts));
+ }
+
+ /**
+ * Job events setup Consumer that adds a subscriber to the
+ * JobRegistryService on start up and removes it on close.
+ *
+ * @param <T> Type of the tuples.
+ */
+ private static final class JobEventsSetup<T>
+ implements Consumer<Consumer<T>>, AutoCloseable {
+
+ private static final long serialVersionUID = 1L;
+ private final Supplier<RuntimeServices> rts;
+ private final JobEventsListener<T> listener;
+
+ JobEventsSetup(BiFunction<JobRegistryService.EventType, Job, T>
+ tupleGen, Supplier<RuntimeServices> rts) {
+
+ this.rts = rts;
+ this.listener = new JobEventsListener<T>(tupleGen);
+ }
+
+ @Override
+ public void accept(Consumer<T> submitter) {
+ JobRegistryService jobRegistry = rts.get().getService(JobRegistryService.class);
+ if (jobRegistry != null) {
+ listener.setSubmitter(submitter);
+ jobRegistry.addListener(listener);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ JobRegistryService jobRegistry = rts.get().getService(JobRegistryService.class);
+ if (jobRegistry != null) {
+ jobRegistry.removeListener(listener);
+ }
+ }
+
+ /**
+ * JobRegistryService listener which uses a tuple generator for
+ * wrapping job events into tuples and a consumer for submitting
+ * the tuples.
+ *
+ * @param <T> Type of the tuples.
+ */
+ private static final class JobEventsListener<T>
+ implements BiConsumer<JobRegistryService.EventType, Job> {
+
+ private static final long serialVersionUID = 1L;
+ private Consumer<T> eventSubmitter;
+ private final BiFunction<JobRegistryService.EventType, Job, T> tupleGenerator;
+
+ JobEventsListener(BiFunction<JobRegistryService.EventType, Job, T> tupleGen) {
+ this.tupleGenerator = tupleGen;
+ }
+
+ void setSubmitter(Consumer<T> submitter) {
+ this.eventSubmitter = submitter;
+ }
+
+ @Override
+ public void accept(EventType evType, Job job) {
+ T tuple = tupleGenerator.apply(evType, job);
+ eventSubmitter.accept(tuple);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/runtime/jobregistry/src/main/java/org/apache/edgent/runtime/jobregistry/JobRegistry.java
----------------------------------------------------------------------
diff --git a/runtime/jobregistry/src/main/java/org/apache/edgent/runtime/jobregistry/JobRegistry.java b/runtime/jobregistry/src/main/java/org/apache/edgent/runtime/jobregistry/JobRegistry.java
new file mode 100644
index 0000000..289307e
--- /dev/null
+++ b/runtime/jobregistry/src/main/java/org/apache/edgent/runtime/jobregistry/JobRegistry.java
@@ -0,0 +1,151 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.runtime.jobregistry;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.apache.edgent.execution.Job;
+import org.apache.edgent.execution.services.JobRegistryService;
+import org.apache.edgent.execution.services.ServiceContainer;
+import org.apache.edgent.function.BiConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Maintains a set of registered jobs and a set of listeners.
+ * Notifies listeners on job additions, deletions and updates.
+ */
+public class JobRegistry implements JobRegistryService {
+
+ /**
+ * Creates and registers a {@link JobRegistry} with the given service
+ * container.
+ *
+ * @param services provides access to service registration
+ * @return service instance.
+ */
+ public static JobRegistryService createAndRegister(ServiceContainer services) {
+ JobRegistryService service = new JobRegistry();
+ services.addService(JobRegistryService.class, service);
+ return service;
+ }
+
+ private final ConcurrentMap<String /*JobId*/, Job> jobs;
+ private final Broadcaster<JobRegistryService.EventType, Job> listeners;
+ private static final Logger logger = LoggerFactory.getLogger(JobRegistry.class);
+
+ /**
+ * Creates a new {@link JobRegistry}.
+ */
+ public JobRegistry() {
+ this.jobs = new ConcurrentHashMap<String, Job>();
+ this.listeners = new Broadcaster<JobRegistryService.EventType, Job>();
+ }
+
+ @Override
+ public void addListener(BiConsumer<JobRegistryService.EventType, Job> listener) {
+ listeners.add(listener);
+
+ synchronized (jobs) {
+ for (Job job : jobs.values())
+ listener.accept(JobRegistryService.EventType.ADD, job);
+ }
+ }
+
+ @Override
+ public boolean removeListener(BiConsumer<JobRegistryService.EventType, Job> listener) {
+ return listeners.remove(listener);
+ }
+
+ @Override
+ public Set<String> getJobIds() {
+ return Collections.unmodifiableSet(jobs.keySet());
+ }
+
+ @Override
+ public Job getJob(String id) {
+ return jobs.get(id);
+ }
+
+ @Override
+ public boolean removeJob(String jobId) {
+ final Job removed = jobs.remove(jobId);
+ if (removed != null) {
+ listeners.onEvent(JobRegistryService.EventType.REMOVE, removed);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public void addJob(Job job) throws IllegalArgumentException {
+ final Job existing = jobs.putIfAbsent(job.getId(), job);
+ if (existing == null) {
+ listeners.onEvent(JobRegistryService.EventType.ADD, job);
+ } else {
+ throw new IllegalArgumentException("A job with Id " + job.getId()
+ + " already exists");
+ }
+ }
+
+ @Override
+ public boolean updateJob(Job job) {
+ if (jobs.containsValue(job)) {
+ listeners.onEvent(JobRegistryService.EventType.UPDATE, job);
+ return true;
+ }
+ return false;
+ }
+
+ private static class Broadcaster<T, O> {
+ private final List<BiConsumer<T, O>> listeners;
+
+ Broadcaster() {
+ this.listeners = new CopyOnWriteArrayList<BiConsumer<T, O>>();
+ }
+
+ void add(BiConsumer<T, O> listener) {
+ if (listener == null) {
+ throw new IllegalArgumentException("Null listener") ;
+ }
+ listeners.add(listener);
+ }
+
+ boolean remove(BiConsumer<T, O> listener) {
+ if (listener == null)
+ return false;
+ return listeners.remove(listener);
+ }
+
+ private void onEvent(T event, O job) {
+ for (BiConsumer<T, O> listener : listeners) {
+ try {
+ listener.accept(event, job);
+ } catch (Exception e) {
+ logger.error("Exception caught while invoking listener:" + e);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/runtime/jobregistry/src/main/java/org/apache/edgent/runtime/jobregistry/package-info.java
----------------------------------------------------------------------
diff --git a/runtime/jobregistry/src/main/java/org/apache/edgent/runtime/jobregistry/package-info.java b/runtime/jobregistry/src/main/java/org/apache/edgent/runtime/jobregistry/package-info.java
new file mode 100644
index 0000000..132d5a8
--- /dev/null
+++ b/runtime/jobregistry/src/main/java/org/apache/edgent/runtime/jobregistry/package-info.java
@@ -0,0 +1,25 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.runtime.jobregistry;
+
+/**
+ * {@link JobRegistryService} implementation which provides {@link Job}
+ * registration, access to registered jobs, as well as subscriptions to job
+ * update events.
+ */
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/runtime/jobregistry/src/test/java/edgent/test/runtime/jobregistry/JobRegistryTest.java
----------------------------------------------------------------------
diff --git a/runtime/jobregistry/src/test/java/edgent/test/runtime/jobregistry/JobRegistryTest.java b/runtime/jobregistry/src/test/java/edgent/test/runtime/jobregistry/JobRegistryTest.java
deleted file mode 100644
index 2f36efb..0000000
--- a/runtime/jobregistry/src/test/java/edgent/test/runtime/jobregistry/JobRegistryTest.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.runtime.jobregistry;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
-
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.junit.Test;
-
-import edgent.execution.Job;
-import edgent.execution.services.JobRegistryService;
-import edgent.execution.services.JobRegistryService.EventType;
-import edgent.function.BiConsumer;
-import edgent.runtime.jobregistry.JobRegistry;
-
-public class JobRegistryTest {
-
- @Test
- public void testGetJob() {
- JobRegistryService service = new JobRegistry();
- Job job1 = new JobImpl("Job_1");
- Job job2 = new JobImpl("Job_2");
- service.addJob(job1);
- service.addJob(job2);
-
- assertSame(job1, service.getJob("Job_1"));
- assertSame(job2, service.getJob("Job_2"));
- }
-
- @Test
- public void testAddJob() {
- JobRegistryService service = new JobRegistry();
-
- service.addJob(new JobImpl("Job_1"));
- service.addListener((et, j) ->
- {
- assertEquals(JobRegistryService.EventType.ADD, et);
- assertEquals("Job_1", j.getId());
- });
- }
-
- @Test
- public void testAddJob2() {
- JobRegistryService service = new JobRegistry();
-
- service.addListener((et, j) -> {
- assertEquals(JobRegistryService.EventType.ADD, et);
- assertEquals("Job_1", j.getId());
- });
- service.addJob(new JobImpl("Job_1"));
- }
-
- @Test
- public void testRemoveJob() {
- JobRegistryService service = new JobRegistry();
- service.addJob(new JobImpl("Job_1"));
- service.addJob(new JobImpl("Job_2"));
-
- service.addListener(new JobHandler(JobRegistryService.EventType.REMOVE));
- service.removeJob("Job_1");
- service.removeJob("Job_2");
- }
-
- @Test
- public void testUpdateJob() {
- JobRegistryService service = new JobRegistry();
- service.addJob(new JobImpl("Job_1"));
- service.addJob(new JobImpl("Job_2"));
-
- service.addListener(new JobHandler(JobRegistryService.EventType.UPDATE));
- service.updateJob(service.getJob("Job_1"));
- service.updateJob(service.getJob("Job_2"));
- }
-
- private static class JobHandler implements BiConsumer<JobRegistryService.EventType, Job> {
- private static final long serialVersionUID = 1L;
- private int eventCount = 0;
- private final JobRegistryService.EventType eventType;
-
- JobHandler(JobRegistryService.EventType et) {
- this.eventType = et;
- }
-
- @Override
- public void accept(EventType et, Job j) {
- eventCount++;
- switch (eventCount) {
- case 1:
- case 2:
- assertEquals(JobRegistryService.EventType.ADD, et);
- break;
- case 3:
- assertEquals(eventType, et);
- assertEquals("Job_1", j.getId());
- break;
- case 4:
- assertEquals(eventType, et);
- assertEquals("Job_2", j.getId());
- break;
- }
- }
- }
-
- private static class JobImpl implements Job {
-
- private String id;
-
- JobImpl(String id) {
- this.id = id;
- }
-
- @Override
- public State getCurrentState() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public State getNextState() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void stateChange(Action action) throws IllegalArgumentException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Health getHealth() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public String getLastError() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public String getName() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public String getId() {
- return id;
- }
-
- @Override
- public void complete() throws ExecutionException, InterruptedException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void complete(long timeout, TimeUnit unit)
- throws ExecutionException, InterruptedException, TimeoutException {
- throw new UnsupportedOperationException();
- }
-
- }
-}