You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/07/21 13:17:34 UTC

[33/54] [abbrv] [partial] incubator-quarks git commit: add "org.apache." prefix to edgent package names

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/command/src/main/java/org/apache/edgent/connectors/command/CommandStreams.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/main/java/org/apache/edgent/connectors/command/CommandStreams.java b/connectors/command/src/main/java/org/apache/edgent/connectors/command/CommandStreams.java
new file mode 100644
index 0000000..ad02cac
--- /dev/null
+++ b/connectors/command/src/main/java/org/apache/edgent/connectors/command/CommandStreams.java
@@ -0,0 +1,349 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.connectors.command;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.StringTokenizer;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.edgent.connectors.command.runtime.CommandReader;
+import org.apache.edgent.connectors.command.runtime.CommandWriter;
+import org.apache.edgent.function.Consumer;
+import org.apache.edgent.function.Supplier;
+import org.apache.edgent.topology.TSink;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+
+/**
+ * Connector for creating a TStream from a Command's / OS Process's output
+ * and sinking a TStream to a Command's / OS Process's input.
+ * <P>
+ * e.g., run a network monitor command (like Tiger Shark) and ingest its output.
+ */
+public class CommandStreams {
+  private CommandStreams() {}
+
+  /**
+   * Tokenize the specified {@code cmdString} in the exact same manner as
+   * done by {@link Runtime#exec(String)}.
+   * <P>
+   * This function provides a convenience for creating a {@link ProcessBuilder}
+   * for use by the other CommandStreams methods. 
+   * </P>
+   * <P>
+   * Sample use:
+   * <pre>{@code
+   * ProcessBuilder cmd = new ProcessBuilder(tokenize("sh someShellCmd.sh and args"));
+   * TStream<String> stream = CommandStreams.generate(topology, cmd);
+   * }</pre>
+   * 
+   * @param cmdString a command string
+   * @return the tokens
+   */
+  public static List<String> tokenize(String cmdString) {
+    List<String> command = new ArrayList<>();
+    StringTokenizer tok = new StringTokenizer(cmdString);
+    while (tok.hasMoreTokens()) 
+      command.add(tok.nextToken());
+    return command;
+  }
+  
+  /**
+   * Create an endless {@code TStream<String>} from a long running command's output.
+   * <P>
+   * The supplied {@code cmd} is used to start the command.
+   * A tuple is created for each UTF8 line read from the command's
+   * {@link Process#getInputStream() output}.
+   * The tuples contain output from stderr if cmd is configured to 
+   * {@link ProcessBuilder#redirectErrorStream(boolean) redirect stderr to stdout}.
+   * The command is restarted if a read from the command's output stream
+   * returns EOF or an error. 
+   * </P>
+   * <P>
+   * This is a convenience function equivalent to
+   * {@code topology.generate(endlessCommandReader(cmd))}.
+   * </P>
+   * <P>
+   * Sample use: create a stream of tuples for the output from a 
+   * continuously running and restartable command:
+   * <pre>{@code
+   * ProcessBuilder cmd = new ProcessBuilder("myCommand");
+   * TStream<String> cmdOutput = CommandStreams.generate(topology, cmd);
+   * cmdOutput.print();
+   * }</pre>
+   * 
+   * @param topology the topology to add the source stream to
+   * @param cmd the {@link ProcessBuilder} to start the command
+   * @return the source {@code TStream<String>}
+   * 
+   * @see #endlessCommandReader(ProcessBuilder)
+   * @see #tokenize(String)
+   */
+  public static TStream<String> generate(Topology topology, ProcessBuilder cmd) {
+    return topology.generate(endlessCommandReader(cmd));
+  }
+  
+  /**
+   * Create a {@code TStream<String>} from a periodically run command's output.
+   * <P>
+   * The supplied {@code cmd} is used to start the command
+   * at the specified {@code period}.
+   * The command's UTF8 {@link Process#getInputStream() output} is read until EOF
+   * and a {@code List<String>} tuple is created containing the collected output.
+   * The tuples contain output from stderr if the cmd is configured to 
+   * {@link ProcessBuilder#redirectErrorStream(boolean) redirect stderr to stdout}.  
+   * </P>
+   * <P>
+   * This is a convenience function equivalent to
+   * {@code topology.poll(commandReaderList(cmd), period, units)}.
+   * </P>
+   * <P>
+   * Sample use: create a stream of tuples containing the output 
+   * from a periodically run command:
+   * <pre>{@code
+   * ProcessBuilder cmd = new ProcessBuilder("date");
+   * TStream<List<String>> cmdOutput = 
+   *      CommandStreams.periodicSource(topology, cmd, 2, TimeUnit.SECONDS);
+   * cmdOutput.print();
+   * }</pre>
+   * 
+   * @param topology the topology to add the source stream to
+   * @param cmd the {@link ProcessBuilder} to start the command
+   * @param period the period to run the command and collect its output
+   * @param units TimeUnit for {@code period}
+   * @return the source {@code TStream<List<String>>}
+   * 
+   * @see #commandReaderList(ProcessBuilder)
+   * @see #tokenize(String)
+   */
+  public static TStream<List<String>> periodicSource(Topology topology,
+      ProcessBuilder cmd, long period, TimeUnit units) {
+    return topology.poll(commandReaderList(cmd), period, units);
+  }
+  
+  /**
+   * Sink a {@code TStream<String>} to a command's input.
+   * <P>
+   * The supplied {@code cmd} is used to start the command.
+   * Each tuple is written as UTF8 and flushed to the command's {@link Process#getOutputStream() input}.
+   * The command is restarted if a write encounters an error. 
+   * </P>
+   * <P>
+   * While each write is followed by a flush() that only helps to
+   * reduce the time it takes to notice that cmd has failed and restart it.
+   * Supposedly "successfully written and flushed" values are not guaranteed to
+   * have been received by a cmd across restarts.
+   * </P>
+   * <P>
+   * This is a convenience function equivalent to
+   * {@code stream.sink(commandWriter(cmd))}
+   * </P>
+   * <P>
+   * Sample use: write a stream of tuples to the input of a command:
+   * <pre>{@code
+   * TStream<String> stream = topology.strings("one", "two", "three");
+   * ProcessBuilder cmd = new ProcessBuilder("cat").redirectOutput(new File("/dev/stdout"));
+   * CommandStreams.sink(stream, cmd);
+   * }</pre>
+   * 
+   * @param stream the stream to sink
+   * @param cmd the {@link ProcessBuilder} to start the command
+   * @return a {@link TSink}
+   * 
+   * @see #commandWriter(ProcessBuilder)
+   * @see #tokenize(String)
+   */
+  public static TSink<String> sink(TStream<String> stream, ProcessBuilder cmd) {
+    return stream.sink(commandWriter(cmd));
+  }
+    
+  /**
+   * Create an endless {@code Supplier<String>} for ingesting a long running command's output.
+   * <P>
+   * This method is particularly helpful in creating a sensor or source connector
+   * class that hides the fact that it uses a command, enabling it to be used
+   * like any other sensor/connector.
+   * </P>
+   * For example:
+   * <pre><code>
+   * // ingest the sensor data
+   * TStream&lt;MySensorData&gt; stream = topology.generate(new MySensor());
+   *
+   * // MySensor class
+   * class MySensor implements Supplier&lt;MySensorData&gt; {
+   *   private String[] cmd = new String[] {"mySensorCmd", "arg1"};
+   *   private Supplier&lt;String&gt; commandReader = 
+   *     CommandStreams.endlessCommandReader(new ProcessBuilder(cmd));
+   *       
+   *   // implement Supplier&lt;MySensorData&gt;.get()
+   *   public MySensorData get() {
+   *     // get the next line from the cmd and create a MySensorData tuple from it
+   *     return createMySensorData(commandReader.get());
+   *   }
+   * }
+   * </code></pre>
+   * <P>
+   * The supplied {@code cmd} is used to start the command.
+   * A call to {@link Supplier#get()} reads the next UTF8 line from the command's
+   * {@link Process#getInputStream() output}.
+   * The returned strings contain output from stderr if the cmd is configured to 
+   * {@link ProcessBuilder#redirectErrorStream(boolean) redirect stderr to stdput}.  
+   * The command is restarted if a read from the command's output stream
+   * returns EOF or an error.
+   * </P>
+   * 
+   * @param cmd the {@link ProcessBuilder} to start the command
+   * @return the {@code Supplier<String>}
+   * 
+   * @see #generate(Topology, ProcessBuilder)
+   * @see #tokenize(String)
+   */
+  public static Supplier<String> endlessCommandReader(ProcessBuilder cmd) {
+    return new Supplier<String>() {
+      private static final long serialVersionUID = 1L;
+      Supplier<Iterable<String>> reader = new CommandReader(cmd, true);
+      Iterator<String> iter = null;
+      @Override
+      public String get() {
+        if (iter == null) {
+          iter = reader.get().iterator();
+        }
+        if (iter.hasNext()) {
+          return iter.next();
+        }
+        else {
+          // presumably a shutdown condition
+          return null;
+        }
+      }
+    };
+  }
+  
+  /**
+   * Create a {@code Supplier<List<String>>} to ingest a command's output.
+   * <P>
+   * This method is particularly helpful in creating a sensor or source connector
+   * class that hides the fact that it uses a command, enabling it to be used
+   * like any other sensor/connector.
+   * </P>
+   * For example:
+   * <pre><code>
+   * // ingest the sensor data
+   * TStream&lt;MySensorData&gt; stream = topology.periodicSource(new MySensor());
+   *
+   * // MySensor class
+   * class MySensor implements Supplier&lt;MySensorData&gt; {
+   *   private String[] cmd = new String[] {"mySensorCmd", "arg1"};
+   *   private Supplier&lt;List&lt;String&gt;&gt; commandReader = 
+   *     CommandStreams.commandReaderList(new ProcessBuilder(cmd));
+   *       
+   *   // implement Supplier&lt;MySensorData&gt;.get()
+   *   public MySensorData get() {
+   *     // get the cmd output and create a MySensorData tuple from it
+   *     return createMySensorData(commandReader.get());
+   *   }
+   * }
+   * </code></pre>
+   * <P>
+   * The supplied {@code cmd} is used to start the command.
+   * A call to {@link Supplier#get()} reads the command's UTF8
+   * {@link Process#getInputStream() input stream} until an EOF or error
+   * and returns a {@code List<String>} of the collected input.
+   * The tuples contain output from stderr if the cmd is configured to 
+   * {@link ProcessBuilder#redirectErrorStream(boolean) redirect stderr to stdout}.
+   * </P>
+   * 
+   * @param cmd the {@link ProcessBuilder} to start the command
+   * @return the {@code Supplier<List<String>>} for the command
+   * 
+   * @see #periodicSource(Topology, ProcessBuilder, long, TimeUnit)
+   * @see #tokenize(String)
+   */
+  public static Supplier<List<String>> commandReaderList(ProcessBuilder cmd) {
+    return new Supplier<List<String>>() {
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public List<String> get() {
+        try (CommandReader supplier
+                = new CommandReader(cmd, false))
+        {
+          Iterator<String> iter = supplier.get().iterator();
+          List<String> list = new ArrayList<>();
+          while (iter.hasNext())
+            list.add(iter.next());
+          return list;
+        }
+      }
+    };
+  }
+ 
+  /**
+   * Create a {@code Consumer<String>} to write UTF8 string data to a command's input.
+   * <P>
+   * This method is particularly helpful in creating a sink connector
+   * that hides the fact that it uses a command, enabling it to be used
+   * like a native connector.
+   * </P>
+   * For example:
+   * <pre><code>
+   * // sink a stream to my connector
+   * TStream&lt;MySensorData&gt; stream = ...;
+   * stream.sink(new MySinkConnector());
+   *
+   * // MySinkConnector class
+   * class MySinkConnector implements Consumer&lt;MySensorData&gt; {
+   *   private String[] cmd = new String[] {"mySinkCmd", "arg1"};
+   *   private Consumer&lt;String&gt; commandWriter = 
+   *     CommandStreams.commandWriter(new ProcessBuilder(cmd));
+   *       
+   *   // implement Consumer&lt;MySensorData&gt;.accept()
+   *   public void accept(MySensorData data) {
+   *     // convert the data to a string and write it to the cmd
+   *     commandWriter.accept(convertMySensorData(data));
+   *   }
+   * }
+   * </code></pre>
+   * <P>
+   * The supplied {@link ProcessBuilder cmd} is used to start the command.
+   * Each call to {@link Consumer#accept(Object) accept(String)} writes a 
+   * UTF8 string to the command's {@link Process#getOutputStream() input}.
+   * Each write is followed by a flush.
+   * The command is restarted if a write encounters an error. 
+   * </P>
+   * <P>
+   * While each write is followed by a flush() that only helps to
+   * reduce the time it takes to notice that cmd has failed and restart it.
+   * Supposedly "successfully written and flushed" values are not guaranteed to
+   * have been received by a cmd across restarts.
+   * </P>
+   * 
+   * @param cmd the {@link ProcessBuilder} to start the command
+   * @return the {@code Consumer<String>} for the command
+   * 
+   * @see #sink(TStream, ProcessBuilder)
+   * @see #tokenize(String)
+   */
+  public static Consumer<String> commandWriter(ProcessBuilder cmd) {
+    return new CommandWriter(cmd, true);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/command/src/main/java/org/apache/edgent/connectors/command/package-info.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/main/java/org/apache/edgent/connectors/command/package-info.java b/connectors/command/src/main/java/org/apache/edgent/connectors/command/package-info.java
new file mode 100644
index 0000000..97213c9
--- /dev/null
+++ b/connectors/command/src/main/java/org/apache/edgent/connectors/command/package-info.java
@@ -0,0 +1,24 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+/**
+ * Command / OS Process connector.
+ */
+package org.apache.edgent.connectors.command;
+

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/command/src/main/java/org/apache/edgent/connectors/command/runtime/CommandConnector.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/main/java/org/apache/edgent/connectors/command/runtime/CommandConnector.java b/connectors/command/src/main/java/org/apache/edgent/connectors/command/runtime/CommandConnector.java
new file mode 100644
index 0000000..fca3d4c
--- /dev/null
+++ b/connectors/command/src/main/java/org/apache/edgent/connectors/command/runtime/CommandConnector.java
@@ -0,0 +1,103 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.connectors.command.runtime;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base class for source / sink specific command connectors.
+ * <P>
+ * The lifetime of a CommandConnector is that of its Command's execution lifetime.
+ * In the case of a "one shot" command (e.g., a periodicSource's cmd) the
+ * lifetime may be brief - {@code restart==false}.
+ * </P>
+ * <P>
+ * Many command connector uses will involve long running commands, sources
+ * and sinks that want to be robust in the face of inadvertent command
+ * termination/failures - (@code restart==true}.
+ * </P>
+ */
+abstract class CommandConnector implements AutoCloseable {
+  static final Logger logger = LoggerFactory.getLogger(CommandConnector.class);
+
+  private final ProcessBuilder cmd;
+  private final boolean restart;
+  private Process currentProcess;
+  private long numStarts;
+  private long lastStartTimestamp;
+  private final int restartDelayMsec = 1_000;
+  
+  
+  CommandConnector(ProcessBuilder cmd, boolean restart) {
+    this.cmd = cmd;
+    this.restart = restart;
+  }
+  
+  protected boolean canStart() {
+    return restart || numStarts==0;
+  }
+  
+  protected Process getCurrentProcess() {
+    return currentProcess;
+  }
+  
+  protected void start() throws InterruptedException {
+    if (!canStart())
+      throw new IllegalStateException();
+    closeProcess();
+    try {
+      numStarts++;
+      // ensure we don't thrash on continuous restarts
+      long now = System.currentTimeMillis();
+      if (now < lastStartTimestamp + restartDelayMsec) {
+        logger.info("Sleeping before restarting cmd {}", toCmdForMsg());
+        Thread.sleep(restartDelayMsec);
+        now = System.currentTimeMillis();
+      }
+      lastStartTimestamp = now;
+      
+      currentProcess = cmd.start();
+      
+      logger.debug("Started cmd {}", toCmdForMsg());
+    }
+    catch (IOException e) {
+      logger.error("Unable to start cmd {}", toCmdForMsg(), e);
+    }
+  }
+  
+  protected void closeProcess() {
+    if (currentProcess != null) {
+      currentProcess.destroy();
+      currentProcess = null;
+    }
+  }
+
+  @Override
+  public void close() {
+    closeProcess();
+  }
+  
+  String toCmdForMsg() {
+    return cmd.command().toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/command/src/main/java/org/apache/edgent/connectors/command/runtime/CommandReader.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/main/java/org/apache/edgent/connectors/command/runtime/CommandReader.java b/connectors/command/src/main/java/org/apache/edgent/connectors/command/runtime/CommandReader.java
new file mode 100644
index 0000000..44b2004
--- /dev/null
+++ b/connectors/command/src/main/java/org/apache/edgent/connectors/command/runtime/CommandReader.java
@@ -0,0 +1,111 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.connectors.command.runtime;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.edgent.function.Supplier;
+
+/**
+ * Create a {@code Supplier<Iterable<String>>} to ingest a command's output.
+ * <P>
+ * The supplied {@code cmd} is used to start the command
+ * and restart it upon process termination/error if so configured.
+ * </P>
+ * <P>
+ * The iterator returned by {@link Iterable#iterator()) returns
+ * {@hasNext()==true} until a read from {@link Process#getOutputStream()}
+ * returns EOF or an IOError.
+ */
+public class CommandReader extends CommandConnector implements Supplier<Iterable<String>>, AutoCloseable {
+  private static final long serialVersionUID = 1L;
+  private Iterator<String> currentSupplierIterator;
+  
+  /**
+   * Create a supplier of UTF8 strings from a command's output.
+   * 
+   * @param cmd the {@link ProcessBuilder} to use to start the command
+   * @param restart when true, restart the command upon termination, EOF, or
+   * read error.
+   */
+  public CommandReader(ProcessBuilder cmd, boolean restart) {
+    super(cmd, restart);
+  }
+
+  protected void start() throws InterruptedException {
+    super.start();
+    currentSupplierIterator = new ProcessReader(getCurrentProcess()).get().iterator();
+  }
+  
+  protected void closeProcess() {
+    currentSupplierIterator = null;
+    super.closeProcess();
+  }
+  
+  @Override
+  public Iterable<String> get() {
+    return new Iterable<String>() {
+
+      @Override
+      public Iterator<String> iterator() {
+        return new Iterator<String>() {
+
+          @Override
+          public boolean hasNext() {
+            try {
+              for(;;) {
+                if (currentSupplierIterator != null) {
+                  boolean hasNext = currentSupplierIterator.hasNext();
+                  if (hasNext) {
+                    return true;
+                  }
+                  else {
+                    // no more from that process.  close and loop/retry.
+                    closeProcess();
+                  }
+                }
+                else if (currentSupplierIterator == null && canStart()) {
+                  start(); // and loop/retry
+                }
+                else {
+                  return false; // no more input
+                }
+              }
+            }
+            catch (InterruptedException e) {
+              return false;
+            }
+          }
+
+          @Override
+          public String next() {
+            if (currentSupplierIterator != null)
+              return currentSupplierIterator.next();
+            else
+              throw new NoSuchElementException();
+          }
+          
+        };
+      }
+      
+    };
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/command/src/main/java/org/apache/edgent/connectors/command/runtime/CommandWriter.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/main/java/org/apache/edgent/connectors/command/runtime/CommandWriter.java b/connectors/command/src/main/java/org/apache/edgent/connectors/command/runtime/CommandWriter.java
new file mode 100644
index 0000000..e3308dc
--- /dev/null
+++ b/connectors/command/src/main/java/org/apache/edgent/connectors/command/runtime/CommandWriter.java
@@ -0,0 +1,91 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.connectors.command.runtime;
+
+import org.apache.edgent.function.Consumer;
+
+/**
+ * A {@code Consumer<String>>} to write data to a command's input.
+ * <P>
+ * The supplied {@code cmd} is used to start the command
+ * and restart it upon process termination/error if so configured.
+ * </P>
+ */
+public class CommandWriter extends CommandConnector implements Consumer<String>, AutoCloseable {
+  private static final long serialVersionUID = 1L;
+  private ProcessWriter currentConsumer;
+  
+  /**
+   * Create a consumer to write UTF8 string data to a command's input.
+   * <P>
+   * Each write is followed by a flush() though that only helps to
+   * reduce the time it takes to notice that a cmd has failed.
+   * Supposedly "successfully written and flushed" values are not guaranteed to
+   * have been received by a cmd even following restart.
+   * </P>
+   * 
+   * @param cmd the builder to use to start the process
+   * @param restart true to restart the process upon termination or
+   * write error.
+   */
+  public CommandWriter(ProcessBuilder cmd, boolean restart) {
+    super(cmd, restart);
+  }
+
+  protected void start() throws InterruptedException {
+    super.start();
+    currentConsumer = new ProcessWriter(getCurrentProcess());
+  }
+  
+  protected void closeProcess() {
+    currentConsumer = null;
+    super.closeProcess();
+  }
+
+  @Override
+  public void accept(String value) {
+    for (;;) {
+      try {
+        if (currentConsumer != null) {
+          try {
+            currentConsumer.accept(value);
+            logger.trace("WROTE: {}", value);
+            return;
+          } 
+          catch (RuntimeException e) {
+            closeProcess(); // and loop/retry
+          }
+        }
+        else if (currentConsumer == null && canStart()) {
+          logger.debug("STARTING for: {}", value);
+          start();  // and loop/retry
+        }
+        else {
+          // not restartable. toss it on the floor
+          return;
+        }
+      }
+      catch (InterruptedException e) {
+        // toss it on the floor
+        return;
+      }
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/command/src/main/java/org/apache/edgent/connectors/command/runtime/ProcessReader.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/main/java/org/apache/edgent/connectors/command/runtime/ProcessReader.java b/connectors/command/src/main/java/org/apache/edgent/connectors/command/runtime/ProcessReader.java
new file mode 100644
index 0000000..47a2c59
--- /dev/null
+++ b/connectors/command/src/main/java/org/apache/edgent/connectors/command/runtime/ProcessReader.java
@@ -0,0 +1,98 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.connectors.command.runtime;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.edgent.function.Supplier;
+
+/**
+ * A {@code Supplier<Iterable<String>>} for ingesting a process's output.
+ * <P>
+ * The iterator returned by {@link Iterable#iterator()) returns
+ * {@hasNext()==true} until a read from {@link Process#getInputStream()}
+ * returns EOF or an IOError.
+ */
+class ProcessReader implements Supplier<Iterable<String>> {
+  private static final long serialVersionUID = 1L;
+  private final BufferedReader reader;
+  
+  /**
+   * Create a new supplier of UTF8 strings read from a process's
+   * {@link Process#getInputStream() output}.
+   * 
+   * @param process the process to read from.
+   */
+  ProcessReader(Process process) {
+    reader = new BufferedReader(new InputStreamReader(
+        process.getInputStream(), StandardCharsets.UTF_8));
+  }
+
+  @Override
+  public Iterable<String> get() {
+    return new Iterable<String>() {
+
+      @Override
+      public Iterator<String> iterator() {
+        return new Iterator<String>() {
+          private Boolean hasNext = null;
+          private String next = null;
+          
+          @Override
+          public boolean hasNext() {
+            if (hasNext != null)
+              return hasNext;
+            next = getNext();
+            hasNext = next != null;
+            return hasNext;
+          }
+
+          @Override
+          public String next() {
+            if (next == null)
+              throw new NoSuchElementException();
+            hasNext = null;
+            return next;
+          }
+          
+        };
+      }
+      
+    };
+  }
+
+  /**
+   * Get the next available line from the process's stdout
+   * @return null if no more input (or error)
+   */
+  private String getNext() {
+    try {
+      return reader.readLine();
+    } catch (IOException e) {
+      CommandConnector.logger.error("Unable to readline from cmd", e);
+      return null;
+    }
+  }
+  
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/command/src/main/java/org/apache/edgent/connectors/command/runtime/ProcessWriter.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/main/java/org/apache/edgent/connectors/command/runtime/ProcessWriter.java b/connectors/command/src/main/java/org/apache/edgent/connectors/command/runtime/ProcessWriter.java
new file mode 100644
index 0000000..92c17d9
--- /dev/null
+++ b/connectors/command/src/main/java/org/apache/edgent/connectors/command/runtime/ProcessWriter.java
@@ -0,0 +1,66 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.connectors.command.runtime;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.edgent.function.Consumer;
+
+/**
+ * A {@code Consumer<String>>} to receive data and write it to a process's input.
+ * <P>
+ * Each write is followed by a flush() though that only helps to
+ * reduce the time it takes to notice that a cmd has failed.
+ * Supposedly "successfully written and flushed" values are not guaranteed to
+ * have been received by a cmd.
+ */
+class ProcessWriter implements Consumer<String> {
+  private static final long serialVersionUID = 1L;
+  private final BufferedWriter writer;
+  
+  /**
+   * Create a new consumer for UTF8 strings to write to a process's
+   * {@link Process#getOutputStream() input}
+   * 
+   * @param process to process to write to.
+   */
+  ProcessWriter(Process process) {
+    writer = new BufferedWriter(new OutputStreamWriter(
+        process.getOutputStream(), StandardCharsets.UTF_8));
+  }
+
+  @Override
+  public void accept(String value) {
+    try {
+      // see class doc regarding guarantees.
+      writer.write(value);
+      writer.newLine();
+      writer.flush();
+    }
+    catch (IOException e) {
+      CommandConnector.logger.error("Unable to write to cmd", e);
+      // caller (CommandWriter) requires throw to detect failure and recover
+      throw new RuntimeException("Unable to write to cmd", e);
+    }
+  }
+  
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/command/src/test/java/edgent/test/connectors/command/CommandStreamsGlobalTest.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/test/java/edgent/test/connectors/command/CommandStreamsGlobalTest.java b/connectors/command/src/test/java/edgent/test/connectors/command/CommandStreamsGlobalTest.java
deleted file mode 100644
index 1697099..0000000
--- a/connectors/command/src/test/java/edgent/test/connectors/command/CommandStreamsGlobalTest.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.connectors.command;
-
-/**
- * CommandStreams connector globalization tests.
- */
-public class CommandStreamsGlobalTest extends CommandStreamsTest {
-
-    private static final String[] globalLines = new String[] {
-            "\u5b78\u800c\u6642\u7fd2\u4e4b",
-            "\u4e0d\u4ea6\u8aaa\u4e4e"
-    };
-
-    public String[] getLines() {
-        return globalLines;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/command/src/test/java/edgent/test/connectors/command/CommandStreamsTest.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/test/java/edgent/test/connectors/command/CommandStreamsTest.java b/connectors/command/src/test/java/edgent/test/connectors/command/CommandStreamsTest.java
deleted file mode 100644
index 99f40e8..0000000
--- a/connectors/command/src/test/java/edgent/test/connectors/command/CommandStreamsTest.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.connectors.command;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertNotNull;
-
-import java.io.File;
-import java.lang.ProcessBuilder.Redirect;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.junit.Test;
-
-import com.google.gson.JsonObject;
-
-import edgent.connectors.command.CommandStreams;
-import edgent.test.connectors.common.FileUtil;
-import edgent.test.connectors.common.TestRepoPath;
-import edgent.test.providers.direct.DirectTestSetup;
-import edgent.test.topology.TopologyAbstractTest;
-import edgent.topology.TSink;
-import edgent.topology.TStream;
-import edgent.topology.Topology;
-import edgent.topology.tester.Condition;
-
-public class CommandStreamsTest extends TopologyAbstractTest implements DirectTestSetup {
-    
-    private String[] stdLines = new String[] {
-            "Line 1",
-            "Line 2",
-            "Line 3",
-    };
-
-    public String[] getLines() {
-        return stdLines;
-    }
-    
-    private static void delay(long millis) {
-      try {
-        Thread.sleep(millis);
-      }
-      catch (InterruptedException e) { }
-    }
-    
-    @Test
-    public void testTokenize() {
-      String cmdString = "myCmd arg1    arg2\targ3";
-      String[] expected = new String[]{"myCmd", "arg1", "arg2", "arg3"};
-      
-      assertArrayEquals(expected, CommandStreams.tokenize(cmdString).toArray(new String[0]));
-    }
-    
-    @Test
-    public void testPeriodicSource() throws Exception {
-      Topology t = newTopology("testPeriodicSource");
-      
-      Path tempFile1 = FileUtil.createTempFile("test1", ".txt", getLines());
-      System.out.println("Test: "+t.getName()+" "+tempFile1);
-      
-      ProcessBuilder cmd = new ProcessBuilder("cat", tempFile1.toString());
-      
-      int NUM_POLLS = 3;
-      List<String> expLines = new ArrayList<>();
-      for (int i = 0; i < NUM_POLLS; i++) {
-        expLines.addAll(Arrays.asList(getLines()));
-      }
-      
-      TStream<List<String>> ls = CommandStreams.periodicSource(t, cmd, 1, TimeUnit.SECONDS);
-      TStream<String> s = ls.flatMap(list -> list); 
-      
-      try {
-        completeAndValidate("", t, s, 10, expLines.toArray(new String[0]));
-      }
-      finally {
-          tempFile1.toFile().delete();
-      }
-    }
-    
-    @Test
-    public void testGenerate() throws Exception {
-      Topology t = newTopology("testGenerate");
-
-      Path tempFile1 = FileUtil.createTempFile("test1", ".txt", getLines());
-      System.out.println("Test: "+t.getName()+" "+tempFile1);
-      
-      ProcessBuilder cmd = new ProcessBuilder("cat", tempFile1.toString());
-      
-      // N.B. if looking at trace: EDGENT-224 generate() continues running after job is closed
-      TStream<String> s = CommandStreams.generate(t, cmd);
-      
-      try {
-        completeAndValidate("", t, s, 10, getLines());
-      }
-      finally {
-          tempFile1.toFile().delete();
-      }
-    }
-    
-    @Test
-    public void testGenerateRestart() throws Exception {
-      Topology t = newTopology("testGenerateRestart");
-
-      Path tempFile1 = FileUtil.createTempFile("test1", ".txt", getLines());
-      System.out.println("Test: "+t.getName()+" "+tempFile1);
-      
-      ProcessBuilder cmd = new ProcessBuilder("cat", tempFile1.toString());
-
-      int NUM_RUNS = 3;
-      List<String> expLines = new ArrayList<>();
-      for (int i = 0; i < NUM_RUNS; i++) {
-        expLines.addAll(Arrays.asList(getLines()));
-      }
-      
-      // N.B. if looking at trace: EDGENT-224 generate() continues running after job is closed
-      TStream<String> s = CommandStreams.generate(t, cmd);
-      
-      completeAndValidate("", t, s, 10 + ((NUM_RUNS-1) * 1/*restart delay time*/), expLines.toArray(new String[0]));
-    }
-    
-    @Test
-    public void testSink() throws Exception {
-      Topology t = newTopology("testSink");
-
-      Path tempFile1 = FileUtil.createTempFile("test1", ".txt", new String[0]);
-      System.out.println("Test: "+t.getName()+" "+tempFile1);
-      
-      ProcessBuilder cmd = new ProcessBuilder("cat")
-          .redirectOutput(Redirect.appendTo(tempFile1.toFile()));
-      
-      TStream<String> s = t.strings(getLines());
-      
-      TSink<String> sink = CommandStreams.sink(s, cmd);
-      
-      assertNotNull(sink);
-      
-      try {
-        // start the job, sleep for a bit (await the timeout) then validate sink output
-        Condition<?> never = t.getTester().tupleCount(s, Long.MAX_VALUE);
-        t.getTester().complete(getSubmitter(), new JsonObject(), never, 3, TimeUnit.SECONDS);
-
-        FileUtil.validateFile(tempFile1, getLines());
-      }
-      finally {
-          tempFile1.toFile().delete();
-      }
-    }
-
-    @Test
-    public void testSinkRestart() throws Exception {
-      Topology t = newTopology("testSinkRestart");
-
-      Path tempFile1 = FileUtil.createTempFile("test1", ".txt", new String[0]);
-      System.out.println("Test: "+t.getName()+" "+tempFile1);
-
-      int batchSize = getLines().length;
-      
-      // tell cmd to terminate after each batch of lines
-      ProcessBuilder cmd = new ProcessBuilder("sh", getCmdPath("sinkcmd"), ""+batchSize)
-          .redirectOutput(Redirect.appendTo(tempFile1.toFile()))
-          .redirectError(Redirect.to(new File("/dev/stderr")));
-
-      int NUM_RUNS = 3;
-      List<String> expLines = new ArrayList<>();
-      for (int i = 0; i < NUM_RUNS; i++) {
-        expLines.addAll(Arrays.asList(getLines()));
-      }
-      AtomicInteger cnt = new AtomicInteger();
-     
-      TStream<String> s = t.strings(expLines.toArray(new String[0]))
-          .filter(tup -> {
-            // need to slow things down so the sinker has time to notice
-            // the cmd has terminated.  otherwise we'll get ahead,
-            // tuples will get dropped on the floor and validation will fail.
-            if (cnt.incrementAndGet() > batchSize) {
-              // System.out.println("SLEEPING on cnt "+ cnt.get() + " for "+tup);
-              delay(1_000);
-              cnt.set(1);
-            }
-            return true;
-          });
-      
-      TSink<String> sink = CommandStreams.sink(s, cmd);
-      
-      assertNotNull(sink);
-      
-      try {
-        // start the job, sleep for a bit (await the timeout) then validate sink output
-        Condition<?> never = t.getTester().tupleCount(s, Long.MAX_VALUE);
-        t.getTester().complete(getSubmitter(), new JsonObject(), never,
-            6 + ((NUM_RUNS-1) * 1/*restart delay*/), TimeUnit.SECONDS);
-        
-        FileUtil.validateFile(tempFile1, expLines.toArray(new String[0]));
-      }
-      finally {
-          tempFile1.toFile().delete();
-      }
-    }
-    
-    private String getCmdPath(String cmd) {
-      return TestRepoPath.getPath("connectors", "command", "src", "test", "scripts", cmd);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/command/src/test/java/org/apache/edgent/test/connectors/command/CommandStreamsGlobalTest.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/test/java/org/apache/edgent/test/connectors/command/CommandStreamsGlobalTest.java b/connectors/command/src/test/java/org/apache/edgent/test/connectors/command/CommandStreamsGlobalTest.java
new file mode 100644
index 0000000..bc6d268
--- /dev/null
+++ b/connectors/command/src/test/java/org/apache/edgent/test/connectors/command/CommandStreamsGlobalTest.java
@@ -0,0 +1,35 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.connectors.command;
+
+/**
+ * CommandStreams connector globalization tests.
+ */
+public class CommandStreamsGlobalTest extends CommandStreamsTest {
+
+    private static final String[] globalLines = new String[] {
+            "\u5b78\u800c\u6642\u7fd2\u4e4b",
+            "\u4e0d\u4ea6\u8aaa\u4e4e"
+    };
+
+    public String[] getLines() {
+        return globalLines;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/command/src/test/java/org/apache/edgent/test/connectors/command/CommandStreamsTest.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/test/java/org/apache/edgent/test/connectors/command/CommandStreamsTest.java b/connectors/command/src/test/java/org/apache/edgent/test/connectors/command/CommandStreamsTest.java
new file mode 100644
index 0000000..a24bec8
--- /dev/null
+++ b/connectors/command/src/test/java/org/apache/edgent/test/connectors/command/CommandStreamsTest.java
@@ -0,0 +1,223 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.connectors.command;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.File;
+import java.lang.ProcessBuilder.Redirect;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.edgent.connectors.command.CommandStreams;
+import org.apache.edgent.test.connectors.common.FileUtil;
+import org.apache.edgent.test.connectors.common.TestRepoPath;
+import org.apache.edgent.test.providers.direct.DirectTestSetup;
+import org.apache.edgent.test.topology.TopologyAbstractTest;
+import org.apache.edgent.topology.TSink;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+import org.apache.edgent.topology.tester.Condition;
+import org.junit.Test;
+
+import com.google.gson.JsonObject;
+
+public class CommandStreamsTest extends TopologyAbstractTest implements DirectTestSetup {
+    
+    private String[] stdLines = new String[] {
+            "Line 1",
+            "Line 2",
+            "Line 3",
+    };
+
+    public String[] getLines() {
+        return stdLines;
+    }
+    
+    private static void delay(long millis) {
+      try {
+        Thread.sleep(millis);
+      }
+      catch (InterruptedException e) { }
+    }
+    
+    @Test
+    public void testTokenize() {
+      String cmdString = "myCmd arg1    arg2\targ3";
+      String[] expected = new String[]{"myCmd", "arg1", "arg2", "arg3"};
+      
+      assertArrayEquals(expected, CommandStreams.tokenize(cmdString).toArray(new String[0]));
+    }
+    
+    @Test
+    public void testPeriodicSource() throws Exception {
+      Topology t = newTopology("testPeriodicSource");
+      
+      Path tempFile1 = FileUtil.createTempFile("test1", ".txt", getLines());
+      System.out.println("Test: "+t.getName()+" "+tempFile1);
+      
+      ProcessBuilder cmd = new ProcessBuilder("cat", tempFile1.toString());
+      
+      int NUM_POLLS = 3;
+      List<String> expLines = new ArrayList<>();
+      for (int i = 0; i < NUM_POLLS; i++) {
+        expLines.addAll(Arrays.asList(getLines()));
+      }
+      
+      TStream<List<String>> ls = CommandStreams.periodicSource(t, cmd, 1, TimeUnit.SECONDS);
+      TStream<String> s = ls.flatMap(list -> list); 
+      
+      try {
+        completeAndValidate("", t, s, 10, expLines.toArray(new String[0]));
+      }
+      finally {
+          tempFile1.toFile().delete();
+      }
+    }
+    
+    @Test
+    public void testGenerate() throws Exception {
+      Topology t = newTopology("testGenerate");
+
+      Path tempFile1 = FileUtil.createTempFile("test1", ".txt", getLines());
+      System.out.println("Test: "+t.getName()+" "+tempFile1);
+      
+      ProcessBuilder cmd = new ProcessBuilder("cat", tempFile1.toString());
+      
+      // N.B. if looking at trace: EDGENT-224 generate() continues running after job is closed
+      TStream<String> s = CommandStreams.generate(t, cmd);
+      
+      try {
+        completeAndValidate("", t, s, 10, getLines());
+      }
+      finally {
+          tempFile1.toFile().delete();
+      }
+    }
+    
+    @Test
+    public void testGenerateRestart() throws Exception {
+      Topology t = newTopology("testGenerateRestart");
+
+      Path tempFile1 = FileUtil.createTempFile("test1", ".txt", getLines());
+      System.out.println("Test: "+t.getName()+" "+tempFile1);
+      
+      ProcessBuilder cmd = new ProcessBuilder("cat", tempFile1.toString());
+
+      int NUM_RUNS = 3;
+      List<String> expLines = new ArrayList<>();
+      for (int i = 0; i < NUM_RUNS; i++) {
+        expLines.addAll(Arrays.asList(getLines()));
+      }
+      
+      // N.B. if looking at trace: EDGENT-224 generate() continues running after job is closed
+      TStream<String> s = CommandStreams.generate(t, cmd);
+      
+      completeAndValidate("", t, s, 10 + ((NUM_RUNS-1) * 1/*restart delay time*/), expLines.toArray(new String[0]));
+    }
+    
+    @Test
+    public void testSink() throws Exception {
+      Topology t = newTopology("testSink");
+
+      Path tempFile1 = FileUtil.createTempFile("test1", ".txt", new String[0]);
+      System.out.println("Test: "+t.getName()+" "+tempFile1);
+      
+      ProcessBuilder cmd = new ProcessBuilder("cat")
+          .redirectOutput(Redirect.appendTo(tempFile1.toFile()));
+      
+      TStream<String> s = t.strings(getLines());
+      
+      TSink<String> sink = CommandStreams.sink(s, cmd);
+      
+      assertNotNull(sink);
+      
+      try {
+        // start the job, sleep for a bit (await the timeout) then validate sink output
+        Condition<?> never = t.getTester().tupleCount(s, Long.MAX_VALUE);
+        t.getTester().complete(getSubmitter(), new JsonObject(), never, 3, TimeUnit.SECONDS);
+
+        FileUtil.validateFile(tempFile1, getLines());
+      }
+      finally {
+          tempFile1.toFile().delete();
+      }
+    }
+
+    @Test
+    public void testSinkRestart() throws Exception {
+      Topology t = newTopology("testSinkRestart");
+
+      Path tempFile1 = FileUtil.createTempFile("test1", ".txt", new String[0]);
+      System.out.println("Test: "+t.getName()+" "+tempFile1);
+
+      int batchSize = getLines().length;
+      
+      // tell cmd to terminate after each batch of lines
+      ProcessBuilder cmd = new ProcessBuilder("sh", getCmdPath("sinkcmd"), ""+batchSize)
+          .redirectOutput(Redirect.appendTo(tempFile1.toFile()))
+          .redirectError(Redirect.to(new File("/dev/stderr")));
+
+      int NUM_RUNS = 3;
+      List<String> expLines = new ArrayList<>();
+      for (int i = 0; i < NUM_RUNS; i++) {
+        expLines.addAll(Arrays.asList(getLines()));
+      }
+      AtomicInteger cnt = new AtomicInteger();
+     
+      TStream<String> s = t.strings(expLines.toArray(new String[0]))
+          .filter(tup -> {
+            // need to slow things down so the sinker has time to notice
+            // the cmd has terminated.  otherwise we'll get ahead,
+            // tuples will get dropped on the floor and validation will fail.
+            if (cnt.incrementAndGet() > batchSize) {
+              // System.out.println("SLEEPING on cnt "+ cnt.get() + " for "+tup);
+              delay(1_000);
+              cnt.set(1);
+            }
+            return true;
+          });
+      
+      TSink<String> sink = CommandStreams.sink(s, cmd);
+      
+      assertNotNull(sink);
+      
+      try {
+        // start the job, sleep for a bit (await the timeout) then validate sink output
+        Condition<?> never = t.getTester().tupleCount(s, Long.MAX_VALUE);
+        t.getTester().complete(getSubmitter(), new JsonObject(), never,
+            6 + ((NUM_RUNS-1) * 1/*restart delay*/), TimeUnit.SECONDS);
+        
+        FileUtil.validateFile(tempFile1, expLines.toArray(new String[0]));
+      }
+      finally {
+          tempFile1.toFile().delete();
+      }
+    }
+    
+    private String getCmdPath(String cmd) {
+      return TestRepoPath.getPath("connectors", "command", "src", "test", "scripts", cmd);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/common/src/main/java/edgent/connectors/runtime/Connector.java
----------------------------------------------------------------------
diff --git a/connectors/common/src/main/java/edgent/connectors/runtime/Connector.java b/connectors/common/src/main/java/edgent/connectors/runtime/Connector.java
deleted file mode 100644
index 6f0bdf9..0000000
--- a/connectors/common/src/main/java/edgent/connectors/runtime/Connector.java
+++ /dev/null
@@ -1,448 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package edgent.connectors.runtime;
-
-import java.io.Serializable;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-
-/**
- * An abstract class for general connector connection management.
- * <p>
- * The basic (default) model is to maintain a connection across
- * inadvertent disconnects.  
- * <p>
- * A connected connector is returned
- * whenever {@link #client()} is called.  If the connection is lost,
- * actions are taken to reestablish it.  Overall tracking and
- * control of connection state is performed and
- * general logging is performed.
- * <p>
- * Sub-classes are able to further control when initial connection
- * occurs and generally expose ways to explicitly disconnect or connect.
- * <p>
- * Optionally, clients can request automatic disconnection after
- * a period of inactivity - see {@link #setIdleTimeout(long, TimeUnit)}.
- * Support for this feature requires connector implementations
- * to call {@link #notIdle()}.
- * <p>
- * TODO - dynamic control:
- * <ul>
- * <li>[abstract] ConnectorControl { getState(), connect(), disconnect(), ...}.</li>
- * <li>MqttConnectorControl extends ConnectorControl.  Adds subscribe(), unsubscribe().</li>
- * <li>Add Consumer&lt;MqttConnectorControl&gt; to MqttConfig. Runtime calls it supplying a connector control object.
- * By doing a disconnect()/connect(), the Supplier&lt;MqttConfig&gt; will be called to as part of reconnect -- hence using updated values if any.</li>
- * </ul>
- * <p>
- * Sub-classes are responsible for implementing a small number
- * of operations: {@link #doConnect(Object)}, {@link #doDisconnect(Object)},
- * {@link #doClose(Object)}, and {@link #id()}.  Sub-classes are also responsible for
- * calling {@link #connectionLost(Throwable)} and {@link #notIdle()}.
- * 
- * @param <T> type of the underlying managed connector (e.g., MqttClient)
- */
-public abstract class Connector<T> implements AutoCloseable, Serializable {
-
-    private static final long serialVersionUID = 1L;
-    private static final long BASE_RETRY_WAIT_MSEC = 2*1000;
-    private static final long MAX_RETRY_WAIT_MSEC = 60*1000;
-    private final IdleManager idleManager;
-    private State state = State.DISCONNECTED;
-    private T client; // must be non-null when state==CONNECTED
-    private Future<?> connectFuture;
-    
-    // TODO proper "operator-acquired" thread/scheduler w/thread factory, etc
-    // hmm... today a single [Mqtt]Connector supports multiple publish operators
-    // and a subscribe operator, hence the threads should come from... ?
-    private final ExecutorService connectExecutor = Executors.newSingleThreadExecutor();
-    private final ScheduledExecutorService schedExecutor = Executors.newScheduledThreadPool(0);
-    
-    public enum State {
-        DISCONNECTED,
-        CONNECTING,
-        CONNECTED,
-        DISCONNECTING,
-        CLOSING,
-        CLOSED
-    }
-
-    protected Connector() {
-        idleManager = new IdleManager(this, schedExecutor);
-    }
-
-    public abstract Logger getLogger();
-    
-    /**
-     * Must be called by the connector when an action is performed that
-     * qualifies as a "not idle" condition.
-     */
-    public void notIdle() {
-        idleManager.notIdle();
-    }
-
-    /**
-     * Disconnect the connector after the specified period of inactivity.
-     * @param idleTimeout idleTimeout(long)
-     * @param unit TimeUnit
-     */
-    public void setIdleTimeout(long idleTimeout, TimeUnit unit) {
-        idleManager.setIdleTimeoutMsec(unit.toMillis(idleTimeout));
-    }
-
-    public void setIdleReconnectInterval(int intervalSec) {
-        idleManager.setIdleReconnectInterval(intervalSec);
-    }
-
-    /**
-     * Get a connected client connector.
-     * @return the client connector
-     * @throws IllegalStateException if state is inappropriate to return connected client connector 
-     * @throws Exception if unable to connect
-     */
-    public T client() throws Exception {
-        return connectInternal();
-    }
-
-    // Expose with client driven dynamic disconnect/reconnect
-//    /**
-//     * Connect the client connector if necessary.
-//     * Blocks until connected or it's no longer valid to request a connect().
-//     * @throws IllegalStateException if a connect() is (no longer) appropriate. 
-//     */
-//    public void connect() throws Exception {
-//        connectInternal();
-//    }
-
-    /**
-     * Connect the client connector if necessary.
-     * Blocks until connected or it's no longer valid to request a connect().
-     * @return connected client connector
-     * @throws IllegalStateException if a connect() is (no longer) appropriate. 
-     */
-    private T connectInternal() throws Exception {
-        // N.B. have to deal with multiple concurrent connect requests
-        // e.g., multiple publishers and a single subscriber for a connector.
-        Future<?> f = null;
-        synchronized(this) {
-            if (state == State.CONNECTED)
-                return client;
-            else if (state == State.CONNECTING)
-                f = connectFuture;
-            else if (state == State.DISCONNECTED) {
-                startAsyncConnect();
-                f = connectFuture;
-            }
-            else
-                throw wrongState(state, "connectInternal()");
-        }
-        awaitDone(f); // throws if task not successful
-        return connectedClient();
-    }
-    
-    private IllegalStateException wrongState(State s, String op) {
-        String msg = String.format("%s %s wrong state %s", id(), op, s);
-        getLogger().error(msg);
-        return new IllegalStateException(msg);
-    }
-    
-    /**
-     * Get the connected client; throw if not connected. 
-     * @return connected client
-     * @throws IllegalStateException if not connected
-     */
-    private T connectedClient() {
-        synchronized(this) {
-            if (state != State.CONNECTED)
-                throw wrongState(state, "connectedClient()");
-            return client;
-        }
-    }
-    
-    /**
-     * Schedule an async connect task.
-     * <p>
-     * Updates connectFuture if successfully initiated.
-     * Updates state and client once successfully completed.
-     * @throws IllegalStateException if inappropriate state for async connect request
-     */
-    private void startAsyncConnect() {
-        synchronized(this) {
-            if (state != State.DISCONNECTED)
-                throw wrongState(state, "startAsyncConnect()");
-            getLogger().trace("{} submitting async connect task", id());
-            setStateUnsafe(State.CONNECTING);
-            connectFuture = connectExecutor.submit(
-                    () -> { connectTask(); return null; });
-        }
-    }
-    
-    /**
-     * Wait till task is done.
-     * @throws Exception if task wasn't successful
-     */
-    private void awaitDone(Future<?> f) throws Exception {
-        try {
-            getLogger().trace("{} awaiting done", id());
-            f.get();
-        } 
-        catch (InterruptedException e) {
-            // assume that if we're cancelled we want to cancel the future task
-            getLogger().trace("{} awaitDone() interrupted, cancelling task", id());
-            f.cancel(true);
-            throw e;
-        } 
-        catch (CancellationException e) {
-            String msg = String.format("%s awaitDone() task was cancelled", id());
-            getLogger().trace(msg);
-            throw new IllegalStateException(msg);
-        } 
-        catch (ExecutionException e) {
-            String msg = String.format("%s awaitDone() task failed", id());
-            getLogger().error(msg);
-            throw new IllegalStateException(msg, e.getCause());
-        }
-    }
-    
-    /**
-     * Do a blocking connect with retries.
-     * <p>
-     * Updates state and client when successful.
-     * @throws IllegalStateException if state no longer appropriate for connect.
-     * @throws Exception some other problem.
-     */
-    private void connectTask() throws Exception {
-        int retryCnt = 0;
-        while(true) {
-            try {
-                oneConnect(retryCnt++);
-                return;
-            }
-            catch (IllegalStateException e) {
-                throw e;  // already logged
-            }
-            catch (Exception e) {
-                // already logged
-                long msec = getConnectRetryDelayMsec(retryCnt);
-                getLogger().info("{} connectTask() waiting {}msec to retry", id(), msec);
-                Thread.sleep(msec);
-            }
-        }
-    }
-    
-    /**
-     * Do a blocking single connect attempt.
-     * <p>
-     * Updates state and client and idle task when successful.
-     * @throws IllegalStateException if state no longer appropriate for connect.
-     * @throws Exception if connect failed.
-     */
-    private void oneConnect(int tryCnt) throws Exception {
-        synchronized(this) {
-            if (state != State.CONNECTING)
-                throw wrongState(state, "oneConnect()");
-        }
-        getLogger().trace("{} doing one connect", id());
-
-        T result;
-        try {
-            result = doConnect(client);
-        }
-        catch (Exception e) {
-            getLogger().error("{} doConnect() failed", id(), e);
-            throw e;
-        }
-        
-        getLogger().trace("{} connected", id());
-        synchronized(this) {
-            if (state != State.CONNECTING) {
-                // need to disconnect/close result?
-                throw wrongState(state, "oneConnect()");
-            }
-            setStateUnsafe(State.CONNECTED);
-            client = result;
-            idleManager.connected();
-        }
-    }
-    
-    /**
-     * Get the next connect retry delay.
-     * @param retryCnt the retry attempt number
-     * @return the delay interval in msec
-     */
-    private long getConnectRetryDelayMsec(int retryCnt) {
-        int factor = retryCnt <= 1 ? 1 : 2 << Math.min(retryCnt - 2, 8);
-        return Math.min(BASE_RETRY_WAIT_MSEC * factor, MAX_RETRY_WAIT_MSEC);
-    }
-
-    // Expose with client driven dynamic disconnect/reconnect
-//    /**
-//     * Disconnect the client connector.
-//     * <p>
-//     * Subsequently, a connect() may performed to reconnect.
-//     * @throws IllegalStateException inappropriate state to request disconnect
-//     */
-//    public void disconnect() {
-//        disconnect(false);
-//    }
-    
-    /**
-     * Disconnect the client connector.
-     * <p>
-     * Subsequently, a connect() may performed to reconnect.
-     * @param wasIdle true if being disconnected due to an idle connection
-     * @throws IllegalStateException inappropriate state to request disconnect
-     */
-    void disconnect(boolean wasIdle) {
-        synchronized(this) {
-            if (!(state == State.CONNECTED || state == State.CONNECTING))
-                throw wrongState(state, "disconnect("+wasIdle+")");
-            try {
-                getLogger().trace("Connection {} disconnecting wasIdle:{}", id(), wasIdle);
-                setStateUnsafe(State.DISCONNECTING);
-                cancelConnectTaskUnsafe();
-                doDisconnect(client);
-            }
-            catch (Exception e) {
-                getLogger().error("{} disconnnect() failed", id(), e);
-            }
-            finally {
-                setStateUnsafe(State.DISCONNECTED);
-                idleManager.disconnected(wasIdle);
-            }
-        }
-    }
-    
-    /**
-     * Permanently close the client connector.
-     * <p>
-     * Expect any subsequent operations to fail.
-     * @throws Exception shouldn't happen
-     */
-    @Override
-    public void close() throws Exception {
-        synchronized(this) {
-            if (state == State.CLOSED) {
-                getLogger().trace("{} close() state already {}", id(), state);
-                return;
-            }
-            try {
-                getLogger().info("Connection {} closing", id());
-                setStateUnsafe(State.CLOSING);
-                idleManager.close();
-                cancelConnectTaskUnsafe();
-                if (client != null)
-                    doClose(client);
-            }
-            catch (Exception e) {
-                getLogger().error("{} close() failed", id(), e);
-            }
-            finally {
-                setStateUnsafe(State.CLOSED);
-                connectFuture = null;
-                client = null;
-                connectExecutor.shutdownNow();
-                schedExecutor.shutdownNow();
-            }
-        }
-    }
-
-    /**
-     * Cancel a pending connect task if any.
-     */
-    private void cancelConnectTaskUnsafe() {
-        if (connectFuture != null && !connectFuture.isDone()) {
-            getLogger().trace("{} cancelConnect()", id());
-            connectFuture.cancel(true);
-        }
-    }
-
-    /**
-     * To be called by the connector when it detects a connection lost condition.
-     * <p>
-     * An asynchronous reconnect is initiated if appropriate.
-     * @param t the cause.  may be null.
-     */
-    protected void connectionLost(Throwable t) {
-        synchronized(this) {
-            getLogger().info("Connection {} connectionLost()", id(), t);
-            if (state == State.CONNECTED) {
-                setStateUnsafe(State.DISCONNECTED);
-                // don't allow unwind
-                try {
-                    startAsyncConnect();
-                }
-                catch (Exception e) {
-                    getLogger().error("{} startAsyncConnect() failed", id(), e);
-                }
-            }
-            else {
-                getLogger().trace("{} connectionLost() state already {}", id(), state);
-            }
-        }
-    }
-    
-    private void setStateUnsafe(State state) {
-        State prev = this.state;
-        this.state = state;
-        getLogger().info("{} state {} (was {})", id(), state, prev);
-    } 
-    
-    State getState() {
-        synchronized(this) {
-            return state;
-        }
-    }
-
-    /**
-     * A one-shot request to connect the client to its server.
-     * @param client the connector's client object.  may be null.
-     * @return a connected client.  not necessarily the same as {@code client}.
-     * @throws Exception if unable to connect
-     */
-    protected abstract T doConnect(T client) throws Exception;
-
-    /**
-     * A one-shot request to disconnect the client.
-     * @param client the connector's client object.
-     * @throws Exception if unable to disconnect
-     */
-    protected abstract void doDisconnect(T client) throws Exception;
-    
-    /**
-     * A one-shot request to permanently close the client.
-     * @param client the connector's client object.
-     * @throws Exception if unable to close
-     */
-    protected abstract void doClose(T client) throws Exception;
-    
-    /** 
-     * Get a connector id to use in log and exception msgs
-     * @return the id 
-     */
-    protected abstract String id();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/connectors/common/src/main/java/edgent/connectors/runtime/IdleManager.java
----------------------------------------------------------------------
diff --git a/connectors/common/src/main/java/edgent/connectors/runtime/IdleManager.java b/connectors/common/src/main/java/edgent/connectors/runtime/IdleManager.java
deleted file mode 100644
index bc99957..0000000
--- a/connectors/common/src/main/java/edgent/connectors/runtime/IdleManager.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package edgent.connectors.runtime;
-
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.slf4j.Logger;
-
-import edgent.connectors.runtime.Connector.State;
-
-/**
- * Manager to auto-disconnect a connector when idle and
- * subsequently auto-reconnect it.
- */
-public class IdleManager {
-    private final Connector<?> connector;
-    private final AtomicLong idleTimeoutMsec = new AtomicLong();
-    private final ScheduledExecutorService schedExecutor;
-    private long idleReconnectIntervalMsec;
-    private final AtomicLong lastActionMsec = new AtomicLong();
-    private Future<?> idleFuture;
-    private Future<?> idleReconnectFuture;
-    
-    /**
-     * Create a new idle manager for the connector.
-     * <p>
-     * By default idle (disconnect) timeouts and subsequent auto-reconnect
-     * is disabled. 
-     * @param connector
-     * @param schedExecitor
-     */
-    IdleManager(Connector<?> connector, ScheduledExecutorService schedExecitor) {
-        this.connector = connector;
-        this.schedExecutor = schedExecitor;
-    }
-    
-    protected Logger getLogger() {
-        return connector.getLogger();
-    }
-    
-    /**
-     * A "not idle" event has occurred.
-     * <p>
-     * If idle timeouts have been enabled, this must be called by the
-     * connector when an event occur that qualifies as a "not idle" condition.
-     */
-    public void notIdle() {
-        if (idleTimeoutMsec.get() > 0)
-            lastActionMsec.set(System.currentTimeMillis());
-    }
-
-    /**
-     * Disconnect the connector after the specified period of inactivity.
-     * @param idleTimeoutMsec 0 to disable idle timeouts
-     */
-    public void setIdleTimeoutMsec(long idleTimeoutMsec) {
-        getLogger().trace("{} setIdleTimeout({}msec)", id(), idleTimeoutMsec);
-        this.idleTimeoutMsec.set(idleTimeoutMsec);
-    }
-
-    /**
-     * Reconnect the connector after disconnect due to idleness.
-     * @param intervalSec delay following disconnect until reconnect. 0 to disable.
-     */
-    public void setIdleReconnectInterval(int intervalSec) {
-        getLogger().trace("{} setIdleReconnectInterval({}sec)", id(), intervalSec);
-        idleReconnectIntervalMsec = intervalSec * 1000;
-    }
-    
-    /**
-     * To be called when the connector is being permanently closed.
-     */
-    public void close() {
-        synchronized(this) {
-            if (idleFuture != null)
-                idleFuture.cancel(true);
-            if (idleReconnectFuture != null)
-                idleReconnectFuture.cancel(true);
-        }
-    }
-    
-    /**
-     * To be called when the connector has become connected.
-     */
-    public void connected() {
-        synchronized(this) {
-            if (idleReconnectFuture != null)
-                idleReconnectFuture.cancel(false);
-            scheduleIdleTask(idleTimeoutMsec.get(), false);
-        }
-    }
-
-    /**
-     * To be called when the connector has become disconnected.
-     * @param wasIdle true if the disconnect was due to an idle condition.
-     */
-    public void disconnected(boolean wasIdle) {
-        synchronized(this) {
-            if (idleFuture != null)
-                idleFuture.cancel(false);
-            if (wasIdle)
-                scheduleIdleReconnectTask(idleReconnectIntervalMsec);
-        }
-    }
-    
-    private void scheduleIdleTask(long delayMsec, boolean isResched) {
-        synchronized(this) {
-            if (idleFuture != null)
-                idleFuture.cancel(true);
-            if (delayMsec > 0) {
-                if (isResched)
-                    getLogger().trace("{} scheduleIdleTask({}msec)", id(), delayMsec);
-                else
-                    getLogger().info("{} scheduleIdleTask({}msec)", id(), delayMsec);
-                idleFuture = schedExecutor.schedule(
-                        () -> idleTimeoutTask(), delayMsec, TimeUnit.MILLISECONDS);
-            }
-        }
-    }
-    
-    private void idleTimeoutTask() {
-        boolean doDisconnect = false;
-        try {
-            synchronized(this) {
-                long tmo = idleTimeoutMsec.get();
-                if (tmo == 0)
-                    return;
-                State s = connector.getState();
-                if (s != State.CONNECTED) {
-                    getLogger().info("{} idleTimeoutTask() no longer connected ({})", id(), s);
-                    return;
-                }
-                long last = lastActionMsec.get();
-                long now = System.currentTimeMillis();
-                if (now > last + tmo) {
-                    getLogger().info("{} idleTimeoutTask() disconnecting", id());
-                    doDisconnect = true;
-                }
-                else {
-                    long adj = now - last;
-                    if (adj >= tmo)
-                        adj = 0;
-                    long delayMsec = tmo - adj;
-                    getLogger().trace("{} scheduleIdleTask({}msec)", id(), delayMsec);
-                    scheduleIdleTask(delayMsec, true);
-                }
-            }
-            if (doDisconnect)
-                connector.disconnect(true);
-        }
-        catch (RuntimeException e) {
-            getLogger().trace("{} idleTimeoutTask() disconnect failed", id(), e);
-        }
-    }
-    
-    private void scheduleIdleReconnectTask(long delayMsec) {
-        synchronized(this) {
-            getLogger().info("{} scheduleIdleReconnectTask({}msec)", id(), delayMsec);
-            if (idleReconnectFuture != null)
-                idleReconnectFuture.cancel(true);
-            if (delayMsec > 0) {
-                idleReconnectFuture = schedExecutor.schedule(
-                        () -> idleReconnectTask(),
-                        delayMsec, TimeUnit.MILLISECONDS);
-            }
-        }
-    }
-    
-    private void idleReconnectTask() {
-        try {
-            getLogger().info("{} idleReconnectTask() reconnecting", id());
-            connector.client(); // induce reconnect
-        }
-        catch (Exception e) {
-            getLogger().error("{} idleReconnectTask() failed", id(), e);
-        }
-    }
-
-    private String id() {
-        return connector.id();
-    }
-}