You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/07/21 13:17:35 UTC
[34/54] [abbrv] [partial] incubator-quarks git commit: add
"org.apache." prefix to edgent package names
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/apps/runtime/src/main/java/edgent/apps/runtime/JobMonitorApp.java
----------------------------------------------------------------------
diff --git a/apps/runtime/src/main/java/edgent/apps/runtime/JobMonitorApp.java b/apps/runtime/src/main/java/edgent/apps/runtime/JobMonitorApp.java
deleted file mode 100644
index efc2fee..0000000
--- a/apps/runtime/src/main/java/edgent/apps/runtime/JobMonitorApp.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.apps.runtime;
-
-import static edgent.topology.services.ApplicationService.SYSTEM_APP_PREFIX;
-
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.gson.JsonObject;
-
-import edgent.execution.DirectSubmitter;
-import edgent.execution.Job;
-import edgent.execution.Job.Action;
-import edgent.execution.mbeans.JobMXBean;
-import edgent.execution.services.ControlService;
-import edgent.execution.services.Controls;
-import edgent.execution.services.JobRegistryService;
-import edgent.execution.services.RuntimeServices;
-import edgent.function.Consumer;
-import edgent.function.Supplier;
-import edgent.runtime.jobregistry.JobEvents;
-import edgent.topology.TStream;
-import edgent.topology.Topology;
-import edgent.topology.TopologyProvider;
-import edgent.topology.mbeans.ApplicationServiceMXBean;
-import edgent.topology.services.ApplicationService;
-
-/**
- * Job monitoring application.
- * <P>
- * The application listens on JobRegistry events and resubmits jobs for which
- * an event has been emitted because the job is unhealthy. The monitored
- * applications must be registered with an {@code ApplicationService}
- * prior to submission, otherwise the monitor application cannot restart
- * them.
- * </P>
- * <P>
- * The monitoring application must be submitted within a context which
- * provides the following services:
- * </P>
- * <ul>
- * <li>ApplicationService - an {@code ApplicationServiceMXBean} control
- * registered by this service is used to resubmit failed applications.</li>
- * <li>ControlService - the application queries this service for an
- * {@code ApplicationServiceMXBean} control, which is then used for
- * restarting failed applications.</li>
- * <li>JobRegistryService - generates job monitoring events. </li>
- * </ul>
- */
-public class JobMonitorApp {
- /**
- * Job monitoring application name.
- */
- public static final String APP_NAME = SYSTEM_APP_PREFIX + "JobMonitorApp";
-
-
- private final TopologyProvider provider;
- private final DirectSubmitter<Topology, Job> submitter;
- private final Topology topology;
- private static final Logger logger = LoggerFactory.getLogger(JobMonitorApp.class);
-
- /**
- * Constructs a {@code JobMonitorApp} with the specified name in the
- * context of the specified provider.
- *
- * @param provider the topology provider
- * @param submitter a {@code DirectSubmitter} which provides required
- * services and submits the application
- * @param name the application name
- *
- * @throws IllegalArgumentException if the submitter does not provide
- * access to the required services
- */
- public JobMonitorApp(TopologyProvider provider,
- DirectSubmitter<Topology, Job> submitter, String name) {
-
- this.provider = provider;
- this.submitter = submitter;
- validateSubmitter();
- this.topology = declareTopology(name);
- }
-
- /**
- * Submits the application topology.
- *
- * @return the job.
- * @throws InterruptedException if the operation was interrupted
- * @throws ExecutionException on task execution exception
- */
- public Job submit() throws InterruptedException, ExecutionException {
- Future<Job> f = submitter.submit(topology);
- return f.get();
- }
-
- /**
- * Submits an application using an {@code ApplicationServiceMXBean} control
- * registered with the specified {@code ControlService}.
- *
- * @param applicationName the name of the application to submit
- * @param controlService the control service
- */
- public static void submitApplication(String applicationName, ControlService controlService) {
- try {
- ApplicationServiceMXBean control =
- controlService.getControl(
- ApplicationServiceMXBean.TYPE,
- ApplicationService.ALIAS,
- ApplicationServiceMXBean.class);
- if (control == null) {
- throw new IllegalStateException(
- "Could not find a registered control with the following interface: " +
- ApplicationServiceMXBean.class.getName());
- }
-// TODO add ability to submit with the initial application configuration
- logger.info("Restarting monitored application {}", applicationName);
- control.submit(applicationName, null);
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Closes a job using a {@code JobMXBean} control registered with the
- * specified {@code ControlService}.
- *
- * @param jobName the name of the job
- * @param controlService the control service
- */
- public static void closeJob(String jobName, ControlService controlService) {
- try {
- JobMXBean jobMbean = controlService.getControl(JobMXBean.TYPE, jobName, JobMXBean.class);
- if (jobMbean == null) {
- throw new IllegalStateException(
- "Could not find a registered control for job " + jobName +
- " with the following interface: " + JobMXBean.class.getName());
- }
- jobMbean.stateChange(Action.CLOSE);
- logger.debug("Closing job {}", jobName);
-
- // Wait for the job to complete
- long startWaiting = System.currentTimeMillis();
- for (long waitForMillis = Controls.JOB_HOLD_AFTER_CLOSE_SECS * 1000;
- waitForMillis < 0;
- waitForMillis -= 100) {
- if (jobMbean.getCurrentState() == Job.State.CLOSED)
- break;
- else
- Thread.sleep(100);
- }
- if (jobMbean.getCurrentState() != Job.State.CLOSED) {
- throw new IllegalStateException(
- "The unhealthy job " + jobName + " did not close after " +
- Controls.JOB_HOLD_AFTER_CLOSE_SECS + " seconds");
- }
- logger.debug("Job {} state is CLOSED after waiting for {} milliseconds",
- jobName, System.currentTimeMillis() - startWaiting);
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Declares the following topology:
- * <pre>
- * JobEvents source --> Filter (health == unhealthy) --> Restart application
- * </pre>
- *
- * @param name the topology name
- * @return the application topology
- */
- protected Topology declareTopology(String name) {
- Topology t = provider.newTopology(name);
-
- declareTopology(t);
-
- return t;
- }
-
- /**
- * Populates the following topology:
- * <pre>
- * JobEvents source --> Filter (health == unhealthy) --> Restart application
- * </pre>
- * @param t Topology
- *
- */
- public static void declareTopology(Topology t) {
- TStream<JsonObject> jobEvents = JobEvents.source(
- t,
- (evType, job) -> { return JobMonitorAppEvent.toJsonObject(evType, job); }
- );
-
- jobEvents = jobEvents.filter(
- value -> {
- logger.trace("Filter: {}", value);
-
- try {
- JsonObject job = JobMonitorAppEvent.getJob(value);
- return (Job.Health.UNHEALTHY.name().equals(
- JobMonitorAppEvent.getJobHealth(job)));
- } catch (IllegalArgumentException e) {
- logger.info("Invalid event filtered out, cause: {}", e.getMessage());
- return false;
- }
- });
-
- jobEvents.sink(new JobRestarter(t.getRuntimeServiceSupplier()));
- }
-
- /**
- * A {@code Consumer} which restarts the application specified by a
- * JSON object passed to its {@code accept} function.
- */
- private static class JobRestarter implements Consumer<JsonObject> {
- private static final long serialVersionUID = 1L;
- private final Supplier<RuntimeServices> rts;
-
- JobRestarter(Supplier<RuntimeServices> rts) {
- this.rts = rts;
- }
-
- @Override
- public void accept(JsonObject value) {
- ControlService controlService = rts.get().getService(ControlService.class);
- JsonObject job = JobMonitorAppEvent.getJob(value);
- String applicationName = JobMonitorAppEvent.getJobName(job);
-
- closeJob(applicationName, controlService);
- submitApplication(applicationName, controlService);
- }
- }
-
- private void validateSubmitter() {
- ControlService controlService = submitter.getServices().getService(ControlService.class);
- if (controlService == null) {
- throw new IllegalArgumentException("Could not access service " + ControlService.class.getName());
- }
-
- ApplicationService appService = submitter.getServices().getService(ApplicationService.class);
- if (appService == null) {
- throw new IllegalArgumentException("Could not access service " + ApplicationService.class.getName());
- }
-
- JobRegistryService jobRegistryService = submitter.getServices().getService(JobRegistryService.class);
- if (jobRegistryService == null) {
- throw new IllegalArgumentException("Could not access service " + JobRegistryService.class.getName());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/apps/runtime/src/main/java/edgent/apps/runtime/JobMonitorAppEvent.java
----------------------------------------------------------------------
diff --git a/apps/runtime/src/main/java/edgent/apps/runtime/JobMonitorAppEvent.java b/apps/runtime/src/main/java/edgent/apps/runtime/JobMonitorAppEvent.java
deleted file mode 100644
index 2abbb6a..0000000
--- a/apps/runtime/src/main/java/edgent/apps/runtime/JobMonitorAppEvent.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.apps.runtime;
-
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-
-import edgent.execution.Job;
-import edgent.execution.services.JobRegistryService;
-
-/**
- * Helpers for parsing generating and parsing a JSON representation of job
- * monitoring events.
- */
-class JobMonitorAppEvent {
-
- /**
- * Creates a JsonObject wrapping a {@code JobRegistryService} event type
- * and Job info.
- *
- * @param evType the event type
- * @param job the job
- * @return the wrapped data
- */
- static JsonObject toJsonObject(JobRegistryService.EventType evType, Job job) {
- JsonObject value = new JsonObject();
- value.addProperty("time", (Number)System.currentTimeMillis());
- value.addProperty("event", evType.toString());
- JsonObject obj = new JsonObject();
- obj.addProperty("id", job.getId());
- obj.addProperty("name", job.getName());
- obj.addProperty("state", job.getCurrentState().toString());
- obj.addProperty("nextState", job.getNextState().toString());
- obj.addProperty("health", job.getHealth().toString());
- obj.addProperty("lastError", job.getLastError());
- value.add("job", obj);
- return value;
- }
-
- /**
- * Gets the {@code job} JsonObject from the given JSON value.
- *
- * @param value a JSON object
- * @return the job JsonObject
- */
- static JsonObject getJob(JsonObject value) {
- JsonObject job = value.getAsJsonObject("job");
- if (job == null) {
- throw new IllegalArgumentException("Could not find the job object in: " + value);
- }
- return job;
- }
-
- /**
- * Gets the {@code name} string property from the given job JSON object.
- *
- * @param job the job JSON object
- * @return the job name
- * @throws IllegalArgumentException if it could not find the property
- */
- static String getJobName(JsonObject job) {
- return getProperty(job, "name");
- }
-
- /**
- * Gets the {@code health} string property from the given job JSON object.
- *
- * @param job the job JSON object
- * @return the job health
- * @throws IllegalArgumentException if it could not find the property
- */
- static String getJobHealth(JsonObject job) {
- return getProperty(job, "health");
- }
-
- /**
- * Gets a string property with the specified name from the given JSON
- * object.
- *
- * @param value a JSON object
- * @param name the property name
- * @return the property value
- *
- * @throws IllegalArgumentException if could not find a property with the
- * given name
- */
- static String getProperty(JsonObject value, String name) {
- JsonElement e = value.get(name);
- if (e != null && e.isJsonPrimitive()) {
- try {
- return e.getAsString();
- } catch (Exception ex) {
- }
- }
- throw new IllegalArgumentException("Could not find the " + name + " property in: " + value);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/apps/runtime/src/main/java/edgent/apps/runtime/package-info.java
----------------------------------------------------------------------
diff --git a/apps/runtime/src/main/java/edgent/apps/runtime/package-info.java b/apps/runtime/src/main/java/edgent/apps/runtime/package-info.java
deleted file mode 100644
index 0d80435..0000000
--- a/apps/runtime/src/main/java/edgent/apps/runtime/package-info.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-/**
- * Applications which provide monitoring and failure recovery to other
- * Edgent applications.
- */
-package edgent.apps.runtime;
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/apps/runtime/src/main/java/org/apache/edgent/apps/runtime/JobMonitorApp.java
----------------------------------------------------------------------
diff --git a/apps/runtime/src/main/java/org/apache/edgent/apps/runtime/JobMonitorApp.java b/apps/runtime/src/main/java/org/apache/edgent/apps/runtime/JobMonitorApp.java
new file mode 100644
index 0000000..883b023
--- /dev/null
+++ b/apps/runtime/src/main/java/org/apache/edgent/apps/runtime/JobMonitorApp.java
@@ -0,0 +1,270 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.apps.runtime;
+
+import static org.apache.edgent.topology.services.ApplicationService.SYSTEM_APP_PREFIX;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.edgent.execution.DirectSubmitter;
+import org.apache.edgent.execution.Job;
+import org.apache.edgent.execution.Job.Action;
+import org.apache.edgent.execution.mbeans.JobMXBean;
+import org.apache.edgent.execution.services.ControlService;
+import org.apache.edgent.execution.services.Controls;
+import org.apache.edgent.execution.services.JobRegistryService;
+import org.apache.edgent.execution.services.RuntimeServices;
+import org.apache.edgent.function.Consumer;
+import org.apache.edgent.function.Supplier;
+import org.apache.edgent.runtime.jobregistry.JobEvents;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+import org.apache.edgent.topology.TopologyProvider;
+import org.apache.edgent.topology.mbeans.ApplicationServiceMXBean;
+import org.apache.edgent.topology.services.ApplicationService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.JsonObject;
+
+/**
+ * Job monitoring application.
+ * <P>
+ * The application listens on JobRegistry events and resubmits jobs for which
+ * an event has been emitted because the job is unhealthy. The monitored
+ * applications must be registered with an {@code ApplicationService}
+ * prior to submission, otherwise the monitor application cannot restart
+ * them.
+ * </P>
+ * <P>
+ * The monitoring application must be submitted within a context which
+ * provides the following services:
+ * </P>
+ * <ul>
+ * <li>ApplicationService - an {@code ApplicationServiceMXBean} control
+ * registered by this service is used to resubmit failed applications.</li>
+ * <li>ControlService - the application queries this service for an
+ * {@code ApplicationServiceMXBean} control, which is then used for
+ * restarting failed applications.</li>
+ * <li>JobRegistryService - generates job monitoring events. </li>
+ * </ul>
+ */
+public class JobMonitorApp {
+ /**
+ * Job monitoring application name.
+ */
+ public static final String APP_NAME = SYSTEM_APP_PREFIX + "JobMonitorApp";
+
+
+ private final TopologyProvider provider;
+ private final DirectSubmitter<Topology, Job> submitter;
+ private final Topology topology;
+ private static final Logger logger = LoggerFactory.getLogger(JobMonitorApp.class);
+
+ /**
+ * Constructs a {@code JobMonitorApp} with the specified name in the
+ * context of the specified provider.
+ *
+ * @param provider the topology provider
+ * @param submitter a {@code DirectSubmitter} which provides required
+ * services and submits the application
+ * @param name the application name
+ *
+ * @throws IllegalArgumentException if the submitter does not provide
+ * access to the required services
+ */
+ public JobMonitorApp(TopologyProvider provider,
+ DirectSubmitter<Topology, Job> submitter, String name) {
+
+ this.provider = provider;
+ this.submitter = submitter;
+ validateSubmitter();
+ this.topology = declareTopology(name);
+ }
+
+ /**
+ * Submits the application topology.
+ *
+ * @return the job.
+ * @throws InterruptedException if the operation was interrupted
+ * @throws ExecutionException on task execution exception
+ */
+ public Job submit() throws InterruptedException, ExecutionException {
+ Future<Job> f = submitter.submit(topology);
+ return f.get();
+ }
+
+ /**
+ * Submits an application using an {@code ApplicationServiceMXBean} control
+ * registered with the specified {@code ControlService}.
+ *
+ * @param applicationName the name of the application to submit
+ * @param controlService the control service
+ */
+ public static void submitApplication(String applicationName, ControlService controlService) {
+ try {
+ ApplicationServiceMXBean control =
+ controlService.getControl(
+ ApplicationServiceMXBean.TYPE,
+ ApplicationService.ALIAS,
+ ApplicationServiceMXBean.class);
+ if (control == null) {
+ throw new IllegalStateException(
+ "Could not find a registered control with the following interface: " +
+ ApplicationServiceMXBean.class.getName());
+ }
+// TODO add ability to submit with the initial application configuration
+ logger.info("Restarting monitored application {}", applicationName);
+ control.submit(applicationName, null);
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Closes a job using a {@code JobMXBean} control registered with the
+ * specified {@code ControlService}.
+ *
+ * @param jobName the name of the job
+ * @param controlService the control service
+ */
+ public static void closeJob(String jobName, ControlService controlService) {
+ try {
+ JobMXBean jobMbean = controlService.getControl(JobMXBean.TYPE, jobName, JobMXBean.class);
+ if (jobMbean == null) {
+ throw new IllegalStateException(
+ "Could not find a registered control for job " + jobName +
+ " with the following interface: " + JobMXBean.class.getName());
+ }
+ jobMbean.stateChange(Action.CLOSE);
+ logger.debug("Closing job {}", jobName);
+
+ // Wait for the job to complete
+ long startWaiting = System.currentTimeMillis();
+ for (long waitForMillis = Controls.JOB_HOLD_AFTER_CLOSE_SECS * 1000;
+ waitForMillis < 0;
+ waitForMillis -= 100) {
+ if (jobMbean.getCurrentState() == Job.State.CLOSED)
+ break;
+ else
+ Thread.sleep(100);
+ }
+ if (jobMbean.getCurrentState() != Job.State.CLOSED) {
+ throw new IllegalStateException(
+ "The unhealthy job " + jobName + " did not close after " +
+ Controls.JOB_HOLD_AFTER_CLOSE_SECS + " seconds");
+ }
+ logger.debug("Job {} state is CLOSED after waiting for {} milliseconds",
+ jobName, System.currentTimeMillis() - startWaiting);
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Declares the following topology:
+ * <pre>
+ * JobEvents source --> Filter (health == unhealthy) --> Restart application
+ * </pre>
+ *
+ * @param name the topology name
+ * @return the application topology
+ */
+ protected Topology declareTopology(String name) {
+ Topology t = provider.newTopology(name);
+
+ declareTopology(t);
+
+ return t;
+ }
+
+ /**
+ * Populates the following topology:
+ * <pre>
+ * JobEvents source --> Filter (health == unhealthy) --> Restart application
+ * </pre>
+ * @param t Topology
+ *
+ */
+ public static void declareTopology(Topology t) {
+ TStream<JsonObject> jobEvents = JobEvents.source(
+ t,
+ (evType, job) -> { return JobMonitorAppEvent.toJsonObject(evType, job); }
+ );
+
+ jobEvents = jobEvents.filter(
+ value -> {
+ logger.trace("Filter: {}", value);
+
+ try {
+ JsonObject job = JobMonitorAppEvent.getJob(value);
+ return (Job.Health.UNHEALTHY.name().equals(
+ JobMonitorAppEvent.getJobHealth(job)));
+ } catch (IllegalArgumentException e) {
+ logger.info("Invalid event filtered out, cause: {}", e.getMessage());
+ return false;
+ }
+ });
+
+ jobEvents.sink(new JobRestarter(t.getRuntimeServiceSupplier()));
+ }
+
+ /**
+ * A {@code Consumer} which restarts the application specified by a
+ * JSON object passed to its {@code accept} function.
+ */
+ private static class JobRestarter implements Consumer<JsonObject> {
+ private static final long serialVersionUID = 1L;
+ private final Supplier<RuntimeServices> rts;
+
+ JobRestarter(Supplier<RuntimeServices> rts) {
+ this.rts = rts;
+ }
+
+ @Override
+ public void accept(JsonObject value) {
+ ControlService controlService = rts.get().getService(ControlService.class);
+ JsonObject job = JobMonitorAppEvent.getJob(value);
+ String applicationName = JobMonitorAppEvent.getJobName(job);
+
+ closeJob(applicationName, controlService);
+ submitApplication(applicationName, controlService);
+ }
+ }
+
+ private void validateSubmitter() {
+ ControlService controlService = submitter.getServices().getService(ControlService.class);
+ if (controlService == null) {
+ throw new IllegalArgumentException("Could not access service " + ControlService.class.getName());
+ }
+
+ ApplicationService appService = submitter.getServices().getService(ApplicationService.class);
+ if (appService == null) {
+ throw new IllegalArgumentException("Could not access service " + ApplicationService.class.getName());
+ }
+
+ JobRegistryService jobRegistryService = submitter.getServices().getService(JobRegistryService.class);
+ if (jobRegistryService == null) {
+ throw new IllegalArgumentException("Could not access service " + JobRegistryService.class.getName());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/apps/runtime/src/main/java/org/apache/edgent/apps/runtime/JobMonitorAppEvent.java
----------------------------------------------------------------------
diff --git a/apps/runtime/src/main/java/org/apache/edgent/apps/runtime/JobMonitorAppEvent.java b/apps/runtime/src/main/java/org/apache/edgent/apps/runtime/JobMonitorAppEvent.java
new file mode 100644
index 0000000..4b8c9b3
--- /dev/null
+++ b/apps/runtime/src/main/java/org/apache/edgent/apps/runtime/JobMonitorAppEvent.java
@@ -0,0 +1,113 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.apps.runtime;
+
+import org.apache.edgent.execution.Job;
+import org.apache.edgent.execution.services.JobRegistryService;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
+/**
+ * Helpers for parsing generating and parsing a JSON representation of job
+ * monitoring events.
+ */
+class JobMonitorAppEvent {
+
+ /**
+ * Creates a JsonObject wrapping a {@code JobRegistryService} event type
+ * and Job info.
+ *
+ * @param evType the event type
+ * @param job the job
+ * @return the wrapped data
+ */
+ static JsonObject toJsonObject(JobRegistryService.EventType evType, Job job) {
+ JsonObject value = new JsonObject();
+ value.addProperty("time", (Number)System.currentTimeMillis());
+ value.addProperty("event", evType.toString());
+ JsonObject obj = new JsonObject();
+ obj.addProperty("id", job.getId());
+ obj.addProperty("name", job.getName());
+ obj.addProperty("state", job.getCurrentState().toString());
+ obj.addProperty("nextState", job.getNextState().toString());
+ obj.addProperty("health", job.getHealth().toString());
+ obj.addProperty("lastError", job.getLastError());
+ value.add("job", obj);
+ return value;
+ }
+
+ /**
+ * Gets the {@code job} JsonObject from the given JSON value.
+ *
+ * @param value a JSON object
+ * @return the job JsonObject
+ */
+ static JsonObject getJob(JsonObject value) {
+ JsonObject job = value.getAsJsonObject("job");
+ if (job == null) {
+ throw new IllegalArgumentException("Could not find the job object in: " + value);
+ }
+ return job;
+ }
+
+ /**
+ * Gets the {@code name} string property from the given job JSON object.
+ *
+ * @param job the job JSON object
+ * @return the job name
+ * @throws IllegalArgumentException if it could not find the property
+ */
+ static String getJobName(JsonObject job) {
+ return getProperty(job, "name");
+ }
+
+ /**
+ * Gets the {@code health} string property from the given job JSON object.
+ *
+ * @param job the job JSON object
+ * @return the job health
+ * @throws IllegalArgumentException if it could not find the property
+ */
+ static String getJobHealth(JsonObject job) {
+ return getProperty(job, "health");
+ }
+
+ /**
+ * Gets a string property with the specified name from the given JSON
+ * object.
+ *
+ * @param value a JSON object
+ * @param name the property name
+ * @return the property value
+ *
+ * @throws IllegalArgumentException if could not find a property with the
+ * given name
+ */
+ static String getProperty(JsonObject value, String name) {
+ JsonElement e = value.get(name);
+ if (e != null && e.isJsonPrimitive()) {
+ try {
+ return e.getAsString();
+ } catch (Exception ex) {
+ }
+ }
+ throw new IllegalArgumentException("Could not find the " + name + " property in: " + value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/apps/runtime/src/main/java/org/apache/edgent/apps/runtime/package-info.java
----------------------------------------------------------------------
diff --git a/apps/runtime/src/main/java/org/apache/edgent/apps/runtime/package-info.java b/apps/runtime/src/main/java/org/apache/edgent/apps/runtime/package-info.java
new file mode 100644
index 0000000..44575df
--- /dev/null
+++ b/apps/runtime/src/main/java/org/apache/edgent/apps/runtime/package-info.java
@@ -0,0 +1,24 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+/**
+ * Applications which provide monitoring and failure recovery to other
+ * Edgent applications.
+ */
+package org.apache.edgent.apps.runtime;
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/apps/runtime/src/test/java/edgent/test/apps/runtime/JobMonitorAppTest.java
----------------------------------------------------------------------
diff --git a/apps/runtime/src/test/java/edgent/test/apps/runtime/JobMonitorAppTest.java b/apps/runtime/src/test/java/edgent/test/apps/runtime/JobMonitorAppTest.java
deleted file mode 100644
index fea4378..0000000
--- a/apps/runtime/src/test/java/edgent/test/apps/runtime/JobMonitorAppTest.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.apps.runtime;
-
-import static org.junit.Assert.assertTrue;
-
-import java.util.Hashtable;
-import java.util.Random;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.junit.Test;
-
-import edgent.apps.runtime.JobMonitorApp;
-import edgent.execution.DirectSubmitter;
-import edgent.execution.Job;
-import edgent.execution.services.ControlService;
-import edgent.execution.services.ServiceContainer;
-import edgent.providers.direct.DirectProvider;
-import edgent.runtime.appservice.AppService;
-import edgent.runtime.jmxcontrol.JMXControlService;
-import edgent.runtime.jobregistry.JobRegistry;
-import edgent.topology.TStream;
-import edgent.topology.Topology;
-import edgent.topology.services.ApplicationService;
-
-public class JobMonitorAppTest {
-
- public final static String MONITORED_APP_NAME_1 = "MonitoredApp_1";
- public final static String MONITORED_APP_NAME_2 = "MonitoredApp_2";
-
- @Test
- public void testJobMonitorApp() throws Exception {
- DirectProvider provider = new DirectProvider();
- startProvider(provider);
-
- // Start monitor app
- JobMonitorApp app = new JobMonitorApp(provider, provider, JobMonitorApp.APP_NAME);
- Job monitor = app.submit();
-
- // Declare and register user apps which need monitoring
- registerMonitoredApplicationOne(provider);
- registerMonitoredApplicationTwo(provider);
-
- // Start monitored apps
- startMonitoredApplications(provider);
-
- // Run for a while, assert the monitor app is still running healthy
- Thread.sleep(5000);
- assertTrue(
- Job.State.RUNNING.equals(monitor.getCurrentState()) &&
- Job.State.RUNNING.equals(monitor.getNextState()) &&
- Job.Health.HEALTHY.equals(monitor.getHealth()));
- }
-
- static void startProvider(DirectProvider provider)
- throws InterruptedException, ExecutionException {
-
- provider.getServices().addService(ControlService.class,
- new JMXControlService("edgent.test.apps.runtime", new Hashtable<>()));
- AppService.createAndRegister(provider, provider);
- JobRegistry.createAndRegister(provider.getServices());
- }
-
- /**
- * Fails every 2 seconds (20 tuples * 100 millis)
- */
- static void registerMonitoredApplicationOne(DirectSubmitter<Topology, Job> submitter) {
- ApplicationService appService = submitter.getServices().getService(ApplicationService.class);
- appService.registerTopology(MONITORED_APP_NAME_1, (topology, config) -> {
-
- Random r = new Random();
- TStream<Double> d = topology.poll(() -> r.nextGaussian(), 100, TimeUnit.MILLISECONDS);
-
- final AtomicInteger count = new AtomicInteger(0);
- d = d.filter(tuple -> {
- int tupleCount = count.incrementAndGet();
- if (tupleCount == 20) {
- throw new IllegalStateException("Injected error");
- }
- return true;
- });
-
- d.sink(tuple -> System.out.print("."));
- });
- }
-
- /**
- * Fails every 1.5 seconds (10 tuples * 150 millis)
- */
- static void registerMonitoredApplicationTwo(DirectSubmitter<Topology, Job> submitter) {
- ApplicationService appService = submitter.getServices().getService(ApplicationService.class);
- appService.registerTopology(MONITORED_APP_NAME_2, (topology, config) -> {
-
- Random r = new Random();
- TStream<Double> d = topology.poll(() -> r.nextGaussian(), 150, TimeUnit.MILLISECONDS);
-
- final AtomicInteger count = new AtomicInteger(0);
- d = d.filter(tuple -> {
- int tupleCount = count.incrementAndGet();
- if (tupleCount == 10) {
- throw new IllegalStateException("Injected error");
- }
- return true;
- });
-
- d.sink(tuple -> System.out.print("#"));
- });
- }
-
- static void startMonitoredApplications(DirectSubmitter<Topology, Job> submitter) {
- ServiceContainer services = submitter.getServices();
- ApplicationService appService = services.getService(ApplicationService.class);
- ControlService controlService = services.getService(ControlService.class);
-
- // Submit all applications registered with the ApplicationService
- for (String name: appService.getApplicationNames()) {
- JobMonitorApp.submitApplication(name, controlService);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/apps/runtime/src/test/java/org/apache/edgent/test/apps/runtime/JobMonitorAppTest.java
----------------------------------------------------------------------
diff --git a/apps/runtime/src/test/java/org/apache/edgent/test/apps/runtime/JobMonitorAppTest.java b/apps/runtime/src/test/java/org/apache/edgent/test/apps/runtime/JobMonitorAppTest.java
new file mode 100644
index 0000000..51b129c
--- /dev/null
+++ b/apps/runtime/src/test/java/org/apache/edgent/test/apps/runtime/JobMonitorAppTest.java
@@ -0,0 +1,137 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.apps.runtime;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.Hashtable;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.edgent.apps.runtime.JobMonitorApp;
+import org.apache.edgent.execution.DirectSubmitter;
+import org.apache.edgent.execution.Job;
+import org.apache.edgent.execution.services.ControlService;
+import org.apache.edgent.execution.services.ServiceContainer;
+import org.apache.edgent.providers.direct.DirectProvider;
+import org.apache.edgent.runtime.appservice.AppService;
+import org.apache.edgent.runtime.jmxcontrol.JMXControlService;
+import org.apache.edgent.runtime.jobregistry.JobRegistry;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+import org.apache.edgent.topology.services.ApplicationService;
+import org.junit.Test;
+
+public class JobMonitorAppTest {
+
+ public final static String MONITORED_APP_NAME_1 = "MonitoredApp_1";
+ public final static String MONITORED_APP_NAME_2 = "MonitoredApp_2";
+
+ @Test
+ public void testJobMonitorApp() throws Exception {
+ DirectProvider provider = new DirectProvider();
+ startProvider(provider);
+
+ // Start monitor app
+ JobMonitorApp app = new JobMonitorApp(provider, provider, JobMonitorApp.APP_NAME);
+ Job monitor = app.submit();
+
+ // Declare and register user apps which need monitoring
+ registerMonitoredApplicationOne(provider);
+ registerMonitoredApplicationTwo(provider);
+
+ // Start monitored apps
+ startMonitoredApplications(provider);
+
+ // Run for a while, assert the monitor app is still running healthy
+ Thread.sleep(5000);
+ assertTrue(
+ Job.State.RUNNING.equals(monitor.getCurrentState()) &&
+ Job.State.RUNNING.equals(monitor.getNextState()) &&
+ Job.Health.HEALTHY.equals(monitor.getHealth()));
+ }
+
+ static void startProvider(DirectProvider provider)
+ throws InterruptedException, ExecutionException {
+
+ provider.getServices().addService(ControlService.class,
+ new JMXControlService("org.apache.edgent.test.apps.runtime", new Hashtable<>()));
+ AppService.createAndRegister(provider, provider);
+ JobRegistry.createAndRegister(provider.getServices());
+ }
+
+ /**
+ * Fails every 2 seconds (20 tuples * 100 millis)
+ */
+ static void registerMonitoredApplicationOne(DirectSubmitter<Topology, Job> submitter) {
+ ApplicationService appService = submitter.getServices().getService(ApplicationService.class);
+ appService.registerTopology(MONITORED_APP_NAME_1, (topology, config) -> {
+
+ Random r = new Random();
+ TStream<Double> d = topology.poll(() -> r.nextGaussian(), 100, TimeUnit.MILLISECONDS);
+
+ final AtomicInteger count = new AtomicInteger(0);
+ d = d.filter(tuple -> {
+ int tupleCount = count.incrementAndGet();
+ if (tupleCount == 20) {
+ throw new IllegalStateException("Injected error");
+ }
+ return true;
+ });
+
+ d.sink(tuple -> System.out.print("."));
+ });
+ }
+
+ /**
+ * Fails every 1.5 seconds (10 tuples * 150 millis)
+ */
+ static void registerMonitoredApplicationTwo(DirectSubmitter<Topology, Job> submitter) {
+ ApplicationService appService = submitter.getServices().getService(ApplicationService.class);
+ appService.registerTopology(MONITORED_APP_NAME_2, (topology, config) -> {
+
+ Random r = new Random();
+ TStream<Double> d = topology.poll(() -> r.nextGaussian(), 150, TimeUnit.MILLISECONDS);
+
+ final AtomicInteger count = new AtomicInteger(0);
+ d = d.filter(tuple -> {
+ int tupleCount = count.incrementAndGet();
+ if (tupleCount == 10) {
+ throw new IllegalStateException("Injected error");
+ }
+ return true;
+ });
+
+ d.sink(tuple -> System.out.print("#"));
+ });
+ }
+
+ static void startMonitoredApplications(DirectSubmitter<Topology, Job> submitter) {
+ ServiceContainer services = submitter.getServices();
+ ApplicationService appService = services.getService(ApplicationService.class);
+ ControlService controlService = services.getService(ControlService.class);
+
+ // Submit all applications registered with the ApplicationService
+ for (String name: appService.getApplicationNames()) {
+ JobMonitorApp.submitApplication(name, controlService);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 694771f..0c383ef 100644
--- a/build.gradle
+++ b/build.gradle
@@ -46,7 +46,6 @@ subprojects {
plugins.apply 'java'
plugins.apply 'jacoco'
- if(["javax.websocket-client", "javax.websocket-server", "edgent.javax.websocket"].contains(project.name)) {
archivesBaseName = "${project.name}"
} else{
archivesBaseName = "${rootProject.name}${project.path.replace(':', '.')}"
@@ -175,14 +174,14 @@ task aggregateJavadoc(type: Javadoc) {
overview "edgent_overview.html"
windowTitle "Edgent v${build_version}"
- group("Edgent API", "edgent.execution", "edgent.function", "edgent.topology", "edgent.topology.json", "edgent.topology.mbeans", "edgent.topology.plumbing", "edgent.topology.services", "edgent.execution.*")
- group("Edgent Providers", "edgent.providers.*")
- group("Edgent Connectors", "edgent.connectors.*")
- group("Edgent Samples", "edgent.samples.*")
- group("Edgent Analytics", "edgent.analytics.*")
- group("Edgent Utilities", "edgent.metrics", "edgent.metrics.*", "edgent.streamscope", "edgent.streamscope.*")
- group("Edgent Low-Level API", "edgent.graph", "edgent.graph.*", "edgent.oplet", "edgent.oplet.*", "edgent.window")
- group("Edgent SPI", "edgent.topology.spi", "edgent.topology.spi.*")
+ group("Edgent API", "org.apache.edgent.execution", "org.apache.edgent.function", "org.apache.edgent.topology", "org.apache.edgent.topology.json", "org.apache.edgent.topology.mbeans", "org.apache.edgent.topology.plumbing", "org.apache.edgent.topology.services", "org.apache.edgent.execution.*")
+ group("Edgent Providers", "org.apache.edgent.providers.*")
+ group("Edgent Connectors", "org.apache.edgent.connectors.*")
+ group("Edgent Samples", "org.apache.edgent.samples.*")
+ group("Edgent Analytics", "org.apache.edgent.analytics.*")
+ group("Edgent Utilities", "org.apache.edgent.metrics", "org.apache.edgent.metrics.*", "org.apache.edgent.streamscope", "org.apache.edgent.streamscope.*")
+ group("Edgent Low-Level API", "org.apache.edgent.graph", "org.apache.edgent.graph.*", "org.apache.edgent.oplet", "org.apache.edgent.oplet.*", "org.apache.edgent.window")
+ group("Edgent SPI", "org.apache.edgent.topology.spi", "org.apache.edgent.topology.spi.*")
}
source subprojects.collect { project -> project.sourceSets.main.allJava }
classpath = files(subprojects.collect
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index d98ae50..ae57319 100644
--- a/build.xml
+++ b/build.xml
@@ -300,7 +300,7 @@
additionalparam="-Xdoclint:none"
Overview="edgent_overview.html"
failonwarning="true"
- excludepackagenames="edgent.connectors.runtime,edgent.connectors.*.runtime,edgent.console.*,edgent.samples.scenarios.iotf.range.sensor"
+ excludepackagenames="org.apache.edgent.connectors.runtime,org.apache.edgent.connectors.*.runtime,org.apache.edgent.console.*,org.apache.edgent.samples.scenarios.iotf.range.sensor"
>
<sourcepath>
<dirset dir="${basedir}">
@@ -323,14 +323,14 @@
<doctitle>Apache Edgent (incubating) v${edgent.version}</doctitle>
<footer><![CDATA[<a href="http://edgent.incubator.apache.org">Apache Edgent (incubating)</a>]]></footer>
<bottom>Copyright &copy; 2016 The Apache Software Foundation. All Rights Reserved - ${commithash}-${DSTAMP}-${TSTAMP}</bottom>
- <group title="Edgent API" packages="edgent.execution,edgent.function,edgent.topology,edgent.topology.json,edgent.topology.mbeans,edgent.topology.plumbing,edgent.topology.services,edgent.execution.*"/>
- <group title="Edgent Providers" packages="edgent.providers.*"/>
- <group title="Edgent Connectors" packages="edgent.connectors.*"/>
- <group title="Edgent Samples" packages="edgent.samples.*"/>
- <group title="Edgent Analytics" packages="edgent.analytics.*"/>
- <group title="Edgent Utilities" packages="edgent.metrics,edgent.metrics.*,edgent.streamscope,edgent.streamscope.*"/>
- <group title="Edgent Low-Level API" packages="edgent.graph,edgent.graph.*,edgent.oplet,edgent.oplet.*,edgent.window"/>
- <group title="Edgent SPI" packages="edgent.topology.spi,edgent.topology.spi.*"/>
+ <group title="Edgent API" packages="org.apache.edgent.execution,org.apache.edgent.function,org.apache.edgent.topology,org.apache.edgent.topology.json,org.apache.edgent.topology.mbeans,org.apache.edgent.topology.plumbing,org.apache.edgent.topology.services,org.apache.edgent.execution.*"/>
+ <group title="Edgent Providers" packages="org.apache.edgent.providers.*"/>
+ <group title="Edgent Connectors" packages="org.apache.edgent.connectors.*"/>
+ <group title="Edgent Samples" packages="org.apache.edgent.samples.*"/>
+ <group title="Edgent Analytics" packages="org.apache.edgent.analytics.*"/>
+ <group title="Edgent Utilities" packages="org.apache.edgent.metrics,edgent.metrics.*,org.apache.edgent.streamscope,org.apache.edgent.streamscope.*"/>
+ <group title="Edgent Low-Level API" packages="org.apache.edgent.graph,edgent.graph.*,org.apache.edgent.oplet,edgent.oplet.*,org.apache.edgent.window"/>
+ <group title="Edgent SPI" packages="org.apache.edgent.topology.spi,org.apache.edgent.topology.spi.*"/>
</javadoc>
</target>
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/command/src/main/java/edgent/connectors/command/CommandStreams.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/main/java/edgent/connectors/command/CommandStreams.java b/connectors/command/src/main/java/edgent/connectors/command/CommandStreams.java
deleted file mode 100644
index 817286c..0000000
--- a/connectors/command/src/main/java/edgent/connectors/command/CommandStreams.java
+++ /dev/null
@@ -1,349 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.connectors.command;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.StringTokenizer;
-import java.util.concurrent.TimeUnit;
-
-import edgent.connectors.command.runtime.CommandReader;
-import edgent.connectors.command.runtime.CommandWriter;
-import edgent.function.Consumer;
-import edgent.function.Supplier;
-import edgent.topology.TSink;
-import edgent.topology.TStream;
-import edgent.topology.Topology;
-
-/**
- * Connector for creating a TStream from a Command's / OS Process's output
- * and sinking a TStream to a Command's / OS Process's input.
- * <P>
- * e.g., run a network monitor command (like Tiger Shark) and ingest its output.
- */
-public class CommandStreams {
- private CommandStreams() {}
-
- /**
- * Tokenize the specified {@code cmdString} in the exact same manner as
- * done by {@link Runtime#exec(String)}.
- * <P>
- * This function provides a convenience for creating a {@link ProcessBuilder}
- * for use by the other CommandStreams methods.
- * </P>
- * <P>
- * Sample use:
- * <pre>{@code
- * ProcessBuilder cmd = new ProcessBuilder(tokenize("sh someShellCmd.sh and args"));
- * TStream<String> stream = CommandStreams.generate(topology, cmd);
- * }</pre>
- *
- * @param cmdString a command string
- * @return the tokens
- */
- public static List<String> tokenize(String cmdString) {
- List<String> command = new ArrayList<>();
- StringTokenizer tok = new StringTokenizer(cmdString);
- while (tok.hasMoreTokens())
- command.add(tok.nextToken());
- return command;
- }
-
- /**
- * Create an endless {@code TStream<String>} from a long running command's output.
- * <P>
- * The supplied {@code cmd} is used to start the command.
- * A tuple is created for each UTF8 line read from the command's
- * {@link Process#getInputStream() output}.
- * The tuples contain output from stderr if cmd is configured to
- * {@link ProcessBuilder#redirectErrorStream(boolean) redirect stderr to stdout}.
- * The command is restarted if a read from the command's output stream
- * returns EOF or an error.
- * </P>
- * <P>
- * This is a convenience function equivalent to
- * {@code topology.generate(endlessCommandReader(cmd))}.
- * </P>
- * <P>
- * Sample use: create a stream of tuples for the output from a
- * continuously running and restartable command:
- * <pre>{@code
- * ProcessBuilder cmd = new ProcessBuilder("myCommand");
- * TStream<String> cmdOutput = CommandStreams.generate(topology, cmd);
- * cmdOutput.print();
- * }</pre>
- *
- * @param topology the topology to add the source stream to
- * @param cmd the {@link ProcessBuilder} to start the command
- * @return the source {@code TStream<String>}
- *
- * @see #endlessCommandReader(ProcessBuilder)
- * @see #tokenize(String)
- */
- public static TStream<String> generate(Topology topology, ProcessBuilder cmd) {
- return topology.generate(endlessCommandReader(cmd));
- }
-
- /**
- * Create a {@code TStream<String>} from a periodically run command's output.
- * <P>
- * The supplied {@code cmd} is used to start the command
- * at the specified {@code period}.
- * The command's UTF8 {@link Process#getInputStream() output} is read until EOF
- * and a {@code List<String>} tuple is created containing the collected output.
- * The tuples contain output from stderr if the cmd is configured to
- * {@link ProcessBuilder#redirectErrorStream(boolean) redirect stderr to stdout}.
- * </P>
- * <P>
- * This is a convenience function equivalent to
- * {@code topology.poll(commandReaderList(cmd), period, units)}.
- * </P>
- * <P>
- * Sample use: create a stream of tuples containing the output
- * from a periodically run command:
- * <pre>{@code
- * ProcessBuilder cmd = new ProcessBuilder("date");
- * TStream<List<String>> cmdOutput =
- * CommandStreams.periodicSource(topology, cmd, 2, TimeUnit.SECONDS);
- * cmdOutput.print();
- * }</pre>
- *
- * @param topology the topology to add the source stream to
- * @param cmd the {@link ProcessBuilder} to start the command
- * @param period the period to run the command and collect its output
- * @param units TimeUnit for {@code period}
- * @return the source {@code TStream<List<String>>}
- *
- * @see #commandReaderList(ProcessBuilder)
- * @see #tokenize(String)
- */
- public static TStream<List<String>> periodicSource(Topology topology,
- ProcessBuilder cmd, long period, TimeUnit units) {
- return topology.poll(commandReaderList(cmd), period, units);
- }
-
- /**
- * Sink a {@code TStream<String>} to a command's input.
- * <P>
- * The supplied {@code cmd} is used to start the command.
- * Each tuple is written as UTF8 and flushed to the command's {@link Process#getOutputStream() input}.
- * The command is restarted if a write encounters an error.
- * </P>
- * <P>
- * While each write is followed by a flush() that only helps to
- * reduce the time it takes to notice that cmd has failed and restart it.
- * Supposedly "successfully written and flushed" values are not guaranteed to
- * have been received by a cmd across restarts.
- * </P>
- * <P>
- * This is a convenience function equivalent to
- * {@code stream.sink(commandWriter(cmd))}
- * </P>
- * <P>
- * Sample use: write a stream of tuples to the input of a command:
- * <pre>{@code
- * TStream<String> stream = topology.strings("one", "two", "three");
- * ProcessBuilder cmd = new ProcessBuilder("cat").redirectOutput(new File("/dev/stdout"));
- * CommandStreams.sink(stream, cmd);
- * }</pre>
- *
- * @param stream the stream to sink
- * @param cmd the {@link ProcessBuilder} to start the command
- * @return a {@link TSink}
- *
- * @see #commandWriter(ProcessBuilder)
- * @see #tokenize(String)
- */
- public static TSink<String> sink(TStream<String> stream, ProcessBuilder cmd) {
- return stream.sink(commandWriter(cmd));
- }
-
- /**
- * Create an endless {@code Supplier<String>} for ingesting a long running command's output.
- * <P>
- * This method is particularly helpful in creating a sensor or source connector
- * class that hides the fact that it uses a command, enabling it to be used
- * like any other sensor/connector.
- * </P>
- * For example:
- * <pre><code>
- * // ingest the sensor data
- * TStream<MySensorData> stream = topology.generate(new MySensor());
- *
- * // MySensor class
- * class MySensor implements Supplier<MySensorData> {
- * private String[] cmd = new String[] {"mySensorCmd", "arg1"};
- * private Supplier<String> commandReader =
- * CommandStreams.endlessCommandReader(new ProcessBuilder(cmd));
- *
- * // implement Supplier<MySensorData>.get()
- * public MySensorData get() {
- * // get the next line from the cmd and create a MySensorData tuple from it
- * return createMySensorData(commandReader.get());
- * }
- * }
- * </code></pre>
- * <P>
- * The supplied {@code cmd} is used to start the command.
- * A call to {@link Supplier#get()} reads the next UTF8 line from the command's
- * {@link Process#getInputStream() output}.
- * The returned strings contain output from stderr if the cmd is configured to
- * {@link ProcessBuilder#redirectErrorStream(boolean) redirect stderr to stdput}.
- * The command is restarted if a read from the command's output stream
- * returns EOF or an error.
- * </P>
- *
- * @param cmd the {@link ProcessBuilder} to start the command
- * @return the {@code Supplier<String>}
- *
- * @see #generate(Topology, ProcessBuilder)
- * @see #tokenize(String)
- */
- public static Supplier<String> endlessCommandReader(ProcessBuilder cmd) {
- return new Supplier<String>() {
- private static final long serialVersionUID = 1L;
- Supplier<Iterable<String>> reader = new CommandReader(cmd, true);
- Iterator<String> iter = null;
- @Override
- public String get() {
- if (iter == null) {
- iter = reader.get().iterator();
- }
- if (iter.hasNext()) {
- return iter.next();
- }
- else {
- // presumably a shutdown condition
- return null;
- }
- }
- };
- }
-
- /**
- * Create a {@code Supplier<List<String>>} to ingest a command's output.
- * <P>
- * This method is particularly helpful in creating a sensor or source connector
- * class that hides the fact that it uses a command, enabling it to be used
- * like any other sensor/connector.
- * </P>
- * For example:
- * <pre><code>
- * // ingest the sensor data
- * TStream<MySensorData> stream = topology.periodicSource(new MySensor());
- *
- * // MySensor class
- * class MySensor implements Supplier<MySensorData> {
- * private String[] cmd = new String[] {"mySensorCmd", "arg1"};
- * private Supplier<List<String>> commandReader =
- * CommandStreams.commandReaderList(new ProcessBuilder(cmd));
- *
- * // implement Supplier<MySensorData>.get()
- * public MySensorData get() {
- * // get the cmd output and create a MySensorData tuple from it
- * return createMySensorData(commandReader.get());
- * }
- * }
- * </code></pre>
- * <P>
- * The supplied {@code cmd} is used to start the command.
- * A call to {@link Supplier#get()} reads the command's UTF8
- * {@link Process#getInputStream() input stream} until an EOF or error
- * and returns a {@code List<String>} of the collected input.
- * The tuples contain output from stderr if the cmd is configured to
- * {@link ProcessBuilder#redirectErrorStream(boolean) redirect stderr to stdout}.
- * </P>
- *
- * @param cmd the {@link ProcessBuilder} to start the command
- * @return the {@code Supplier<List<String>>} for the command
- *
- * @see #periodicSource(Topology, ProcessBuilder, long, TimeUnit)
- * @see #tokenize(String)
- */
- public static Supplier<List<String>> commandReaderList(ProcessBuilder cmd) {
- return new Supplier<List<String>>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public List<String> get() {
- try (CommandReader supplier
- = new CommandReader(cmd, false))
- {
- Iterator<String> iter = supplier.get().iterator();
- List<String> list = new ArrayList<>();
- while (iter.hasNext())
- list.add(iter.next());
- return list;
- }
- }
- };
- }
-
- /**
- * Create a {@code Consumer<String>} to write UTF8 string data to a command's input.
- * <P>
- * This method is particularly helpful in creating a sink connector
- * that hides the fact that it uses a command, enabling it to be used
- * like a native connector.
- * </P>
- * For example:
- * <pre><code>
- * // sink a stream to my connector
- * TStream<MySensorData> stream = ...;
- * stream.sink(new MySinkConnector());
- *
- * // MySinkConnector class
- * class MySinkConnector implements Consumer<MySensorData> {
- * private String[] cmd = new String[] {"mySinkCmd", "arg1"};
- * private Consumer<String> commandWriter =
- * CommandStreams.commandWriter(new ProcessBuilder(cmd));
- *
- * // implement Consumer<MySensorData>.accept()
- * public void accept(MySensorData data) {
- * // convert the data to a string and write it to the cmd
- * commandWriter.accept(convertMySensorData(data));
- * }
- * }
- * </code></pre>
- * <P>
- * The supplied {@link ProcessBuilder cmd} is used to start the command.
- * Each call to {@link Consumer#accept(Object) accept(String)} writes a
- * UTF8 string to the command's {@link Process#getOutputStream() input}.
- * Each write is followed by a flush.
- * The command is restarted if a write encounters an error.
- * </P>
- * <P>
- * While each write is followed by a flush() that only helps to
- * reduce the time it takes to notice that cmd has failed and restart it.
- * Supposedly "successfully written and flushed" values are not guaranteed to
- * have been received by a cmd across restarts.
- * </P>
- *
- * @param cmd the {@link ProcessBuilder} to start the command
- * @return the {@code Consumer<String>} for the command
- *
- * @see #sink(TStream, ProcessBuilder)
- * @see #tokenize(String)
- */
- public static Consumer<String> commandWriter(ProcessBuilder cmd) {
- return new CommandWriter(cmd, true);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/command/src/main/java/edgent/connectors/command/package-info.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/main/java/edgent/connectors/command/package-info.java b/connectors/command/src/main/java/edgent/connectors/command/package-info.java
deleted file mode 100644
index ee5f59a..0000000
--- a/connectors/command/src/main/java/edgent/connectors/command/package-info.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-/**
- * Command / OS Process connector.
- */
-package edgent.connectors.command;
-
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/command/src/main/java/edgent/connectors/command/runtime/CommandConnector.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/main/java/edgent/connectors/command/runtime/CommandConnector.java b/connectors/command/src/main/java/edgent/connectors/command/runtime/CommandConnector.java
deleted file mode 100644
index 36ca884..0000000
--- a/connectors/command/src/main/java/edgent/connectors/command/runtime/CommandConnector.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.connectors.command.runtime;
-
-import java.io.IOException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Base class for source / sink specific command connectors.
- * <P>
- * The lifetime of a CommandConnector is that of its Command's execution lifetime.
- * In the case of a "one shot" command (e.g., a periodicSource's cmd) the
- * lifetime may be brief - {@code restart==false}.
- * </P>
- * <P>
- * Many command connector uses will involve long running commands, sources
- * and sinks that want to be robust in the face of inadvertent command
- * termination/failures - (@code restart==true}.
- * </P>
- */
-abstract class CommandConnector implements AutoCloseable {
- static final Logger logger = LoggerFactory.getLogger(CommandConnector.class);
-
- private final ProcessBuilder cmd;
- private final boolean restart;
- private Process currentProcess;
- private long numStarts;
- private long lastStartTimestamp;
- private final int restartDelayMsec = 1_000;
-
-
- CommandConnector(ProcessBuilder cmd, boolean restart) {
- this.cmd = cmd;
- this.restart = restart;
- }
-
- protected boolean canStart() {
- return restart || numStarts==0;
- }
-
- protected Process getCurrentProcess() {
- return currentProcess;
- }
-
- protected void start() throws InterruptedException {
- if (!canStart())
- throw new IllegalStateException();
- closeProcess();
- try {
- numStarts++;
- // ensure we don't thrash on continuous restarts
- long now = System.currentTimeMillis();
- if (now < lastStartTimestamp + restartDelayMsec) {
- logger.info("Sleeping before restarting cmd {}", toCmdForMsg());
- Thread.sleep(restartDelayMsec);
- now = System.currentTimeMillis();
- }
- lastStartTimestamp = now;
-
- currentProcess = cmd.start();
-
- logger.debug("Started cmd {}", toCmdForMsg());
- }
- catch (IOException e) {
- logger.error("Unable to start cmd {}", toCmdForMsg(), e);
- }
- }
-
- protected void closeProcess() {
- if (currentProcess != null) {
- currentProcess.destroy();
- currentProcess = null;
- }
- }
-
- @Override
- public void close() {
- closeProcess();
- }
-
- String toCmdForMsg() {
- return cmd.command().toString();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/command/src/main/java/edgent/connectors/command/runtime/CommandReader.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/main/java/edgent/connectors/command/runtime/CommandReader.java b/connectors/command/src/main/java/edgent/connectors/command/runtime/CommandReader.java
deleted file mode 100644
index cb26db9..0000000
--- a/connectors/command/src/main/java/edgent/connectors/command/runtime/CommandReader.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.connectors.command.runtime;
-
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import edgent.function.Supplier;
-
-/**
- * Create a {@code Supplier<Iterable<String>>} to ingest a command's output.
- * <P>
- * The supplied {@code cmd} is used to start the command
- * and restart it upon process termination/error if so configured.
- * </P>
- * <P>
- * The iterator returned by {@link Iterable#iterator()) returns
- * {@hasNext()==true} until a read from {@link Process#getOutputStream()}
- * returns EOF or an IOError.
- */
-public class CommandReader extends CommandConnector implements Supplier<Iterable<String>>, AutoCloseable {
- private static final long serialVersionUID = 1L;
- private Iterator<String> currentSupplierIterator;
-
- /**
- * Create a supplier of UTF8 strings from a command's output.
- *
- * @param cmd the {@link ProcessBuilder} to use to start the command
- * @param restart when true, restart the command upon termination, EOF, or
- * read error.
- */
- public CommandReader(ProcessBuilder cmd, boolean restart) {
- super(cmd, restart);
- }
-
- protected void start() throws InterruptedException {
- super.start();
- currentSupplierIterator = new ProcessReader(getCurrentProcess()).get().iterator();
- }
-
- protected void closeProcess() {
- currentSupplierIterator = null;
- super.closeProcess();
- }
-
- @Override
- public Iterable<String> get() {
- return new Iterable<String>() {
-
- @Override
- public Iterator<String> iterator() {
- return new Iterator<String>() {
-
- @Override
- public boolean hasNext() {
- try {
- for(;;) {
- if (currentSupplierIterator != null) {
- boolean hasNext = currentSupplierIterator.hasNext();
- if (hasNext) {
- return true;
- }
- else {
- // no more from that process. close and loop/retry.
- closeProcess();
- }
- }
- else if (currentSupplierIterator == null && canStart()) {
- start(); // and loop/retry
- }
- else {
- return false; // no more input
- }
- }
- }
- catch (InterruptedException e) {
- return false;
- }
- }
-
- @Override
- public String next() {
- if (currentSupplierIterator != null)
- return currentSupplierIterator.next();
- else
- throw new NoSuchElementException();
- }
-
- };
- }
-
- };
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/command/src/main/java/edgent/connectors/command/runtime/CommandWriter.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/main/java/edgent/connectors/command/runtime/CommandWriter.java b/connectors/command/src/main/java/edgent/connectors/command/runtime/CommandWriter.java
deleted file mode 100644
index 42a9113..0000000
--- a/connectors/command/src/main/java/edgent/connectors/command/runtime/CommandWriter.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.connectors.command.runtime;
-
-import edgent.function.Consumer;
-
-/**
- * A {@code Consumer<String>>} to write data to a command's input.
- * <P>
- * The supplied {@code cmd} is used to start the command
- * and restart it upon process termination/error if so configured.
- * </P>
- */
-public class CommandWriter extends CommandConnector implements Consumer<String>, AutoCloseable {
- private static final long serialVersionUID = 1L;
- private ProcessWriter currentConsumer;
-
- /**
- * Create a consumer to write UTF8 string data to a command's input.
- * <P>
- * Each write is followed by a flush() though that only helps to
- * reduce the time it takes to notice that a cmd has failed.
- * Supposedly "successfully written and flushed" values are not guaranteed to
- * have been received by a cmd even following restart.
- * </P>
- *
- * @param cmd the builder to use to start the process
- * @param restart true to restart the process upon termination or
- * write error.
- */
- public CommandWriter(ProcessBuilder cmd, boolean restart) {
- super(cmd, restart);
- }
-
- protected void start() throws InterruptedException {
- super.start();
- currentConsumer = new ProcessWriter(getCurrentProcess());
- }
-
- protected void closeProcess() {
- currentConsumer = null;
- super.closeProcess();
- }
-
- @Override
- public void accept(String value) {
- for (;;) {
- try {
- if (currentConsumer != null) {
- try {
- currentConsumer.accept(value);
- logger.trace("WROTE: {}", value);
- return;
- }
- catch (RuntimeException e) {
- closeProcess(); // and loop/retry
- }
- }
- else if (currentConsumer == null && canStart()) {
- logger.debug("STARTING for: {}", value);
- start(); // and loop/retry
- }
- else {
- // not restartable. toss it on the floor
- return;
- }
- }
- catch (InterruptedException e) {
- // toss it on the floor
- return;
- }
- }
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/command/src/main/java/edgent/connectors/command/runtime/ProcessReader.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/main/java/edgent/connectors/command/runtime/ProcessReader.java b/connectors/command/src/main/java/edgent/connectors/command/runtime/ProcessReader.java
deleted file mode 100644
index c65a875..0000000
--- a/connectors/command/src/main/java/edgent/connectors/command/runtime/ProcessReader.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.connectors.command.runtime;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.nio.charset.StandardCharsets;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import edgent.function.Supplier;
-
-/**
- * A {@code Supplier<Iterable<String>>} for ingesting a process's output.
- * <P>
- * The iterator returned by {@link Iterable#iterator()) returns
- * {@hasNext()==true} until a read from {@link Process#getInputStream()}
- * returns EOF or an IOError.
- */
-class ProcessReader implements Supplier<Iterable<String>> {
- private static final long serialVersionUID = 1L;
- private final BufferedReader reader;
-
- /**
- * Create a new supplier of UTF8 strings read from a process's
- * {@link Process#getInputStream() output}.
- *
- * @param process the process to read from.
- */
- ProcessReader(Process process) {
- reader = new BufferedReader(new InputStreamReader(
- process.getInputStream(), StandardCharsets.UTF_8));
- }
-
- @Override
- public Iterable<String> get() {
- return new Iterable<String>() {
-
- @Override
- public Iterator<String> iterator() {
- return new Iterator<String>() {
- private Boolean hasNext = null;
- private String next = null;
-
- @Override
- public boolean hasNext() {
- if (hasNext != null)
- return hasNext;
- next = getNext();
- hasNext = next != null;
- return hasNext;
- }
-
- @Override
- public String next() {
- if (next == null)
- throw new NoSuchElementException();
- hasNext = null;
- return next;
- }
-
- };
- }
-
- };
- }
-
- /**
- * Get the next available line from the process's stdout
- * @return null if no more input (or error)
- */
- private String getNext() {
- try {
- return reader.readLine();
- } catch (IOException e) {
- CommandConnector.logger.error("Unable to readline from cmd", e);
- return null;
- }
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/command/src/main/java/edgent/connectors/command/runtime/ProcessWriter.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/main/java/edgent/connectors/command/runtime/ProcessWriter.java b/connectors/command/src/main/java/edgent/connectors/command/runtime/ProcessWriter.java
deleted file mode 100644
index f9176dc..0000000
--- a/connectors/command/src/main/java/edgent/connectors/command/runtime/ProcessWriter.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.connectors.command.runtime;
-
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.nio.charset.StandardCharsets;
-
-import edgent.function.Consumer;
-
-/**
- * A {@code Consumer<String>>} to receive data and write it to a process's input.
- * <P>
- * Each write is followed by a flush() though that only helps to
- * reduce the time it takes to notice that a cmd has failed.
- * Supposedly "successfully written and flushed" values are not guaranteed to
- * have been received by a cmd.
- */
-class ProcessWriter implements Consumer<String> {
- private static final long serialVersionUID = 1L;
- private final BufferedWriter writer;
-
- /**
- * Create a new consumer for UTF8 strings to write to a process's
- * {@link Process#getOutputStream() input}
- *
- * @param process to process to write to.
- */
- ProcessWriter(Process process) {
- writer = new BufferedWriter(new OutputStreamWriter(
- process.getOutputStream(), StandardCharsets.UTF_8));
- }
-
- @Override
- public void accept(String value) {
- try {
- // see class doc regarding guarantees.
- writer.write(value);
- writer.newLine();
- writer.flush();
- }
- catch (IOException e) {
- CommandConnector.logger.error("Unable to write to cmd", e);
- // caller (CommandWriter) requires throw to detect failure and recover
- throw new RuntimeException("Unable to write to cmd", e);
- }
- }
-
-}
\ No newline at end of file