You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/07/13 17:30:52 UTC
[3/4] incubator-quarks git commit: QUARKS-226 add Command connector
QUARKS-226 add Command connector
Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/85247c21
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/85247c21
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/85247c21
Branch: refs/heads/master
Commit: 85247c215bb06e88c67c096e9ddc54c1d2463d7c
Parents: d9b8e67
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Tue Jul 12 12:58:46 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Wed Jul 13 11:22:48 2016 -0400
----------------------------------------------------------------------
build.xml | 2 +
connectors/.classpath | 2 +
connectors/command/build.gradle | 21 ++
connectors/command/build.xml | 50 +++
.../connectors/command/CommandStreams.java | 348 +++++++++++++++++++
.../quarks/connectors/command/package-info.java | 24 ++
.../command/runtime/CommandConnector.java | 103 ++++++
.../command/runtime/CommandReader.java | 111 ++++++
.../command/runtime/CommandWriter.java | 91 +++++
.../command/runtime/ProcessReader.java | 98 ++++++
.../command/runtime/ProcessWriter.java | 66 ++++
.../command/CommandStreamsGlobalTest.java | 35 ++
.../connectors/command/CommandStreamsTest.java | 224 ++++++++++++
connectors/command/src/test/scripts/sinkcmd | 23 ++
.../quarks/test/connectors/common/FileUtil.java | 88 +++++
.../test/connectors/file/FileStreamsTest.java | 33 +-
.../file/FileStreamsTextFileWriterTest.java | 21 +-
17 files changed, 1294 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/85247c21/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index a258ea6..88fd386 100644
--- a/build.xml
+++ b/build.xml
@@ -117,6 +117,7 @@
<ant dir="providers/direct" target="@{target}" useNativeBasedir="true"/>
<ant dir="connectors/common" target="@{target}" useNativeBasedir="true"/>
+ <ant dir="connectors/command" target="@{target}" useNativeBasedir="true"/>
<ant dir="connectors/csv" target="@{target}" useNativeBasedir="true"/>
<ant dir="connectors/iot" target="@{target}" useNativeBasedir="true"/>
<ant dir="connectors/serial" target="@{target}" useNativeBasedir="true"/>
@@ -229,6 +230,7 @@
</classfiles>
<sourcefiles encoding="UTF-8">
<fileset dir="${basedir}/connectors/common/src/main/java" includes="**/*.java"/>
+ <fileset dir="${basedir}/connectors/command/src/main/java" includes="**/*.java"/>
<fileset dir="${basedir}/connectors/csv/src/main/java" includes="**/*.java"/>
<fileset dir="${basedir}/connectors/file/src/main/java" includes="**/*.java"/>
<fileset dir="${basedir}/connectors/iot/src/main/java" includes="**/*.java"/>
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/85247c21/connectors/.classpath
----------------------------------------------------------------------
diff --git a/connectors/.classpath b/connectors/.classpath
index 9eeb7d7..2015cfa 100644
--- a/connectors/.classpath
+++ b/connectors/.classpath
@@ -2,6 +2,8 @@
<classpath>
<classpathentry kind="src" path="common/src/main/java"/>
<classpathentry kind="src" path="common/src/test/java"/>
+ <classpathentry kind="src" path="command/src/main/java"/>
+ <classpathentry kind="src" path="command/src/test/java"/>
<classpathentry kind="src" path="csv/src/main/java"/>
<classpathentry kind="src" path="csv/src/test/java"/>
<classpathentry kind="src" path="file/src/main/java"/>
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/85247c21/connectors/command/build.gradle
----------------------------------------------------------------------
diff --git a/connectors/command/build.gradle b/connectors/command/build.gradle
new file mode 100644
index 0000000..94ae884
--- /dev/null
+++ b/connectors/command/build.gradle
@@ -0,0 +1,21 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+dependencies {
+ compile project(':api:topology')
+ compile project(':connectors:common')
+ testCompile project(':providers:direct')
+ testRuntime ext_classpath
+}
+
+addCompileTestDependencies ':api:topology', ':providers:direct', ':connectors:common'
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/85247c21/connectors/command/build.xml
----------------------------------------------------------------------
diff --git a/connectors/command/build.xml b/connectors/command/build.xml
new file mode 100644
index 0000000..146d82e
--- /dev/null
+++ b/connectors/command/build.xml
@@ -0,0 +1,50 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+<project name="quarks.connectors.command" default="all"
+ xmlns:jacoco="antlib:org.jacoco.ant"
+ >
+ <description>
+ Build Command connectors.
+ </description>
+
+ <property name="component.path" value="connectors/command"/>
+ <import file="../../common-build.xml"/>
+
+ <property name="ext" location="../../ext"/>
+
+ <path id="compile.classpath">
+ <pathelement location="${quarks.lib}/quarks.api.topology.jar" />
+ <pathelement location="${quarks.connectors}/common/lib/quarks.connectors.common.jar" />
+ </path>
+
+ <path id="test.compile.classpath">
+ <pathelement location="${jar}" />
+ <pathelement location="${quarks.lib}/quarks.providers.direct.jar"/>
+ <pathelement location="../../api/topology/test.classes"/>
+ <pathelement location="../../providers/direct/test.classes"/>
+ <pathelement location="../../connectors/common/test.classes"/>
+ <path refid="compile.classpath"/>
+ </path>
+
+ <path id="test.classpath">
+ <pathelement location="${test.classes}" />
+ <path refid="test.compile.classpath"/>
+ <path refid="quarks.ext.classpath" />
+ <path refid="test.common.classpath" />
+ </path>
+
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/85247c21/connectors/command/src/main/java/quarks/connectors/command/CommandStreams.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/main/java/quarks/connectors/command/CommandStreams.java b/connectors/command/src/main/java/quarks/connectors/command/CommandStreams.java
new file mode 100644
index 0000000..ad49f5f
--- /dev/null
+++ b/connectors/command/src/main/java/quarks/connectors/command/CommandStreams.java
@@ -0,0 +1,348 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.connectors.command;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.StringTokenizer;
+import java.util.concurrent.TimeUnit;
+
+import quarks.connectors.command.runtime.CommandReader;
+import quarks.connectors.command.runtime.CommandWriter;
+import quarks.function.Consumer;
+import quarks.function.Supplier;
+import quarks.topology.TSink;
+import quarks.topology.TStream;
+import quarks.topology.Topology;
+
+/**
+ * Connectors for creating a TStream from a Command's / OS Process's output
+ * and sinking a TStream to a Command's / OS Process's input.
+ * <P>
+ * e.g., run a network monitor command (like Tiger Shark) and ingest its output.
+ */
+public class CommandStreams {
+
+ /**
+ * Tokenize the specified {@code cmdString} in the exact same manner as
+ * done by {@link Runtime#exec(String)}.
+ * <P>
+ * This function provides a convenience for creating a {@link ProcessBuilder}
+ * for use by the other CommandStreams methods.
+ * </P>
+ * <P>
+ * Sample use:
+ * <pre>{@code
+ * ProcessBuilder cmd = new ProcessBuilder(tokenize("sh someShellCmd.sh and args"));
+ * TStream<String> stream = CommandStreams.generate(topology, cmd);
+ * }</pre>
+ *
+ * @param cmdString a command string
+ * @return the tokens
+ */
+ public static List<String> tokenize(String cmdString) {
+ List<String> command = new ArrayList<>();
+ StringTokenizer tok = new StringTokenizer(cmdString);
+ while (tok.hasMoreTokens())
+ command.add(tok.nextToken());
+ return command;
+ }
+
+ /**
+ * Create an endless {@code TStream<String>} from a long running command's output.
+ * <P>
+ * The supplied {@code cmd} is used to start the command.
+ * A tuple is created for each UTF8 line read from the command's
+ * {@link Process#getInputStream() output}.
+ * The tuples contain output from stderr if cmd is configured to
+ * {@link ProcessBuilder#redirectErrorStream(boolean) redirect stderr to stdout}.
+ * The command is restarted if a read from the command's output stream
+ * returns EOF or an error.
+ * </P>
+ * <P>
+ * This is a convenience function equivalent to
+ * {@code topology.generate(endlessCommandReader(cmd))}.
+ * </P>
+ * <P>
+ * Sample use: create a stream of tuples for the output from a
+ * continuously running and restartable command:
+ * <pre>{@code
+ * ProcessBuilder cmd = new ProcessBuilder("myCommand");
+ * TStream<String> cmdOutput = CommandStreams.generate(topology, cmd);
+ * cmdOutput.print();
+ * }</pre>
+ *
+ * @param topology the topology to add the source stream to
+ * @param cmd the {@link ProcessBuilder} to start the command
+ * @return the source {@code TStream<String>}
+ *
+ * @see #endlessCommandReader(ProcessBuilder)
+ * @see #tokenize(String)
+ */
+ public static TStream<String> generate(Topology topology, ProcessBuilder cmd) {
+ return topology.generate(endlessCommandReader(cmd));
+ }
+
+ /**
+ * Create a {@code TStream<String>} from a periodically run command's output.
+ * <P>
+ * The supplied {@code cmd} is used to start the command
+ * at the specified {@code period}.
+ * The command's UTF8 {@link Process#getInputStream() output} is read until EOF
+ * and a {@code List<String>} tuple is created containing the collected output.
+ * The tuples contain output from stderr if the cmd is configured to
+ * {@link ProcessBuilder#redirectErrorStream(boolean) redirect stderr to stdout}.
+ * </P>
+ * <P>
+ * This is a convenience function equivalent to
+ * {@code topology.poll(commandReaderList(cmd), period, units)}.
+ * </P>
+ * <P>
+ * Sample use: create a stream of tuples containing the output
+ * from a periodically run command:
+ * <pre>{@code
+ * ProcessBuilder cmd = new ProcessBuilder("date");
+ * TStream<List<String>> cmdOutput =
+ * CommandStreams.periodicSource(topology, cmd, 2, TimeUnit.SECONDS);
+ * cmdOutput.print();
+ * }</pre>
+ *
+ * @param topology the topology to add the source stream to
+ * @param cmd the {@link ProcessBuilder} to start the command
+ * @param period the period to run the command and collect its output
+ * @param units TimeUnit for {@code period}
+ * @return the source {@code TStream<List<String>>}
+ *
+ * @see #commandReaderList(ProcessBuilder)
+ * @see #tokenize(String)
+ */
+ public static TStream<List<String>> periodicSource(Topology topology,
+ ProcessBuilder cmd, long period, TimeUnit units) {
+ return topology.poll(commandReaderList(cmd), period, units);
+ }
+
+ /**
+ * Sink a {@code TStream<String>} to a command's input.
+ * <P>
+ * The supplied {@code cmd} is used to start the command.
+ * Each tuple is written as UTF8 and flushed to the command's {@link Process#getOutputStream() input}.
+ * The command is restarted if a write encounters an error.
+ * </P>
+ * <P>
+ * While each write is followed by a flush() that only helps to
+ * reduce the time it takes to notice that cmd has failed and restart it.
+ * Supposedly "successfully written and flushed" values are not guaranteed to
+ * have been received by a cmd across restarts.
+ * </P>
+ * <P>
+ * This is a convenience function equivalent to
+ * {@code stream.sink(commandWriter(cmd))}
+ * </P>
+ * <P>
+ * Sample use: write a stream of tuples to the input of a command:
+ * <pre>{@code
+ * TStream<String> stream = topology.strings("one", "two", "three");
+ * ProcessBuilder cmd = new ProcessBuilder("cat").redirectOutput(new File("/dev/stdout"));
+ * CommandStreams.sink(stream, cmd);
+ * }</pre>
+ *
+ * @param stream the stream to sink
+ * @param cmd the {@link ProcessBuilder} to start the command
+ * @return a {@link TSink}
+ *
+ * @see #commandWriter(ProcessBuilder)
+ * @see #tokenize(String)
+ */
+ public static TSink<String> sink(TStream<String> stream, ProcessBuilder cmd) {
+ return stream.sink(commandWriter(cmd));
+ }
+
+ /**
+ * Create an endless {@code Supplier<String>} for ingesting a long running command's output.
+ * <P>
+ * This method is particularly helpful in creating a sensor or source connector
+ * class that hides the fact that it uses a command, enabling it to be used
+ * like any other sensor/connector.
+ * </P>
+ * For example:
+ * <pre><code>
+ * // ingest the sensor data
+ * TStream<MySensorData> stream = topology.generate(new MySensor());
+ *
+ * // MySensor class
+ * class MySensor implements Supplier<MySensorData> {
+ * private String[] cmd = new String[] {"mySensorCmd", "arg1"};
+ * private Supplier<String> commandReader =
+ * CommandStreams.endlessCommandReader(new ProcessBuilder(cmd));
+ *
+ * // implement Supplier<MySensorData>.get()
+ * public MySensorData get() {
+ * // get the next line from the cmd and create a MySensorData tuple from it
+ * return createMySensorData(commandReader.get());
+ * }
+ * }
+ * </code></pre>
+ * <P>
+ * The supplied {@code cmd} is used to start the command.
+ * A call to {@link Supplier#get()} reads the next UTF8 line from the command's
+ * {@link Process#getInputStream() output}.
+ * The returned strings contain output from stderr if the cmd is configured to
+ * {@link ProcessBuilder#redirectErrorStream(boolean) redirect stderr to stdput}.
+ * The command is restarted if a read from the command's output stream
+ * returns EOF or an error.
+ * </P>
+ *
+ * @param cmd the {@link ProcessBuilder} to start the command
+ * @return the {@code Supplier<String>}
+ *
+ * @see #generate(Topology, ProcessBuilder)
+ * @see #tokenize(String)
+ */
+ public static Supplier<String> endlessCommandReader(ProcessBuilder cmd) {
+ return new Supplier<String>() {
+ private static final long serialVersionUID = 1L;
+ Supplier<Iterable<String>> reader = new CommandReader(cmd, true);
+ Iterator<String> iter = null;
+ @Override
+ public String get() {
+ if (iter == null) {
+ iter = reader.get().iterator();
+ }
+ if (iter.hasNext()) {
+ return iter.next();
+ }
+ else {
+ // presumably a shutdown condition
+ return null;
+ }
+ }
+ };
+ }
+
+ /**
+ * Create a {@code Supplier<List<String>>} to ingest a command's output.
+ * <P>
+ * This method is particularly helpful in creating a sensor or source connector
+ * class that hides the fact that it uses a command, enabling it to be used
+ * like any other sensor/connector.
+ * </P>
+ * For example:
+ * <pre><code>
+ * // ingest the sensor data
+ * TStream<MySensorData> stream = topology.periodicSource(new MySensor());
+ *
+ * // MySensor class
+ * class MySensor implements Supplier<MySensorData> {
+ * private String[] cmd = new String[] {"mySensorCmd", "arg1"};
+ * private Supplier<List<String>> commandReader =
+ * CommandStreams.commandReaderList(new ProcessBuilder(cmd));
+ *
+ * // implement Supplier<MySensorData>.get()
+ * public MySensorData get() {
+ * // get the cmd output and create a MySensorData tuple from it
+ * return createMySensorData(commandReader.get());
+ * }
+ * }
+ * </code></pre>
+ * <P>
+ * The supplied {@code cmd} is used to start the command.
+ * A call to {@link Supplier#get()} reads the command's UTF8
+ * {@link Process#getInputStream() input stream} until an EOF or error
+ * and returns a {@code List<String>} of the collected input.
+ * The tuples contain output from stderr if the cmd is configured to
+ * {@link ProcessBuilder#redirectErrorStream(boolean) redirect stderr to stdout}.
+ * </P>
+ *
+ * @param cmd the {@link ProcessBuilder} to start the command
+ * @return the {@code Supplier<List<String>>} for the command
+ *
+ * @see #periodicSource(Topology, ProcessBuilder, long, TimeUnit)
+ * @see #tokenize(String)
+ */
+ public static Supplier<List<String>> commandReaderList(ProcessBuilder cmd) {
+ return new Supplier<List<String>>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public List<String> get() {
+ try (CommandReader supplier
+ = new CommandReader(cmd, false))
+ {
+ Iterator<String> iter = supplier.get().iterator();
+ List<String> list = new ArrayList<>();
+ while (iter.hasNext())
+ list.add(iter.next());
+ return list;
+ }
+ }
+ };
+ }
+
+ /**
+ * Create a {@code Consumer<String>} to write UTF8 string data to a command's input.
+ * <P>
+ * This method is particularly helpful in creating a sink connector
+ * that hides the fact that it uses a command, enabling it to be used
+ * like a native connector.
+ * </P>
+ * For example:
+ * <pre><code>
+ * // sink a stream to my connector
+ * TStream<MySensorData> stream = ...;
+ * stream.sink(new MySinkConnector());
+ *
+ * // MySinkConnector class
+ * class MySinkConnector implements Consumer<MySensorData> {
+ * private String[] cmd = new String[] {"mySinkCmd", "arg1"};
+ * private Consumer<String> commandWriter =
+ * CommandStreams.commandWriter(new ProcessBuilder(cmd));
+ *
+ * // implement Consumer<MySensorData>.accept()
+ * public void accept(MySensorData data) {
+ * // convert the data to a string and write it to the cmd
+ * commandWriter.accept(convertMySensorData(data));
+ * }
+ * }
+ * </code></pre>
+ * <P>
+ * The supplied {@link ProcessBuilder cmd} is used to start the command.
+ * Each call to {@link Consumer#accept(Object) accept(String)} writes a
+ * UTF8 string to the command's {@link Process#getOutputStream() input}.
+ * Each write is followed by a flush.
+ * The command is restarted if a write encounters an error.
+ * </P>
+ * <P>
+ * While each write is followed by a flush() that only helps to
+ * reduce the time it takes to notice that cmd has failed and restart it.
+ * Supposedly "successfully written and flushed" values are not guaranteed to
+ * have been received by a cmd across restarts.
+ * </P>
+ *
+ * @param cmd the {@link ProcessBuilder} to start the command
+ * @return the {@code Consumer<String>} for the command
+ *
+ * @see #sink(TStream, ProcessBuilder)
+ * @see #tokenize(String)
+ */
+ public static Consumer<String> commandWriter(ProcessBuilder cmd) {
+ return new CommandWriter(cmd, true);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/85247c21/connectors/command/src/main/java/quarks/connectors/command/package-info.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/main/java/quarks/connectors/command/package-info.java b/connectors/command/src/main/java/quarks/connectors/command/package-info.java
new file mode 100644
index 0000000..981293f
--- /dev/null
+++ b/connectors/command/src/main/java/quarks/connectors/command/package-info.java
@@ -0,0 +1,24 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+/**
+ * Connectors to a Command / OS Process.
+ */
+package quarks.connectors.command;
+
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/85247c21/connectors/command/src/main/java/quarks/connectors/command/runtime/CommandConnector.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/main/java/quarks/connectors/command/runtime/CommandConnector.java b/connectors/command/src/main/java/quarks/connectors/command/runtime/CommandConnector.java
new file mode 100644
index 0000000..7076638
--- /dev/null
+++ b/connectors/command/src/main/java/quarks/connectors/command/runtime/CommandConnector.java
@@ -0,0 +1,103 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.connectors.command.runtime;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base class for source / sink specific command connectors.
+ * <P>
+ * The lifetime of a CommandConnector is that of its Command's execution lifetime.
+ * In the case of a "one shot" command (e.g., a periodicSource's cmd) the
+ * lifetime may be brief - {@code restart==false}.
+ * </P>
+ * <P>
+ * Many command connector uses will involve long running commands, sources
+ * and sinks that want to be robust in the face of inadvertent command
+ * termination/failures - (@code restart==true}.
+ * </P>
+ */
+abstract class CommandConnector implements AutoCloseable {
+ static final Logger logger = LoggerFactory.getLogger(CommandConnector.class);
+
+ private final ProcessBuilder cmd;
+ private final boolean restart;
+ private Process currentProcess;
+ private long numStarts;
+ private long lastStartTimestamp;
+ private final int restartDelayMsec = 1_000;
+
+
+ CommandConnector(ProcessBuilder cmd, boolean restart) {
+ this.cmd = cmd;
+ this.restart = restart;
+ }
+
+ protected boolean canStart() {
+ return restart || numStarts==0;
+ }
+
+ protected Process getCurrentProcess() {
+ return currentProcess;
+ }
+
+ protected void start() throws InterruptedException {
+ if (!canStart())
+ throw new IllegalStateException();
+ closeProcess();
+ try {
+ numStarts++;
+ // ensure we don't thrash on continuous restarts
+ long now = System.currentTimeMillis();
+ if (now < lastStartTimestamp + restartDelayMsec) {
+ logger.info("Sleeping before restarting cmd {}", toCmdForMsg());
+ Thread.sleep(restartDelayMsec);
+ now = System.currentTimeMillis();
+ }
+ lastStartTimestamp = now;
+
+ currentProcess = cmd.start();
+
+ logger.debug("Started cmd {}", toCmdForMsg());
+ }
+ catch (IOException e) {
+ logger.error("Unable to start cmd {}", toCmdForMsg(), e);
+ }
+ }
+
+ protected void closeProcess() {
+ if (currentProcess != null) {
+ currentProcess.destroy();
+ currentProcess = null;
+ }
+ }
+
+ @Override
+ public void close() {
+ closeProcess();
+ }
+
+ String toCmdForMsg() {
+ return cmd.command().toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/85247c21/connectors/command/src/main/java/quarks/connectors/command/runtime/CommandReader.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/main/java/quarks/connectors/command/runtime/CommandReader.java b/connectors/command/src/main/java/quarks/connectors/command/runtime/CommandReader.java
new file mode 100644
index 0000000..0340918
--- /dev/null
+++ b/connectors/command/src/main/java/quarks/connectors/command/runtime/CommandReader.java
@@ -0,0 +1,111 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.connectors.command.runtime;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import quarks.function.Supplier;
+
+/**
+ * Create a {@code Supplier<Iterable<String>>} to ingest a command's output.
+ * <P>
+ * The supplied {@code cmd} is used to start the command
+ * and restart it upon process termination/error if so configured.
+ * </P>
+ * <P>
+ * The iterator returned by {@link Iterable#iterator()) returns
+ * {@hasNext()==true} until a read from {@link Process#getOutputStream()}
+ * returns EOF or an IOError.
+ */
+public class CommandReader extends CommandConnector implements Supplier<Iterable<String>>, AutoCloseable {
+ private static final long serialVersionUID = 1L;
+ private Iterator<String> currentSupplierIterator;
+
+ /**
+ * Create a supplier of UTF8 strings from a command's output.
+ *
+ * @param cmd the {@link ProcessBuilder} to use to start the command
+ * @param restart when true, restart the command upon termination, EOF, or
+ * read error.
+ */
+ public CommandReader(ProcessBuilder cmd, boolean restart) {
+ super(cmd, restart);
+ }
+
+ protected void start() throws InterruptedException {
+ super.start();
+ currentSupplierIterator = new ProcessReader(getCurrentProcess()).get().iterator();
+ }
+
+ protected void closeProcess() {
+ currentSupplierIterator = null;
+ super.closeProcess();
+ }
+
+ @Override
+ public Iterable<String> get() {
+ return new Iterable<String>() {
+
+ @Override
+ public Iterator<String> iterator() {
+ return new Iterator<String>() {
+
+ @Override
+ public boolean hasNext() {
+ try {
+ for(;;) {
+ if (currentSupplierIterator != null) {
+ boolean hasNext = currentSupplierIterator.hasNext();
+ if (hasNext) {
+ return true;
+ }
+ else {
+ // no more from that process. close and loop/retry.
+ closeProcess();
+ }
+ }
+ else if (currentSupplierIterator == null && canStart()) {
+ start(); // and loop/retry
+ }
+ else {
+ return false; // no more input
+ }
+ }
+ }
+ catch (InterruptedException e) {
+ return false;
+ }
+ }
+
+ @Override
+ public String next() {
+ if (currentSupplierIterator != null)
+ return currentSupplierIterator.next();
+ else
+ throw new NoSuchElementException();
+ }
+
+ };
+ }
+
+ };
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/85247c21/connectors/command/src/main/java/quarks/connectors/command/runtime/CommandWriter.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/main/java/quarks/connectors/command/runtime/CommandWriter.java b/connectors/command/src/main/java/quarks/connectors/command/runtime/CommandWriter.java
new file mode 100644
index 0000000..813fe93
--- /dev/null
+++ b/connectors/command/src/main/java/quarks/connectors/command/runtime/CommandWriter.java
@@ -0,0 +1,91 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.connectors.command.runtime;
+
+import quarks.function.Consumer;
+
+/**
+ * A {@code Consumer<String>>} to write data to a command's input.
+ * <P>
+ * The supplied {@code cmd} is used to start the command
+ * and restart it upon process termination/error if so configured.
+ * </P>
+ */
+public class CommandWriter extends CommandConnector implements Consumer<String>, AutoCloseable {
+ private static final long serialVersionUID = 1L;
+ private ProcessWriter currentConsumer;
+
+ /**
+ * Create a consumer to write UTF8 string data to a command's input.
+ * <P>
+ * Each write is followed by a flush() though that only helps to
+ * reduce the time it takes to notice that a cmd has failed.
+ * Supposedly "successfully written and flushed" values are not guaranteed to
+ * have been received by a cmd even following restart.
+ * </P>
+ *
+ * @param cmd the builder to use to start the process
+ * @param restart true to restart the process upon termination or
+ * write error.
+ */
+ public CommandWriter(ProcessBuilder cmd, boolean restart) {
+ super(cmd, restart);
+ }
+
+ protected void start() throws InterruptedException {
+ super.start();
+ currentConsumer = new ProcessWriter(getCurrentProcess());
+ }
+
+ protected void closeProcess() {
+ currentConsumer = null;
+ super.closeProcess();
+ }
+
+ @Override
+ public void accept(String value) {
+ for (;;) {
+ try {
+ if (currentConsumer != null) {
+ try {
+ currentConsumer.accept(value);
+ logger.trace("WROTE: {}", value);
+ return;
+ }
+ catch (RuntimeException e) {
+ closeProcess(); // and loop/retry
+ }
+ }
+ else if (currentConsumer == null && canStart()) {
+ logger.debug("STARTING for: {}", value);
+ start(); // and loop/retry
+ }
+ else {
+ // not restartable. toss it on the floor
+ return;
+ }
+ }
+ catch (InterruptedException e) {
+ // toss it on the floor
+ return;
+ }
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/85247c21/connectors/command/src/main/java/quarks/connectors/command/runtime/ProcessReader.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/main/java/quarks/connectors/command/runtime/ProcessReader.java b/connectors/command/src/main/java/quarks/connectors/command/runtime/ProcessReader.java
new file mode 100644
index 0000000..8b2525a
--- /dev/null
+++ b/connectors/command/src/main/java/quarks/connectors/command/runtime/ProcessReader.java
@@ -0,0 +1,98 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.connectors.command.runtime;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import quarks.function.Supplier;
+
+/**
+ * A {@code Supplier<Iterable<String>>} for ingesting a process's output.
+ * <P>
+ * The iterator returned by {@link Iterable#iterator()) returns
+ * {@hasNext()==true} until a read from {@link Process#getInputStream()}
+ * returns EOF or an IOError.
+ */
+class ProcessReader implements Supplier<Iterable<String>> {
+ private static final long serialVersionUID = 1L;
+ private final BufferedReader reader;
+
+ /**
+ * Create a new supplier of UTF8 strings read from a process's
+ * {@link Process#getInputStream() output}.
+ *
+ * @param process the process to read from.
+ */
+ ProcessReader(Process process) {
+ reader = new BufferedReader(new InputStreamReader(
+ process.getInputStream(), StandardCharsets.UTF_8));
+ }
+
+ @Override
+ public Iterable<String> get() {
+ return new Iterable<String>() {
+
+ @Override
+ public Iterator<String> iterator() {
+ return new Iterator<String>() {
+ private Boolean hasNext = null;
+ private String next = null;
+
+ @Override
+ public boolean hasNext() {
+ if (hasNext != null)
+ return hasNext;
+ next = getNext();
+ hasNext = next != null;
+ return hasNext;
+ }
+
+ @Override
+ public String next() {
+ if (next == null)
+ throw new NoSuchElementException();
+ hasNext = null;
+ return next;
+ }
+
+ };
+ }
+
+ };
+ }
+
+ /**
+ * Get the next available line from the process's stdout
+ * @return null if no more input (or error)
+ */
+ private String getNext() {
+ try {
+ return reader.readLine();
+ } catch (IOException e) {
+ CommandConnector.logger.error("Unable to readline from cmd", e);
+ return null;
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/85247c21/connectors/command/src/main/java/quarks/connectors/command/runtime/ProcessWriter.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/main/java/quarks/connectors/command/runtime/ProcessWriter.java b/connectors/command/src/main/java/quarks/connectors/command/runtime/ProcessWriter.java
new file mode 100644
index 0000000..7fcb003
--- /dev/null
+++ b/connectors/command/src/main/java/quarks/connectors/command/runtime/ProcessWriter.java
@@ -0,0 +1,66 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.connectors.command.runtime;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+
+import quarks.function.Consumer;
+
+/**
+ * A {@code Consumer<String>>} to receive data and write it to a process's input.
+ * <P>
+ * Each write is followed by a flush() though that only helps to
+ * reduce the time it takes to notice that a cmd has failed.
+ * Supposedly "successfully written and flushed" values are not guaranteed to
+ * have been received by a cmd.
+ */
+class ProcessWriter implements Consumer<String> {
+ private static final long serialVersionUID = 1L;
+ private final BufferedWriter writer;
+
+ /**
+ * Create a new consumer for UTF8 strings to write to a process's
+ * {@link Process#getOutputStream() input}
+ *
+ * @param process to process to write to.
+ */
+ ProcessWriter(Process process) {
+ writer = new BufferedWriter(new OutputStreamWriter(
+ process.getOutputStream(), StandardCharsets.UTF_8));
+ }
+
+ @Override
+ public void accept(String value) {
+ try {
+ // see class doc regarding guarantees.
+ writer.write(value);
+ writer.newLine();
+ writer.flush();
+ }
+ catch (IOException e) {
+ CommandConnector.logger.error("Unable to write to cmd", e);
+ // caller (CommandWriter) requires throw to detect failure and recover
+ throw new RuntimeException("Unable to write to cmd", e);
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/85247c21/connectors/command/src/test/java/quarks/test/connectors/command/CommandStreamsGlobalTest.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/test/java/quarks/test/connectors/command/CommandStreamsGlobalTest.java b/connectors/command/src/test/java/quarks/test/connectors/command/CommandStreamsGlobalTest.java
new file mode 100644
index 0000000..eaaf83f
--- /dev/null
+++ b/connectors/command/src/test/java/quarks/test/connectors/command/CommandStreamsGlobalTest.java
@@ -0,0 +1,35 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.test.connectors.command;
+
+/**
+ * CommandStreams connector globalization tests.
+ */
+public class CommandStreamsGlobalTest extends CommandStreamsTest {
+
+ private static final String[] globalLines = new String[] {
+ "\u5b78\u800c\u6642\u7fd2\u4e4b",
+ "\u4e0d\u4ea6\u8aaa\u4e4e"
+ };
+
+ public String[] getLines() {
+ return globalLines;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/85247c21/connectors/command/src/test/java/quarks/test/connectors/command/CommandStreamsTest.java
----------------------------------------------------------------------
diff --git a/connectors/command/src/test/java/quarks/test/connectors/command/CommandStreamsTest.java b/connectors/command/src/test/java/quarks/test/connectors/command/CommandStreamsTest.java
new file mode 100644
index 0000000..3e9cb7c
--- /dev/null
+++ b/connectors/command/src/test/java/quarks/test/connectors/command/CommandStreamsTest.java
@@ -0,0 +1,224 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.test.connectors.command;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.File;
+import java.lang.ProcessBuilder.Redirect;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
+
+import com.google.gson.JsonObject;
+
+import quarks.connectors.command.CommandStreams;
+import quarks.test.connectors.common.FileUtil;
+import quarks.test.connectors.common.TestRepoPath;
+import quarks.test.providers.direct.DirectTestSetup;
+import quarks.test.topology.TopologyAbstractTest;
+import quarks.topology.TSink;
+import quarks.topology.TStream;
+import quarks.topology.Topology;
+import quarks.topology.tester.Condition;
+
+public class CommandStreamsTest extends TopologyAbstractTest implements DirectTestSetup {
+
+ private String[] stdLines = new String[] {
+ "Line 1",
+ "Line 2",
+ "Line 3",
+ };
+
+ public String[] getLines() {
+ return stdLines;
+ }
+
+ private static void delay(long millis) {
+ try {
+ Thread.sleep(millis);
+ }
+ catch (InterruptedException e) { }
+ }
+
+ @Test
+ public void testTokenize() {
+ String cmdString = "myCmd arg1 arg2\targ3";
+ String[] expected = new String[]{"myCmd", "arg1", "arg2", "arg3"};
+
+ assertArrayEquals(expected, CommandStreams.tokenize(cmdString).toArray(new String[0]));
+ }
+
+ @Test
+ public void testPeriodicSource() throws Exception {
+ Topology t = newTopology("testPeriodicSource");
+
+ Path tempFile1 = FileUtil.createTempFile("test1", ".txt", getLines());
+ System.out.println("Test: "+t.getName()+" "+tempFile1);
+
+ ProcessBuilder cmd = new ProcessBuilder("cat", tempFile1.toString());
+
+ int NUM_POLLS = 3;
+ List<String> expLines = new ArrayList<>();
+ for (int i = 0; i < NUM_POLLS; i++) {
+ expLines.addAll(Arrays.asList(getLines()));
+ }
+
+ TStream<List<String>> ls = CommandStreams.periodicSource(t, cmd, 1, TimeUnit.SECONDS);
+ TStream<String> s = ls.flatMap(list -> list);
+
+ try {
+ completeAndValidate("", t, s, 10, expLines.toArray(new String[0]));
+ }
+ finally {
+ tempFile1.toFile().delete();
+ }
+ }
+
+ @Test
+ public void testGenerate() throws Exception {
+ Topology t = newTopology("testGenerate");
+
+ Path tempFile1 = FileUtil.createTempFile("test1", ".txt", getLines());
+ System.out.println("Test: "+t.getName()+" "+tempFile1);
+
+ ProcessBuilder cmd = new ProcessBuilder("cat", tempFile1.toString());
+
+ // N.B. if looking at trace: QUARKS-224 generate() continues running after job is closed
+ TStream<String> s = CommandStreams.generate(t, cmd);
+
+ try {
+ completeAndValidate("", t, s, 10, getLines());
+ }
+ finally {
+ tempFile1.toFile().delete();
+ }
+ }
+
+ @Test
+ public void testGenerateRestart() throws Exception {
+ Topology t = newTopology("testGenerateRestart");
+
+ Path tempFile1 = FileUtil.createTempFile("test1", ".txt", getLines());
+ System.out.println("Test: "+t.getName()+" "+tempFile1);
+
+ ProcessBuilder cmd = new ProcessBuilder("cat", tempFile1.toString());
+
+ int NUM_RUNS = 3;
+ List<String> expLines = new ArrayList<>();
+ for (int i = 0; i < NUM_RUNS; i++) {
+ expLines.addAll(Arrays.asList(getLines()));
+ }
+
+ // N.B. if looking at trace: QUARKS-224 generate() continues running after job is closed
+ TStream<String> s = CommandStreams.generate(t, cmd);
+
+ completeAndValidate("", t, s, 10 + ((NUM_RUNS-1) * 1/*restart delay time*/), expLines.toArray(new String[0]));
+ }
+
+ @Test
+ public void testSink() throws Exception {
+ Topology t = newTopology("testSink");
+
+ Path tempFile1 = FileUtil.createTempFile("test1", ".txt", new String[0]);
+ System.out.println("Test: "+t.getName()+" "+tempFile1);
+
+ ProcessBuilder cmd = new ProcessBuilder("cat")
+ .redirectOutput(Redirect.appendTo(tempFile1.toFile()));
+
+ TStream<String> s = t.strings(getLines());
+
+ TSink<String> sink = CommandStreams.sink(s, cmd);
+
+ assertNotNull(sink);
+
+ try {
+ // start the job, sleep for a bit (await the timeout) then validate sink output
+ Condition<?> never = t.getTester().tupleCount(s, Long.MAX_VALUE);
+ t.getTester().complete(getSubmitter(), new JsonObject(), never, 3, TimeUnit.SECONDS);
+
+ FileUtil.validateFile(tempFile1, getLines());
+ }
+ finally {
+ tempFile1.toFile().delete();
+ }
+ }
+
+ @Test
+ public void testSinkRestart() throws Exception {
+ Topology t = newTopology("testSinkRestart");
+
+ Path tempFile1 = FileUtil.createTempFile("test1", ".txt", new String[0]);
+ System.out.println("Test: "+t.getName()+" "+tempFile1);
+
+ int batchSize = getLines().length;
+
+ // tell cmd to terminate after each batch of lines
+ ProcessBuilder cmd = new ProcessBuilder("sh", getCmdPath("sinkcmd"), ""+batchSize)
+ .redirectOutput(Redirect.appendTo(tempFile1.toFile()))
+ .redirectError(Redirect.to(new File("/dev/stderr")));
+
+ int NUM_RUNS = 3;
+ List<String> expLines = new ArrayList<>();
+ for (int i = 0; i < NUM_RUNS; i++) {
+ expLines.addAll(Arrays.asList(getLines()));
+ }
+ AtomicInteger cnt = new AtomicInteger();
+
+ TStream<String> s = t.strings(expLines.toArray(new String[0]))
+ .filter(tup -> {
+ // need to slow things down so the sinker has time to notice
+ // the cmd has terminated. otherwise we'll get ahead,
+ // tuples will get dropped on the floor and validation will fail.
+ if (cnt.incrementAndGet() > batchSize) {
+ // System.out.println("SLEEPING on cnt "+ cnt.get() + " for "+tup);
+ delay(1_000);
+ cnt.set(1);
+ }
+ return true;
+ });
+
+ TSink<String> sink = CommandStreams.sink(s, cmd);
+
+ assertNotNull(sink);
+
+ try {
+ // start the job, sleep for a bit (await the timeout) then validate sink output
+ Condition<?> never = t.getTester().tupleCount(s, Long.MAX_VALUE);
+ t.getTester().complete(getSubmitter(), new JsonObject(), never,
+ 6 + ((NUM_RUNS-1) * 1/*restart delay*/), TimeUnit.SECONDS);
+
+ FileUtil.validateFile(tempFile1, expLines.toArray(new String[0]));
+ }
+ finally {
+ tempFile1.toFile().delete();
+ }
+ }
+
+ private String getCmdPath(String cmd) {
+ return TestRepoPath.getPath("connectors", "command", "src", "test", "scripts", cmd);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/85247c21/connectors/command/src/test/scripts/sinkcmd
----------------------------------------------------------------------
diff --git a/connectors/command/src/test/scripts/sinkcmd b/connectors/command/src/test/scripts/sinkcmd
new file mode 100644
index 0000000..93a09a4
--- /dev/null
+++ b/connectors/command/src/test/scripts/sinkcmd
@@ -0,0 +1,23 @@
+#!/bin/sh
+
+# echo stdin to stdout. terminate after <terminateCount> lines
+USAGE="$0 [<terminateCount>]"
+
+#set -x
+
+TERM_COUNT=0
+if [ $# -gt 0 ]; then
+ TERM_COUNT=$1
+ shift
+fi
+
+COUNTER=0
+while read LINE; do
+ echo $LINE
+ # echo $0 echoing $LINE >/dev/stderr
+ let COUNTER=COUNTER+1
+ if [ $TERM_COUNT -gt 0 -a $COUNTER -ge $TERM_COUNT ]; then
+ echo $0 TERMINATING after $LINE >/dev/stderr
+ exit 1;
+ fi
+done
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/85247c21/connectors/common/src/test/java/quarks/test/connectors/common/FileUtil.java
----------------------------------------------------------------------
diff --git a/connectors/common/src/test/java/quarks/test/connectors/common/FileUtil.java b/connectors/common/src/test/java/quarks/test/connectors/common/FileUtil.java
new file mode 100644
index 0000000..fc2b7e3
--- /dev/null
+++ b/connectors/common/src/test/java/quarks/test/connectors/common/FileUtil.java
@@ -0,0 +1,88 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.test.connectors.common;
+
+import static org.junit.Assert.assertArrayEquals;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * File utilities for tests
+ */
+public class FileUtil {
+
+ /**
+ * Create a temp file with the specified name, extension and contents.
+ * @param name
+ * @param extension
+ * @param lines content for the file
+ * @return {@code Path} to temp file
+ * @throws Exception on failure
+ */
+ public static Path createTempFile(String name, String extension, String[] lines) throws Exception {
+ Path tmpFile = Files.createTempFile(name, extension);
+ tmpFile.toFile().deleteOnExit();
+
+ try (BufferedWriter bw =
+ new BufferedWriter(new OutputStreamWriter(
+ new FileOutputStream(tmpFile.toFile()), StandardCharsets.UTF_8)))
+ {
+ for (int i = 0; i < lines.length; i++) {
+ bw.write(lines[i]);
+ bw.write("\n");
+ }
+ return tmpFile;
+ }
+ catch (Exception e) {
+ tmpFile.toFile().delete();
+ throw e;
+ }
+ }
+
+ /**
+ * Validate that the file contains the specified content.
+ * @param path file to validate
+ * @param lines the expected content
+ * @throws Exception on failure
+ */
+ public static void validateFile(Path path, String[] lines) throws Exception {
+ List<String> actLines = new ArrayList<>();
+ try (BufferedReader reader =
+ new BufferedReader(new InputStreamReader(
+ new FileInputStream(path.toFile()), StandardCharsets.UTF_8)))
+ {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ actLines.add(line);
+ }
+ assertArrayEquals(lines, actLines.toArray(new String[actLines.size()]));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/85247c21/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTest.java
----------------------------------------------------------------------
diff --git a/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTest.java b/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTest.java
index 5a408ae..d826222 100644
--- a/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTest.java
+++ b/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTest.java
@@ -20,12 +20,8 @@ package quarks.test.connectors.file;
import static org.junit.Assume.assumeTrue;
-import java.io.BufferedWriter;
import java.io.File;
-import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.nio.charset.StandardCharsets;
import java.nio.file.AccessDeniedException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
@@ -43,6 +39,7 @@ import org.junit.Test;
import quarks.connectors.file.FileStreams;
import quarks.function.BiFunction;
import quarks.function.Function;
+import quarks.test.connectors.common.FileUtil;
import quarks.test.providers.direct.DirectTestSetup;
import quarks.test.topology.TopologyAbstractTest;
import quarks.topology.TStream;
@@ -202,8 +199,8 @@ public class FileStreamsTest extends TopologyAbstractTest implements DirectTestS
String[] allLines = Stream.concat(Stream.of(lines), Stream.of(ucLines))
.toArray(String[]::new);
- Path tempFile1 = createTempFile("test1", "txt", lines);
- Path tempFile2 = createTempFile("test2", "txt", ucLines);
+ Path tempFile1 = FileUtil.createTempFile("test1", "txt", lines);
+ Path tempFile2 = FileUtil.createTempFile("test2", "txt", ucLines);
TStream<String> contents = FileStreams.textFileReader(
t.strings(tempFile1.toAbsolutePath().toString(),
@@ -229,8 +226,8 @@ public class FileStreamsTest extends TopologyAbstractTest implements DirectTestS
String[] allLines = Stream.concat(Stream.of(lines), Stream.of(ucLines))
.toArray(String[]::new);
- Path tempFile1 = createTempFile("test1", "txt", lines);
- Path tempFile2 = createTempFile("test2", "txt", ucLines);
+ Path tempFile1 = FileUtil.createTempFile("test1", "txt", lines);
+ Path tempFile2 = FileUtil.createTempFile("test2", "txt", ucLines);
// ensure a problem in one file (tuple) doesn't affect others.
// The problem files should result in a log entry but otherwise be ignored.
@@ -259,8 +256,8 @@ public class FileStreamsTest extends TopologyAbstractTest implements DirectTestS
.map(line -> line.toUpperCase())
.toArray(String[]::new);
- Path tempFile1 = createTempFile("test1", "txt", lines);
- Path tempFile2 = createTempFile("test2", "txt", ucLines);
+ Path tempFile1 = FileUtil.createTempFile("test1", "txt", lines);
+ Path tempFile2 = FileUtil.createTempFile("test2", "txt", ucLines);
// Be insensitive to Windows path separators and "/tmp" location
boolean isWindows = System.getProperty("os.name").startsWith("Windows");
@@ -311,20 +308,4 @@ public class FileStreamsTest extends TopologyAbstractTest implements DirectTestS
tempFile2.toFile().delete();
}
}
-
- public static Path createTempFile(String name, String extension, String[] lines) throws Exception {
- Path tmpFile = Files.createTempFile(name, extension);
-
- BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(
- new FileOutputStream(tmpFile.toFile()), StandardCharsets.UTF_8));
-
- for (int i = 0; i < lines.length; i++) {
- bw.write(lines[i]);
- bw.write("\n");
- }
- bw.flush();
- bw.close();
-
- return tmpFile;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/85247c21/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTextFileWriterTest.java
----------------------------------------------------------------------
diff --git a/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTextFileWriterTest.java b/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTextFileWriterTest.java
index 8d0506e..c810aed 100644
--- a/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTextFileWriterTest.java
+++ b/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTextFileWriterTest.java
@@ -26,17 +26,14 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static quarks.test.connectors.common.FileUtil.createTempFile;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
-import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
@@ -944,20 +941,4 @@ public class FileStreamsTextFileWriterTest extends TopologyAbstractTest implemen
assertNull("path:"+path+" line "+lineCnt+" unexpected IOException "+e, e);
}
}
-
- public static Path createTempFile(String name, String extension, String[] lines) throws Exception {
- Path tmpFile = Files.createTempFile(name, extension);
-
- BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(
- new FileOutputStream(tmpFile.toFile()), StandardCharsets.UTF_8));
-
- for (int i = 0; i < lines.length; i++) {
- bw.write(lines[i]);
- bw.write("\n");
- }
- bw.flush();
- bw.close();
-
- return tmpFile;
- }
}