You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/07/13 17:30:52 UTC

[3/4] incubator-quarks git commit: QUARKS-226 add Command connector

QUARKS-226 add Command connector


Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/85247c21
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/85247c21
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/85247c21

Branch: refs/heads/master
Commit: 85247c215bb06e88c67c096e9ddc54c1d2463d7c
Parents: d9b8e67
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Tue Jul 12 12:58:46 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Wed Jul 13 11:22:48 2016 -0400

----------------------------------------------------------------------
 build.xml                                       |   2 +
 connectors/.classpath                           |   2 +
 connectors/command/build.gradle                 |  21 ++
 connectors/command/build.xml                    |  50 +++
 .../connectors/command/CommandStreams.java      | 348 +++++++++++++++++++
 .../quarks/connectors/command/package-info.java |  24 ++
 .../command/runtime/CommandConnector.java       | 103 ++++++
 .../command/runtime/CommandReader.java          | 111 ++++++
 .../command/runtime/CommandWriter.java          |  91 +++++
 .../command/runtime/ProcessReader.java          |  98 ++++++
 .../command/runtime/ProcessWriter.java          |  66 ++++
 .../command/CommandStreamsGlobalTest.java       |  35 ++
 .../connectors/command/CommandStreamsTest.java  | 224 ++++++++++++
 connectors/command/src/test/scripts/sinkcmd     |  23 ++
 .../quarks/test/connectors/common/FileUtil.java |  88 +++++
 .../test/connectors/file/FileStreamsTest.java   |  33 +-
 .../file/FileStreamsTextFileWriterTest.java     |  21 +-
 17 files changed, 1294 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/85247c21/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index a258ea6..88fd386 100644
--- a/build.xml
+++ b/build.xml
@@ -117,6 +117,7 @@
 
         <ant dir="providers/direct" target="@{target}" useNativeBasedir="true"/>
         <ant dir="connectors/common" target="@{target}" useNativeBasedir="true"/>
+        <ant dir="connectors/command" target="@{target}" useNativeBasedir="true"/>
         <ant dir="connectors/csv" target="@{target}" useNativeBasedir="true"/>
         <ant dir="connectors/iot" target="@{target}" useNativeBasedir="true"/>
         <ant dir="connectors/serial" target="@{target}" useNativeBasedir="true"/>
@@ -229,6 +230,7 @@
       </classfiles>
     <sourcefiles encoding="UTF-8">
         <fileset dir="${basedir}/connectors/common/src/main/java" includes="**/*.java"/>
+        <fileset dir="${basedir}/connectors/command/src/main/java" includes="**/*.java"/>
         <fileset dir="${basedir}/connectors/csv/src/main/java" includes="**/*.java"/>
         <fileset dir="${basedir}/connectors/file/src/main/java" includes="**/*.java"/>
         <fileset dir="${basedir}/connectors/iot/src/main/java" includes="**/*.java"/>

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/85247c21/connectors/.classpath
----------------------------------------------------------------------
diff --git a/connectors/.classpath b/connectors/.classpath
index 9eeb7d7..2015cfa 100644
--- a/connectors/.classpath
+++ b/connectors/.classpath
@@ -2,6 +2,8 @@
 <classpath>
 	<classpathentry kind="src" path="common/src/main/java"/>
 	<classpathentry kind="src" path="common/src/test/java"/>
+	<classpathentry kind="src" path="command/src/main/java"/>
+	<classpathentry kind="src" path="command/src/test/java"/>
 	<classpathentry kind="src" path="csv/src/main/java"/>
 	<classpathentry kind="src" path="csv/src/test/java"/>
 	<classpathentry kind="src" path="file/src/main/java"/>

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/85247c21/connectors/command/build.gradle
----------------------------------------------------------------------
diff --git a/connectors/command/build.gradle b/connectors/command/build.gradle
new file mode 100644
index 0000000..94ae884
--- /dev/null
+++ b/connectors/command/build.gradle
@@ -0,0 +1,21 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+dependencies {
+  compile project(':api:topology')
+  compile project(':connectors:common')
+  testCompile project(':providers:direct')
+  testRuntime ext_classpath
+}
+
+addCompileTestDependencies ':api:topology', ':providers:direct', ':connectors:common'

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/85247c21/connectors/command/build.xml
----------------------------------------------------------------------
diff --git a/connectors/command/build.xml b/connectors/command/build.xml
new file mode 100644
index 0000000..146d82e
--- /dev/null
+++ b/connectors/command/build.xml
@@ -0,0 +1,50 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+<project name="quarks.connectors.command" default="all"
+    xmlns:jacoco="antlib:org.jacoco.ant"
+    >
+    <description>
+        Build Command connectors.
+    </description>
+
+  <property name="component.path" value="connectors/command"/>
+  <import file="../../common-build.xml"/>
+
+  <property name="ext" location="../../ext"/>
+
+  <path id="compile.classpath">
+    <pathelement location="${quarks.lib}/quarks.api.topology.jar" />
+    <pathelement location="${quarks.connectors}/common/lib/quarks.connectors.common.jar" />
+  </path>
+
+  <path id="test.compile.classpath">
+    <pathelement location="${jar}" />
+    <pathelement location="${quarks.lib}/quarks.providers.direct.jar"/>
+    <pathelement location="../../api/topology/test.classes"/>
+    <pathelement location="../../providers/direct/test.classes"/>
+    <pathelement location="../../connectors/common/test.classes"/>
+    <path refid="compile.classpath"/>
+  </path>
+
+  <path id="test.classpath">
+    <pathelement location="${test.classes}" />
+    <path refid="test.compile.classpath"/>
+    <path refid="quarks.ext.classpath" />
+    <path refid="test.common.classpath" />
+  </path>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/85247c21/connectors/command/src/main/java/quarks/connectors/command/CommandStreams.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/main/java/quarks/connectors/command/CommandStreams.java b/connectors/command/src/main/java/quarks/connectors/command/CommandStreams.java
new file mode 100644
index 0000000..ad49f5f
--- /dev/null
+++ b/connectors/command/src/main/java/quarks/connectors/command/CommandStreams.java
@@ -0,0 +1,348 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.connectors.command;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.StringTokenizer;
+import java.util.concurrent.TimeUnit;
+
+import quarks.connectors.command.runtime.CommandReader;
+import quarks.connectors.command.runtime.CommandWriter;
+import quarks.function.Consumer;
+import quarks.function.Supplier;
+import quarks.topology.TSink;
+import quarks.topology.TStream;
+import quarks.topology.Topology;
+
+/**
+ * Connectors for creating a TStream from a Command's / OS Process's output
+ * and sinking a TStream to a Command's / OS Process's input.
+ * <P>
+ * e.g., run a network monitor command (like Tiger Shark) and ingest its output.
+ */
+public class CommandStreams {
+
+  /**
+   * Tokenize the specified {@code cmdString} in the exact same manner as
+   * done by {@link Runtime#exec(String)}.
+   * <P>
+   * This function provides a convenience for creating a {@link ProcessBuilder}
+   * for use by the other CommandStreams methods. 
+   * </P>
+   * <P>
+   * Sample use:
+   * <pre>{@code
+   * ProcessBuilder cmd = new ProcessBuilder(tokenize("sh someShellCmd.sh and args"));
+   * TStream<String> stream = CommandStreams.generate(topology, cmd);
+   * }</pre>
+   * 
+   * @param cmdString a command string
+   * @return the tokens
+   */
+  public static List<String> tokenize(String cmdString) {
+    List<String> command = new ArrayList<>();
+    StringTokenizer tok = new StringTokenizer(cmdString);
+    while (tok.hasMoreTokens()) 
+      command.add(tok.nextToken());
+    return command;
+  }
+  
+  /**
+   * Create an endless {@code TStream<String>} from a long running command's output.
+   * <P>
+   * The supplied {@code cmd} is used to start the command.
+   * A tuple is created for each UTF8 line read from the command's
+   * {@link Process#getInputStream() output}.
+   * The tuples contain output from stderr if cmd is configured to 
+   * {@link ProcessBuilder#redirectErrorStream(boolean) redirect stderr to stdout}.
+   * The command is restarted if a read from the command's output stream
+   * returns EOF or an error. 
+   * </P>
+   * <P>
+   * This is a convenience function equivalent to
+   * {@code topology.generate(endlessCommandReader(cmd))}.
+   * </P>
+   * <P>
+   * Sample use: create a stream of tuples for the output from a 
+   * continuously running and restartable command:
+   * <pre>{@code
+   * ProcessBuilder cmd = new ProcessBuilder("myCommand");
+   * TStream<String> cmdOutput = CommandStreams.generate(topology, cmd);
+   * cmdOutput.print();
+   * }</pre>
+   * 
+   * @param topology the topology to add the source stream to
+   * @param cmd the {@link ProcessBuilder} to start the command
+   * @return the source {@code TStream<String>}
+   * 
+   * @see #endlessCommandReader(ProcessBuilder)
+   * @see #tokenize(String)
+   */
+  public static TStream<String> generate(Topology topology, ProcessBuilder cmd) {
+    return topology.generate(endlessCommandReader(cmd));
+  }
+  
+  /**
+   * Create a {@code TStream<String>} from a periodically run command's output.
+   * <P>
+   * The supplied {@code cmd} is used to start the command
+   * at the specified {@code period}.
+   * The command's UTF8 {@link Process#getInputStream() output} is read until EOF
+   * and a {@code List<String>} tuple is created containing the collected output.
+   * The tuples contain output from stderr if the cmd is configured to 
+   * {@link ProcessBuilder#redirectErrorStream(boolean) redirect stderr to stdout}.  
+   * </P>
+   * <P>
+   * This is a convenience function equivalent to
+   * {@code topology.poll(commandReaderList(cmd), period, units)}.
+   * </P>
+   * <P>
+   * Sample use: create a stream of tuples containing the output 
+   * from a periodically run command:
+   * <pre>{@code
+   * ProcessBuilder cmd = new ProcessBuilder("date");
+   * TStream<List<String>> cmdOutput = 
+   *      CommandStreams.periodicSource(topology, cmd, 2, TimeUnit.SECONDS);
+   * cmdOutput.print();
+   * }</pre>
+   * 
+   * @param topology the topology to add the source stream to
+   * @param cmd the {@link ProcessBuilder} to start the command
+   * @param period the period to run the command and collect its output
+   * @param units TimeUnit for {@code period}
+   * @return the source {@code TStream<List<String>>}
+   * 
+   * @see #commandReaderList(ProcessBuilder)
+   * @see #tokenize(String)
+   */
+  public static TStream<List<String>> periodicSource(Topology topology,
+      ProcessBuilder cmd, long period, TimeUnit units) {
+    return topology.poll(commandReaderList(cmd), period, units);
+  }
+  
+  /**
+   * Sink a {@code TStream<String>} to a command's input.
+   * <P>
+   * The supplied {@code cmd} is used to start the command.
+   * Each tuple is written as UTF8 and flushed to the command's {@link Process#getOutputStream() input}.
+   * The command is restarted if a write encounters an error. 
+   * </P>
+   * <P>
+   * While each write is followed by a flush() that only helps to
+   * reduce the time it takes to notice that cmd has failed and restart it.
+   * Supposedly "successfully written and flushed" values are not guaranteed to
+   * have been received by a cmd across restarts.
+   * </P>
+   * <P>
+   * This is a convenience function equivalent to
+   * {@code stream.sink(commandWriter(cmd))}
+   * </P>
+   * <P>
+   * Sample use: write a stream of tuples to the input of a command:
+   * <pre>{@code
+   * TStream<String> stream = topology.strings("one", "two", "three");
+   * ProcessBuilder cmd = new ProcessBuilder("cat").redirectOutput(new File("/dev/stdout"));
+   * CommandStreams.sink(stream, cmd);
+   * }</pre>
+   * 
+   * @param stream the stream to sink
+   * @param cmd the {@link ProcessBuilder} to start the command
+   * @return a {@link TSink}
+   * 
+   * @see #commandWriter(ProcessBuilder)
+   * @see #tokenize(String)
+   */
+  public static TSink<String> sink(TStream<String> stream, ProcessBuilder cmd) {
+    return stream.sink(commandWriter(cmd));
+  }
+    
+  /**
+   * Create an endless {@code Supplier<String>} for ingesting a long running command's output.
+   * <P>
+   * This method is particularly helpful in creating a sensor or source connector
+   * class that hides the fact that it uses a command, enabling it to be used
+   * like any other sensor/connector.
+   * </P>
+   * For example:
+   * <pre><code>
+   * // ingest the sensor data
+   * TStream&lt;MySensorData&gt; stream = topology.generate(new MySensor());
+   *
+   * // MySensor class
+   * class MySensor implements Supplier&lt;MySensorData&gt; {
+   *   private String[] cmd = new String[] {"mySensorCmd", "arg1"};
+   *   private Supplier&lt;String&gt; commandReader = 
+   *     CommandStreams.endlessCommandReader(new ProcessBuilder(cmd));
+   *       
+   *   // implement Supplier&lt;MySensorData&gt;.get()
+   *   public MySensorData get() {
+   *     // get the next line from the cmd and create a MySensorData tuple from it
+   *     return createMySensorData(commandReader.get());
+   *   }
+   * }
+   * </code></pre>
+   * <P>
+   * The supplied {@code cmd} is used to start the command.
+   * A call to {@link Supplier#get()} reads the next UTF8 line from the command's
+   * {@link Process#getInputStream() output}.
+   * The returned strings contain output from stderr if the cmd is configured to 
+   * {@link ProcessBuilder#redirectErrorStream(boolean) redirect stderr to stdput}.  
+   * The command is restarted if a read from the command's output stream
+   * returns EOF or an error.
+   * </P>
+   * 
+   * @param cmd the {@link ProcessBuilder} to start the command
+   * @return the {@code Supplier<String>}
+   * 
+   * @see #generate(Topology, ProcessBuilder)
+   * @see #tokenize(String)
+   */
+  public static Supplier<String> endlessCommandReader(ProcessBuilder cmd) {
+    return new Supplier<String>() {
+      private static final long serialVersionUID = 1L;
+      Supplier<Iterable<String>> reader = new CommandReader(cmd, true);
+      Iterator<String> iter = null;
+      @Override
+      public String get() {
+        if (iter == null) {
+          iter = reader.get().iterator();
+        }
+        if (iter.hasNext()) {
+          return iter.next();
+        }
+        else {
+          // presumably a shutdown condition
+          return null;
+        }
+      }
+    };
+  }
+  
+  /**
+   * Create a {@code Supplier<List<String>>} to ingest a command's output.
+   * <P>
+   * This method is particularly helpful in creating a sensor or source connector
+   * class that hides the fact that it uses a command, enabling it to be used
+   * like any other sensor/connector.
+   * </P>
+   * For example:
+   * <pre><code>
+   * // ingest the sensor data
+   * TStream&lt;MySensorData&gt; stream = topology.periodicSource(new MySensor());
+   *
+   * // MySensor class
+   * class MySensor implements Supplier&lt;MySensorData&gt; {
+   *   private String[] cmd = new String[] {"mySensorCmd", "arg1"};
+   *   private Supplier&lt;List&lt;String&gt;&gt; commandReader = 
+   *     CommandStreams.commandReaderList(new ProcessBuilder(cmd));
+   *       
+   *   // implement Supplier&lt;MySensorData&gt;.get()
+   *   public MySensorData get() {
+   *     // get the cmd output and create a MySensorData tuple from it
+   *     return createMySensorData(commandReader.get());
+   *   }
+   * }
+   * </code></pre>
+   * <P>
+   * The supplied {@code cmd} is used to start the command.
+   * A call to {@link Supplier#get()} reads the command's UTF8
+   * {@link Process#getInputStream() input stream} until an EOF or error
+   * and returns a {@code List<String>} of the collected input.
+   * The tuples contain output from stderr if the cmd is configured to 
+   * {@link ProcessBuilder#redirectErrorStream(boolean) redirect stderr to stdout}.
+   * </P>
+   * 
+   * @param cmd the {@link ProcessBuilder} to start the command
+   * @return the {@code Supplier<List<String>>} for the command
+   * 
+   * @see #periodicSource(Topology, ProcessBuilder, long, TimeUnit)
+   * @see #tokenize(String)
+   */
+  public static Supplier<List<String>> commandReaderList(ProcessBuilder cmd) {
+    return new Supplier<List<String>>() {
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public List<String> get() {
+        try (CommandReader supplier
+                = new CommandReader(cmd, false))
+        {
+          Iterator<String> iter = supplier.get().iterator();
+          List<String> list = new ArrayList<>();
+          while (iter.hasNext())
+            list.add(iter.next());
+          return list;
+        }
+      }
+    };
+  }
+ 
+  /**
+   * Create a {@code Consumer<String>} to write UTF8 string data to a command's input.
+   * <P>
+   * This method is particularly helpful in creating a sink connector
+   * that hides the fact that it uses a command, enabling it to be used
+   * like a native connector.
+   * </P>
+   * For example:
+   * <pre><code>
+   * // sink a stream to my connector
+   * TStream&lt;MySensorData&gt; stream = ...;
+   * stream.sink(new MySinkConnector());
+   *
+   * // MySinkConnector class
+   * class MySinkConnector implements Consumer&lt;MySensorData&gt; {
+   *   private String[] cmd = new String[] {"mySinkCmd", "arg1"};
+   *   private Consumer&lt;String&gt; commandWriter = 
+   *     CommandStreams.commandWriter(new ProcessBuilder(cmd));
+   *       
+   *   // implement Consumer&lt;MySensorData&gt;.accept()
+   *   public void accept(MySensorData data) {
+   *     // convert the data to a string and write it to the cmd
+   *     commandWriter.accept(convertMySensorData(data));
+   *   }
+   * }
+   * </code></pre>
+   * <P>
+   * The supplied {@link ProcessBuilder cmd} is used to start the command.
+   * Each call to {@link Consumer#accept(Object) accept(String)} writes a 
+   * UTF8 string to the command's {@link Process#getOutputStream() input}.
+   * Each write is followed by a flush.
+   * The command is restarted if a write encounters an error. 
+   * </P>
+   * <P>
+   * While each write is followed by a flush() that only helps to
+   * reduce the time it takes to notice that cmd has failed and restart it.
+   * Supposedly "successfully written and flushed" values are not guaranteed to
+   * have been received by a cmd across restarts.
+   * </P>
+   * 
+   * @param cmd the {@link ProcessBuilder} to start the command
+   * @return the {@code Consumer<String>} for the command
+   * 
+   * @see #sink(TStream, ProcessBuilder)
+   * @see #tokenize(String)
+   */
+  public static Consumer<String> commandWriter(ProcessBuilder cmd) {
+    return new CommandWriter(cmd, true);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/85247c21/connectors/command/src/main/java/quarks/connectors/command/package-info.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/main/java/quarks/connectors/command/package-info.java b/connectors/command/src/main/java/quarks/connectors/command/package-info.java
new file mode 100644
index 0000000..981293f
--- /dev/null
+++ b/connectors/command/src/main/java/quarks/connectors/command/package-info.java
@@ -0,0 +1,24 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+/**
+ * Connectors to a Command / OS Process.
+ */
+package quarks.connectors.command;
+

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/85247c21/connectors/command/src/main/java/quarks/connectors/command/runtime/CommandConnector.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/main/java/quarks/connectors/command/runtime/CommandConnector.java b/connectors/command/src/main/java/quarks/connectors/command/runtime/CommandConnector.java
new file mode 100644
index 0000000..7076638
--- /dev/null
+++ b/connectors/command/src/main/java/quarks/connectors/command/runtime/CommandConnector.java
@@ -0,0 +1,103 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.connectors.command.runtime;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base class for source / sink specific command connectors.
+ * <P>
+ * The lifetime of a CommandConnector is that of its Command's execution lifetime.
+ * In the case of a "one shot" command (e.g., a periodicSource's cmd) the
+ * lifetime may be brief - {@code restart==false}.
+ * </P>
+ * <P>
+ * Many command connector uses will involve long running commands, sources
+ * and sinks that want to be robust in the face of inadvertent command
+ * termination/failures - (@code restart==true}.
+ * </P>
+ */
+abstract class CommandConnector implements AutoCloseable {
+  static final Logger logger = LoggerFactory.getLogger(CommandConnector.class);
+
+  private final ProcessBuilder cmd;
+  private final boolean restart;
+  private Process currentProcess;
+  private long numStarts;
+  private long lastStartTimestamp;
+  private final int restartDelayMsec = 1_000;
+  
+  
+  CommandConnector(ProcessBuilder cmd, boolean restart) {
+    this.cmd = cmd;
+    this.restart = restart;
+  }
+  
+  protected boolean canStart() {
+    return restart || numStarts==0;
+  }
+  
+  protected Process getCurrentProcess() {
+    return currentProcess;
+  }
+  
+  protected void start() throws InterruptedException {
+    if (!canStart())
+      throw new IllegalStateException();
+    closeProcess();
+    try {
+      numStarts++;
+      // ensure we don't thrash on continuous restarts
+      long now = System.currentTimeMillis();
+      if (now < lastStartTimestamp + restartDelayMsec) {
+        logger.info("Sleeping before restarting cmd {}", toCmdForMsg());
+        Thread.sleep(restartDelayMsec);
+        now = System.currentTimeMillis();
+      }
+      lastStartTimestamp = now;
+      
+      currentProcess = cmd.start();
+      
+      logger.debug("Started cmd {}", toCmdForMsg());
+    }
+    catch (IOException e) {
+      logger.error("Unable to start cmd {}", toCmdForMsg(), e);
+    }
+  }
+  
+  protected void closeProcess() {
+    if (currentProcess != null) {
+      currentProcess.destroy();
+      currentProcess = null;
+    }
+  }
+
+  @Override
+  public void close() {
+    closeProcess();
+  }
+  
+  String toCmdForMsg() {
+    return cmd.command().toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/85247c21/connectors/command/src/main/java/quarks/connectors/command/runtime/CommandReader.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/main/java/quarks/connectors/command/runtime/CommandReader.java b/connectors/command/src/main/java/quarks/connectors/command/runtime/CommandReader.java
new file mode 100644
index 0000000..0340918
--- /dev/null
+++ b/connectors/command/src/main/java/quarks/connectors/command/runtime/CommandReader.java
@@ -0,0 +1,111 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.connectors.command.runtime;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import quarks.function.Supplier;
+
+/**
+ * Create a {@code Supplier<Iterable<String>>} to ingest a command's output.
+ * <P>
+ * The supplied {@code cmd} is used to start the command
+ * and restart it upon process termination/error if so configured.
+ * </P>
+ * <P>
+ * The iterator returned by {@link Iterable#iterator()) returns
+ * {@hasNext()==true} until a read from {@link Process#getOutputStream()}
+ * returns EOF or an IOError.
+ */
+public class CommandReader extends CommandConnector implements Supplier<Iterable<String>>, AutoCloseable {
+  private static final long serialVersionUID = 1L;
+  private Iterator<String> currentSupplierIterator;
+  
+  /**
+   * Create a supplier of UTF8 strings from a command's output.
+   * 
+   * @param cmd the {@link ProcessBuilder} to use to start the command
+   * @param restart when true, restart the command upon termination, EOF, or
+   * read error.
+   */
+  public CommandReader(ProcessBuilder cmd, boolean restart) {
+    super(cmd, restart);
+  }
+
+  protected void start() throws InterruptedException {
+    super.start();
+    currentSupplierIterator = new ProcessReader(getCurrentProcess()).get().iterator();
+  }
+  
+  protected void closeProcess() {
+    currentSupplierIterator = null;
+    super.closeProcess();
+  }
+  
+  @Override
+  public Iterable<String> get() {
+    return new Iterable<String>() {
+
+      @Override
+      public Iterator<String> iterator() {
+        return new Iterator<String>() {
+
+          @Override
+          public boolean hasNext() {
+            try {
+              for(;;) {
+                if (currentSupplierIterator != null) {
+                  boolean hasNext = currentSupplierIterator.hasNext();
+                  if (hasNext) {
+                    return true;
+                  }
+                  else {
+                    // no more from that process.  close and loop/retry.
+                    closeProcess();
+                  }
+                }
+                else if (currentSupplierIterator == null && canStart()) {
+                  start(); // and loop/retry
+                }
+                else {
+                  return false; // no more input
+                }
+              }
+            }
+            catch (InterruptedException e) {
+              return false;
+            }
+          }
+
+          @Override
+          public String next() {
+            if (currentSupplierIterator != null)
+              return currentSupplierIterator.next();
+            else
+              throw new NoSuchElementException();
+          }
+          
+        };
+      }
+      
+    };
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/85247c21/connectors/command/src/main/java/quarks/connectors/command/runtime/CommandWriter.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/main/java/quarks/connectors/command/runtime/CommandWriter.java b/connectors/command/src/main/java/quarks/connectors/command/runtime/CommandWriter.java
new file mode 100644
index 0000000..813fe93
--- /dev/null
+++ b/connectors/command/src/main/java/quarks/connectors/command/runtime/CommandWriter.java
@@ -0,0 +1,91 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.connectors.command.runtime;
+
+import quarks.function.Consumer;
+
+/**
+ * A {@code Consumer<String>>} to write data to a command's input.
+ * <P>
+ * The supplied {@code cmd} is used to start the command
+ * and restart it upon process termination/error if so configured.
+ * </P>
+ */
+public class CommandWriter extends CommandConnector implements Consumer<String>, AutoCloseable {
+  private static final long serialVersionUID = 1L;
+  private ProcessWriter currentConsumer;
+  
+  /**
+   * Create a consumer to write UTF8 string data to a command's input.
+   * <P>
+   * Each write is followed by a flush() though that only helps to
+   * reduce the time it takes to notice that a cmd has failed.
+   * Supposedly "successfully written and flushed" values are not guaranteed to
+   * have been received by a cmd even following restart.
+   * </P>
+   * 
+   * @param cmd the builder to use to start the process
+   * @param restart true to restart the process upon termination or
+   * write error.
+   */
+  public CommandWriter(ProcessBuilder cmd, boolean restart) {
+    super(cmd, restart);
+  }
+
+  protected void start() throws InterruptedException {
+    super.start();
+    currentConsumer = new ProcessWriter(getCurrentProcess());
+  }
+  
+  protected void closeProcess() {
+    currentConsumer = null;
+    super.closeProcess();
+  }
+
+  @Override
+  public void accept(String value) {
+    for (;;) {
+      try {
+        if (currentConsumer != null) {
+          try {
+            currentConsumer.accept(value);
+            logger.trace("WROTE: {}", value);
+            return;
+          } 
+          catch (RuntimeException e) {
+            closeProcess(); // and loop/retry
+          }
+        }
+        else if (currentConsumer == null && canStart()) {
+          logger.debug("STARTING for: {}", value);
+          start();  // and loop/retry
+        }
+        else {
+          // not restartable. toss it on the floor
+          return;
+        }
+      }
+      catch (InterruptedException e) {
+        // toss it on the floor
+        return;
+      }
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/85247c21/connectors/command/src/main/java/quarks/connectors/command/runtime/ProcessReader.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/main/java/quarks/connectors/command/runtime/ProcessReader.java b/connectors/command/src/main/java/quarks/connectors/command/runtime/ProcessReader.java
new file mode 100644
index 0000000..8b2525a
--- /dev/null
+++ b/connectors/command/src/main/java/quarks/connectors/command/runtime/ProcessReader.java
@@ -0,0 +1,98 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.connectors.command.runtime;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import quarks.function.Supplier;
+
+/**
+ * A {@code Supplier<Iterable<String>>} for ingesting a process's output.
+ * <P>
+ * The iterator returned by {@link Iterable#iterator()) returns
+ * {@hasNext()==true} until a read from {@link Process#getInputStream()}
+ * returns EOF or an IOError.
+ */
+class ProcessReader implements Supplier<Iterable<String>> {
+  private static final long serialVersionUID = 1L;
+  private final BufferedReader reader;
+  
+  /**
+   * Create a new supplier of UTF8 strings read from a process's
+   * {@link Process#getInputStream() output}.
+   * 
+   * @param process the process to read from.
+   */
+  ProcessReader(Process process) {
+    reader = new BufferedReader(new InputStreamReader(
+        process.getInputStream(), StandardCharsets.UTF_8));
+  }
+
+  @Override
+  public Iterable<String> get() {
+    return new Iterable<String>() {
+
+      @Override
+      public Iterator<String> iterator() {
+        return new Iterator<String>() {
+          private Boolean hasNext = null;
+          private String next = null;
+          
+          @Override
+          public boolean hasNext() {
+            if (hasNext != null)
+              return hasNext;
+            next = getNext();
+            hasNext = next != null;
+            return hasNext;
+          }
+
+          @Override
+          public String next() {
+            if (next == null)
+              throw new NoSuchElementException();
+            hasNext = null;
+            return next;
+          }
+          
+        };
+      }
+      
+    };
+  }
+
+  /**
+   * Get the next available line from the process's stdout
+   * @return null if no more input (or error)
+   */
+  private String getNext() {
+    try {
+      return reader.readLine();
+    } catch (IOException e) {
+      CommandConnector.logger.error("Unable to readline from cmd", e);
+      return null;
+    }
+  }
+  
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/85247c21/connectors/command/src/main/java/quarks/connectors/command/runtime/ProcessWriter.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/main/java/quarks/connectors/command/runtime/ProcessWriter.java b/connectors/command/src/main/java/quarks/connectors/command/runtime/ProcessWriter.java
new file mode 100644
index 0000000..7fcb003
--- /dev/null
+++ b/connectors/command/src/main/java/quarks/connectors/command/runtime/ProcessWriter.java
@@ -0,0 +1,66 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.connectors.command.runtime;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+
+import quarks.function.Consumer;
+
+/**
+ * A {@code Consumer<String>>} to receive data and write it to a process's input.
+ * <P>
+ * Each write is followed by a flush() though that only helps to
+ * reduce the time it takes to notice that a cmd has failed.
+ * Supposedly "successfully written and flushed" values are not guaranteed to
+ * have been received by a cmd.
+ */
+class ProcessWriter implements Consumer<String> {
+  private static final long serialVersionUID = 1L;
+  private final BufferedWriter writer;
+  
+  /**
+   * Create a new consumer for UTF8 strings to write to a process's
+   * {@link Process#getOutputStream() input}
+   * 
+   * @param process to process to write to.
+   */
+  ProcessWriter(Process process) {
+    writer = new BufferedWriter(new OutputStreamWriter(
+        process.getOutputStream(), StandardCharsets.UTF_8));
+  }
+
+  @Override
+  public void accept(String value) {
+    try {
+      // see class doc regarding guarantees.
+      writer.write(value);
+      writer.newLine();
+      writer.flush();
+    }
+    catch (IOException e) {
+      CommandConnector.logger.error("Unable to write to cmd", e);
+      // caller (CommandWriter) requires throw to detect failure and recover
+      throw new RuntimeException("Unable to write to cmd", e);
+    }
+  }
+  
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/85247c21/connectors/command/src/test/java/quarks/test/connectors/command/CommandStreamsGlobalTest.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/test/java/quarks/test/connectors/command/CommandStreamsGlobalTest.java b/connectors/command/src/test/java/quarks/test/connectors/command/CommandStreamsGlobalTest.java
new file mode 100644
index 0000000..eaaf83f
--- /dev/null
+++ b/connectors/command/src/test/java/quarks/test/connectors/command/CommandStreamsGlobalTest.java
@@ -0,0 +1,35 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.test.connectors.command;
+
+/**
+ * CommandStreams connector globalization tests.
+ */
+public class CommandStreamsGlobalTest extends CommandStreamsTest {
+
+    private static final String[] globalLines = new String[] {
+            "\u5b78\u800c\u6642\u7fd2\u4e4b",
+            "\u4e0d\u4ea6\u8aaa\u4e4e"
+    };
+
+    public String[] getLines() {
+        return globalLines;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/85247c21/connectors/command/src/test/java/quarks/test/connectors/command/CommandStreamsTest.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/test/java/quarks/test/connectors/command/CommandStreamsTest.java b/connectors/command/src/test/java/quarks/test/connectors/command/CommandStreamsTest.java
new file mode 100644
index 0000000..3e9cb7c
--- /dev/null
+++ b/connectors/command/src/test/java/quarks/test/connectors/command/CommandStreamsTest.java
@@ -0,0 +1,224 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.test.connectors.command;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.File;
+import java.lang.ProcessBuilder.Redirect;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
+
+import com.google.gson.JsonObject;
+
+import quarks.connectors.command.CommandStreams;
+import quarks.test.connectors.common.FileUtil;
+import quarks.test.connectors.common.TestRepoPath;
+import quarks.test.providers.direct.DirectTestSetup;
+import quarks.test.topology.TopologyAbstractTest;
+import quarks.topology.TSink;
+import quarks.topology.TStream;
+import quarks.topology.Topology;
+import quarks.topology.tester.Condition;
+
+public class CommandStreamsTest extends TopologyAbstractTest implements DirectTestSetup {
+    
+    private String[] stdLines = new String[] {
+            "Line 1",
+            "Line 2",
+            "Line 3",
+    };
+
+    public String[] getLines() {
+        return stdLines;
+    }
+    
+    private static void delay(long millis) {
+      try {
+        Thread.sleep(millis);
+      }
+      catch (InterruptedException e) { }
+    }
+    
+    @Test
+    public void testTokenize() {
+      String cmdString = "myCmd arg1    arg2\targ3";
+      String[] expected = new String[]{"myCmd", "arg1", "arg2", "arg3"};
+      
+      assertArrayEquals(expected, CommandStreams.tokenize(cmdString).toArray(new String[0]));
+    }
+    
+    @Test
+    public void testPeriodicSource() throws Exception {
+      Topology t = newTopology("testPeriodicSource");
+      
+      Path tempFile1 = FileUtil.createTempFile("test1", ".txt", getLines());
+      System.out.println("Test: "+t.getName()+" "+tempFile1);
+      
+      ProcessBuilder cmd = new ProcessBuilder("cat", tempFile1.toString());
+      
+      int NUM_POLLS = 3;
+      List<String> expLines = new ArrayList<>();
+      for (int i = 0; i < NUM_POLLS; i++) {
+        expLines.addAll(Arrays.asList(getLines()));
+      }
+      
+      TStream<List<String>> ls = CommandStreams.periodicSource(t, cmd, 1, TimeUnit.SECONDS);
+      TStream<String> s = ls.flatMap(list -> list); 
+      
+      try {
+        completeAndValidate("", t, s, 10, expLines.toArray(new String[0]));
+      }
+      finally {
+          tempFile1.toFile().delete();
+      }
+    }
+    
+    @Test
+    public void testGenerate() throws Exception {
+      Topology t = newTopology("testGenerate");
+
+      Path tempFile1 = FileUtil.createTempFile("test1", ".txt", getLines());
+      System.out.println("Test: "+t.getName()+" "+tempFile1);
+      
+      ProcessBuilder cmd = new ProcessBuilder("cat", tempFile1.toString());
+      
+      // N.B. if looking at trace: QUARKS-224 generate() continues running after job is closed
+      TStream<String> s = CommandStreams.generate(t, cmd);
+      
+      try {
+        completeAndValidate("", t, s, 10, getLines());
+      }
+      finally {
+          tempFile1.toFile().delete();
+      }
+    }
+    
+    @Test
+    public void testGenerateRestart() throws Exception {
+      Topology t = newTopology("testGenerateRestart");
+
+      Path tempFile1 = FileUtil.createTempFile("test1", ".txt", getLines());
+      System.out.println("Test: "+t.getName()+" "+tempFile1);
+      
+      ProcessBuilder cmd = new ProcessBuilder("cat", tempFile1.toString());
+
+      int NUM_RUNS = 3;
+      List<String> expLines = new ArrayList<>();
+      for (int i = 0; i < NUM_RUNS; i++) {
+        expLines.addAll(Arrays.asList(getLines()));
+      }
+      
+      // N.B. if looking at trace: QUARKS-224 generate() continues running after job is closed
+      TStream<String> s = CommandStreams.generate(t, cmd);
+      
+      completeAndValidate("", t, s, 10 + ((NUM_RUNS-1) * 1/*restart delay time*/), expLines.toArray(new String[0]));
+    }
+    
+    @Test
+    public void testSink() throws Exception {
+      Topology t = newTopology("testSink");
+
+      Path tempFile1 = FileUtil.createTempFile("test1", ".txt", new String[0]);
+      System.out.println("Test: "+t.getName()+" "+tempFile1);
+      
+      ProcessBuilder cmd = new ProcessBuilder("cat")
+          .redirectOutput(Redirect.appendTo(tempFile1.toFile()));
+      
+      TStream<String> s = t.strings(getLines());
+      
+      TSink<String> sink = CommandStreams.sink(s, cmd);
+      
+      assertNotNull(sink);
+      
+      try {
+        // start the job, sleep for a bit (await the timeout) then validate sink output
+        Condition<?> never = t.getTester().tupleCount(s, Long.MAX_VALUE);
+        t.getTester().complete(getSubmitter(), new JsonObject(), never, 3, TimeUnit.SECONDS);
+
+        FileUtil.validateFile(tempFile1, getLines());
+      }
+      finally {
+          tempFile1.toFile().delete();
+      }
+    }
+
+    @Test
+    public void testSinkRestart() throws Exception {
+      Topology t = newTopology("testSinkRestart");
+
+      Path tempFile1 = FileUtil.createTempFile("test1", ".txt", new String[0]);
+      System.out.println("Test: "+t.getName()+" "+tempFile1);
+
+      int batchSize = getLines().length;
+      
+      // tell cmd to terminate after each batch of lines
+      ProcessBuilder cmd = new ProcessBuilder("sh", getCmdPath("sinkcmd"), ""+batchSize)
+          .redirectOutput(Redirect.appendTo(tempFile1.toFile()))
+          .redirectError(Redirect.to(new File("/dev/stderr")));
+
+      int NUM_RUNS = 3;
+      List<String> expLines = new ArrayList<>();
+      for (int i = 0; i < NUM_RUNS; i++) {
+        expLines.addAll(Arrays.asList(getLines()));
+      }
+      AtomicInteger cnt = new AtomicInteger();
+     
+      TStream<String> s = t.strings(expLines.toArray(new String[0]))
+          .filter(tup -> {
+            // need to slow things down so the sinker has time to notice
+            // the cmd has terminated.  otherwise we'll get ahead,
+            // tuples will get dropped on the floor and validation will fail.
+            if (cnt.incrementAndGet() > batchSize) {
+              // System.out.println("SLEEPING on cnt "+ cnt.get() + " for "+tup);
+              delay(1_000);
+              cnt.set(1);
+            }
+            return true;
+          });
+      
+      TSink<String> sink = CommandStreams.sink(s, cmd);
+      
+      assertNotNull(sink);
+      
+      try {
+        // start the job, sleep for a bit (await the timeout) then validate sink output
+        Condition<?> never = t.getTester().tupleCount(s, Long.MAX_VALUE);
+        t.getTester().complete(getSubmitter(), new JsonObject(), never,
+            6 + ((NUM_RUNS-1) * 1/*restart delay*/), TimeUnit.SECONDS);
+        
+        FileUtil.validateFile(tempFile1, expLines.toArray(new String[0]));
+      }
+      finally {
+          tempFile1.toFile().delete();
+      }
+    }
+    
+    private String getCmdPath(String cmd) {
+      return TestRepoPath.getPath("connectors", "command", "src", "test", "scripts", cmd);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/85247c21/connectors/command/src/test/scripts/sinkcmd
----------------------------------------------------------------------
diff --git a/connectors/command/src/test/scripts/sinkcmd b/connectors/command/src/test/scripts/sinkcmd
new file mode 100644
index 0000000..93a09a4
--- /dev/null
+++ b/connectors/command/src/test/scripts/sinkcmd
@@ -0,0 +1,23 @@
+#!/bin/sh
+
+# echo stdin to stdout.  terminate after <terminateCount> lines
+USAGE="$0 [<terminateCount>]"
+
+#set -x
+
+TERM_COUNT=0
+if [ $# -gt 0 ]; then
+  TERM_COUNT=$1
+  shift
+fi
+
+COUNTER=0
+while read LINE; do
+  echo $LINE
+  # echo $0 echoing $LINE >/dev/stderr
+  let COUNTER=COUNTER+1
+  if [ $TERM_COUNT -gt 0 -a $COUNTER -ge $TERM_COUNT ]; then
+    echo $0 TERMINATING after $LINE >/dev/stderr
+    exit 1;
+  fi
+done
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/85247c21/connectors/common/src/test/java/quarks/test/connectors/common/FileUtil.java
----------------------------------------------------------------------
diff --git a/connectors/common/src/test/java/quarks/test/connectors/common/FileUtil.java b/connectors/common/src/test/java/quarks/test/connectors/common/FileUtil.java
new file mode 100644
index 0000000..fc2b7e3
--- /dev/null
+++ b/connectors/common/src/test/java/quarks/test/connectors/common/FileUtil.java
@@ -0,0 +1,88 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.test.connectors.common;
+
+import static org.junit.Assert.assertArrayEquals;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * File utilities for tests
+ */
+public class FileUtil {
+
+  /**
+   * Create a temp file with the specified name, extension and contents.
+   * @param name
+   * @param extension
+   * @param lines content for the file
+   * @return {@code Path} to temp file
+   * @throws Exception on failure
+   */
+  public static Path createTempFile(String name, String extension, String[] lines) throws Exception {
+      Path tmpFile = Files.createTempFile(name, extension);
+      tmpFile.toFile().deleteOnExit();
+      
+      try (BufferedWriter bw = 
+            new BufferedWriter(new OutputStreamWriter(
+              new FileOutputStream(tmpFile.toFile()), StandardCharsets.UTF_8)))
+      {
+        for (int i = 0; i < lines.length; i++) {
+            bw.write(lines[i]);
+            bw.write("\n");
+        }
+        return tmpFile;
+      }
+      catch (Exception e) {
+        tmpFile.toFile().delete();
+        throw e;
+      }
+  }
+  
+  /**
+   * Validate that the file contains the specified content.
+   * @param path file to validate
+   * @param lines the expected content
+   * @throws Exception on failure
+   */
+  public static void validateFile(Path path, String[] lines) throws Exception {
+    List<String> actLines = new ArrayList<>();
+    try (BufferedReader reader = 
+          new BufferedReader(new InputStreamReader(
+            new FileInputStream(path.toFile()), StandardCharsets.UTF_8)))
+    {
+      String line;
+      while ((line = reader.readLine()) != null) {
+        actLines.add(line);
+      }
+      assertArrayEquals(lines, actLines.toArray(new String[actLines.size()]));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/85247c21/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTest.java
----------------------------------------------------------------------
diff --git a/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTest.java b/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTest.java
index 5a408ae..d826222 100644
--- a/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTest.java
+++ b/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTest.java
@@ -20,12 +20,8 @@ package quarks.test.connectors.file;
 
 import static org.junit.Assume.assumeTrue;
 
-import java.io.BufferedWriter;
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.nio.charset.StandardCharsets;
 import java.nio.file.AccessDeniedException;
 import java.nio.file.Files;
 import java.nio.file.NoSuchFileException;
@@ -43,6 +39,7 @@ import org.junit.Test;
 import quarks.connectors.file.FileStreams;
 import quarks.function.BiFunction;
 import quarks.function.Function;
+import quarks.test.connectors.common.FileUtil;
 import quarks.test.providers.direct.DirectTestSetup;
 import quarks.test.topology.TopologyAbstractTest;
 import quarks.topology.TStream;
@@ -202,8 +199,8 @@ public class FileStreamsTest extends TopologyAbstractTest implements DirectTestS
         String[] allLines = Stream.concat(Stream.of(lines), Stream.of(ucLines))
                 .toArray(String[]::new);
         
-        Path tempFile1 = createTempFile("test1", "txt", lines);
-        Path tempFile2 = createTempFile("test2", "txt", ucLines);
+        Path tempFile1 = FileUtil.createTempFile("test1", "txt", lines);
+        Path tempFile2 = FileUtil.createTempFile("test2", "txt", ucLines);
         
         TStream<String> contents = FileStreams.textFileReader(
                 t.strings(tempFile1.toAbsolutePath().toString(),
@@ -229,8 +226,8 @@ public class FileStreamsTest extends TopologyAbstractTest implements DirectTestS
         String[] allLines = Stream.concat(Stream.of(lines), Stream.of(ucLines))
                 .toArray(String[]::new);
         
-        Path tempFile1 = createTempFile("test1", "txt", lines);
-        Path tempFile2 = createTempFile("test2", "txt", ucLines);
+        Path tempFile1 = FileUtil.createTempFile("test1", "txt", lines);
+        Path tempFile2 = FileUtil.createTempFile("test2", "txt", ucLines);
         
         // ensure a problem in one file (tuple) doesn't affect others.
         // The problem files should result in a log entry but otherwise be ignored.
@@ -259,8 +256,8 @@ public class FileStreamsTest extends TopologyAbstractTest implements DirectTestS
                 .map(line -> line.toUpperCase())
                 .toArray(String[]::new);
         
-        Path tempFile1 = createTempFile("test1", "txt", lines);
-        Path tempFile2 = createTempFile("test2", "txt", ucLines);
+        Path tempFile1 = FileUtil.createTempFile("test1", "txt", lines);
+        Path tempFile2 = FileUtil.createTempFile("test2", "txt", ucLines);
         
         // Be insensitive to Windows path separators and "/tmp" location
         boolean isWindows = System.getProperty("os.name").startsWith("Windows");
@@ -311,20 +308,4 @@ public class FileStreamsTest extends TopologyAbstractTest implements DirectTestS
             tempFile2.toFile().delete();
         }
     }
-
-    public static Path createTempFile(String name, String extension, String[] lines) throws Exception {
-        Path tmpFile = Files.createTempFile(name, extension);
-        
-        BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(
-                new FileOutputStream(tmpFile.toFile()), StandardCharsets.UTF_8));
-        
-        for (int i = 0; i < lines.length; i++) {
-            bw.write(lines[i]);
-            bw.write("\n");
-        }
-        bw.flush();
-        bw.close();
-        
-        return tmpFile;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/85247c21/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTextFileWriterTest.java
----------------------------------------------------------------------
diff --git a/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTextFileWriterTest.java b/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTextFileWriterTest.java
index 8d0506e..c810aed 100644
--- a/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTextFileWriterTest.java
+++ b/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTextFileWriterTest.java
@@ -26,17 +26,14 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static quarks.test.connectors.common.FileUtil.createTempFile;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedReader;
-import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
@@ -944,20 +941,4 @@ public class FileStreamsTextFileWriterTest extends TopologyAbstractTest implemen
           assertNull("path:"+path+" line "+lineCnt+" unexpected IOException "+e, e);
         }
     }
-
-    public static Path createTempFile(String name, String extension, String[] lines) throws Exception {
-        Path tmpFile = Files.createTempFile(name, extension);
-        
-        BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(
-                new FileOutputStream(tmpFile.toFile()), StandardCharsets.UTF_8));
-        
-        for (int i = 0; i < lines.length; i++) {
-            bw.write(lines[i]);
-            bw.write("\n");
-        }
-        bw.flush();
-        bw.close();
-        
-        return tmpFile;
-    }
 }