You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/07/21 13:17:35 UTC

[34/54] [abbrv] [partial] incubator-quarks git commit: add "org.apache." prefix to edgent package names

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/apps/runtime/src/main/java/edgent/apps/runtime/JobMonitorApp.java
----------------------------------------------------------------------
diff --git a/apps/runtime/src/main/java/edgent/apps/runtime/JobMonitorApp.java b/apps/runtime/src/main/java/edgent/apps/runtime/JobMonitorApp.java
deleted file mode 100644
index efc2fee..0000000
--- a/apps/runtime/src/main/java/edgent/apps/runtime/JobMonitorApp.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.apps.runtime;
-
-import static edgent.topology.services.ApplicationService.SYSTEM_APP_PREFIX;
-
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.gson.JsonObject;
-
-import edgent.execution.DirectSubmitter;
-import edgent.execution.Job;
-import edgent.execution.Job.Action;
-import edgent.execution.mbeans.JobMXBean;
-import edgent.execution.services.ControlService;
-import edgent.execution.services.Controls;
-import edgent.execution.services.JobRegistryService;
-import edgent.execution.services.RuntimeServices;
-import edgent.function.Consumer;
-import edgent.function.Supplier;
-import edgent.runtime.jobregistry.JobEvents;
-import edgent.topology.TStream;
-import edgent.topology.Topology;
-import edgent.topology.TopologyProvider;
-import edgent.topology.mbeans.ApplicationServiceMXBean;
-import edgent.topology.services.ApplicationService;
-
-/**
- * Job monitoring application.
- * <P>
- * The application listens on JobRegistry events and resubmits jobs for which 
- * an event has been emitted because the job is unhealthy. The monitored 
- * applications must be registered with an {@code ApplicationService} 
- * prior to submission, otherwise the monitor application cannot restart 
- * them.
- * </P>
- * <P>
- * The monitoring application must be submitted within a context which 
- * provides the following services:
- * </P>
- * <ul>
- * <li>ApplicationService - an {@code ApplicationServiceMXBean} control 
- * registered by this service is used to resubmit failed applications.</li>
- * <li>ControlService - the application queries this service for an 
- * {@code ApplicationServiceMXBean} control, which is then used for 
- * restarting failed applications.</li>
- * <li>JobRegistryService - generates job monitoring events. </li>
- * </ul>
- */
-public class JobMonitorApp {
-    /**
-     * Job monitoring application name.
-     */
-    public static final String APP_NAME = SYSTEM_APP_PREFIX + "JobMonitorApp";
-
-    
-    private final TopologyProvider provider;
-    private final DirectSubmitter<Topology, Job> submitter;
-    private final Topology topology;
-    private static final Logger logger = LoggerFactory.getLogger(JobMonitorApp.class);
-
-    /**
-     * Constructs a {@code JobMonitorApp} with the specified name in the 
-     * context of the specified provider.
-     * 
-     * @param provider the topology provider
-     * @param submitter a {@code DirectSubmitter} which provides required 
-     *      services and submits the application
-     * @param name the application name
-     * 
-     * @throws IllegalArgumentException if the submitter does not provide 
-     *      access to the required services
-     */
-    public JobMonitorApp(TopologyProvider provider, 
-            DirectSubmitter<Topology, Job> submitter, String name) {
-
-        this.provider = provider;
-        this.submitter = submitter;
-        validateSubmitter();
-        this.topology = declareTopology(name);
-    }
-    
-    /**
-     * Submits the application topology.
-     * 
-     * @return the job.
-     * @throws InterruptedException if the operation was interrupted
-     * @throws ExecutionException on task execution exception 
-     */
-    public Job submit() throws InterruptedException, ExecutionException {
-        Future<Job> f = submitter.submit(topology);
-        return f.get();
-    }
-
-    /**
-     * Submits an application using an {@code ApplicationServiceMXBean} control 
-     * registered with the specified {@code ControlService}.
-     * 
-     * @param applicationName the name of the application to submit
-     * @param controlService the control service
-     */
-    public static void submitApplication(String applicationName, ControlService controlService) {
-        try {
-            ApplicationServiceMXBean control =
-                    controlService.getControl(
-                            ApplicationServiceMXBean.TYPE,
-                            ApplicationService.ALIAS,
-                            ApplicationServiceMXBean.class);
-            if (control == null) {
-                throw new IllegalStateException(
-                        "Could not find a registered control with the following interface: " + 
-                        ApplicationServiceMXBean.class.getName());                
-            }
-// TODO add ability to submit with the initial application configuration
-            logger.info("Restarting monitored application {}", applicationName);
-            control.submit(applicationName, null);
-        }
-        catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-    
-    /**
-     * Closes a job using a {@code JobMXBean} control registered with the 
-     * specified {@code ControlService}.
-     * 
-     * @param jobName the name of the job
-     * @param controlService the control service
-     */
-    public static void closeJob(String jobName, ControlService controlService) {
-        try {
-            JobMXBean jobMbean = controlService.getControl(JobMXBean.TYPE, jobName, JobMXBean.class);
-            if (jobMbean == null) {
-                throw new IllegalStateException(
-                        "Could not find a registered control for job " + jobName + 
-                        " with the following interface: " + JobMXBean.class.getName());                
-            }
-            jobMbean.stateChange(Action.CLOSE);
-            logger.debug("Closing job {}", jobName);
-            
-            // Wait for the job to complete
-            long startWaiting = System.currentTimeMillis();
-            for (long waitForMillis = Controls.JOB_HOLD_AFTER_CLOSE_SECS * 1000;
-                    waitForMillis < 0;
-                    waitForMillis -= 100) {
-                if (jobMbean.getCurrentState() == Job.State.CLOSED)
-                    break;
-                else
-                    Thread.sleep(100);
-            }
-            if (jobMbean.getCurrentState() != Job.State.CLOSED) {
-                throw new IllegalStateException(
-                        "The unhealthy job " + jobName + " did not close after " + 
-                        Controls.JOB_HOLD_AFTER_CLOSE_SECS + " seconds");                
-            }
-            logger.debug("Job {} state is CLOSED after waiting for {} milliseconds",
-                    jobName, System.currentTimeMillis() - startWaiting);
-        }
-        catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    /**
-     * Declares the following topology:
-     * <pre>
-     * JobEvents source --&gt; Filter (health == unhealthy) --&gt; Restart application
-     * </pre>
-     * 
-     * @param name the topology name
-     * @return the application topology
-     */
-    protected Topology declareTopology(String name) {
-        Topology t = provider.newTopology(name);
-        
-        declareTopology(t);
-        
-        return t;
-    }
-    
-    /**
-     * Populates the following topology:
-     * <pre>
-     * JobEvents source --&gt; Filter (health == unhealthy) --&gt; Restart application
-     * </pre>
-     * @param t Topology
-     *
-     */
-    public static void declareTopology(Topology t) {
-        TStream<JsonObject> jobEvents = JobEvents.source(
-                t, 
-                (evType, job) -> { return JobMonitorAppEvent.toJsonObject(evType, job); }
-                );
-
-        jobEvents = jobEvents.filter(
-                value -> {
-                    logger.trace("Filter: {}", value);
-
-                    try {
-                        JsonObject job = JobMonitorAppEvent.getJob(value);
-                        return (Job.Health.UNHEALTHY.name().equals(
-                                JobMonitorAppEvent.getJobHealth(job)));
-                    } catch (IllegalArgumentException e) {
-                        logger.info("Invalid event filtered out, cause: {}", e.getMessage());
-                        return false;
-                    }
-                 });
-
-        jobEvents.sink(new JobRestarter(t.getRuntimeServiceSupplier()));
-    }
-
-    /**
-     * A {@code Consumer} which restarts the application specified by a 
-     * JSON object passed to its {@code accept} function. 
-     */
-    private static class JobRestarter implements Consumer<JsonObject> {
-        private static final long serialVersionUID = 1L;
-        private final Supplier<RuntimeServices> rts;
-
-        JobRestarter(Supplier<RuntimeServices> rts) {
-            this.rts = rts;
-        }
-
-        @Override
-        public void accept(JsonObject value) {
-            ControlService controlService = rts.get().getService(ControlService.class);
-            JsonObject job = JobMonitorAppEvent.getJob(value);
-            String applicationName = JobMonitorAppEvent.getJobName(job);
-
-            closeJob(applicationName, controlService);
-            submitApplication(applicationName, controlService);
-        }
-    }
-
-    private void validateSubmitter() {
-        ControlService controlService = submitter.getServices().getService(ControlService.class);
-        if (controlService == null) {
-            throw new IllegalArgumentException("Could not access service " + ControlService.class.getName());
-        }
-
-        ApplicationService appService = submitter.getServices().getService(ApplicationService.class);
-        if (appService == null) {
-            throw new IllegalArgumentException("Could not access service " + ApplicationService.class.getName());
-        }
-
-        JobRegistryService jobRegistryService = submitter.getServices().getService(JobRegistryService.class);
-        if (jobRegistryService == null) {
-            throw new IllegalArgumentException("Could not access service " + JobRegistryService.class.getName());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/apps/runtime/src/main/java/edgent/apps/runtime/JobMonitorAppEvent.java
----------------------------------------------------------------------
diff --git a/apps/runtime/src/main/java/edgent/apps/runtime/JobMonitorAppEvent.java b/apps/runtime/src/main/java/edgent/apps/runtime/JobMonitorAppEvent.java
deleted file mode 100644
index 2abbb6a..0000000
--- a/apps/runtime/src/main/java/edgent/apps/runtime/JobMonitorAppEvent.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.apps.runtime;
-
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-
-import edgent.execution.Job;
-import edgent.execution.services.JobRegistryService;
-
-/**
- * Helpers for parsing generating and parsing a JSON representation of job
- * monitoring events. 
- */
-class JobMonitorAppEvent {
-
-    /**
-     * Creates a JsonObject wrapping a {@code JobRegistryService} event type
-     * and Job info.
-     * 
-     * @param evType the event type
-     * @param job the job
-     * @return the wrapped data
-     */
-    static JsonObject toJsonObject(JobRegistryService.EventType evType, Job job) {
-        JsonObject value = new JsonObject();
-        value.addProperty("time", (Number)System.currentTimeMillis());
-        value.addProperty("event", evType.toString());
-        JsonObject obj = new JsonObject();
-        obj.addProperty("id", job.getId());
-        obj.addProperty("name", job.getName());
-        obj.addProperty("state", job.getCurrentState().toString());
-        obj.addProperty("nextState", job.getNextState().toString());
-        obj.addProperty("health", job.getHealth().toString());
-        obj.addProperty("lastError", job.getLastError());
-        value.add("job", obj);
-        return value;
-    }
-
-    /**
-     * Gets the {@code job} JsonObject from the given JSON value.
-     * 
-     * @param value a JSON object
-     * @return the job JsonObject
-     */
-    static JsonObject getJob(JsonObject value) {
-        JsonObject job = value.getAsJsonObject("job");
-        if (job == null) {
-            throw new IllegalArgumentException("Could not find the job object in: " + value);
-        }
-        return job;
-    }
-
-    /**
-     * Gets the {@code name} string property from the given job JSON object.
-     *  
-     * @param job the job JSON object
-     * @return the job name
-     * @throws IllegalArgumentException if it could not find the property
-     */
-    static String getJobName(JsonObject job) {
-        return getProperty(job, "name");
-    }
-
-    /**
-     * Gets the {@code health} string property from the given job JSON object.
-     * 
-     * @param job the job JSON object
-     * @return the job health
-     * @throws IllegalArgumentException if it could not find the property
-     */
-    static String getJobHealth(JsonObject job) {
-        return getProperty(job, "health");
-    }
-
-    /**
-     * Gets a string property with the specified name from the given JSON 
-     * object.
-     * 
-     * @param value a JSON object
-     * @param name the property name
-     * @return the property value
-     * 
-     * @throws IllegalArgumentException if could not find a property with the 
-     *      given name
-     */
-    static String getProperty(JsonObject value, String name) {
-        JsonElement e = value.get(name);
-        if (e != null && e.isJsonPrimitive()) {
-            try {
-                return e.getAsString();
-            } catch (Exception ex) {
-            }
-        }
-        throw new IllegalArgumentException("Could not find the " + name + " property in: " + value);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/apps/runtime/src/main/java/edgent/apps/runtime/package-info.java
----------------------------------------------------------------------
diff --git a/apps/runtime/src/main/java/edgent/apps/runtime/package-info.java b/apps/runtime/src/main/java/edgent/apps/runtime/package-info.java
deleted file mode 100644
index 0d80435..0000000
--- a/apps/runtime/src/main/java/edgent/apps/runtime/package-info.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-/**
- * Applications which provide monitoring and failure recovery to other 
- * Edgent applications.
- */
-package edgent.apps.runtime;

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/apps/runtime/src/main/java/org/apache/edgent/apps/runtime/JobMonitorApp.java
----------------------------------------------------------------------
diff --git a/apps/runtime/src/main/java/org/apache/edgent/apps/runtime/JobMonitorApp.java b/apps/runtime/src/main/java/org/apache/edgent/apps/runtime/JobMonitorApp.java
new file mode 100644
index 0000000..883b023
--- /dev/null
+++ b/apps/runtime/src/main/java/org/apache/edgent/apps/runtime/JobMonitorApp.java
@@ -0,0 +1,270 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.apps.runtime;
+
+import static org.apache.edgent.topology.services.ApplicationService.SYSTEM_APP_PREFIX;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.edgent.execution.DirectSubmitter;
+import org.apache.edgent.execution.Job;
+import org.apache.edgent.execution.Job.Action;
+import org.apache.edgent.execution.mbeans.JobMXBean;
+import org.apache.edgent.execution.services.ControlService;
+import org.apache.edgent.execution.services.Controls;
+import org.apache.edgent.execution.services.JobRegistryService;
+import org.apache.edgent.execution.services.RuntimeServices;
+import org.apache.edgent.function.Consumer;
+import org.apache.edgent.function.Supplier;
+import org.apache.edgent.runtime.jobregistry.JobEvents;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+import org.apache.edgent.topology.TopologyProvider;
+import org.apache.edgent.topology.mbeans.ApplicationServiceMXBean;
+import org.apache.edgent.topology.services.ApplicationService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.JsonObject;
+
+/**
+ * Job monitoring application.
+ * <P>
+ * The application listens on JobRegistry events and resubmits jobs for which 
+ * an event has been emitted because the job is unhealthy. The monitored 
+ * applications must be registered with an {@code ApplicationService} 
+ * prior to submission, otherwise the monitor application cannot restart 
+ * them.
+ * </P>
+ * <P>
+ * The monitoring application must be submitted within a context which 
+ * provides the following services:
+ * </P>
+ * <ul>
+ * <li>ApplicationService - an {@code ApplicationServiceMXBean} control 
+ * registered by this service is used to resubmit failed applications.</li>
+ * <li>ControlService - the application queries this service for an 
+ * {@code ApplicationServiceMXBean} control, which is then used for 
+ * restarting failed applications.</li>
+ * <li>JobRegistryService - generates job monitoring events. </li>
+ * </ul>
+ */
+public class JobMonitorApp {
+    /**
+     * Job monitoring application name.
+     */
+    public static final String APP_NAME = SYSTEM_APP_PREFIX + "JobMonitorApp";
+
+    
+    private final TopologyProvider provider;
+    private final DirectSubmitter<Topology, Job> submitter;
+    private final Topology topology;
+    private static final Logger logger = LoggerFactory.getLogger(JobMonitorApp.class);
+
+    /**
+     * Constructs a {@code JobMonitorApp} with the specified name in the 
+     * context of the specified provider.
+     * 
+     * @param provider the topology provider
+     * @param submitter a {@code DirectSubmitter} which provides required 
+     *      services and submits the application
+     * @param name the application name
+     * 
+     * @throws IllegalArgumentException if the submitter does not provide 
+     *      access to the required services
+     */
+    public JobMonitorApp(TopologyProvider provider, 
+            DirectSubmitter<Topology, Job> submitter, String name) {
+
+        this.provider = provider;
+        this.submitter = submitter;
+        validateSubmitter();
+        this.topology = declareTopology(name);
+    }
+    
+    /**
+     * Submits the application topology.
+     * 
+     * @return the job.
+     * @throws InterruptedException if the operation was interrupted
+     * @throws ExecutionException on task execution exception 
+     */
+    public Job submit() throws InterruptedException, ExecutionException {
+        Future<Job> f = submitter.submit(topology);
+        return f.get();
+    }
+
+    /**
+     * Submits an application using an {@code ApplicationServiceMXBean} control 
+     * registered with the specified {@code ControlService}.
+     * 
+     * @param applicationName the name of the application to submit
+     * @param controlService the control service
+     */
+    public static void submitApplication(String applicationName, ControlService controlService) {
+        try {
+            ApplicationServiceMXBean control =
+                    controlService.getControl(
+                            ApplicationServiceMXBean.TYPE,
+                            ApplicationService.ALIAS,
+                            ApplicationServiceMXBean.class);
+            if (control == null) {
+                throw new IllegalStateException(
+                        "Could not find a registered control with the following interface: " + 
+                        ApplicationServiceMXBean.class.getName());                
+            }
+// TODO add ability to submit with the initial application configuration
+            logger.info("Restarting monitored application {}", applicationName);
+            control.submit(applicationName, null);
+        }
+        catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+    
+    /**
+     * Closes a job using a {@code JobMXBean} control registered with the 
+     * specified {@code ControlService}.
+     * 
+     * @param jobName the name of the job
+     * @param controlService the control service
+     */
+    public static void closeJob(String jobName, ControlService controlService) {
+        try {
+            JobMXBean jobMbean = controlService.getControl(JobMXBean.TYPE, jobName, JobMXBean.class);
+            if (jobMbean == null) {
+                throw new IllegalStateException(
+                        "Could not find a registered control for job " + jobName + 
+                        " with the following interface: " + JobMXBean.class.getName());                
+            }
+            jobMbean.stateChange(Action.CLOSE);
+            logger.debug("Closing job {}", jobName);
+            
+            // Wait for the job to complete
+            long startWaiting = System.currentTimeMillis();
+            for (long waitForMillis = Controls.JOB_HOLD_AFTER_CLOSE_SECS * 1000;
+                    waitForMillis < 0;
+                    waitForMillis -= 100) {
+                if (jobMbean.getCurrentState() == Job.State.CLOSED)
+                    break;
+                else
+                    Thread.sleep(100);
+            }
+            if (jobMbean.getCurrentState() != Job.State.CLOSED) {
+                throw new IllegalStateException(
+                        "The unhealthy job " + jobName + " did not close after " + 
+                        Controls.JOB_HOLD_AFTER_CLOSE_SECS + " seconds");                
+            }
+            logger.debug("Job {} state is CLOSED after waiting for {} milliseconds",
+                    jobName, System.currentTimeMillis() - startWaiting);
+        }
+        catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Declares the following topology:
+     * <pre>
+     * JobEvents source --&gt; Filter (health == unhealthy) --&gt; Restart application
+     * </pre>
+     * 
+     * @param name the topology name
+     * @return the application topology
+     */
+    protected Topology declareTopology(String name) {
+        Topology t = provider.newTopology(name);
+        
+        declareTopology(t);
+        
+        return t;
+    }
+    
+    /**
+     * Populates the following topology:
+     * <pre>
+     * JobEvents source --&gt; Filter (health == unhealthy) --&gt; Restart application
+     * </pre>
+     * @param t Topology
+     *
+     */
+    public static void declareTopology(Topology t) {
+        TStream<JsonObject> jobEvents = JobEvents.source(
+                t, 
+                (evType, job) -> { return JobMonitorAppEvent.toJsonObject(evType, job); }
+                );
+
+        jobEvents = jobEvents.filter(
+                value -> {
+                    logger.trace("Filter: {}", value);
+
+                    try {
+                        JsonObject job = JobMonitorAppEvent.getJob(value);
+                        return (Job.Health.UNHEALTHY.name().equals(
+                                JobMonitorAppEvent.getJobHealth(job)));
+                    } catch (IllegalArgumentException e) {
+                        logger.info("Invalid event filtered out, cause: {}", e.getMessage());
+                        return false;
+                    }
+                 });
+
+        jobEvents.sink(new JobRestarter(t.getRuntimeServiceSupplier()));
+    }
+
+    /**
+     * A {@code Consumer} which restarts the application specified by a 
+     * JSON object passed to its {@code accept} function. 
+     */
+    private static class JobRestarter implements Consumer<JsonObject> {
+        private static final long serialVersionUID = 1L;
+        private final Supplier<RuntimeServices> rts;
+
+        JobRestarter(Supplier<RuntimeServices> rts) {
+            this.rts = rts;
+        }
+
+        @Override
+        public void accept(JsonObject value) {
+            ControlService controlService = rts.get().getService(ControlService.class);
+            JsonObject job = JobMonitorAppEvent.getJob(value);
+            String applicationName = JobMonitorAppEvent.getJobName(job);
+
+            closeJob(applicationName, controlService);
+            submitApplication(applicationName, controlService);
+        }
+    }
+
+    private void validateSubmitter() {
+        ControlService controlService = submitter.getServices().getService(ControlService.class);
+        if (controlService == null) {
+            throw new IllegalArgumentException("Could not access service " + ControlService.class.getName());
+        }
+
+        ApplicationService appService = submitter.getServices().getService(ApplicationService.class);
+        if (appService == null) {
+            throw new IllegalArgumentException("Could not access service " + ApplicationService.class.getName());
+        }
+
+        JobRegistryService jobRegistryService = submitter.getServices().getService(JobRegistryService.class);
+        if (jobRegistryService == null) {
+            throw new IllegalArgumentException("Could not access service " + JobRegistryService.class.getName());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/apps/runtime/src/main/java/org/apache/edgent/apps/runtime/JobMonitorAppEvent.java
----------------------------------------------------------------------
diff --git a/apps/runtime/src/main/java/org/apache/edgent/apps/runtime/JobMonitorAppEvent.java b/apps/runtime/src/main/java/org/apache/edgent/apps/runtime/JobMonitorAppEvent.java
new file mode 100644
index 0000000..4b8c9b3
--- /dev/null
+++ b/apps/runtime/src/main/java/org/apache/edgent/apps/runtime/JobMonitorAppEvent.java
@@ -0,0 +1,113 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.apps.runtime;
+
+import org.apache.edgent.execution.Job;
+import org.apache.edgent.execution.services.JobRegistryService;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
+/**
+ * Helpers for parsing generating and parsing a JSON representation of job
+ * monitoring events. 
+ */
+class JobMonitorAppEvent {
+
+    /**
+     * Creates a JsonObject wrapping a {@code JobRegistryService} event type
+     * and Job info.
+     * 
+     * @param evType the event type
+     * @param job the job
+     * @return the wrapped data
+     */
+    static JsonObject toJsonObject(JobRegistryService.EventType evType, Job job) {
+        JsonObject value = new JsonObject();
+        value.addProperty("time", (Number)System.currentTimeMillis());
+        value.addProperty("event", evType.toString());
+        JsonObject obj = new JsonObject();
+        obj.addProperty("id", job.getId());
+        obj.addProperty("name", job.getName());
+        obj.addProperty("state", job.getCurrentState().toString());
+        obj.addProperty("nextState", job.getNextState().toString());
+        obj.addProperty("health", job.getHealth().toString());
+        obj.addProperty("lastError", job.getLastError());
+        value.add("job", obj);
+        return value;
+    }
+
+    /**
+     * Gets the {@code job} JsonObject from the given JSON value.
+     * 
+     * @param value a JSON object
+     * @return the job JsonObject
+     */
+    static JsonObject getJob(JsonObject value) {
+        JsonObject job = value.getAsJsonObject("job");
+        if (job == null) {
+            throw new IllegalArgumentException("Could not find the job object in: " + value);
+        }
+        return job;
+    }
+
+    /**
+     * Gets the {@code name} string property from the given job JSON object.
+     *  
+     * @param job the job JSON object
+     * @return the job name
+     * @throws IllegalArgumentException if it could not find the property
+     */
+    static String getJobName(JsonObject job) {
+        return getProperty(job, "name");
+    }
+
+    /**
+     * Gets the {@code health} string property from the given job JSON object.
+     * 
+     * @param job the job JSON object
+     * @return the job health
+     * @throws IllegalArgumentException if it could not find the property
+     */
+    static String getJobHealth(JsonObject job) {
+        return getProperty(job, "health");
+    }
+
+    /**
+     * Gets a string property with the specified name from the given JSON 
+     * object.
+     * 
+     * @param value a JSON object
+     * @param name the property name
+     * @return the property value
+     * 
+     * @throws IllegalArgumentException if could not find a property with the 
+     *      given name
+     */
+    static String getProperty(JsonObject value, String name) {
+        JsonElement e = value.get(name);
+        if (e != null && e.isJsonPrimitive()) {
+            try {
+                return e.getAsString();
+            } catch (Exception ex) {
+            }
+        }
+        throw new IllegalArgumentException("Could not find the " + name + " property in: " + value);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/apps/runtime/src/main/java/org/apache/edgent/apps/runtime/package-info.java
----------------------------------------------------------------------
diff --git a/apps/runtime/src/main/java/org/apache/edgent/apps/runtime/package-info.java b/apps/runtime/src/main/java/org/apache/edgent/apps/runtime/package-info.java
new file mode 100644
index 0000000..44575df
--- /dev/null
+++ b/apps/runtime/src/main/java/org/apache/edgent/apps/runtime/package-info.java
@@ -0,0 +1,24 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+/**
+ * Applications which provide monitoring and failure recovery to other 
+ * Edgent applications.
+ */
+package org.apache.edgent.apps.runtime;

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/apps/runtime/src/test/java/edgent/test/apps/runtime/JobMonitorAppTest.java
----------------------------------------------------------------------
diff --git a/apps/runtime/src/test/java/edgent/test/apps/runtime/JobMonitorAppTest.java b/apps/runtime/src/test/java/edgent/test/apps/runtime/JobMonitorAppTest.java
deleted file mode 100644
index fea4378..0000000
--- a/apps/runtime/src/test/java/edgent/test/apps/runtime/JobMonitorAppTest.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.apps.runtime;
-
-import static org.junit.Assert.assertTrue;
-
-import java.util.Hashtable;
-import java.util.Random;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.junit.Test;
-
-import edgent.apps.runtime.JobMonitorApp;
-import edgent.execution.DirectSubmitter;
-import edgent.execution.Job;
-import edgent.execution.services.ControlService;
-import edgent.execution.services.ServiceContainer;
-import edgent.providers.direct.DirectProvider;
-import edgent.runtime.appservice.AppService;
-import edgent.runtime.jmxcontrol.JMXControlService;
-import edgent.runtime.jobregistry.JobRegistry;
-import edgent.topology.TStream;
-import edgent.topology.Topology;
-import edgent.topology.services.ApplicationService;
-
-public class JobMonitorAppTest {
-
-    public final static String MONITORED_APP_NAME_1 = "MonitoredApp_1";
-    public final static String MONITORED_APP_NAME_2 = "MonitoredApp_2";
-
-    @Test
-    public void testJobMonitorApp() throws Exception {
-        DirectProvider provider = new DirectProvider();
-        startProvider(provider);
-
-        // Start monitor app
-        JobMonitorApp app = new JobMonitorApp(provider, provider, JobMonitorApp.APP_NAME);
-        Job monitor = app.submit();
-
-        // Declare and register user apps which need monitoring
-        registerMonitoredApplicationOne(provider);
-        registerMonitoredApplicationTwo(provider);
-
-        // Start monitored apps
-        startMonitoredApplications(provider);
-        
-        // Run for a while, assert the monitor app is still running healthy
-        Thread.sleep(5000);
-        assertTrue(
-                Job.State.RUNNING.equals(monitor.getCurrentState()) &&
-                Job.State.RUNNING.equals(monitor.getNextState()) &&
-                Job.Health.HEALTHY.equals(monitor.getHealth()));
-    }
-
-    static void startProvider(DirectProvider provider) 
-            throws InterruptedException, ExecutionException {
-        
-        provider.getServices().addService(ControlService.class,
-                new JMXControlService("edgent.test.apps.runtime", new Hashtable<>()));
-        AppService.createAndRegister(provider, provider);
-        JobRegistry.createAndRegister(provider.getServices());        
-    }
-
-    /**
-     * Fails every 2 seconds (20 tuples * 100 millis)
-     */
-    static void registerMonitoredApplicationOne(DirectSubmitter<Topology, Job> submitter) {
-        ApplicationService appService = submitter.getServices().getService(ApplicationService.class);
-        appService.registerTopology(MONITORED_APP_NAME_1, (topology, config) -> {
-                
-                Random r = new Random();
-                TStream<Double> d  = topology.poll(() -> r.nextGaussian(), 100, TimeUnit.MILLISECONDS);
-                
-                final AtomicInteger count = new AtomicInteger(0);
-                d = d.filter(tuple -> {
-                    int tupleCount = count.incrementAndGet();
-                    if (tupleCount == 20) {
-                        throw new IllegalStateException("Injected error");
-                    }
-                    return true; 
-                });
-                
-                d.sink(tuple -> System.out.print("."));
-            });
-    }
-
-    /**
-     * Fails every 1.5 seconds (10 tuples * 150 millis)
-     */
-    static void registerMonitoredApplicationTwo(DirectSubmitter<Topology, Job> submitter) {
-        ApplicationService appService = submitter.getServices().getService(ApplicationService.class);
-        appService.registerTopology(MONITORED_APP_NAME_2, (topology, config) -> {
-                
-                Random r = new Random();
-                TStream<Double> d  = topology.poll(() -> r.nextGaussian(), 150, TimeUnit.MILLISECONDS);
-                
-                final AtomicInteger count = new AtomicInteger(0);
-                d = d.filter(tuple -> {
-                    int tupleCount = count.incrementAndGet();
-                    if (tupleCount == 10) {
-                        throw new IllegalStateException("Injected error");
-                    }
-                    return true; 
-                });
-                
-                d.sink(tuple -> System.out.print("#"));
-            });
-    }
-
-    static void startMonitoredApplications(DirectSubmitter<Topology, Job> submitter) {
-        ServiceContainer services = submitter.getServices();
-        ApplicationService appService = services.getService(ApplicationService.class);
-        ControlService controlService = services.getService(ControlService.class);
-
-        // Submit all applications registered with the ApplicationService
-        for (String name: appService.getApplicationNames()) {
-            JobMonitorApp.submitApplication(name, controlService);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/apps/runtime/src/test/java/org/apache/edgent/test/apps/runtime/JobMonitorAppTest.java
----------------------------------------------------------------------
diff --git a/apps/runtime/src/test/java/org/apache/edgent/test/apps/runtime/JobMonitorAppTest.java b/apps/runtime/src/test/java/org/apache/edgent/test/apps/runtime/JobMonitorAppTest.java
new file mode 100644
index 0000000..51b129c
--- /dev/null
+++ b/apps/runtime/src/test/java/org/apache/edgent/test/apps/runtime/JobMonitorAppTest.java
@@ -0,0 +1,137 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.apps.runtime;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.Hashtable;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.edgent.apps.runtime.JobMonitorApp;
+import org.apache.edgent.execution.DirectSubmitter;
+import org.apache.edgent.execution.Job;
+import org.apache.edgent.execution.services.ControlService;
+import org.apache.edgent.execution.services.ServiceContainer;
+import org.apache.edgent.providers.direct.DirectProvider;
+import org.apache.edgent.runtime.appservice.AppService;
+import org.apache.edgent.runtime.jmxcontrol.JMXControlService;
+import org.apache.edgent.runtime.jobregistry.JobRegistry;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+import org.apache.edgent.topology.services.ApplicationService;
+import org.junit.Test;
+
+public class JobMonitorAppTest {
+
+    public final static String MONITORED_APP_NAME_1 = "MonitoredApp_1";
+    public final static String MONITORED_APP_NAME_2 = "MonitoredApp_2";
+
+    @Test
+    public void testJobMonitorApp() throws Exception {
+        DirectProvider provider = new DirectProvider();
+        startProvider(provider);
+
+        // Start monitor app
+        JobMonitorApp app = new JobMonitorApp(provider, provider, JobMonitorApp.APP_NAME);
+        Job monitor = app.submit();
+
+        // Declare and register user apps which need monitoring
+        registerMonitoredApplicationOne(provider);
+        registerMonitoredApplicationTwo(provider);
+
+        // Start monitored apps
+        startMonitoredApplications(provider);
+        
+        // Run for a while, assert the monitor app is still running healthy
+        Thread.sleep(5000);
+        assertTrue(
+                Job.State.RUNNING.equals(monitor.getCurrentState()) &&
+                Job.State.RUNNING.equals(monitor.getNextState()) &&
+                Job.Health.HEALTHY.equals(monitor.getHealth()));
+    }
+
+    static void startProvider(DirectProvider provider) 
+            throws InterruptedException, ExecutionException {
+        
+        provider.getServices().addService(ControlService.class,
+                new JMXControlService("org.apache.edgent.test.apps.runtime", new Hashtable<>()));
+        AppService.createAndRegister(provider, provider);
+        JobRegistry.createAndRegister(provider.getServices());        
+    }
+
+    /**
+     * Fails every 2 seconds (20 tuples * 100 millis)
+     */
+    static void registerMonitoredApplicationOne(DirectSubmitter<Topology, Job> submitter) {
+        ApplicationService appService = submitter.getServices().getService(ApplicationService.class);
+        appService.registerTopology(MONITORED_APP_NAME_1, (topology, config) -> {
+                
+                Random r = new Random();
+                TStream<Double> d  = topology.poll(() -> r.nextGaussian(), 100, TimeUnit.MILLISECONDS);
+                
+                final AtomicInteger count = new AtomicInteger(0);
+                d = d.filter(tuple -> {
+                    int tupleCount = count.incrementAndGet();
+                    if (tupleCount == 20) {
+                        throw new IllegalStateException("Injected error");
+                    }
+                    return true; 
+                });
+                
+                d.sink(tuple -> System.out.print("."));
+            });
+    }
+
+    /**
+     * Fails every 1.5 seconds (10 tuples * 150 millis)
+     */
+    static void registerMonitoredApplicationTwo(DirectSubmitter<Topology, Job> submitter) {
+        ApplicationService appService = submitter.getServices().getService(ApplicationService.class);
+        appService.registerTopology(MONITORED_APP_NAME_2, (topology, config) -> {
+                
+                Random r = new Random();
+                TStream<Double> d  = topology.poll(() -> r.nextGaussian(), 150, TimeUnit.MILLISECONDS);
+                
+                final AtomicInteger count = new AtomicInteger(0);
+                d = d.filter(tuple -> {
+                    int tupleCount = count.incrementAndGet();
+                    if (tupleCount == 10) {
+                        throw new IllegalStateException("Injected error");
+                    }
+                    return true; 
+                });
+                
+                d.sink(tuple -> System.out.print("#"));
+            });
+    }
+
+    static void startMonitoredApplications(DirectSubmitter<Topology, Job> submitter) {
+        ServiceContainer services = submitter.getServices();
+        ApplicationService appService = services.getService(ApplicationService.class);
+        ControlService controlService = services.getService(ControlService.class);
+
+        // Submit all applications registered with the ApplicationService
+        for (String name: appService.getApplicationNames()) {
+            JobMonitorApp.submitApplication(name, controlService);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 694771f..0c383ef 100644
--- a/build.gradle
+++ b/build.gradle
@@ -46,7 +46,6 @@ subprojects {
   plugins.apply 'java'
   plugins.apply 'jacoco'
 
-  if(["javax.websocket-client", "javax.websocket-server", "edgent.javax.websocket"].contains(project.name)) {
     archivesBaseName = "${project.name}"
   } else{
     archivesBaseName = "${rootProject.name}${project.path.replace(':', '.')}"
@@ -175,14 +174,14 @@ task aggregateJavadoc(type: Javadoc) {
     overview "edgent_overview.html"
     windowTitle "Edgent v${build_version}"
 
-    group("Edgent API", "edgent.execution", "edgent.function", "edgent.topology", "edgent.topology.json", "edgent.topology.mbeans", "edgent.topology.plumbing", "edgent.topology.services", "edgent.execution.*")
-    group("Edgent Providers", "edgent.providers.*")
-    group("Edgent Connectors", "edgent.connectors.*")
-    group("Edgent Samples", "edgent.samples.*")
-    group("Edgent Analytics", "edgent.analytics.*")
-    group("Edgent Utilities", "edgent.metrics", "edgent.metrics.*", "edgent.streamscope", "edgent.streamscope.*")
-    group("Edgent Low-Level API", "edgent.graph", "edgent.graph.*", "edgent.oplet", "edgent.oplet.*", "edgent.window")
-    group("Edgent SPI", "edgent.topology.spi", "edgent.topology.spi.*")
+    group("Edgent API", "org.apache.edgent.execution", "org.apache.edgent.function", "org.apache.edgent.topology", "org.apache.edgent.topology.json", "org.apache.edgent.topology.mbeans", "org.apache.edgent.topology.plumbing", "org.apache.edgent.topology.services", "org.apache.edgent.execution.*")
+    group("Edgent Providers", "org.apache.edgent.providers.*")
+    group("Edgent Connectors", "org.apache.edgent.connectors.*")
+    group("Edgent Samples", "org.apache.edgent.samples.*")
+    group("Edgent Analytics", "org.apache.edgent.analytics.*")
+    group("Edgent Utilities", "org.apache.edgent.metrics", "org.apache.edgent.metrics.*", "org.apache.edgent.streamscope", "org.apache.edgent.streamscope.*")
+    group("Edgent Low-Level API", "org.apache.edgent.graph", "org.apache.edgent.graph.*", "org.apache.edgent.oplet", "org.apache.edgent.oplet.*", "org.apache.edgent.window")
+    group("Edgent SPI", "org.apache.edgent.topology.spi", "org.apache.edgent.topology.spi.*")
   }
   source subprojects.collect { project -> project.sourceSets.main.allJava }
   classpath = files(subprojects.collect

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index d98ae50..ae57319 100644
--- a/build.xml
+++ b/build.xml
@@ -300,7 +300,7 @@
       additionalparam="-Xdoclint:none"
       Overview="edgent_overview.html"
       failonwarning="true"
-      excludepackagenames="edgent.connectors.runtime,edgent.connectors.*.runtime,edgent.console.*,edgent.samples.scenarios.iotf.range.sensor"
+      excludepackagenames="org.apache.edgent.connectors.runtime,org.apache.edgent.connectors.*.runtime,org.apache.edgent.console.*,org.apache.edgent.samples.scenarios.iotf.range.sensor"
       >
       <sourcepath>
          <dirset dir="${basedir}">
@@ -323,14 +323,14 @@
        <doctitle>Apache Edgent (incubating) v${edgent.version}</doctitle>
        <footer><![CDATA[<a href="http://edgent.incubator.apache.org">Apache Edgent (incubating)</a>]]></footer>
        <bottom>Copyright &amp;copy; 2016 The Apache Software Foundation. All Rights Reserved - ${commithash}-${DSTAMP}-${TSTAMP}</bottom>
-       <group title="Edgent API" packages="edgent.execution,edgent.function,edgent.topology,edgent.topology.json,edgent.topology.mbeans,edgent.topology.plumbing,edgent.topology.services,edgent.execution.*"/>
-       <group title="Edgent Providers" packages="edgent.providers.*"/>
-       <group title="Edgent Connectors" packages="edgent.connectors.*"/>
-       <group title="Edgent Samples" packages="edgent.samples.*"/>
-       <group title="Edgent Analytics" packages="edgent.analytics.*"/>
-       <group title="Edgent Utilities" packages="edgent.metrics,edgent.metrics.*,edgent.streamscope,edgent.streamscope.*"/>
-       <group title="Edgent Low-Level API" packages="edgent.graph,edgent.graph.*,edgent.oplet,edgent.oplet.*,edgent.window"/>
-       <group title="Edgent SPI" packages="edgent.topology.spi,edgent.topology.spi.*"/>
+       <group title="Edgent API" packages="org.apache.edgent.execution,org.apache.edgent.function,org.apache.edgent.topology,org.apache.edgent.topology.json,org.apache.edgent.topology.mbeans,org.apache.edgent.topology.plumbing,org.apache.edgent.topology.services,org.apache.edgent.execution.*"/>
+       <group title="Edgent Providers" packages="org.apache.edgent.providers.*"/>
+       <group title="Edgent Connectors" packages="org.apache.edgent.connectors.*"/>
+       <group title="Edgent Samples" packages="org.apache.edgent.samples.*"/>
+       <group title="Edgent Analytics" packages="org.apache.edgent.analytics.*"/>
+       <group title="Edgent Utilities" packages="org.apache.edgent.metrics,edgent.metrics.*,org.apache.edgent.streamscope,org.apache.edgent.streamscope.*"/>
+       <group title="Edgent Low-Level API" packages="org.apache.edgent.graph,edgent.graph.*,org.apache.edgent.oplet,edgent.oplet.*,org.apache.edgent.window"/>
+       <group title="Edgent SPI" packages="org.apache.edgent.topology.spi,org.apache.edgent.topology.spi.*"/>
      </javadoc>
   </target>
 

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/command/src/main/java/edgent/connectors/command/CommandStreams.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/main/java/edgent/connectors/command/CommandStreams.java b/connectors/command/src/main/java/edgent/connectors/command/CommandStreams.java
deleted file mode 100644
index 817286c..0000000
--- a/connectors/command/src/main/java/edgent/connectors/command/CommandStreams.java
+++ /dev/null
@@ -1,349 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.connectors.command;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.StringTokenizer;
-import java.util.concurrent.TimeUnit;
-
-import edgent.connectors.command.runtime.CommandReader;
-import edgent.connectors.command.runtime.CommandWriter;
-import edgent.function.Consumer;
-import edgent.function.Supplier;
-import edgent.topology.TSink;
-import edgent.topology.TStream;
-import edgent.topology.Topology;
-
-/**
- * Connector for creating a TStream from a Command's / OS Process's output
- * and sinking a TStream to a Command's / OS Process's input.
- * <P>
- * e.g., run a network monitor command (like Tiger Shark) and ingest its output.
- */
-public class CommandStreams {
-  private CommandStreams() {}
-
-  /**
-   * Tokenize the specified {@code cmdString} in the exact same manner as
-   * done by {@link Runtime#exec(String)}.
-   * <P>
-   * This function provides a convenience for creating a {@link ProcessBuilder}
-   * for use by the other CommandStreams methods. 
-   * </P>
-   * <P>
-   * Sample use:
-   * <pre>{@code
-   * ProcessBuilder cmd = new ProcessBuilder(tokenize("sh someShellCmd.sh and args"));
-   * TStream<String> stream = CommandStreams.generate(topology, cmd);
-   * }</pre>
-   * 
-   * @param cmdString a command string
-   * @return the tokens
-   */
-  public static List<String> tokenize(String cmdString) {
-    List<String> command = new ArrayList<>();
-    StringTokenizer tok = new StringTokenizer(cmdString);
-    while (tok.hasMoreTokens()) 
-      command.add(tok.nextToken());
-    return command;
-  }
-  
-  /**
-   * Create an endless {@code TStream<String>} from a long running command's output.
-   * <P>
-   * The supplied {@code cmd} is used to start the command.
-   * A tuple is created for each UTF8 line read from the command's
-   * {@link Process#getInputStream() output}.
-   * The tuples contain output from stderr if cmd is configured to 
-   * {@link ProcessBuilder#redirectErrorStream(boolean) redirect stderr to stdout}.
-   * The command is restarted if a read from the command's output stream
-   * returns EOF or an error. 
-   * </P>
-   * <P>
-   * This is a convenience function equivalent to
-   * {@code topology.generate(endlessCommandReader(cmd))}.
-   * </P>
-   * <P>
-   * Sample use: create a stream of tuples for the output from a 
-   * continuously running and restartable command:
-   * <pre>{@code
-   * ProcessBuilder cmd = new ProcessBuilder("myCommand");
-   * TStream<String> cmdOutput = CommandStreams.generate(topology, cmd);
-   * cmdOutput.print();
-   * }</pre>
-   * 
-   * @param topology the topology to add the source stream to
-   * @param cmd the {@link ProcessBuilder} to start the command
-   * @return the source {@code TStream<String>}
-   * 
-   * @see #endlessCommandReader(ProcessBuilder)
-   * @see #tokenize(String)
-   */
-  public static TStream<String> generate(Topology topology, ProcessBuilder cmd) {
-    return topology.generate(endlessCommandReader(cmd));
-  }
-  
-  /**
-   * Create a {@code TStream<String>} from a periodically run command's output.
-   * <P>
-   * The supplied {@code cmd} is used to start the command
-   * at the specified {@code period}.
-   * The command's UTF8 {@link Process#getInputStream() output} is read until EOF
-   * and a {@code List<String>} tuple is created containing the collected output.
-   * The tuples contain output from stderr if the cmd is configured to 
-   * {@link ProcessBuilder#redirectErrorStream(boolean) redirect stderr to stdout}.  
-   * </P>
-   * <P>
-   * This is a convenience function equivalent to
-   * {@code topology.poll(commandReaderList(cmd), period, units)}.
-   * </P>
-   * <P>
-   * Sample use: create a stream of tuples containing the output 
-   * from a periodically run command:
-   * <pre>{@code
-   * ProcessBuilder cmd = new ProcessBuilder("date");
-   * TStream<List<String>> cmdOutput = 
-   *      CommandStreams.periodicSource(topology, cmd, 2, TimeUnit.SECONDS);
-   * cmdOutput.print();
-   * }</pre>
-   * 
-   * @param topology the topology to add the source stream to
-   * @param cmd the {@link ProcessBuilder} to start the command
-   * @param period the period to run the command and collect its output
-   * @param units TimeUnit for {@code period}
-   * @return the source {@code TStream<List<String>>}
-   * 
-   * @see #commandReaderList(ProcessBuilder)
-   * @see #tokenize(String)
-   */
-  public static TStream<List<String>> periodicSource(Topology topology,
-      ProcessBuilder cmd, long period, TimeUnit units) {
-    return topology.poll(commandReaderList(cmd), period, units);
-  }
-  
-  /**
-   * Sink a {@code TStream<String>} to a command's input.
-   * <P>
-   * The supplied {@code cmd} is used to start the command.
-   * Each tuple is written as UTF8 and flushed to the command's {@link Process#getOutputStream() input}.
-   * The command is restarted if a write encounters an error. 
-   * </P>
-   * <P>
-   * While each write is followed by a flush() that only helps to
-   * reduce the time it takes to notice that cmd has failed and restart it.
-   * Supposedly "successfully written and flushed" values are not guaranteed to
-   * have been received by a cmd across restarts.
-   * </P>
-   * <P>
-   * This is a convenience function equivalent to
-   * {@code stream.sink(commandWriter(cmd))}
-   * </P>
-   * <P>
-   * Sample use: write a stream of tuples to the input of a command:
-   * <pre>{@code
-   * TStream<String> stream = topology.strings("one", "two", "three");
-   * ProcessBuilder cmd = new ProcessBuilder("cat").redirectOutput(new File("/dev/stdout"));
-   * CommandStreams.sink(stream, cmd);
-   * }</pre>
-   * 
-   * @param stream the stream to sink
-   * @param cmd the {@link ProcessBuilder} to start the command
-   * @return a {@link TSink}
-   * 
-   * @see #commandWriter(ProcessBuilder)
-   * @see #tokenize(String)
-   */
-  public static TSink<String> sink(TStream<String> stream, ProcessBuilder cmd) {
-    return stream.sink(commandWriter(cmd));
-  }
-    
-  /**
-   * Create an endless {@code Supplier<String>} for ingesting a long running command's output.
-   * <P>
-   * This method is particularly helpful in creating a sensor or source connector
-   * class that hides the fact that it uses a command, enabling it to be used
-   * like any other sensor/connector.
-   * </P>
-   * For example:
-   * <pre><code>
-   * // ingest the sensor data
-   * TStream&lt;MySensorData&gt; stream = topology.generate(new MySensor());
-   *
-   * // MySensor class
-   * class MySensor implements Supplier&lt;MySensorData&gt; {
-   *   private String[] cmd = new String[] {"mySensorCmd", "arg1"};
-   *   private Supplier&lt;String&gt; commandReader = 
-   *     CommandStreams.endlessCommandReader(new ProcessBuilder(cmd));
-   *       
-   *   // implement Supplier&lt;MySensorData&gt;.get()
-   *   public MySensorData get() {
-   *     // get the next line from the cmd and create a MySensorData tuple from it
-   *     return createMySensorData(commandReader.get());
-   *   }
-   * }
-   * </code></pre>
-   * <P>
-   * The supplied {@code cmd} is used to start the command.
-   * A call to {@link Supplier#get()} reads the next UTF8 line from the command's
-   * {@link Process#getInputStream() output}.
-   * The returned strings contain output from stderr if the cmd is configured to 
-   * {@link ProcessBuilder#redirectErrorStream(boolean) redirect stderr to stdput}.  
-   * The command is restarted if a read from the command's output stream
-   * returns EOF or an error.
-   * </P>
-   * 
-   * @param cmd the {@link ProcessBuilder} to start the command
-   * @return the {@code Supplier<String>}
-   * 
-   * @see #generate(Topology, ProcessBuilder)
-   * @see #tokenize(String)
-   */
-  public static Supplier<String> endlessCommandReader(ProcessBuilder cmd) {
-    return new Supplier<String>() {
-      private static final long serialVersionUID = 1L;
-      Supplier<Iterable<String>> reader = new CommandReader(cmd, true);
-      Iterator<String> iter = null;
-      @Override
-      public String get() {
-        if (iter == null) {
-          iter = reader.get().iterator();
-        }
-        if (iter.hasNext()) {
-          return iter.next();
-        }
-        else {
-          // presumably a shutdown condition
-          return null;
-        }
-      }
-    };
-  }
-  
-  /**
-   * Create a {@code Supplier<List<String>>} to ingest a command's output.
-   * <P>
-   * This method is particularly helpful in creating a sensor or source connector
-   * class that hides the fact that it uses a command, enabling it to be used
-   * like any other sensor/connector.
-   * </P>
-   * For example:
-   * <pre><code>
-   * // ingest the sensor data
-   * TStream&lt;MySensorData&gt; stream = topology.periodicSource(new MySensor());
-   *
-   * // MySensor class
-   * class MySensor implements Supplier&lt;MySensorData&gt; {
-   *   private String[] cmd = new String[] {"mySensorCmd", "arg1"};
-   *   private Supplier&lt;List&lt;String&gt;&gt; commandReader = 
-   *     CommandStreams.commandReaderList(new ProcessBuilder(cmd));
-   *       
-   *   // implement Supplier&lt;MySensorData&gt;.get()
-   *   public MySensorData get() {
-   *     // get the cmd output and create a MySensorData tuple from it
-   *     return createMySensorData(commandReader.get());
-   *   }
-   * }
-   * </code></pre>
-   * <P>
-   * The supplied {@code cmd} is used to start the command.
-   * A call to {@link Supplier#get()} reads the command's UTF8
-   * {@link Process#getInputStream() input stream} until an EOF or error
-   * and returns a {@code List<String>} of the collected input.
-   * The tuples contain output from stderr if the cmd is configured to 
-   * {@link ProcessBuilder#redirectErrorStream(boolean) redirect stderr to stdout}.
-   * </P>
-   * 
-   * @param cmd the {@link ProcessBuilder} to start the command
-   * @return the {@code Supplier<List<String>>} for the command
-   * 
-   * @see #periodicSource(Topology, ProcessBuilder, long, TimeUnit)
-   * @see #tokenize(String)
-   */
-  public static Supplier<List<String>> commandReaderList(ProcessBuilder cmd) {
-    return new Supplier<List<String>>() {
-      private static final long serialVersionUID = 1L;
-
-      @Override
-      public List<String> get() {
-        try (CommandReader supplier
-                = new CommandReader(cmd, false))
-        {
-          Iterator<String> iter = supplier.get().iterator();
-          List<String> list = new ArrayList<>();
-          while (iter.hasNext())
-            list.add(iter.next());
-          return list;
-        }
-      }
-    };
-  }
- 
-  /**
-   * Create a {@code Consumer<String>} to write UTF8 string data to a command's input.
-   * <P>
-   * This method is particularly helpful in creating a sink connector
-   * that hides the fact that it uses a command, enabling it to be used
-   * like a native connector.
-   * </P>
-   * For example:
-   * <pre><code>
-   * // sink a stream to my connector
-   * TStream&lt;MySensorData&gt; stream = ...;
-   * stream.sink(new MySinkConnector());
-   *
-   * // MySinkConnector class
-   * class MySinkConnector implements Consumer&lt;MySensorData&gt; {
-   *   private String[] cmd = new String[] {"mySinkCmd", "arg1"};
-   *   private Consumer&lt;String&gt; commandWriter = 
-   *     CommandStreams.commandWriter(new ProcessBuilder(cmd));
-   *       
-   *   // implement Consumer&lt;MySensorData&gt;.accept()
-   *   public void accept(MySensorData data) {
-   *     // convert the data to a string and write it to the cmd
-   *     commandWriter.accept(convertMySensorData(data));
-   *   }
-   * }
-   * </code></pre>
-   * <P>
-   * The supplied {@link ProcessBuilder cmd} is used to start the command.
-   * Each call to {@link Consumer#accept(Object) accept(String)} writes a 
-   * UTF8 string to the command's {@link Process#getOutputStream() input}.
-   * Each write is followed by a flush.
-   * The command is restarted if a write encounters an error. 
-   * </P>
-   * <P>
-   * While each write is followed by a flush() that only helps to
-   * reduce the time it takes to notice that cmd has failed and restart it.
-   * Supposedly "successfully written and flushed" values are not guaranteed to
-   * have been received by a cmd across restarts.
-   * </P>
-   * 
-   * @param cmd the {@link ProcessBuilder} to start the command
-   * @return the {@code Consumer<String>} for the command
-   * 
-   * @see #sink(TStream, ProcessBuilder)
-   * @see #tokenize(String)
-   */
-  public static Consumer<String> commandWriter(ProcessBuilder cmd) {
-    return new CommandWriter(cmd, true);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/command/src/main/java/edgent/connectors/command/package-info.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/main/java/edgent/connectors/command/package-info.java b/connectors/command/src/main/java/edgent/connectors/command/package-info.java
deleted file mode 100644
index ee5f59a..0000000
--- a/connectors/command/src/main/java/edgent/connectors/command/package-info.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-/**
- * Command / OS Process connector.
- */
-package edgent.connectors.command;
-

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/command/src/main/java/edgent/connectors/command/runtime/CommandConnector.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/main/java/edgent/connectors/command/runtime/CommandConnector.java b/connectors/command/src/main/java/edgent/connectors/command/runtime/CommandConnector.java
deleted file mode 100644
index 36ca884..0000000
--- a/connectors/command/src/main/java/edgent/connectors/command/runtime/CommandConnector.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.connectors.command.runtime;
-
-import java.io.IOException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Base class for source / sink specific command connectors.
- * <P>
- * The lifetime of a CommandConnector is that of its Command's execution lifetime.
- * In the case of a "one shot" command (e.g., a periodicSource's cmd) the
- * lifetime may be brief - {@code restart==false}.
- * </P>
- * <P>
- * Many command connector uses will involve long running commands, sources
- * and sinks that want to be robust in the face of inadvertent command
- * termination/failures - (@code restart==true}.
- * </P>
- */
-abstract class CommandConnector implements AutoCloseable {
-  static final Logger logger = LoggerFactory.getLogger(CommandConnector.class);
-
-  private final ProcessBuilder cmd;
-  private final boolean restart;
-  private Process currentProcess;
-  private long numStarts;
-  private long lastStartTimestamp;
-  private final int restartDelayMsec = 1_000;
-  
-  
-  CommandConnector(ProcessBuilder cmd, boolean restart) {
-    this.cmd = cmd;
-    this.restart = restart;
-  }
-  
-  protected boolean canStart() {
-    return restart || numStarts==0;
-  }
-  
-  protected Process getCurrentProcess() {
-    return currentProcess;
-  }
-  
-  protected void start() throws InterruptedException {
-    if (!canStart())
-      throw new IllegalStateException();
-    closeProcess();
-    try {
-      numStarts++;
-      // ensure we don't thrash on continuous restarts
-      long now = System.currentTimeMillis();
-      if (now < lastStartTimestamp + restartDelayMsec) {
-        logger.info("Sleeping before restarting cmd {}", toCmdForMsg());
-        Thread.sleep(restartDelayMsec);
-        now = System.currentTimeMillis();
-      }
-      lastStartTimestamp = now;
-      
-      currentProcess = cmd.start();
-      
-      logger.debug("Started cmd {}", toCmdForMsg());
-    }
-    catch (IOException e) {
-      logger.error("Unable to start cmd {}", toCmdForMsg(), e);
-    }
-  }
-  
-  protected void closeProcess() {
-    if (currentProcess != null) {
-      currentProcess.destroy();
-      currentProcess = null;
-    }
-  }
-
-  @Override
-  public void close() {
-    closeProcess();
-  }
-  
-  String toCmdForMsg() {
-    return cmd.command().toString();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/command/src/main/java/edgent/connectors/command/runtime/CommandReader.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/main/java/edgent/connectors/command/runtime/CommandReader.java b/connectors/command/src/main/java/edgent/connectors/command/runtime/CommandReader.java
deleted file mode 100644
index cb26db9..0000000
--- a/connectors/command/src/main/java/edgent/connectors/command/runtime/CommandReader.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.connectors.command.runtime;
-
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import edgent.function.Supplier;
-
-/**
- * Create a {@code Supplier<Iterable<String>>} to ingest a command's output.
- * <P>
- * The supplied {@code cmd} is used to start the command
- * and restart it upon process termination/error if so configured.
- * </P>
- * <P>
- * The iterator returned by {@link Iterable#iterator()) returns
- * {@hasNext()==true} until a read from {@link Process#getOutputStream()}
- * returns EOF or an IOError.
- */
-public class CommandReader extends CommandConnector implements Supplier<Iterable<String>>, AutoCloseable {
-  private static final long serialVersionUID = 1L;
-  private Iterator<String> currentSupplierIterator;
-  
-  /**
-   * Create a supplier of UTF8 strings from a command's output.
-   * 
-   * @param cmd the {@link ProcessBuilder} to use to start the command
-   * @param restart when true, restart the command upon termination, EOF, or
-   * read error.
-   */
-  public CommandReader(ProcessBuilder cmd, boolean restart) {
-    super(cmd, restart);
-  }
-
-  protected void start() throws InterruptedException {
-    super.start();
-    currentSupplierIterator = new ProcessReader(getCurrentProcess()).get().iterator();
-  }
-  
-  protected void closeProcess() {
-    currentSupplierIterator = null;
-    super.closeProcess();
-  }
-  
-  @Override
-  public Iterable<String> get() {
-    return new Iterable<String>() {
-
-      @Override
-      public Iterator<String> iterator() {
-        return new Iterator<String>() {
-
-          @Override
-          public boolean hasNext() {
-            try {
-              for(;;) {
-                if (currentSupplierIterator != null) {
-                  boolean hasNext = currentSupplierIterator.hasNext();
-                  if (hasNext) {
-                    return true;
-                  }
-                  else {
-                    // no more from that process.  close and loop/retry.
-                    closeProcess();
-                  }
-                }
-                else if (currentSupplierIterator == null && canStart()) {
-                  start(); // and loop/retry
-                }
-                else {
-                  return false; // no more input
-                }
-              }
-            }
-            catch (InterruptedException e) {
-              return false;
-            }
-          }
-
-          @Override
-          public String next() {
-            if (currentSupplierIterator != null)
-              return currentSupplierIterator.next();
-            else
-              throw new NoSuchElementException();
-          }
-          
-        };
-      }
-      
-    };
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/command/src/main/java/edgent/connectors/command/runtime/CommandWriter.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/main/java/edgent/connectors/command/runtime/CommandWriter.java b/connectors/command/src/main/java/edgent/connectors/command/runtime/CommandWriter.java
deleted file mode 100644
index 42a9113..0000000
--- a/connectors/command/src/main/java/edgent/connectors/command/runtime/CommandWriter.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.connectors.command.runtime;
-
-import edgent.function.Consumer;
-
-/**
- * A {@code Consumer<String>>} to write data to a command's input.
- * <P>
- * The supplied {@code cmd} is used to start the command
- * and restart it upon process termination/error if so configured.
- * </P>
- */
-public class CommandWriter extends CommandConnector implements Consumer<String>, AutoCloseable {
-  private static final long serialVersionUID = 1L;
-  private ProcessWriter currentConsumer;
-  
-  /**
-   * Create a consumer to write UTF8 string data to a command's input.
-   * <P>
-   * Each write is followed by a flush() though that only helps to
-   * reduce the time it takes to notice that a cmd has failed.
-   * Supposedly "successfully written and flushed" values are not guaranteed to
-   * have been received by a cmd even following restart.
-   * </P>
-   * 
-   * @param cmd the builder to use to start the process
-   * @param restart true to restart the process upon termination or
-   * write error.
-   */
-  public CommandWriter(ProcessBuilder cmd, boolean restart) {
-    super(cmd, restart);
-  }
-
-  protected void start() throws InterruptedException {
-    super.start();
-    currentConsumer = new ProcessWriter(getCurrentProcess());
-  }
-  
-  protected void closeProcess() {
-    currentConsumer = null;
-    super.closeProcess();
-  }
-
-  @Override
-  public void accept(String value) {
-    for (;;) {
-      try {
-        if (currentConsumer != null) {
-          try {
-            currentConsumer.accept(value);
-            logger.trace("WROTE: {}", value);
-            return;
-          } 
-          catch (RuntimeException e) {
-            closeProcess(); // and loop/retry
-          }
-        }
-        else if (currentConsumer == null && canStart()) {
-          logger.debug("STARTING for: {}", value);
-          start();  // and loop/retry
-        }
-        else {
-          // not restartable. toss it on the floor
-          return;
-        }
-      }
-      catch (InterruptedException e) {
-        // toss it on the floor
-        return;
-      }
-    }
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/command/src/main/java/edgent/connectors/command/runtime/ProcessReader.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/main/java/edgent/connectors/command/runtime/ProcessReader.java b/connectors/command/src/main/java/edgent/connectors/command/runtime/ProcessReader.java
deleted file mode 100644
index c65a875..0000000
--- a/connectors/command/src/main/java/edgent/connectors/command/runtime/ProcessReader.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.connectors.command.runtime;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.nio.charset.StandardCharsets;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import edgent.function.Supplier;
-
-/**
- * A {@code Supplier<Iterable<String>>} for ingesting a process's output.
- * <P>
- * The iterator returned by {@link Iterable#iterator()) returns
- * {@hasNext()==true} until a read from {@link Process#getInputStream()}
- * returns EOF or an IOError.
- */
-class ProcessReader implements Supplier<Iterable<String>> {
-  private static final long serialVersionUID = 1L;
-  private final BufferedReader reader;
-  
-  /**
-   * Create a new supplier of UTF8 strings read from a process's
-   * {@link Process#getInputStream() output}.
-   * 
-   * @param process the process to read from.
-   */
-  ProcessReader(Process process) {
-    reader = new BufferedReader(new InputStreamReader(
-        process.getInputStream(), StandardCharsets.UTF_8));
-  }
-
-  @Override
-  public Iterable<String> get() {
-    return new Iterable<String>() {
-
-      @Override
-      public Iterator<String> iterator() {
-        return new Iterator<String>() {
-          private Boolean hasNext = null;
-          private String next = null;
-          
-          @Override
-          public boolean hasNext() {
-            if (hasNext != null)
-              return hasNext;
-            next = getNext();
-            hasNext = next != null;
-            return hasNext;
-          }
-
-          @Override
-          public String next() {
-            if (next == null)
-              throw new NoSuchElementException();
-            hasNext = null;
-            return next;
-          }
-          
-        };
-      }
-      
-    };
-  }
-
-  /**
-   * Get the next available line from the process's stdout
-   * @return null if no more input (or error)
-   */
-  private String getNext() {
-    try {
-      return reader.readLine();
-    } catch (IOException e) {
-      CommandConnector.logger.error("Unable to readline from cmd", e);
-      return null;
-    }
-  }
-  
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/command/src/main/java/edgent/connectors/command/runtime/ProcessWriter.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/main/java/edgent/connectors/command/runtime/ProcessWriter.java b/connectors/command/src/main/java/edgent/connectors/command/runtime/ProcessWriter.java
deleted file mode 100644
index f9176dc..0000000
--- a/connectors/command/src/main/java/edgent/connectors/command/runtime/ProcessWriter.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.connectors.command.runtime;
-
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.nio.charset.StandardCharsets;
-
-import edgent.function.Consumer;
-
-/**
- * A {@code Consumer<String>>} to receive data and write it to a process's input.
- * <P>
- * Each write is followed by a flush() though that only helps to
- * reduce the time it takes to notice that a cmd has failed.
- * Supposedly "successfully written and flushed" values are not guaranteed to
- * have been received by a cmd.
- */
-class ProcessWriter implements Consumer<String> {
-  private static final long serialVersionUID = 1L;
-  private final BufferedWriter writer;
-  
-  /**
-   * Create a new consumer for UTF8 strings to write to a process's
-   * {@link Process#getOutputStream() input}
-   * 
-   * @param process to process to write to.
-   */
-  ProcessWriter(Process process) {
-    writer = new BufferedWriter(new OutputStreamWriter(
-        process.getOutputStream(), StandardCharsets.UTF_8));
-  }
-
-  @Override
-  public void accept(String value) {
-    try {
-      // see class doc regarding guarantees.
-      writer.write(value);
-      writer.newLine();
-      writer.flush();
-    }
-    catch (IOException e) {
-      CommandConnector.logger.error("Unable to write to cmd", e);
-      // caller (CommandWriter) requires throw to detect failure and recover
-      throw new RuntimeException("Unable to write to cmd", e);
-    }
-  }
-  
-}
\ No newline at end of file