You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/07/21 13:17:34 UTC
[33/54] [abbrv] [partial] incubator-quarks git commit: add
"org.apache." prefix to edgent package names
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/command/src/main/java/org/apache/edgent/connectors/command/CommandStreams.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/main/java/org/apache/edgent/connectors/command/CommandStreams.java b/connectors/command/src/main/java/org/apache/edgent/connectors/command/CommandStreams.java
new file mode 100644
index 0000000..ad02cac
--- /dev/null
+++ b/connectors/command/src/main/java/org/apache/edgent/connectors/command/CommandStreams.java
@@ -0,0 +1,349 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.connectors.command;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.StringTokenizer;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.edgent.connectors.command.runtime.CommandReader;
+import org.apache.edgent.connectors.command.runtime.CommandWriter;
+import org.apache.edgent.function.Consumer;
+import org.apache.edgent.function.Supplier;
+import org.apache.edgent.topology.TSink;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+
+/**
+ * Connector for creating a TStream from a Command's / OS Process's output
+ * and sinking a TStream to a Command's / OS Process's input.
+ * <P>
+ * e.g., run a network monitor command (like Tiger Shark) and ingest its output.
+ */
+public class CommandStreams {
+ private CommandStreams() {}
+
+ /**
+ * Tokenize the specified {@code cmdString} in the exact same manner as
+ * done by {@link Runtime#exec(String)}.
+ * <P>
+ * This function provides a convenience for creating a {@link ProcessBuilder}
+ * for use by the other CommandStreams methods.
+ * </P>
+ * <P>
+ * Sample use:
+ * <pre>{@code
+ * ProcessBuilder cmd = new ProcessBuilder(tokenize("sh someShellCmd.sh and args"));
+ * TStream<String> stream = CommandStreams.generate(topology, cmd);
+ * }</pre>
+ *
+ * @param cmdString a command string
+ * @return the tokens
+ */
+ public static List<String> tokenize(String cmdString) {
+ List<String> command = new ArrayList<>();
+ StringTokenizer tok = new StringTokenizer(cmdString);
+ while (tok.hasMoreTokens())
+ command.add(tok.nextToken());
+ return command;
+ }
+
+ /**
+ * Create an endless {@code TStream<String>} from a long running command's output.
+ * <P>
+ * The supplied {@code cmd} is used to start the command.
+ * A tuple is created for each UTF8 line read from the command's
+ * {@link Process#getInputStream() output}.
+ * The tuples contain output from stderr if cmd is configured to
+ * {@link ProcessBuilder#redirectErrorStream(boolean) redirect stderr to stdout}.
+ * The command is restarted if a read from the command's output stream
+ * returns EOF or an error.
+ * </P>
+ * <P>
+ * This is a convenience function equivalent to
+ * {@code topology.generate(endlessCommandReader(cmd))}.
+ * </P>
+ * <P>
+ * Sample use: create a stream of tuples for the output from a
+ * continuously running and restartable command:
+ * <pre>{@code
+ * ProcessBuilder cmd = new ProcessBuilder("myCommand");
+ * TStream<String> cmdOutput = CommandStreams.generate(topology, cmd);
+ * cmdOutput.print();
+ * }</pre>
+ *
+ * @param topology the topology to add the source stream to
+ * @param cmd the {@link ProcessBuilder} to start the command
+ * @return the source {@code TStream<String>}
+ *
+ * @see #endlessCommandReader(ProcessBuilder)
+ * @see #tokenize(String)
+ */
+ public static TStream<String> generate(Topology topology, ProcessBuilder cmd) {
+ return topology.generate(endlessCommandReader(cmd));
+ }
+
+ /**
+ * Create a {@code TStream<String>} from a periodically run command's output.
+ * <P>
+ * The supplied {@code cmd} is used to start the command
+ * at the specified {@code period}.
+ * The command's UTF8 {@link Process#getInputStream() output} is read until EOF
+ * and a {@code List<String>} tuple is created containing the collected output.
+ * The tuples contain output from stderr if the cmd is configured to
+ * {@link ProcessBuilder#redirectErrorStream(boolean) redirect stderr to stdout}.
+ * </P>
+ * <P>
+ * This is a convenience function equivalent to
+ * {@code topology.poll(commandReaderList(cmd), period, units)}.
+ * </P>
+ * <P>
+ * Sample use: create a stream of tuples containing the output
+ * from a periodically run command:
+ * <pre>{@code
+ * ProcessBuilder cmd = new ProcessBuilder("date");
+ * TStream<List<String>> cmdOutput =
+ * CommandStreams.periodicSource(topology, cmd, 2, TimeUnit.SECONDS);
+ * cmdOutput.print();
+ * }</pre>
+ *
+ * @param topology the topology to add the source stream to
+ * @param cmd the {@link ProcessBuilder} to start the command
+ * @param period the period to run the command and collect its output
+ * @param units TimeUnit for {@code period}
+ * @return the source {@code TStream<List<String>>}
+ *
+ * @see #commandReaderList(ProcessBuilder)
+ * @see #tokenize(String)
+ */
+ public static TStream<List<String>> periodicSource(Topology topology,
+ ProcessBuilder cmd, long period, TimeUnit units) {
+ return topology.poll(commandReaderList(cmd), period, units);
+ }
+
+ /**
+ * Sink a {@code TStream<String>} to a command's input.
+ * <P>
+ * The supplied {@code cmd} is used to start the command.
+ * Each tuple is written as UTF8 and flushed to the command's {@link Process#getOutputStream() input}.
+ * The command is restarted if a write encounters an error.
+ * </P>
+ * <P>
+ * While each write is followed by a flush() that only helps to
+ * reduce the time it takes to notice that cmd has failed and restart it.
+ * Supposedly "successfully written and flushed" values are not guaranteed to
+ * have been received by a cmd across restarts.
+ * </P>
+ * <P>
+ * This is a convenience function equivalent to
+ * {@code stream.sink(commandWriter(cmd))}
+ * </P>
+ * <P>
+ * Sample use: write a stream of tuples to the input of a command:
+ * <pre>{@code
+ * TStream<String> stream = topology.strings("one", "two", "three");
+ * ProcessBuilder cmd = new ProcessBuilder("cat").redirectOutput(new File("/dev/stdout"));
+ * CommandStreams.sink(stream, cmd);
+ * }</pre>
+ *
+ * @param stream the stream to sink
+ * @param cmd the {@link ProcessBuilder} to start the command
+ * @return a {@link TSink}
+ *
+ * @see #commandWriter(ProcessBuilder)
+ * @see #tokenize(String)
+ */
+ public static TSink<String> sink(TStream<String> stream, ProcessBuilder cmd) {
+ return stream.sink(commandWriter(cmd));
+ }
+
+ /**
+ * Create an endless {@code Supplier<String>} for ingesting a long running command's output.
+ * <P>
+ * This method is particularly helpful in creating a sensor or source connector
+ * class that hides the fact that it uses a command, enabling it to be used
+ * like any other sensor/connector.
+ * </P>
+ * For example:
+ * <pre><code>
+ * // ingest the sensor data
+ * TStream<MySensorData> stream = topology.generate(new MySensor());
+ *
+ * // MySensor class
+ * class MySensor implements Supplier<MySensorData> {
+ * private String[] cmd = new String[] {"mySensorCmd", "arg1"};
+ * private Supplier<String> commandReader =
+ * CommandStreams.endlessCommandReader(new ProcessBuilder(cmd));
+ *
+ * // implement Supplier<MySensorData>.get()
+ * public MySensorData get() {
+ * // get the next line from the cmd and create a MySensorData tuple from it
+ * return createMySensorData(commandReader.get());
+ * }
+ * }
+ * </code></pre>
+ * <P>
+ * The supplied {@code cmd} is used to start the command.
+ * A call to {@link Supplier#get()} reads the next UTF8 line from the command's
+ * {@link Process#getInputStream() output}.
+ * The returned strings contain output from stderr if the cmd is configured to
+ * {@link ProcessBuilder#redirectErrorStream(boolean) redirect stderr to stdput}.
+ * The command is restarted if a read from the command's output stream
+ * returns EOF or an error.
+ * </P>
+ *
+ * @param cmd the {@link ProcessBuilder} to start the command
+ * @return the {@code Supplier<String>}
+ *
+ * @see #generate(Topology, ProcessBuilder)
+ * @see #tokenize(String)
+ */
+ public static Supplier<String> endlessCommandReader(ProcessBuilder cmd) {
+ return new Supplier<String>() {
+ private static final long serialVersionUID = 1L;
+ Supplier<Iterable<String>> reader = new CommandReader(cmd, true);
+ Iterator<String> iter = null;
+ @Override
+ public String get() {
+ if (iter == null) {
+ iter = reader.get().iterator();
+ }
+ if (iter.hasNext()) {
+ return iter.next();
+ }
+ else {
+ // presumably a shutdown condition
+ return null;
+ }
+ }
+ };
+ }
+
+ /**
+ * Create a {@code Supplier<List<String>>} to ingest a command's output.
+ * <P>
+ * This method is particularly helpful in creating a sensor or source connector
+ * class that hides the fact that it uses a command, enabling it to be used
+ * like any other sensor/connector.
+ * </P>
+ * For example:
+ * <pre><code>
+ * // ingest the sensor data
+ * TStream<MySensorData> stream = topology.periodicSource(new MySensor());
+ *
+ * // MySensor class
+ * class MySensor implements Supplier<MySensorData> {
+ * private String[] cmd = new String[] {"mySensorCmd", "arg1"};
+ * private Supplier<List<String>> commandReader =
+ * CommandStreams.commandReaderList(new ProcessBuilder(cmd));
+ *
+ * // implement Supplier<MySensorData>.get()
+ * public MySensorData get() {
+ * // get the cmd output and create a MySensorData tuple from it
+ * return createMySensorData(commandReader.get());
+ * }
+ * }
+ * </code></pre>
+ * <P>
+ * The supplied {@code cmd} is used to start the command.
+ * A call to {@link Supplier#get()} reads the command's UTF8
+ * {@link Process#getInputStream() input stream} until an EOF or error
+ * and returns a {@code List<String>} of the collected input.
+ * The tuples contain output from stderr if the cmd is configured to
+ * {@link ProcessBuilder#redirectErrorStream(boolean) redirect stderr to stdout}.
+ * </P>
+ *
+ * @param cmd the {@link ProcessBuilder} to start the command
+ * @return the {@code Supplier<List<String>>} for the command
+ *
+ * @see #periodicSource(Topology, ProcessBuilder, long, TimeUnit)
+ * @see #tokenize(String)
+ */
+ public static Supplier<List<String>> commandReaderList(ProcessBuilder cmd) {
+ return new Supplier<List<String>>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public List<String> get() {
+ try (CommandReader supplier
+ = new CommandReader(cmd, false))
+ {
+ Iterator<String> iter = supplier.get().iterator();
+ List<String> list = new ArrayList<>();
+ while (iter.hasNext())
+ list.add(iter.next());
+ return list;
+ }
+ }
+ };
+ }
+
+ /**
+ * Create a {@code Consumer<String>} to write UTF8 string data to a command's input.
+ * <P>
+ * This method is particularly helpful in creating a sink connector
+ * that hides the fact that it uses a command, enabling it to be used
+ * like a native connector.
+ * </P>
+ * For example:
+ * <pre><code>
+ * // sink a stream to my connector
+ * TStream<MySensorData> stream = ...;
+ * stream.sink(new MySinkConnector());
+ *
+ * // MySinkConnector class
+ * class MySinkConnector implements Consumer<MySensorData> {
+ * private String[] cmd = new String[] {"mySinkCmd", "arg1"};
+ * private Consumer<String> commandWriter =
+ * CommandStreams.commandWriter(new ProcessBuilder(cmd));
+ *
+ * // implement Consumer<MySensorData>.accept()
+ * public void accept(MySensorData data) {
+ * // convert the data to a string and write it to the cmd
+ * commandWriter.accept(convertMySensorData(data));
+ * }
+ * }
+ * </code></pre>
+ * <P>
+ * The supplied {@link ProcessBuilder cmd} is used to start the command.
+ * Each call to {@link Consumer#accept(Object) accept(String)} writes a
+ * UTF8 string to the command's {@link Process#getOutputStream() input}.
+ * Each write is followed by a flush.
+ * The command is restarted if a write encounters an error.
+ * </P>
+ * <P>
+ * While each write is followed by a flush() that only helps to
+ * reduce the time it takes to notice that cmd has failed and restart it.
+ * Supposedly "successfully written and flushed" values are not guaranteed to
+ * have been received by a cmd across restarts.
+ * </P>
+ *
+ * @param cmd the {@link ProcessBuilder} to start the command
+ * @return the {@code Consumer<String>} for the command
+ *
+ * @see #sink(TStream, ProcessBuilder)
+ * @see #tokenize(String)
+ */
+ public static Consumer<String> commandWriter(ProcessBuilder cmd) {
+ return new CommandWriter(cmd, true);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/command/src/main/java/org/apache/edgent/connectors/command/package-info.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/main/java/org/apache/edgent/connectors/command/package-info.java b/connectors/command/src/main/java/org/apache/edgent/connectors/command/package-info.java
new file mode 100644
index 0000000..97213c9
--- /dev/null
+++ b/connectors/command/src/main/java/org/apache/edgent/connectors/command/package-info.java
@@ -0,0 +1,24 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+/**
+ * Command / OS Process connector.
+ */
+package org.apache.edgent.connectors.command;
+
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/command/src/main/java/org/apache/edgent/connectors/command/runtime/CommandConnector.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/main/java/org/apache/edgent/connectors/command/runtime/CommandConnector.java b/connectors/command/src/main/java/org/apache/edgent/connectors/command/runtime/CommandConnector.java
new file mode 100644
index 0000000..fca3d4c
--- /dev/null
+++ b/connectors/command/src/main/java/org/apache/edgent/connectors/command/runtime/CommandConnector.java
@@ -0,0 +1,103 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.connectors.command.runtime;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base class for source / sink specific command connectors.
+ * <P>
+ * The lifetime of a CommandConnector is that of its Command's execution lifetime.
+ * In the case of a "one shot" command (e.g., a periodicSource's cmd) the
+ * lifetime may be brief - {@code restart==false}.
+ * </P>
+ * <P>
+ * Many command connector uses will involve long running commands, sources
+ * and sinks that want to be robust in the face of inadvertent command
+ * termination/failures - (@code restart==true}.
+ * </P>
+ */
+abstract class CommandConnector implements AutoCloseable {
+ static final Logger logger = LoggerFactory.getLogger(CommandConnector.class);
+
+ private final ProcessBuilder cmd;
+ private final boolean restart;
+ private Process currentProcess;
+ private long numStarts;
+ private long lastStartTimestamp;
+ private final int restartDelayMsec = 1_000;
+
+
+ CommandConnector(ProcessBuilder cmd, boolean restart) {
+ this.cmd = cmd;
+ this.restart = restart;
+ }
+
+ protected boolean canStart() {
+ return restart || numStarts==0;
+ }
+
+ protected Process getCurrentProcess() {
+ return currentProcess;
+ }
+
+ protected void start() throws InterruptedException {
+ if (!canStart())
+ throw new IllegalStateException();
+ closeProcess();
+ try {
+ numStarts++;
+ // ensure we don't thrash on continuous restarts
+ long now = System.currentTimeMillis();
+ if (now < lastStartTimestamp + restartDelayMsec) {
+ logger.info("Sleeping before restarting cmd {}", toCmdForMsg());
+ Thread.sleep(restartDelayMsec);
+ now = System.currentTimeMillis();
+ }
+ lastStartTimestamp = now;
+
+ currentProcess = cmd.start();
+
+ logger.debug("Started cmd {}", toCmdForMsg());
+ }
+ catch (IOException e) {
+ logger.error("Unable to start cmd {}", toCmdForMsg(), e);
+ }
+ }
+
+ protected void closeProcess() {
+ if (currentProcess != null) {
+ currentProcess.destroy();
+ currentProcess = null;
+ }
+ }
+
+ @Override
+ public void close() {
+ closeProcess();
+ }
+
+ String toCmdForMsg() {
+ return cmd.command().toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/command/src/main/java/org/apache/edgent/connectors/command/runtime/CommandReader.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/main/java/org/apache/edgent/connectors/command/runtime/CommandReader.java b/connectors/command/src/main/java/org/apache/edgent/connectors/command/runtime/CommandReader.java
new file mode 100644
index 0000000..44b2004
--- /dev/null
+++ b/connectors/command/src/main/java/org/apache/edgent/connectors/command/runtime/CommandReader.java
@@ -0,0 +1,111 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.connectors.command.runtime;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.edgent.function.Supplier;
+
+/**
+ * Create a {@code Supplier<Iterable<String>>} to ingest a command's output.
+ * <P>
+ * The supplied {@code cmd} is used to start the command
+ * and restart it upon process termination/error if so configured.
+ * </P>
+ * <P>
+ * The iterator returned by {@link Iterable#iterator()) returns
+ * {@hasNext()==true} until a read from {@link Process#getOutputStream()}
+ * returns EOF or an IOError.
+ */
+public class CommandReader extends CommandConnector implements Supplier<Iterable<String>>, AutoCloseable {
+ private static final long serialVersionUID = 1L;
+ private Iterator<String> currentSupplierIterator;
+
+ /**
+ * Create a supplier of UTF8 strings from a command's output.
+ *
+ * @param cmd the {@link ProcessBuilder} to use to start the command
+ * @param restart when true, restart the command upon termination, EOF, or
+ * read error.
+ */
+ public CommandReader(ProcessBuilder cmd, boolean restart) {
+ super(cmd, restart);
+ }
+
+ protected void start() throws InterruptedException {
+ super.start();
+ currentSupplierIterator = new ProcessReader(getCurrentProcess()).get().iterator();
+ }
+
+ protected void closeProcess() {
+ currentSupplierIterator = null;
+ super.closeProcess();
+ }
+
+ @Override
+ public Iterable<String> get() {
+ return new Iterable<String>() {
+
+ @Override
+ public Iterator<String> iterator() {
+ return new Iterator<String>() {
+
+ @Override
+ public boolean hasNext() {
+ try {
+ for(;;) {
+ if (currentSupplierIterator != null) {
+ boolean hasNext = currentSupplierIterator.hasNext();
+ if (hasNext) {
+ return true;
+ }
+ else {
+ // no more from that process. close and loop/retry.
+ closeProcess();
+ }
+ }
+ else if (currentSupplierIterator == null && canStart()) {
+ start(); // and loop/retry
+ }
+ else {
+ return false; // no more input
+ }
+ }
+ }
+ catch (InterruptedException e) {
+ return false;
+ }
+ }
+
+ @Override
+ public String next() {
+ if (currentSupplierIterator != null)
+ return currentSupplierIterator.next();
+ else
+ throw new NoSuchElementException();
+ }
+
+ };
+ }
+
+ };
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/command/src/main/java/org/apache/edgent/connectors/command/runtime/CommandWriter.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/main/java/org/apache/edgent/connectors/command/runtime/CommandWriter.java b/connectors/command/src/main/java/org/apache/edgent/connectors/command/runtime/CommandWriter.java
new file mode 100644
index 0000000..e3308dc
--- /dev/null
+++ b/connectors/command/src/main/java/org/apache/edgent/connectors/command/runtime/CommandWriter.java
@@ -0,0 +1,91 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.connectors.command.runtime;
+
+import org.apache.edgent.function.Consumer;
+
+/**
+ * A {@code Consumer<String>>} to write data to a command's input.
+ * <P>
+ * The supplied {@code cmd} is used to start the command
+ * and restart it upon process termination/error if so configured.
+ * </P>
+ */
+public class CommandWriter extends CommandConnector implements Consumer<String>, AutoCloseable {
+ private static final long serialVersionUID = 1L;
+ private ProcessWriter currentConsumer;
+
+ /**
+ * Create a consumer to write UTF8 string data to a command's input.
+ * <P>
+ * Each write is followed by a flush() though that only helps to
+ * reduce the time it takes to notice that a cmd has failed.
+ * Supposedly "successfully written and flushed" values are not guaranteed to
+ * have been received by a cmd even following restart.
+ * </P>
+ *
+ * @param cmd the builder to use to start the process
+ * @param restart true to restart the process upon termination or
+ * write error.
+ */
+ public CommandWriter(ProcessBuilder cmd, boolean restart) {
+ super(cmd, restart);
+ }
+
+ protected void start() throws InterruptedException {
+ super.start();
+ currentConsumer = new ProcessWriter(getCurrentProcess());
+ }
+
+ protected void closeProcess() {
+ currentConsumer = null;
+ super.closeProcess();
+ }
+
+ @Override
+ public void accept(String value) {
+ for (;;) {
+ try {
+ if (currentConsumer != null) {
+ try {
+ currentConsumer.accept(value);
+ logger.trace("WROTE: {}", value);
+ return;
+ }
+ catch (RuntimeException e) {
+ closeProcess(); // and loop/retry
+ }
+ }
+ else if (currentConsumer == null && canStart()) {
+ logger.debug("STARTING for: {}", value);
+ start(); // and loop/retry
+ }
+ else {
+ // not restartable. toss it on the floor
+ return;
+ }
+ }
+ catch (InterruptedException e) {
+ // toss it on the floor
+ return;
+ }
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/command/src/main/java/org/apache/edgent/connectors/command/runtime/ProcessReader.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/main/java/org/apache/edgent/connectors/command/runtime/ProcessReader.java b/connectors/command/src/main/java/org/apache/edgent/connectors/command/runtime/ProcessReader.java
new file mode 100644
index 0000000..47a2c59
--- /dev/null
+++ b/connectors/command/src/main/java/org/apache/edgent/connectors/command/runtime/ProcessReader.java
@@ -0,0 +1,98 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.connectors.command.runtime;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.edgent.function.Supplier;
+
+/**
+ * A {@code Supplier<Iterable<String>>} for ingesting a process's output.
+ * <P>
+ * The iterator returned by {@link Iterable#iterator()) returns
+ * {@hasNext()==true} until a read from {@link Process#getInputStream()}
+ * returns EOF or an IOError.
+ */
+class ProcessReader implements Supplier<Iterable<String>> {
+ private static final long serialVersionUID = 1L;
+ private final BufferedReader reader;
+
+ /**
+ * Create a new supplier of UTF8 strings read from a process's
+ * {@link Process#getInputStream() output}.
+ *
+ * @param process the process to read from.
+ */
+ ProcessReader(Process process) {
+ reader = new BufferedReader(new InputStreamReader(
+ process.getInputStream(), StandardCharsets.UTF_8));
+ }
+
+ @Override
+ public Iterable<String> get() {
+ return new Iterable<String>() {
+
+ @Override
+ public Iterator<String> iterator() {
+ return new Iterator<String>() {
+ private Boolean hasNext = null;
+ private String next = null;
+
+ @Override
+ public boolean hasNext() {
+ if (hasNext != null)
+ return hasNext;
+ next = getNext();
+ hasNext = next != null;
+ return hasNext;
+ }
+
+ @Override
+ public String next() {
+ if (next == null)
+ throw new NoSuchElementException();
+ hasNext = null;
+ return next;
+ }
+
+ };
+ }
+
+ };
+ }
+
+ /**
+ * Get the next available line from the process's stdout
+ * @return null if no more input (or error)
+ */
+ private String getNext() {
+ try {
+ return reader.readLine();
+ } catch (IOException e) {
+ CommandConnector.logger.error("Unable to readline from cmd", e);
+ return null;
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/command/src/main/java/org/apache/edgent/connectors/command/runtime/ProcessWriter.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/main/java/org/apache/edgent/connectors/command/runtime/ProcessWriter.java b/connectors/command/src/main/java/org/apache/edgent/connectors/command/runtime/ProcessWriter.java
new file mode 100644
index 0000000..92c17d9
--- /dev/null
+++ b/connectors/command/src/main/java/org/apache/edgent/connectors/command/runtime/ProcessWriter.java
@@ -0,0 +1,66 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.connectors.command.runtime;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.edgent.function.Consumer;
+
+/**
+ * A {@code Consumer<String>>} to receive data and write it to a process's input.
+ * <P>
+ * Each write is followed by a flush() though that only helps to
+ * reduce the time it takes to notice that a cmd has failed.
+ * Supposedly "successfully written and flushed" values are not guaranteed to
+ * have been received by a cmd.
+ */
+class ProcessWriter implements Consumer<String> {
+ private static final long serialVersionUID = 1L;
+ private final BufferedWriter writer;
+
+ /**
+ * Create a new consumer for UTF8 strings to write to a process's
+ * {@link Process#getOutputStream() input}
+ *
+ * @param process to process to write to.
+ */
+ ProcessWriter(Process process) {
+ writer = new BufferedWriter(new OutputStreamWriter(
+ process.getOutputStream(), StandardCharsets.UTF_8));
+ }
+
+ @Override
+ public void accept(String value) {
+ try {
+ // see class doc regarding guarantees.
+ writer.write(value);
+ writer.newLine();
+ writer.flush();
+ }
+ catch (IOException e) {
+ CommandConnector.logger.error("Unable to write to cmd", e);
+ // caller (CommandWriter) requires throw to detect failure and recover
+ throw new RuntimeException("Unable to write to cmd", e);
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/command/src/test/java/edgent/test/connectors/command/CommandStreamsGlobalTest.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/test/java/edgent/test/connectors/command/CommandStreamsGlobalTest.java b/connectors/command/src/test/java/edgent/test/connectors/command/CommandStreamsGlobalTest.java
deleted file mode 100644
index 1697099..0000000
--- a/connectors/command/src/test/java/edgent/test/connectors/command/CommandStreamsGlobalTest.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.connectors.command;
-
-/**
- * CommandStreams connector globalization tests.
- */
-public class CommandStreamsGlobalTest extends CommandStreamsTest {
-
- private static final String[] globalLines = new String[] {
- "\u5b78\u800c\u6642\u7fd2\u4e4b",
- "\u4e0d\u4ea6\u8aaa\u4e4e"
- };
-
- public String[] getLines() {
- return globalLines;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/command/src/test/java/edgent/test/connectors/command/CommandStreamsTest.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/test/java/edgent/test/connectors/command/CommandStreamsTest.java b/connectors/command/src/test/java/edgent/test/connectors/command/CommandStreamsTest.java
deleted file mode 100644
index 99f40e8..0000000
--- a/connectors/command/src/test/java/edgent/test/connectors/command/CommandStreamsTest.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.connectors.command;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertNotNull;
-
-import java.io.File;
-import java.lang.ProcessBuilder.Redirect;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.junit.Test;
-
-import com.google.gson.JsonObject;
-
-import edgent.connectors.command.CommandStreams;
-import edgent.test.connectors.common.FileUtil;
-import edgent.test.connectors.common.TestRepoPath;
-import edgent.test.providers.direct.DirectTestSetup;
-import edgent.test.topology.TopologyAbstractTest;
-import edgent.topology.TSink;
-import edgent.topology.TStream;
-import edgent.topology.Topology;
-import edgent.topology.tester.Condition;
-
-public class CommandStreamsTest extends TopologyAbstractTest implements DirectTestSetup {
-
- private String[] stdLines = new String[] {
- "Line 1",
- "Line 2",
- "Line 3",
- };
-
- public String[] getLines() {
- return stdLines;
- }
-
- private static void delay(long millis) {
- try {
- Thread.sleep(millis);
- }
- catch (InterruptedException e) { }
- }
-
- @Test
- public void testTokenize() {
- String cmdString = "myCmd arg1 arg2\targ3";
- String[] expected = new String[]{"myCmd", "arg1", "arg2", "arg3"};
-
- assertArrayEquals(expected, CommandStreams.tokenize(cmdString).toArray(new String[0]));
- }
-
- @Test
- public void testPeriodicSource() throws Exception {
- Topology t = newTopology("testPeriodicSource");
-
- Path tempFile1 = FileUtil.createTempFile("test1", ".txt", getLines());
- System.out.println("Test: "+t.getName()+" "+tempFile1);
-
- ProcessBuilder cmd = new ProcessBuilder("cat", tempFile1.toString());
-
- int NUM_POLLS = 3;
- List<String> expLines = new ArrayList<>();
- for (int i = 0; i < NUM_POLLS; i++) {
- expLines.addAll(Arrays.asList(getLines()));
- }
-
- TStream<List<String>> ls = CommandStreams.periodicSource(t, cmd, 1, TimeUnit.SECONDS);
- TStream<String> s = ls.flatMap(list -> list);
-
- try {
- completeAndValidate("", t, s, 10, expLines.toArray(new String[0]));
- }
- finally {
- tempFile1.toFile().delete();
- }
- }
-
- @Test
- public void testGenerate() throws Exception {
- Topology t = newTopology("testGenerate");
-
- Path tempFile1 = FileUtil.createTempFile("test1", ".txt", getLines());
- System.out.println("Test: "+t.getName()+" "+tempFile1);
-
- ProcessBuilder cmd = new ProcessBuilder("cat", tempFile1.toString());
-
- // N.B. if looking at trace: EDGENT-224 generate() continues running after job is closed
- TStream<String> s = CommandStreams.generate(t, cmd);
-
- try {
- completeAndValidate("", t, s, 10, getLines());
- }
- finally {
- tempFile1.toFile().delete();
- }
- }
-
- @Test
- public void testGenerateRestart() throws Exception {
- Topology t = newTopology("testGenerateRestart");
-
- Path tempFile1 = FileUtil.createTempFile("test1", ".txt", getLines());
- System.out.println("Test: "+t.getName()+" "+tempFile1);
-
- ProcessBuilder cmd = new ProcessBuilder("cat", tempFile1.toString());
-
- int NUM_RUNS = 3;
- List<String> expLines = new ArrayList<>();
- for (int i = 0; i < NUM_RUNS; i++) {
- expLines.addAll(Arrays.asList(getLines()));
- }
-
- // N.B. if looking at trace: EDGENT-224 generate() continues running after job is closed
- TStream<String> s = CommandStreams.generate(t, cmd);
-
- completeAndValidate("", t, s, 10 + ((NUM_RUNS-1) * 1/*restart delay time*/), expLines.toArray(new String[0]));
- }
-
- @Test
- public void testSink() throws Exception {
- Topology t = newTopology("testSink");
-
- Path tempFile1 = FileUtil.createTempFile("test1", ".txt", new String[0]);
- System.out.println("Test: "+t.getName()+" "+tempFile1);
-
- ProcessBuilder cmd = new ProcessBuilder("cat")
- .redirectOutput(Redirect.appendTo(tempFile1.toFile()));
-
- TStream<String> s = t.strings(getLines());
-
- TSink<String> sink = CommandStreams.sink(s, cmd);
-
- assertNotNull(sink);
-
- try {
- // start the job, sleep for a bit (await the timeout) then validate sink output
- Condition<?> never = t.getTester().tupleCount(s, Long.MAX_VALUE);
- t.getTester().complete(getSubmitter(), new JsonObject(), never, 3, TimeUnit.SECONDS);
-
- FileUtil.validateFile(tempFile1, getLines());
- }
- finally {
- tempFile1.toFile().delete();
- }
- }
-
- @Test
- public void testSinkRestart() throws Exception {
- Topology t = newTopology("testSinkRestart");
-
- Path tempFile1 = FileUtil.createTempFile("test1", ".txt", new String[0]);
- System.out.println("Test: "+t.getName()+" "+tempFile1);
-
- int batchSize = getLines().length;
-
- // tell cmd to terminate after each batch of lines
- ProcessBuilder cmd = new ProcessBuilder("sh", getCmdPath("sinkcmd"), ""+batchSize)
- .redirectOutput(Redirect.appendTo(tempFile1.toFile()))
- .redirectError(Redirect.to(new File("/dev/stderr")));
-
- int NUM_RUNS = 3;
- List<String> expLines = new ArrayList<>();
- for (int i = 0; i < NUM_RUNS; i++) {
- expLines.addAll(Arrays.asList(getLines()));
- }
- AtomicInteger cnt = new AtomicInteger();
-
- TStream<String> s = t.strings(expLines.toArray(new String[0]))
- .filter(tup -> {
- // need to slow things down so the sinker has time to notice
- // the cmd has terminated. otherwise we'll get ahead,
- // tuples will get dropped on the floor and validation will fail.
- if (cnt.incrementAndGet() > batchSize) {
- // System.out.println("SLEEPING on cnt "+ cnt.get() + " for "+tup);
- delay(1_000);
- cnt.set(1);
- }
- return true;
- });
-
- TSink<String> sink = CommandStreams.sink(s, cmd);
-
- assertNotNull(sink);
-
- try {
- // start the job, sleep for a bit (await the timeout) then validate sink output
- Condition<?> never = t.getTester().tupleCount(s, Long.MAX_VALUE);
- t.getTester().complete(getSubmitter(), new JsonObject(), never,
- 6 + ((NUM_RUNS-1) * 1/*restart delay*/), TimeUnit.SECONDS);
-
- FileUtil.validateFile(tempFile1, expLines.toArray(new String[0]));
- }
- finally {
- tempFile1.toFile().delete();
- }
- }
-
- private String getCmdPath(String cmd) {
- return TestRepoPath.getPath("connectors", "command", "src", "test", "scripts", cmd);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/command/src/test/java/org/apache/edgent/test/connectors/command/CommandStreamsGlobalTest.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/test/java/org/apache/edgent/test/connectors/command/CommandStreamsGlobalTest.java b/connectors/command/src/test/java/org/apache/edgent/test/connectors/command/CommandStreamsGlobalTest.java
new file mode 100644
index 0000000..bc6d268
--- /dev/null
+++ b/connectors/command/src/test/java/org/apache/edgent/test/connectors/command/CommandStreamsGlobalTest.java
@@ -0,0 +1,35 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.connectors.command;
+
+/**
+ * CommandStreams connector globalization tests.
+ */
+public class CommandStreamsGlobalTest extends CommandStreamsTest {
+
+ private static final String[] globalLines = new String[] {
+ "\u5b78\u800c\u6642\u7fd2\u4e4b",
+ "\u4e0d\u4ea6\u8aaa\u4e4e"
+ };
+
+ public String[] getLines() {
+ return globalLines;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/command/src/test/java/org/apache/edgent/test/connectors/command/CommandStreamsTest.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/test/java/org/apache/edgent/test/connectors/command/CommandStreamsTest.java b/connectors/command/src/test/java/org/apache/edgent/test/connectors/command/CommandStreamsTest.java
new file mode 100644
index 0000000..a24bec8
--- /dev/null
+++ b/connectors/command/src/test/java/org/apache/edgent/test/connectors/command/CommandStreamsTest.java
@@ -0,0 +1,223 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.connectors.command;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.File;
+import java.lang.ProcessBuilder.Redirect;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.edgent.connectors.command.CommandStreams;
+import org.apache.edgent.test.connectors.common.FileUtil;
+import org.apache.edgent.test.connectors.common.TestRepoPath;
+import org.apache.edgent.test.providers.direct.DirectTestSetup;
+import org.apache.edgent.test.topology.TopologyAbstractTest;
+import org.apache.edgent.topology.TSink;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+import org.apache.edgent.topology.tester.Condition;
+import org.junit.Test;
+
+import com.google.gson.JsonObject;
+
+public class CommandStreamsTest extends TopologyAbstractTest implements DirectTestSetup {
+
+ private String[] stdLines = new String[] {
+ "Line 1",
+ "Line 2",
+ "Line 3",
+ };
+
+ public String[] getLines() {
+ return stdLines;
+ }
+
+ private static void delay(long millis) {
+ try {
+ Thread.sleep(millis);
+ }
+ catch (InterruptedException e) { }
+ }
+
+ @Test
+ public void testTokenize() {
+ String cmdString = "myCmd arg1 arg2\targ3";
+ String[] expected = new String[]{"myCmd", "arg1", "arg2", "arg3"};
+
+ assertArrayEquals(expected, CommandStreams.tokenize(cmdString).toArray(new String[0]));
+ }
+
+ @Test
+ public void testPeriodicSource() throws Exception {
+ Topology t = newTopology("testPeriodicSource");
+
+ Path tempFile1 = FileUtil.createTempFile("test1", ".txt", getLines());
+ System.out.println("Test: "+t.getName()+" "+tempFile1);
+
+ ProcessBuilder cmd = new ProcessBuilder("cat", tempFile1.toString());
+
+ int NUM_POLLS = 3;
+ List<String> expLines = new ArrayList<>();
+ for (int i = 0; i < NUM_POLLS; i++) {
+ expLines.addAll(Arrays.asList(getLines()));
+ }
+
+ TStream<List<String>> ls = CommandStreams.periodicSource(t, cmd, 1, TimeUnit.SECONDS);
+ TStream<String> s = ls.flatMap(list -> list);
+
+ try {
+ completeAndValidate("", t, s, 10, expLines.toArray(new String[0]));
+ }
+ finally {
+ tempFile1.toFile().delete();
+ }
+ }
+
+ @Test
+ public void testGenerate() throws Exception {
+ Topology t = newTopology("testGenerate");
+
+ Path tempFile1 = FileUtil.createTempFile("test1", ".txt", getLines());
+ System.out.println("Test: "+t.getName()+" "+tempFile1);
+
+ ProcessBuilder cmd = new ProcessBuilder("cat", tempFile1.toString());
+
+ // N.B. if looking at trace: EDGENT-224 generate() continues running after job is closed
+ TStream<String> s = CommandStreams.generate(t, cmd);
+
+ try {
+ completeAndValidate("", t, s, 10, getLines());
+ }
+ finally {
+ tempFile1.toFile().delete();
+ }
+ }
+
+ @Test
+ public void testGenerateRestart() throws Exception {
+ Topology t = newTopology("testGenerateRestart");
+
+ Path tempFile1 = FileUtil.createTempFile("test1", ".txt", getLines());
+ System.out.println("Test: "+t.getName()+" "+tempFile1);
+
+ ProcessBuilder cmd = new ProcessBuilder("cat", tempFile1.toString());
+
+ int NUM_RUNS = 3;
+ List<String> expLines = new ArrayList<>();
+ for (int i = 0; i < NUM_RUNS; i++) {
+ expLines.addAll(Arrays.asList(getLines()));
+ }
+
+ // N.B. if looking at trace: EDGENT-224 generate() continues running after job is closed
+ TStream<String> s = CommandStreams.generate(t, cmd);
+
+ completeAndValidate("", t, s, 10 + ((NUM_RUNS-1) * 1/*restart delay time*/), expLines.toArray(new String[0]));
+ }
+
+ @Test
+ public void testSink() throws Exception {
+ Topology t = newTopology("testSink");
+
+ Path tempFile1 = FileUtil.createTempFile("test1", ".txt", new String[0]);
+ System.out.println("Test: "+t.getName()+" "+tempFile1);
+
+ ProcessBuilder cmd = new ProcessBuilder("cat")
+ .redirectOutput(Redirect.appendTo(tempFile1.toFile()));
+
+ TStream<String> s = t.strings(getLines());
+
+ TSink<String> sink = CommandStreams.sink(s, cmd);
+
+ assertNotNull(sink);
+
+ try {
+ // start the job, sleep for a bit (await the timeout) then validate sink output
+ Condition<?> never = t.getTester().tupleCount(s, Long.MAX_VALUE);
+ t.getTester().complete(getSubmitter(), new JsonObject(), never, 3, TimeUnit.SECONDS);
+
+ FileUtil.validateFile(tempFile1, getLines());
+ }
+ finally {
+ tempFile1.toFile().delete();
+ }
+ }
+
+ @Test
+ public void testSinkRestart() throws Exception {
+ Topology t = newTopology("testSinkRestart");
+
+ Path tempFile1 = FileUtil.createTempFile("test1", ".txt", new String[0]);
+ System.out.println("Test: "+t.getName()+" "+tempFile1);
+
+ int batchSize = getLines().length;
+
+ // tell cmd to terminate after each batch of lines
+ ProcessBuilder cmd = new ProcessBuilder("sh", getCmdPath("sinkcmd"), ""+batchSize)
+ .redirectOutput(Redirect.appendTo(tempFile1.toFile()))
+ .redirectError(Redirect.to(new File("/dev/stderr")));
+
+ int NUM_RUNS = 3;
+ List<String> expLines = new ArrayList<>();
+ for (int i = 0; i < NUM_RUNS; i++) {
+ expLines.addAll(Arrays.asList(getLines()));
+ }
+ AtomicInteger cnt = new AtomicInteger();
+
+ TStream<String> s = t.strings(expLines.toArray(new String[0]))
+ .filter(tup -> {
+ // need to slow things down so the sinker has time to notice
+ // the cmd has terminated. otherwise we'll get ahead,
+ // tuples will get dropped on the floor and validation will fail.
+ if (cnt.incrementAndGet() > batchSize) {
+ // System.out.println("SLEEPING on cnt "+ cnt.get() + " for "+tup);
+ delay(1_000);
+ cnt.set(1);
+ }
+ return true;
+ });
+
+ TSink<String> sink = CommandStreams.sink(s, cmd);
+
+ assertNotNull(sink);
+
+ try {
+ // start the job, sleep for a bit (await the timeout) then validate sink output
+ Condition<?> never = t.getTester().tupleCount(s, Long.MAX_VALUE);
+ t.getTester().complete(getSubmitter(), new JsonObject(), never,
+ 6 + ((NUM_RUNS-1) * 1/*restart delay*/), TimeUnit.SECONDS);
+
+ FileUtil.validateFile(tempFile1, expLines.toArray(new String[0]));
+ }
+ finally {
+ tempFile1.toFile().delete();
+ }
+ }
+
+ private String getCmdPath(String cmd) {
+ return TestRepoPath.getPath("connectors", "command", "src", "test", "scripts", cmd);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/common/src/main/java/edgent/connectors/runtime/Connector.java
----------------------------------------------------------------------
diff --git a/connectors/common/src/main/java/edgent/connectors/runtime/Connector.java b/connectors/common/src/main/java/edgent/connectors/runtime/Connector.java
deleted file mode 100644
index 6f0bdf9..0000000
--- a/connectors/common/src/main/java/edgent/connectors/runtime/Connector.java
+++ /dev/null
@@ -1,448 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package edgent.connectors.runtime;
-
-import java.io.Serializable;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-
-/**
- * An abstract class for general connector connection management.
- * <p>
- * The basic (default) model is to maintain a connection across
- * inadvertent disconnects.
- * <p>
- * A connected connector is returned
- * whenever {@link #client()} is called. If the connection is lost,
- * actions are taken to reestablish it. Overall tracking and
- * control of connection state is performed and
- * general logging is performed.
- * <p>
- * Sub-classes are able to further control when initial connection
- * occurs and generally expose ways to explicitly disconnect or connect.
- * <p>
- * Optionally, clients can request automatic disconnection after
- * a period of inactivity - see {@link #setIdleTimeout(long, TimeUnit)}.
- * Support for this feature requires connector implementations
- * to call {@link #notIdle()}.
- * <p>
- * TODO - dynamic control:
- * <ul>
- * <li>[abstract] ConnectorControl { getState(), connect(), disconnect(), ...}.</li>
- * <li>MqttConnectorControl extends ConnectorControl. Adds subscribe(), unsubscribe().</li>
- * <li>Add Consumer<MqttConnectorControl> to MqttConfig. Runtime calls it supplying a connector control object.
- * By doing a disconnect()/connect(), the Supplier<MqttConfig> will be called to as part of reconnect -- hence using updated values if any.</li>
- * </ul>
- * <p>
- * Sub-classes are responsible for implementing a small number
- * of operations: {@link #doConnect(Object)}, {@link #doDisconnect(Object)},
- * {@link #doClose(Object)}, and {@link #id()}. Sub-classes are also responsible for
- * calling {@link #connectionLost(Throwable)} and {@link #notIdle()}.
- *
- * @param <T> type of the underlying managed connector (e.g., MqttClient)
- */
-public abstract class Connector<T> implements AutoCloseable, Serializable {
-
- private static final long serialVersionUID = 1L;
- private static final long BASE_RETRY_WAIT_MSEC = 2*1000;
- private static final long MAX_RETRY_WAIT_MSEC = 60*1000;
- private final IdleManager idleManager;
- private State state = State.DISCONNECTED;
- private T client; // must be non-null when state==CONNECTED
- private Future<?> connectFuture;
-
- // TODO proper "operator-acquired" thread/scheduler w/thread factory, etc
- // hmm... today a single [Mqtt]Connector supports multiple publish operators
- // and a subscribe operator, hence the threads should come from... ?
- private final ExecutorService connectExecutor = Executors.newSingleThreadExecutor();
- private final ScheduledExecutorService schedExecutor = Executors.newScheduledThreadPool(0);
-
- public enum State {
- DISCONNECTED,
- CONNECTING,
- CONNECTED,
- DISCONNECTING,
- CLOSING,
- CLOSED
- }
-
- protected Connector() {
- idleManager = new IdleManager(this, schedExecutor);
- }
-
- public abstract Logger getLogger();
-
- /**
- * Must be called by the connector when an action is performed that
- * qualifies as a "not idle" condition.
- */
- public void notIdle() {
- idleManager.notIdle();
- }
-
- /**
- * Disconnect the connector after the specified period of inactivity.
- * @param idleTimeout idleTimeout(long)
- * @param unit TimeUnit
- */
- public void setIdleTimeout(long idleTimeout, TimeUnit unit) {
- idleManager.setIdleTimeoutMsec(unit.toMillis(idleTimeout));
- }
-
- public void setIdleReconnectInterval(int intervalSec) {
- idleManager.setIdleReconnectInterval(intervalSec);
- }
-
- /**
- * Get a connected client connector.
- * @return the client connector
- * @throws IllegalStateException if state is inappropriate to return connected client connector
- * @throws Exception if unable to connect
- */
- public T client() throws Exception {
- return connectInternal();
- }
-
- // Expose with client driven dynamic disconnect/reconnect
-// /**
-// * Connect the client connector if necessary.
-// * Blocks until connected or it's no longer valid to request a connect().
-// * @throws IllegalStateException if a connect() is (no longer) appropriate.
-// */
-// public void connect() throws Exception {
-// connectInternal();
-// }
-
- /**
- * Connect the client connector if necessary.
- * Blocks until connected or it's no longer valid to request a connect().
- * @return connected client connector
- * @throws IllegalStateException if a connect() is (no longer) appropriate.
- */
- private T connectInternal() throws Exception {
- // N.B. have to deal with multiple concurrent connect requests
- // e.g., multiple publishers and a single subscriber for a connector.
- Future<?> f = null;
- synchronized(this) {
- if (state == State.CONNECTED)
- return client;
- else if (state == State.CONNECTING)
- f = connectFuture;
- else if (state == State.DISCONNECTED) {
- startAsyncConnect();
- f = connectFuture;
- }
- else
- throw wrongState(state, "connectInternal()");
- }
- awaitDone(f); // throws if task not successful
- return connectedClient();
- }
-
- private IllegalStateException wrongState(State s, String op) {
- String msg = String.format("%s %s wrong state %s", id(), op, s);
- getLogger().error(msg);
- return new IllegalStateException(msg);
- }
-
- /**
- * Get the connected client; throw if not connected.
- * @return connected client
- * @throws IllegalStateException if not connected
- */
- private T connectedClient() {
- synchronized(this) {
- if (state != State.CONNECTED)
- throw wrongState(state, "connectedClient()");
- return client;
- }
- }
-
- /**
- * Schedule an async connect task.
- * <p>
- * Updates connectFuture if successfully initiated.
- * Updates state and client once successfully completed.
- * @throws IllegalStateException if inappropriate state for async connect request
- */
- private void startAsyncConnect() {
- synchronized(this) {
- if (state != State.DISCONNECTED)
- throw wrongState(state, "startAsyncConnect()");
- getLogger().trace("{} submitting async connect task", id());
- setStateUnsafe(State.CONNECTING);
- connectFuture = connectExecutor.submit(
- () -> { connectTask(); return null; });
- }
- }
-
- /**
- * Wait till task is done.
- * @throws Exception if task wasn't successful
- */
- private void awaitDone(Future<?> f) throws Exception {
- try {
- getLogger().trace("{} awaiting done", id());
- f.get();
- }
- catch (InterruptedException e) {
- // assume that if we're cancelled we want to cancel the future task
- getLogger().trace("{} awaitDone() interrupted, cancelling task", id());
- f.cancel(true);
- throw e;
- }
- catch (CancellationException e) {
- String msg = String.format("%s awaitDone() task was cancelled", id());
- getLogger().trace(msg);
- throw new IllegalStateException(msg);
- }
- catch (ExecutionException e) {
- String msg = String.format("%s awaitDone() task failed", id());
- getLogger().error(msg);
- throw new IllegalStateException(msg, e.getCause());
- }
- }
-
- /**
- * Do a blocking connect with retries.
- * <p>
- * Updates state and client when successful.
- * @throws IllegalStateException if state no longer appropriate for connect.
- * @throws Exception some other problem.
- */
- private void connectTask() throws Exception {
- int retryCnt = 0;
- while(true) {
- try {
- oneConnect(retryCnt++);
- return;
- }
- catch (IllegalStateException e) {
- throw e; // already logged
- }
- catch (Exception e) {
- // already logged
- long msec = getConnectRetryDelayMsec(retryCnt);
- getLogger().info("{} connectTask() waiting {}msec to retry", id(), msec);
- Thread.sleep(msec);
- }
- }
- }
-
- /**
- * Do a blocking single connect attempt.
- * <p>
- * Updates state and client and idle task when successful.
- * @throws IllegalStateException if state no longer appropriate for connect.
- * @throws Exception if connect failed.
- */
- private void oneConnect(int tryCnt) throws Exception {
- synchronized(this) {
- if (state != State.CONNECTING)
- throw wrongState(state, "oneConnect()");
- }
- getLogger().trace("{} doing one connect", id());
-
- T result;
- try {
- result = doConnect(client);
- }
- catch (Exception e) {
- getLogger().error("{} doConnect() failed", id(), e);
- throw e;
- }
-
- getLogger().trace("{} connected", id());
- synchronized(this) {
- if (state != State.CONNECTING) {
- // need to disconnect/close result?
- throw wrongState(state, "oneConnect()");
- }
- setStateUnsafe(State.CONNECTED);
- client = result;
- idleManager.connected();
- }
- }
-
- /**
- * Get the next connect retry delay.
- * @param retryCnt the retry attempt number
- * @return the delay interval in msec
- */
- private long getConnectRetryDelayMsec(int retryCnt) {
- int factor = retryCnt <= 1 ? 1 : 2 << Math.min(retryCnt - 2, 8);
- return Math.min(BASE_RETRY_WAIT_MSEC * factor, MAX_RETRY_WAIT_MSEC);
- }
-
- // Expose with client driven dynamic disconnect/reconnect
-// /**
-// * Disconnect the client connector.
-// * <p>
-// * Subsequently, a connect() may performed to reconnect.
-// * @throws IllegalStateException inappropriate state to request disconnect
-// */
-// public void disconnect() {
-// disconnect(false);
-// }
-
- /**
- * Disconnect the client connector.
- * <p>
- * Subsequently, a connect() may performed to reconnect.
- * @param wasIdle true if being disconnected due to an idle connection
- * @throws IllegalStateException inappropriate state to request disconnect
- */
- void disconnect(boolean wasIdle) {
- synchronized(this) {
- if (!(state == State.CONNECTED || state == State.CONNECTING))
- throw wrongState(state, "disconnect("+wasIdle+")");
- try {
- getLogger().trace("Connection {} disconnecting wasIdle:{}", id(), wasIdle);
- setStateUnsafe(State.DISCONNECTING);
- cancelConnectTaskUnsafe();
- doDisconnect(client);
- }
- catch (Exception e) {
- getLogger().error("{} disconnnect() failed", id(), e);
- }
- finally {
- setStateUnsafe(State.DISCONNECTED);
- idleManager.disconnected(wasIdle);
- }
- }
- }
-
- /**
- * Permanently close the client connector.
- * <p>
- * Expect any subsequent operations to fail.
- * @throws Exception shouldn't happen
- */
- @Override
- public void close() throws Exception {
- synchronized(this) {
- if (state == State.CLOSED) {
- getLogger().trace("{} close() state already {}", id(), state);
- return;
- }
- try {
- getLogger().info("Connection {} closing", id());
- setStateUnsafe(State.CLOSING);
- idleManager.close();
- cancelConnectTaskUnsafe();
- if (client != null)
- doClose(client);
- }
- catch (Exception e) {
- getLogger().error("{} close() failed", id(), e);
- }
- finally {
- setStateUnsafe(State.CLOSED);
- connectFuture = null;
- client = null;
- connectExecutor.shutdownNow();
- schedExecutor.shutdownNow();
- }
- }
- }
-
- /**
- * Cancel a pending connect task if any.
- */
- private void cancelConnectTaskUnsafe() {
- if (connectFuture != null && !connectFuture.isDone()) {
- getLogger().trace("{} cancelConnect()", id());
- connectFuture.cancel(true);
- }
- }
-
- /**
- * To be called by the connector when it detects a connection lost condition.
- * <p>
- * An asynchronous reconnect is initiated if appropriate.
- * @param t the cause. may be null.
- */
- protected void connectionLost(Throwable t) {
- synchronized(this) {
- getLogger().info("Connection {} connectionLost()", id(), t);
- if (state == State.CONNECTED) {
- setStateUnsafe(State.DISCONNECTED);
- // don't allow unwind
- try {
- startAsyncConnect();
- }
- catch (Exception e) {
- getLogger().error("{} startAsyncConnect() failed", id(), e);
- }
- }
- else {
- getLogger().trace("{} connectionLost() state already {}", id(), state);
- }
- }
- }
-
- private void setStateUnsafe(State state) {
- State prev = this.state;
- this.state = state;
- getLogger().info("{} state {} (was {})", id(), state, prev);
- }
-
- State getState() {
- synchronized(this) {
- return state;
- }
- }
-
- /**
- * A one-shot request to connect the client to its server.
- * @param client the connector's client object. may be null.
- * @return a connected client. not necessarily the same as {@code client}.
- * @throws Exception if unable to connect
- */
- protected abstract T doConnect(T client) throws Exception;
-
- /**
- * A one-shot request to disconnect the client.
- * @param client the connector's client object.
- * @throws Exception if unable to disconnect
- */
- protected abstract void doDisconnect(T client) throws Exception;
-
- /**
- * A one-shot request to permanently close the client.
- * @param client the connector's client object.
- * @throws Exception if unable to close
- */
- protected abstract void doClose(T client) throws Exception;
-
- /**
- * Get a connector id to use in log and exception msgs
- * @return the id
- */
- protected abstract String id();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/common/src/main/java/edgent/connectors/runtime/IdleManager.java
----------------------------------------------------------------------
diff --git a/connectors/common/src/main/java/edgent/connectors/runtime/IdleManager.java b/connectors/common/src/main/java/edgent/connectors/runtime/IdleManager.java
deleted file mode 100644
index bc99957..0000000
--- a/connectors/common/src/main/java/edgent/connectors/runtime/IdleManager.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package edgent.connectors.runtime;
-
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.slf4j.Logger;
-
-import edgent.connectors.runtime.Connector.State;
-
-/**
- * Manager to auto-disconnect a connector when idle and
- * subsequently auto-reconnect it.
- */
-public class IdleManager {
- private final Connector<?> connector;
- private final AtomicLong idleTimeoutMsec = new AtomicLong();
- private final ScheduledExecutorService schedExecutor;
- private long idleReconnectIntervalMsec;
- private final AtomicLong lastActionMsec = new AtomicLong();
- private Future<?> idleFuture;
- private Future<?> idleReconnectFuture;
-
- /**
- * Create a new idle manager for the connector.
- * <p>
- * By default idle (disconnect) timeouts and subsequent auto-reconnect
- * is disabled.
- * @param connector
- * @param schedExecitor
- */
- IdleManager(Connector<?> connector, ScheduledExecutorService schedExecitor) {
- this.connector = connector;
- this.schedExecutor = schedExecitor;
- }
-
- protected Logger getLogger() {
- return connector.getLogger();
- }
-
- /**
- * A "not idle" event has occurred.
- * <p>
- * If idle timeouts have been enabled, this must be called by the
- * connector when an event occur that qualifies as a "not idle" condition.
- */
- public void notIdle() {
- if (idleTimeoutMsec.get() > 0)
- lastActionMsec.set(System.currentTimeMillis());
- }
-
- /**
- * Disconnect the connector after the specified period of inactivity.
- * @param idleTimeoutMsec 0 to disable idle timeouts
- */
- public void setIdleTimeoutMsec(long idleTimeoutMsec) {
- getLogger().trace("{} setIdleTimeout({}msec)", id(), idleTimeoutMsec);
- this.idleTimeoutMsec.set(idleTimeoutMsec);
- }
-
- /**
- * Reconnect the connector after disconnect due to idleness.
- * @param intervalSec delay following disconnect until reconnect. 0 to disable.
- */
- public void setIdleReconnectInterval(int intervalSec) {
- getLogger().trace("{} setIdleReconnectInterval({}sec)", id(), intervalSec);
- idleReconnectIntervalMsec = intervalSec * 1000;
- }
-
- /**
- * To be called when the connector is being permanently closed.
- */
- public void close() {
- synchronized(this) {
- if (idleFuture != null)
- idleFuture.cancel(true);
- if (idleReconnectFuture != null)
- idleReconnectFuture.cancel(true);
- }
- }
-
- /**
- * To be called when the connector has become connected.
- */
- public void connected() {
- synchronized(this) {
- if (idleReconnectFuture != null)
- idleReconnectFuture.cancel(false);
- scheduleIdleTask(idleTimeoutMsec.get(), false);
- }
- }
-
- /**
- * To be called when the connector has become disconnected.
- * @param wasIdle true if the disconnect was due to an idle condition.
- */
- public void disconnected(boolean wasIdle) {
- synchronized(this) {
- if (idleFuture != null)
- idleFuture.cancel(false);
- if (wasIdle)
- scheduleIdleReconnectTask(idleReconnectIntervalMsec);
- }
- }
-
- private void scheduleIdleTask(long delayMsec, boolean isResched) {
- synchronized(this) {
- if (idleFuture != null)
- idleFuture.cancel(true);
- if (delayMsec > 0) {
- if (isResched)
- getLogger().trace("{} scheduleIdleTask({}msec)", id(), delayMsec);
- else
- getLogger().info("{} scheduleIdleTask({}msec)", id(), delayMsec);
- idleFuture = schedExecutor.schedule(
- () -> idleTimeoutTask(), delayMsec, TimeUnit.MILLISECONDS);
- }
- }
- }
-
- private void idleTimeoutTask() {
- boolean doDisconnect = false;
- try {
- synchronized(this) {
- long tmo = idleTimeoutMsec.get();
- if (tmo == 0)
- return;
- State s = connector.getState();
- if (s != State.CONNECTED) {
- getLogger().info("{} idleTimeoutTask() no longer connected ({})", id(), s);
- return;
- }
- long last = lastActionMsec.get();
- long now = System.currentTimeMillis();
- if (now > last + tmo) {
- getLogger().info("{} idleTimeoutTask() disconnecting", id());
- doDisconnect = true;
- }
- else {
- long adj = now - last;
- if (adj >= tmo)
- adj = 0;
- long delayMsec = tmo - adj;
- getLogger().trace("{} scheduleIdleTask({}msec)", id(), delayMsec);
- scheduleIdleTask(delayMsec, true);
- }
- }
- if (doDisconnect)
- connector.disconnect(true);
- }
- catch (RuntimeException e) {
- getLogger().trace("{} idleTimeoutTask() disconnect failed", id(), e);
- }
- }
-
- private void scheduleIdleReconnectTask(long delayMsec) {
- synchronized(this) {
- getLogger().info("{} scheduleIdleReconnectTask({}msec)", id(), delayMsec);
- if (idleReconnectFuture != null)
- idleReconnectFuture.cancel(true);
- if (delayMsec > 0) {
- idleReconnectFuture = schedExecutor.schedule(
- () -> idleReconnectTask(),
- delayMsec, TimeUnit.MILLISECONDS);
- }
- }
- }
-
- private void idleReconnectTask() {
- try {
- getLogger().info("{} idleReconnectTask() reconnecting", id());
- connector.client(); // induce reconnect
- }
- catch (Exception e) {
- getLogger().error("{} idleReconnectTask() failed", id(), e);
- }
- }
-
- private String id() {
- return connector.id();
- }
-}