You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/07/21 13:17:05 UTC

[04/54] [abbrv] [partial] incubator-quarks git commit: add "org.apache." prefix to edgent package names

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/elm327/Cmd.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/elm327/Cmd.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/elm327/Cmd.java
new file mode 100644
index 0000000..f3ae61a
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/elm327/Cmd.java
@@ -0,0 +1,76 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.connectors.elm327;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import com.google.gson.JsonObject;
+
+/**
+ * ELM327 and OBD-II command interface.
+ *
+ */
+public interface Cmd {
+    /**
+     * Key ({@value}) for PID identifier in JSON result.
+     */
+    String PID = "pid";
+
+    /**
+     * Key ({@value}) for timestamp in JSON result. Timestamp value is the
+     * number of milliseconds since the 1907 epoch.
+     */
+    String TS = "ts";
+    
+    /**
+     * Key ({@value}) for the returned value in JSON result.
+     * May not be present.
+     */
+    String VALUE = "value";
+
+    /**
+     * How the command is written to the serial port.
+     * 
+     * @param out
+     *            OutputStream to write bytes to.
+     * @throws IOException
+     *             Exception writing bytes.
+     */
+    void writeCmd(OutputStream out) throws IOException;
+
+    /**
+     * Process the reply into a result.
+     * 
+     * @param result
+     *            JSON object to populate with the result.
+     * @param reply
+     *            Bytes that were returned from the command execution.
+     *            
+     * @return {@code true} result is valid, {@code false} otherwise.
+     */
+    boolean result(JsonObject result, byte[] reply);
+
+    /**
+     * Unique identifier of the command.
+     * 
+     * @return Unique identifier of the command.
+     */
+    String id();
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/elm327/Elm327Cmds.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/elm327/Elm327Cmds.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/elm327/Elm327Cmds.java
new file mode 100644
index 0000000..aed0c23
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/elm327/Elm327Cmds.java
@@ -0,0 +1,75 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.connectors.elm327;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.edgent.connectors.serial.SerialDevice;
+import org.apache.edgent.samples.connectors.elm327.runtime.CommandExecutor;
+
+import com.google.gson.JsonObject;
+
+/**
+ * ELM327 commands.
+ * 
+ * 
+ */
+public enum Elm327Cmds implements Cmd {
+
+    INIT("ATZ"),
+    ECHO_OFF("ATE0"),
+    PROTOCOL_3("ATSP3"),
+    PROTOCOL_5("ATSP5"),
+    BYPASS_INIT("ATBI"),
+    FAST_INIT("ATFI"),
+    SLOW_INIT("ATSI"),;
+
+    private byte[] cmd;
+
+    Elm327Cmds(String code) {
+        cmd = (code + "\r").getBytes(StandardCharsets.US_ASCII);
+    }
+
+    @Override
+    public void writeCmd(OutputStream out) throws IOException {
+        out.write(cmd);
+    }
+
+    @Override
+    public boolean result(JsonObject result, byte[] data) {
+        return true;
+    }
+
+    @Override
+    public String id() {
+        return name();
+    }
+    
+    /**
+     * Initialize the ELM327 to a specific protocol.
+     * @param device Serial device the ELM327 is connected to.
+     * @param protocol OBD-II protocol to initialize to.
+     */
+    public static void initializeProtocol(SerialDevice device, Elm327Cmds protocol) {
+        device.setInitializer(port -> CommandExecutor.initialize(protocol, port.getOutput(), port.getInput()));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/elm327/Elm327Streams.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/elm327/Elm327Streams.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/elm327/Elm327Streams.java
new file mode 100644
index 0000000..dbaf4db
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/elm327/Elm327Streams.java
@@ -0,0 +1,70 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.connectors.elm327;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.edgent.connectors.serial.SerialDevice;
+import org.apache.edgent.function.Supplier;
+import org.apache.edgent.samples.connectors.elm327.runtime.CommandExecutor;
+import org.apache.edgent.topology.TStream;
+
+import com.google.gson.JsonArray;
+
+/**
+ * Streams fetching OBD-II data from an ELM327 through
+ * a serial device.
+ *
+ * @see <a href="https://en.wikipedia.org/wiki/ELM327">ELM327</a>
+ */
+public class Elm327Streams {
+	
+    /**
+     * Periodically execute a number of ELM327 commands.
+     * Each tuple on the returned stream is a JSON array containing
+     * the result for each command.
+     * <BR>
+     * Each result is a JSON object containing the
+     * {@link Cmd#id() command identifier} with key {@link Cmd#PID pid}
+     * and any result set by the individual command, typically with
+     * the key {@link Cmd#VALUE value}.
+     * 
+     * @param device Serial device the ELM327 is connected to.
+     * @param period Period to poll.
+     * @param unit Unit of {@code period}.
+     * @param cmds Commands to execute.
+     * @return Stream containing the results of the command exections.
+     */
+	public static TStream<JsonArray> poll(SerialDevice device, long period, TimeUnit unit, Cmd ... cmds) {
+		
+		Supplier<JsonArray> data = device.getSource(
+				port ->
+		{
+			JsonArray array = new JsonArray();
+			for (Cmd cmd : cmds) {
+				array.add(CommandExecutor.execute(cmd, port.getOutput(), port.getInput()));
+			}
+			return array;
+			
+		});
+		
+		return device.topology().poll(data, period, unit);
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/elm327/Pids01.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/elm327/Pids01.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/elm327/Pids01.java
new file mode 100644
index 0000000..10ecd1e
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/elm327/Pids01.java
@@ -0,0 +1,141 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.connectors.elm327;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.edgent.samples.connectors.elm327.runtime.CommandExecutor;
+
+import com.google.gson.JsonObject;
+
+/**
+ * OBD-II Standard Mode 01 Pids.
+ *
+ * 
+ * @see <a href="https://en.wikipedia.org/wiki/OBD-II_PIDs#Mode_01">OBD-II Mode 01 Pids</a>
+ */
+public enum Pids01 implements Cmd {
+    
+    /**
+     * Get the list of available PIDs.
+     */
+	AVAILABLE_PIDS("00"),
+	
+	/**
+	 * Engine coolant temperature in �C.
+	 */
+	ENGINE_COOLANT_TEMP("05") {
+		@Override
+		protected boolean decode(JsonObject result, byte[] reply) {
+			
+			int[] binary = CommandExecutor.binary(reply, 4, 2);
+			
+			int c = binary[0] - 40;
+			result.addProperty(VALUE, c);
+			
+			return true;
+		}
+	},
+
+	/**
+	 * Engine speed in rpm.
+	 */
+	RPM("0C") {
+		@Override
+		protected boolean decode(JsonObject result, byte[] reply) {
+			
+			int[] binary = CommandExecutor.binary(reply, 4, 4);
+			int rpm = ((binary[0] * 256) + binary[1])/4;
+			result.addProperty(VALUE, rpm);
+			
+			return true;
+		}
+	},
+	
+	/**
+	 * Vehicle speed in km/h.
+	 */
+	SPEED("0D"){
+		@Override
+		protected boolean decode(JsonObject result, byte[] reply) {
+			
+			int[] binary = CommandExecutor.binary(reply, 4, 2);
+			
+			result.addProperty(VALUE, binary[0]);
+			
+			return true;
+		}
+	},
+	
+	/**
+     * Engine air intake temperature in �C.
+     */
+	AIR_INTAKE_TEMP("0F"){
+		@Override
+		protected boolean decode(JsonObject result, byte[] reply) {
+			
+			int[] binary = CommandExecutor.binary(reply, 4, 2);
+			
+			int c = binary[0] - 40;
+			result.addProperty(VALUE, c);
+			
+			return true;
+		}
+	},
+	;
+
+    private final String pid;
+	private final byte[] cmd;
+	
+	Pids01(String pid) {
+		this.pid = pid;
+		cmd = ("01" + pid + "1\r").getBytes(StandardCharsets.US_ASCII);
+	}
+	
+	public String id() {
+		return pid;
+	}
+	
+	@Override
+	public void writeCmd(OutputStream out) throws IOException {
+		out.write(cmd);
+	}
+	@Override
+	public final boolean result(JsonObject result, byte[] data) {
+		return validateReply(data) && decode(result, data);
+	}
+	 boolean decode(JsonObject result, byte[] data) {
+		 return true;
+	 }
+	
+	boolean validateReply(byte[] reply) {
+		if (reply[0] != '4')
+			return false;
+		if (reply[1] != '1')
+			return false;
+		if (reply[2] != pid.charAt(0))
+			return false;
+		if (reply[3] != pid.charAt(1))
+			return false;
+		
+		return true;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/elm327/package-info.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/elm327/package-info.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/elm327/package-info.java
new file mode 100644
index 0000000..9d3a9b9
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/elm327/package-info.java
@@ -0,0 +1,27 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+/**
+ * OBD-II protocol sample using ELM327.
+ * 
+ * ELM327 devices allow connectivity to a vehicle's OBD-II information.
+ *
+ * @see <a href="https://en.wikipedia.org/wiki/OBD-II">OBD-II</a>
+ * @see <a href="https://en.wikipedia.org/wiki/ELM327">ELM327</a>
+ */
+package org.apache.edgent.samples.connectors.elm327;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/elm327/runtime/CommandExecutor.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/elm327/runtime/CommandExecutor.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/elm327/runtime/CommandExecutor.java
new file mode 100644
index 0000000..b5a342f
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/elm327/runtime/CommandExecutor.java
@@ -0,0 +1,118 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.connectors.elm327.runtime;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.edgent.samples.connectors.elm327.Cmd;
+import org.apache.edgent.samples.connectors.elm327.Elm327Cmds;
+
+import com.google.gson.JsonObject;
+
+/**
+ * Runtime execution of ELM327 &amp; OBD-II commands.
+ *
+ */
+public class CommandExecutor {
+
+    public static int[] binary(byte[] reply, int offset, int length) {
+        int[] binary = new int[length / 2];
+        for (int i = 0; i < binary.length; i++) {
+            int h = Character.digit(reply[offset++], 16);
+            int l = Character.digit(reply[offset++], 16);
+            binary[i] = ((h * 16) + l);
+        }
+        return binary;
+    }
+
+    public static void initialize(Cmd protocol, OutputStream out, InputStream in) {
+        try {
+
+            executeUntilOK(10, Elm327Cmds.INIT, out, in);
+            Thread.sleep(1000);
+
+            executeUntilOK(1, Elm327Cmds.ECHO_OFF, out, in);
+
+            executeUntilOK(1, protocol, out, in);
+            executeUntilOK(1, Elm327Cmds.SLOW_INIT, out, in);
+            Thread.sleep(1000);
+
+        } catch (Exception ioe) {
+            throw new RuntimeException(ioe);
+        }
+    }
+
+    private static boolean readUntilPrompt(InputStream in, ByteArrayOutputStream bytes) throws IOException {
+        bytes.reset();
+        for (;;) {
+            int b = in.read();
+            if (b == -1)
+                return false;
+            if (b == ' ')
+                continue;
+            if (b == '\r')
+                continue;
+            if (b == '>')
+                return true;
+
+            bytes.write(b);
+        }
+    }
+
+    public static JsonObject executeUntilOK(int n, Cmd cmd, OutputStream out, InputStream in) throws IOException {
+        try (ByteArrayOutputStream bytes = new ByteArrayOutputStream(16)) {
+            for (int i = 0; i < n; i++) {
+                cmd.writeCmd(out);
+                out.flush();
+
+                if (!readUntilPrompt(in, bytes))
+                    continue;
+
+                byte[] reply = bytes.toByteArray();
+                JsonObject j = new JsonObject();
+                if (cmd.result(j, reply))
+                    return j;
+                break;
+            }
+        }
+        throw new IllegalStateException("Could not execute command:" + cmd);
+    }
+
+    public static JsonObject execute(Cmd cmd, OutputStream out, InputStream in) {
+        try (ByteArrayOutputStream bytes = new ByteArrayOutputStream(16)) {
+            cmd.writeCmd(out);
+            out.flush();
+
+            JsonObject result = new JsonObject();
+            result.addProperty(Cmd.PID, cmd.id());
+            result.addProperty(Cmd.TS, System.currentTimeMillis());
+
+            readUntilPrompt(in, bytes);
+
+            cmd.result(result, bytes.toByteArray());
+
+            return result;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/file/FileReaderApp.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/file/FileReaderApp.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/file/FileReaderApp.java
new file mode 100644
index 0000000..80086b7
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/file/FileReaderApp.java
@@ -0,0 +1,88 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.connectors.file;
+
+import java.io.File;
+
+import org.apache.edgent.connectors.file.FileStreams;
+import org.apache.edgent.console.server.HttpServer;
+import org.apache.edgent.providers.development.DevelopmentProvider;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+
+/**
+ * Watch a directory for files and convert their contents into a stream.
+ */
+public class FileReaderApp {
+    private final String directory;
+    private static final String baseLeafname = "FileSample";
+
+    public static void main(String[] args) throws Exception {
+        if (args.length != 1)
+            throw new Exception("missing pathname to an existing directory");
+        FileReaderApp reader = new FileReaderApp(args[0]);
+        reader.run();
+    }
+   
+    /**
+     * 
+     * @param directory an existing directory to watch for file
+     */
+    public FileReaderApp(String directory) {
+        File dir = new File(directory);
+        if (!dir.exists())
+            throw new IllegalArgumentException("directory doesn't exist");
+        this.directory = directory;
+    }
+    
+    public void run() throws Exception {
+        DevelopmentProvider tp = new DevelopmentProvider();
+        
+        // build the application / topology
+        
+        Topology t = tp.newTopology("FileSample consumer");
+
+        // watch for files
+        TStream<String> pathnames = FileStreams.directoryWatcher(t, () -> directory);
+        
+        // create a stream containing the files' contents.
+        // use a preFn to include a separator in the results.
+        // use a postFn to delete the file once its been processed.
+        TStream<String> contents = FileStreams.textFileReader(pathnames,
+                tuple -> "<PRE-FUNCTION> "+tuple, 
+                (tuple,exception) -> {
+                    // exercise a little caution in case the user pointed
+                    // us at a directory with other things in it
+                    if (tuple.contains("/"+baseLeafname+"_")) { 
+                        new File(tuple).delete();
+                    }
+                    return null;
+                });
+        
+        // print out what's being read
+        contents.print();
+        
+        // run the application / topology
+        System.out.println("starting the reader watching directory " + directory);
+        System.out.println("Console URL for the job: "
+                + tp.getServices().getService(HttpServer.class).getConsoleUrl());
+        tp.submit(t);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/file/FileWriterApp.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/file/FileWriterApp.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/file/FileWriterApp.java
new file mode 100644
index 0000000..c956cb6
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/file/FileWriterApp.java
@@ -0,0 +1,94 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.connectors.file;
+
+import java.io.File;
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.edgent.connectors.file.FileStreams;
+import org.apache.edgent.connectors.file.FileWriterCycleConfig;
+import org.apache.edgent.connectors.file.FileWriterFlushConfig;
+import org.apache.edgent.connectors.file.FileWriterPolicy;
+import org.apache.edgent.connectors.file.FileWriterRetentionConfig;
+import org.apache.edgent.console.server.HttpServer;
+import org.apache.edgent.providers.development.DevelopmentProvider;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+
+/**
+ * Write a TStream&lt;String&gt; to files.
+ */
+public class FileWriterApp {
+    private final String directory;
+    private final String basePathname;
+    private static final String baseLeafname = "FileSample";
+    
+    public static void main(String[] args) throws Exception {
+        if (args.length != 1)
+            throw new Exception("missing pathname to an existing directory");
+        FileWriterApp writer = new FileWriterApp(args[0]);
+        writer.run();
+    }
+    
+    /**
+     * 
+     * @param directory an existing directory to create files in
+     */
+    public FileWriterApp(String directory) {
+        File dir = new File(directory);
+        if (!dir.exists())
+            throw new IllegalArgumentException("directory doesn't exist");
+        this.directory = directory;
+        basePathname = directory+"/"+baseLeafname;
+    }
+    
+    public void run() throws Exception {
+        DevelopmentProvider tp = new DevelopmentProvider();
+        
+        // build the application / topology
+        
+        Topology t = tp.newTopology("FileSample producer");
+        
+        FileWriterPolicy<String> policy = new FileWriterPolicy<String>(
+                FileWriterFlushConfig.newImplicitConfig(),
+                FileWriterCycleConfig.newCountBasedConfig(5),
+                FileWriterRetentionConfig.newFileCountBasedConfig(3));
+
+        // create a tuple stream to write out
+        AtomicInteger cnt = new AtomicInteger();
+        TStream<String> stream = t.poll(() -> {
+                String str = String.format("sample tuple %d %s",
+                        cnt.incrementAndGet(), new Date().toString());
+                System.out.println("created tuple: "+str);
+                return str;
+            }, 1, TimeUnit.SECONDS);
+        
+        // write the stream
+        FileStreams.textFileWriter(stream, () -> basePathname, () -> policy);
+        
+        // run the application / topology
+        System.out.println("starting the producer writing to directory " + directory);
+        System.out.println("Console URL for the job: "
+                + tp.getServices().getService(HttpServer.class).getConsoleUrl());
+        tp.submit(t);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/file/README
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/file/README b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/file/README
new file mode 100644
index 0000000..4477518
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/file/README
@@ -0,0 +1,11 @@
+Sample File Streams connector topology applications.
+
+The file writer application writes a stream's tuples to files.
+
+The file reader application watches a directory for files and reads their
+contents into a stream of tuples.
+
+see scripts/connectors/file/README to run them
+
+FileWriterApp.java - the writer application topology
+FileReaderApp.java - the reader application topology

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/file/package-info.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/file/package-info.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/file/package-info.java
new file mode 100644
index 0000000..a2cfe74
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/file/package-info.java
@@ -0,0 +1,32 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+/**
+ * Samples showing use of the 
+ * <a href="{@docRoot}/org/apache/edgent/connectors/file/package-summary.html">
+ *     File stream connector</a>.
+ * <p>
+ * See &lt;edgent-release&gt;/scripts/connectors/file/README to run the samples.
+ * <p>
+ * The following samples are provided:
+ * <ul>
+ * <li>FileReaderApp.java - a simple directory watcher and file reader application topology</li>
+ * <li>FileWriterApp.java - a simple file writer application topology</li>
+ * </ul>
+ */
+package org.apache.edgent.samples.connectors.file;

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotf/IotfQuickstart.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotf/IotfQuickstart.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotf/IotfQuickstart.java
new file mode 100644
index 0000000..e83b0a2
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotf/IotfQuickstart.java
@@ -0,0 +1,86 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.connectors.iotf;
+
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.edgent.connectors.iot.IotDevice;
+import org.apache.edgent.connectors.iot.QoS;
+import org.apache.edgent.connectors.iotf.IotfDevice;
+import org.apache.edgent.providers.direct.DirectProvider;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+
+import com.google.gson.JsonObject;
+
+/**
+ * IBM Watson IoT Platform Quickstart sample.
+ * Submits a JSON device event every second using the
+ * same format as the Quickstart device simulator,
+ * with keys {@code temp}, {@code humidity}  and {@code objectTemp}
+ * and random values.
+ * <P>
+ * The device type is {@code iotsamples-edgent} and a random
+ * device identifier is generated. Both are printed out when
+ * the application starts.
+ * </P>
+ * A URL is also printed that allows viewing of the data
+ * as it received by the Quickstart service.
+ */
+public class IotfQuickstart {
+
+    public static void main(String[] args) {
+
+        DirectProvider tp = new DirectProvider();
+        Topology topology = tp.newTopology("IotfQuickstart");
+        
+        // Declare a connection to IoTF Quickstart service
+        String deviceId = "qs" + Long.toHexString(new Random().nextLong());
+        IotDevice device = IotfDevice.quickstart(topology, deviceId);
+        
+        System.out.println("Quickstart device type:" + IotfDevice.QUICKSTART_DEVICE_TYPE);
+        System.out.println("Quickstart device id  :" + deviceId);
+        System.out.println("https://quickstart.internetofthings.ibmcloud.com/#/device/"
+             + deviceId);
+             
+        Random r = new Random();
+        TStream<double[]> raw = topology.poll(() -> {
+            double[]  v = new double[3];
+            
+            v[0] = r.nextGaussian() * 10.0 + 40.0;
+            v[1] = r.nextGaussian() * 10.0 + 50.0;
+            v[2] = r.nextGaussian() * 10.0 + 60.0;
+            
+            return v;
+        }, 1, TimeUnit.SECONDS);
+        
+        TStream<JsonObject> json = raw.map(v -> {
+            JsonObject j = new JsonObject();
+            j.addProperty("temp", v[0]);
+            j.addProperty("humidity", v[1]);
+            j.addProperty("objectTemp", v[2]);
+            return j;
+        });
+        
+        device.events(json, "sensors", QoS.FIRE_AND_FORGET);
+
+        tp.submit(topology);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotf/IotfSensors.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotf/IotfSensors.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotf/IotfSensors.java
new file mode 100644
index 0000000..9f56d37
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotf/IotfSensors.java
@@ -0,0 +1,163 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.samples.connectors.iotf;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.edgent.connectors.iot.HeartBeat;
+import org.apache.edgent.connectors.iot.IotDevice;
+import org.apache.edgent.connectors.iot.QoS;
+import org.apache.edgent.connectors.iotf.IotfDevice;
+import org.apache.edgent.providers.direct.DirectProvider;
+import org.apache.edgent.providers.direct.DirectTopology;
+import org.apache.edgent.samples.topology.SensorsAggregates;
+import org.apache.edgent.topology.TStream;
+
+import com.google.gson.JsonObject;
+
+/**
+ * Sample sending sensor device events to IBM Watson IoT Platform. <BR>
+ * Simulates a couple of bursty sensors and sends the readings from the sensors
+ * to IBM Watson IoT Platform as device events with id {@code sensors}. <BR>
+ * Subscribes to device commands with identifier {@code display}.
+ * <P>
+ * In addition a device event with id {@code hearbeat} is sent
+ * every minute. This ensure a connection attempt to IBM Watson IoT Platform
+ * is made immediately rather than waiting for a bursty sensor to become
+ * active.
+ * <P>
+ * This sample requires an IBM Watson IoT Platform service and a device configuration.
+ * The device configuration is read from the file {@code device.cfg} in the
+ * current directory. <BR>
+ * In order to see commands send from IBM Watson IoT Platform
+ * there must be an analytic application
+ * that sends commands with the identifier {@code display}.
+ * </P>
+ */
+public class IotfSensors {
+
+    /**
+     * Run the IotfSensors application.
+     * 
+     * Takes a single argument that is the path to the
+     * device configuration file containing the connection
+     * authentication information.
+     * 
+     * @param args Must contain the path to the device configuration file.
+     * 
+     * @see IotfDevice#IotfDevice(org.apache.edgent.topology.Topology, File)
+     */
+    public static void main(String[] args) {
+        
+        String deviceCfg = args[0];
+
+        DirectProvider tp = new DirectProvider();
+        DirectTopology topology = tp.newTopology("IotfSensors");
+
+        // Declare a connection to IoTF
+        IotDevice device = new IotfDevice(topology, new File(deviceCfg));
+
+        // Simulated sensors for this device.
+        simulatedSensors(device, true);
+        
+        // Heartbeat
+        heartBeat(device, true);
+
+        // Subscribe to commands of id "display" for this
+        // device and print them to standard out
+        displayMessages(device, true);
+
+        tp.submit(topology);
+    }
+
+
+    /**
+     * Simulate two bursty sensors and send the readings as IoTF device events
+     * with an identifier of {@code sensors}.
+     * 
+     * @param device
+     *            IoT device
+     * @param print
+     *            True if the data submitted as events should also be printed to
+     *            standard out.
+     */
+    public static void simulatedSensors(IotDevice device, boolean print) {
+
+        TStream<JsonObject> sensors = SensorsAggregates.sensorsAB(device.topology());
+        if (print)
+            sensors.print();
+
+        // Send the device streams as IoTF device events
+        // with event identifier "sensors".
+        device.events(sensors, "sensors", QoS.FIRE_AND_FORGET);
+    }
+    
+    /**
+     * Create a heart beat device event with
+     * identifier {@code heartbeat} to
+     * ensure there is some immediate output and
+     * the connection to IoTF happens as soon as possible.
+     * @param device IoT device
+     * @param print true to print generated heartbeat tuples to System.out.
+     */
+    public static void heartBeat(IotDevice device, boolean print) {
+      TStream<JsonObject> hbs = 
+          HeartBeat.addHeartBeat(device, 1, TimeUnit.MINUTES, "heartbeat");
+      if (print)
+        hbs.print();
+    }
+    
+
+    /**
+     * Subscribe to IoTF device commands with identifier {@code display}.
+     * Subscribing to device commands returns a stream of JSON objects that
+     * include a timestamp ({@code tsms}), command identifier ({@code command})
+     * and payload ({@code payload}). Payload is the application specific
+     * portion of the command. <BR>
+     * In this case the payload is expected to be a JSON object containing a
+     * {@code msg} key with a string display message. <BR>
+     * The returned stream consists of the display message string extracted from
+     * the JSON payload.
+     * <P>
+     * Note to receive commands a analytic application must exist that generates
+     * them through IBM Watson IoT Platform.
+     * </P>
+     *
+     * @param device the device
+     * @param print true to print the received command's payload to System.out.
+     * @return the stream
+     * @see IotDevice#commands(String...)
+     */
+    public static TStream<String> displayMessages(IotDevice device, boolean print) {
+        // Subscribe to commands of id "display" for this device
+        TStream<JsonObject> statusMsgs = device.commands("display");
+
+        // The returned JSON object includes several fields
+        // tsms - Timestamp in milliseconds (this is generic to a command)
+        // payload.msg - Status message (this is specific to this application)
+
+        // Map to a String object containing the message
+        TStream<String> messages = statusMsgs.map(j -> j.getAsJsonObject("payload").getAsJsonPrimitive("msg").getAsString());
+        if (print)
+            messages.print();
+        return messages;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotf/package-info.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotf/package-info.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotf/package-info.java
new file mode 100644
index 0000000..a3451af
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotf/package-info.java
@@ -0,0 +1,24 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+/**
+ * Samples showing device events and commands with IBM Watson IoT Platform.
+ */
+package org.apache.edgent.samples.connectors.iotf;
+

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/DbUtils.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/DbUtils.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/DbUtils.java
new file mode 100644
index 0000000..a0264f1
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/DbUtils.java
@@ -0,0 +1,140 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.connectors.jdbc;
+
+import java.lang.reflect.Method;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Properties;
+
+import javax.sql.DataSource;
+
+/**
+ * Utilities for the sample's non-streaming JDBC database related actions.
+ */
+public class DbUtils {
+    
+    /**
+     * Get the JDBC {@link DataSource} for the database.
+     * <p>
+     * The "db.name" property specifies the name of the database.
+     * Defaults to "JdbcConnectorSampleDb".
+     * 
+     * @param props configuration properties
+     * @return the DataSource
+     * @throws Exception on failure
+     */
+    public static DataSource getDataSource(Properties props) throws Exception {
+        return createDerbyEmbeddedDataSource(props);
+    }
+    
+    /**
+     * Initialize the sample's database.
+     * <p>
+     * Tables are created as needed and purged.
+     * @param ds the DataSource
+     * @throws Exception on failure
+     */
+    public static void initDb(DataSource ds) throws Exception {
+        createTables(ds);
+        purgeTables(ds);
+    }
+    
+    /**
+     * Purge the sample's tables
+     * @param ds the DataSource
+     * @throws Exception on failure
+     */
+    public static void purgeTables(DataSource ds) throws Exception {
+        try (Connection cn = ds.getConnection()) {
+            Statement stmt = cn.createStatement();
+            stmt.execute("DELETE FROM persons");
+        }
+    }
+
+    private static void createTables(DataSource ds) throws Exception {
+        try (Connection cn = ds.getConnection()) {
+            Statement stmt = cn.createStatement();
+            stmt.execute("CREATE TABLE persons "
+                    + "("
+                    + "id INTEGER NOT NULL,"
+                    + "firstname VARCHAR(40) NOT NULL,"
+                    + "lastname VARCHAR(40) NOT NULL,"
+                    + "PRIMARY KEY (id)"
+                    + ")"
+                    );
+        }
+        catch (SQLException e) {
+            if (e.getLocalizedMessage().contains("already exists"))
+                return;
+            else
+                throw e;
+        }
+   }
+
+   private static DataSource createDerbyEmbeddedDataSource(Properties props) throws Exception
+   {
+       String dbName = props.getProperty("db.name", "JdbcConnectorSampleDb");
+       
+       // For our sample, avoid a compile-time dependency to the jdbc driver.
+       // At runtime, require that the classpath can find it.
+
+       String DERBY_DATA_SOURCE = "org.apache.derby.jdbc.EmbeddedDataSource";
+   
+       Class<?> nsDataSource = null;
+       try {
+           nsDataSource = Class.forName(DERBY_DATA_SOURCE);
+       }
+       catch (ClassNotFoundException e) {
+           String msg = "Fix the test classpath. ";
+           if (System.getenv("DERBY_HOME") == null) {
+               msg += "DERBY_HOME not set. ";
+           }
+           msg += "Class not found: "+e.getLocalizedMessage();
+           System.err.println(msg);
+           throw new IllegalStateException(msg);
+       }
+       DataSource ds = (DataSource) nsDataSource.newInstance();
+
+       @SuppressWarnings("rawtypes")
+       Class[] methodParams = new Class[] {String.class};
+       Method dbname = nsDataSource.getMethod("setDatabaseName", methodParams);
+       Object[] args = new Object[] {dbName};
+       dbname.invoke(ds, args);
+
+       // create the db if necessary
+       Method create = nsDataSource.getMethod("setCreateDatabase", methodParams);
+       args = new Object[] {"create"};
+       create.invoke(ds, args);
+
+       // set the user
+       Method setuser = nsDataSource.getMethod("setUser", methodParams);
+       args = new Object[] { props.getProperty("db.user", System.getProperty("user.name")) };
+       setuser.invoke(ds, args);
+
+       // optionally set the pw
+       Method setpw = nsDataSource.getMethod("setPassword", methodParams);
+       args = new Object[] { props.getProperty("db.password") };
+       if (args[0] != null)
+           setpw.invoke(ds, args);
+   
+       return ds;
+   }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/Person.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/Person.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/Person.java
new file mode 100644
index 0000000..bb57629
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/Person.java
@@ -0,0 +1,37 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.connectors.jdbc;
+
+/**
+ * A Person object for the sample.
+ */
+public class Person {
+    int id;
+    String firstName;
+    String lastName;
+    Person(int id, String first, String last) {
+        this.id = id;
+        this.firstName = first;
+        this.lastName = last;
+    }
+    public String toString() {
+        return String.format("id=%d first=%s last=%s",
+                id, firstName, lastName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/PersonData.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/PersonData.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/PersonData.java
new file mode 100644
index 0000000..f7f1211
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/PersonData.java
@@ -0,0 +1,96 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.connectors.jdbc;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+/**
+ * Utilities for loading the sample's person data.
+ */
+public class PersonData {
+    
+    /**
+     * Load the person data from the path specified by the "persondata.path"
+     * property.
+     * @param props configuration properties
+     * @return the loaded person data
+     * @throws Exception on failure
+     */
+    public static List<Person> loadPersonData(Properties props) throws Exception {
+        String pathname = props.getProperty("persondata.path");
+        List<Person> persons = new ArrayList<>();
+        Path path = new File(pathname).toPath();
+        try (BufferedReader br = Files.newBufferedReader(path)) {
+            int lineno = 0;
+            String line;
+            while ((line = br.readLine()) != null) {
+                lineno++;
+                Object[] fields = parseLine(line, lineno, pathname);
+                if (fields == null)
+                    continue;
+                persons.add(new Person((Integer)fields[0], (String)fields[1], (String)fields[2]));
+            }
+        }
+        return persons;
+    }
+    
+    private static Object[] parseLine(String line, int lineno, String pathname) {
+        line = line.trim();
+        if (line.startsWith("#"))
+            return null;
+
+        // id,firstName,lastName
+        String[] items = line.split(",");
+        if (items.length < 3)
+            throw new IllegalArgumentException("Invalid data on line "+lineno+" in "+pathname);
+        int id;
+        try {
+           id = new Integer(items[0]);
+           if (id < 1)
+               throw new IllegalArgumentException("Invalid data on line "+lineno+" in "+pathname);
+        }
+        catch (NumberFormatException e) {
+            throw new IllegalArgumentException("Invalid data on line "+lineno+" in "+pathname);
+        }
+        
+        Object[] fields = new Object[3];
+        fields[0] = id;
+        fields[1] = items[1].trim();
+        fields[2] = items[2].trim();
+        return fields;
+    }
+
+    /**
+     * Convert a {@code List<Person>} to a {@code List<PersonId>}
+     * @param persons the person list
+     * @return the person id list
+     */
+    public static List<PersonId> toPersonIds(List<Person> persons) {
+        return persons.stream()
+            .map(person -> new PersonId(person.id))
+            .collect(Collectors.toList());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/PersonId.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/PersonId.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/PersonId.java
new file mode 100644
index 0000000..218a027
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/PersonId.java
@@ -0,0 +1,32 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.connectors.jdbc;
+
+/**
+ * Another class containing a person id for the sample.
+ */
+public class PersonId {
+    int id;
+    PersonId(int id) {
+        this.id = id;
+    }
+    public String toString() {
+        return String.format("id=%d", id);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/SimpleReaderApp.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/SimpleReaderApp.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/SimpleReaderApp.java
new file mode 100644
index 0000000..006b459
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/SimpleReaderApp.java
@@ -0,0 +1,102 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.connectors.jdbc;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.edgent.connectors.jdbc.JdbcStreams;
+import org.apache.edgent.providers.direct.DirectProvider;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+
+/**
+ * A simple JDBC connector sample demonstrating streaming read access
+ * of a dbms table and creating stream tuples from the results.
+ */
+public class SimpleReaderApp {
+    private final Properties props;
+
+    public static void main(String[] args) throws Exception {
+        if (args.length != 1)
+            throw new Exception("missing pathname to jdbc.properties file");
+        SimpleReaderApp reader = new SimpleReaderApp(args[0]);
+        reader.run();
+    }
+
+    /**
+     * @param jdbcPropsPath pathname to properties file
+     */
+    SimpleReaderApp(String jdbcPropsPath) throws Exception {
+        props = new Properties();
+        props.load(Files.newBufferedReader(new File(jdbcPropsPath).toPath()));
+    }
+    
+    /**
+     * Create a topology for the writer application and run it.
+     */
+    private void run() throws Exception {
+        DirectProvider tp = new DirectProvider();
+        
+        // build the application/topology
+        
+        Topology t = tp.newTopology("jdbcSampleWriter");
+
+        // Create the JDBC connector
+        JdbcStreams myDb = new JdbcStreams(t,
+                () -> DbUtils.getDataSource(props),
+                dataSource -> dataSource.getConnection());
+        
+        // Create a sample stream of tuples containing a person id
+        List<PersonId> personIdList = PersonData.toPersonIds(PersonData.loadPersonData(props));
+        personIdList.add(new PersonId(99999));
+        TStream<PersonId> personIds = t.collection(personIdList);
+        
+        // For each tuple on the stream, read info from the db table
+        // using the "id", and create a Person tuple on the result stream.
+        TStream<Person> persons = myDb.executeStatement(personIds,
+                () -> "SELECT id, firstname, lastname FROM persons WHERE id = ?",
+                (personId,stmt) -> stmt.setInt(1, personId.id),
+                (personId,rSet,exc,resultStream) -> {
+                        if (exc != null) {
+                            // some failure processing this tuple. an error was logged.
+                            System.err.println("Unable to process id="+personId+": "+exc);
+                            return;
+                        }
+                        if (rSet.next()) {
+                            resultStream.accept(
+                                    new Person(rSet.getInt("id"),
+                                            rSet.getString("firstname"),
+                                            rSet.getString("lastname")));
+                        }
+                        else {
+                            System.err.println("Unknown person id="+personId.id);
+                        }
+                    }
+                );
+        
+        // print out Person tuples as they are retrieved 
+        persons.sink(person -> System.out.println("retrieved person: "+person));
+        
+        // run the application / topology
+        tp.submit(t);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/SimpleWriterApp.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/SimpleWriterApp.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/SimpleWriterApp.java
new file mode 100644
index 0000000..018c97b
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/SimpleWriterApp.java
@@ -0,0 +1,85 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.connectors.jdbc;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.Properties;
+
+import org.apache.edgent.connectors.jdbc.JdbcStreams;
+import org.apache.edgent.providers.direct.DirectProvider;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+
+/**
+ * A simple JDBC connector sample demonstrating streaming write access
+ * of a dbms to add stream tuples to a table.
+ */
+public class SimpleWriterApp {
+    private final Properties props;
+
+    public static void main(String[] args) throws Exception {
+        if (args.length != 1)
+            throw new Exception("missing pathname to jdbc.properties file");
+        SimpleWriterApp writer = new SimpleWriterApp(args[0]);
+        DbUtils.initDb(DbUtils.getDataSource(writer.props));
+        writer.run();
+    }
+
+    /**
+     * @param jdbcPropsPath pathname to properties file
+     */
+    SimpleWriterApp(String jdbcPropsPath) throws Exception {
+        props = new Properties();
+        props.load(Files.newBufferedReader(new File(jdbcPropsPath).toPath()));
+    }
+    
+    /**
+     * Create a topology for the writer application and run it.
+     */
+    private void run() throws Exception {
+        DirectProvider tp = new DirectProvider();
+        
+        // build the application/topology
+        
+        Topology t = tp.newTopology("jdbcSampleWriter");
+
+        // Create the JDBC connector
+        JdbcStreams myDb = new JdbcStreams(t,
+                () -> DbUtils.getDataSource(props),
+                dataSource -> dataSource.getConnection());
+        
+        // Create a sample stream of Person tuples
+        TStream<Person> persons = t.collection(PersonData.loadPersonData(props));
+        
+        // Write stream tuples to a table.
+        myDb.executeStatement(persons,
+                () -> "INSERT INTO persons VALUES(?,?,?)",
+                (person,stmt) -> {
+                    System.out.println("Inserting into persons table: person "+person);
+                    stmt.setInt(1, person.id);
+                    stmt.setString(2, person.firstName);
+                    stmt.setString(3, person.lastName);
+                    }
+                );
+        
+        // run the application / topology
+        tp.submit(t);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/package-info.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/package-info.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/package-info.java
new file mode 100644
index 0000000..4e88b77
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/package-info.java
@@ -0,0 +1,32 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+/**
+ * Samples showing use of the
+ * <a href="{@docRoot}/org/apache/edgent/connectors/jdbc/package-summary.html">
+ *     JDBC stream connector</a>.
+ * <p>
+ * See &lt;edgent-release&gt;/scripts/connectors/jdbc/README to run the samples.
+ * <p>
+ * The following samples are provided:
+ * <ul>
+ * <li>SimpleReaderApp.java - a simple dbms reader application topology</li>
+ * <li>SimpleWriterApp.java - a simple dbms writer application topology</li>
+ * </ul>
+ */
+package org.apache.edgent.samples.connectors.jdbc;

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/KafkaClient.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/KafkaClient.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/KafkaClient.java
new file mode 100644
index 0000000..7d5a530
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/KafkaClient.java
@@ -0,0 +1,144 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.connectors.kafka;
+
+import org.apache.edgent.samples.connectors.Options;
+
+/**
+ * Demonstrate integrating with the Apache Kafka messaging system
+ * <a href="http://kafka.apache.org">http://kafka.apache.org</a>.
+ * <p>
+ * {@link org.apache.edgent.connectors.kafka.KafkaProducer KafkaProducer} is
+ * a connector used to create a bridge between topology streams
+ * and publishing to Kafka topics.
+ * <p>
+ * {@link org.apache.edgent.connectors.kafka.KafkaConsumer KafkaConsumer} is
+ * a connector used to create a bridge between topology streams
+ * and subscribing to Kafka topics.
+ * <p>
+ * The client either publishes messages to a topic or   
+ * subscribes to the topic and reports the messages received.
+ * <p>
+ * By default, a running Kafka cluster with the following
+ * characteristics is assumed:
+ * <ul>
+ * <li>{@code bootstrap.servers="localhost:9092"}</li>
+ * <li>{@code zookeeper.connect="localhost:2181"}</li>
+ * <li>kafka topic {@code "kafkaSampleTopic"} exists</li>
+ * </ul>
+ * <p>
+ * See the Apache Kafka link above for information about setting up a Kafka
+ * cluster as well as creating a topic.
+ * <p>
+ * This may be executed from as:
+ * <UL>
+ * <LI>
+ * {@code java -cp samples/lib/org.apache.edgent.samples.connectors.kafka.jar
+ *  org.apache.edgent.samples.connectors.kafka.KafkaClient -h
+ * } - Run directly from the command line.
+ * </LI>
+ * </UL>
+ * <UL>
+ * <LI>
+ * An application execution within your IDE once you set the class path to include the correct jars.</LI>
+ * </UL>
+ */
+public class KafkaClient {
+    private static final String usage = "usage: "
+            + "\n" + "[-v] [-h]"
+            + "\n" + "pub | sub"
+            + "\n" + "[bootstrap.servers=<value>]"
+            + "\n" + "[zookeeper.connect=<value>]"
+            + "\n" + "[group.id=<value>]"
+            + "\n" + "[pubcnt=<value>]"
+            ;
+
+    public static void main(String[] args) throws Exception {
+        Options options = processArgs(args);
+        if (options == null)
+            return;
+
+        Runner.run(options);
+    }
+     
+    private static Options processArgs(String[] args) {
+        Options options = new Options();
+        initHandlers(options);
+        try {
+            options.processArgs(args);
+        }
+        catch (Exception e) {
+            System.err.println(e);
+            System.out.println(usage);
+            return null;
+        }
+        
+        if ((Boolean)options.get(OPT_HELP)) {
+            System.out.println(usage);
+            return null;
+        }
+        
+        if (!(Boolean)options.get(OPT_PUB) && !(Boolean)options.get(OPT_SUB)) {
+            System.err.println(String.format("Missing argument '%s' or '%s'.", OPT_PUB, OPT_SUB));
+            System.out.println(usage);
+            return null;
+        }
+        
+        String[] announceOpts = new String[] {
+        };
+        if ((Boolean)options.get(OPT_VERBOSE))
+            announceOpts = options.getAll().stream().map(e -> e.getKey()).toArray(String[]::new);
+        for (String opt : announceOpts) {
+            Object value = options.get(opt);
+            if (value != null) {
+                if (opt.toLowerCase().contains("password"))
+                    value = "*****";
+                System.out.println("Using "+opt+"="+value);
+            }
+        }
+        
+        return options;
+    }
+    
+    static final String OPT_VERBOSE = "-v";
+    static final String OPT_HELP = "-h";
+    static final String OPT_PUB = "pub";
+    static final String OPT_SUB = "sub";
+    static final String OPT_BOOTSTRAP_SERVERS = "bootstrap.servers";
+    static final String OPT_ZOOKEEPER_CONNECT = "zookeeper.connect";
+    static final String OPT_GROUP_ID = "group.id";
+    static final String OPT_TOPIC = "topic";
+    static final String OPT_PUB_CNT = "pubcnt";
+    
+    private static void initHandlers(Options opts) {
+        // options for which we have a default
+        opts.addHandler(OPT_HELP, null, false);
+        opts.addHandler(OPT_VERBOSE, null, false);
+        opts.addHandler(OPT_PUB, null, false);
+        opts.addHandler(OPT_SUB, null, false);
+        opts.addHandler(OPT_BOOTSTRAP_SERVERS, v -> v, "localhost:9092");
+        opts.addHandler(OPT_ZOOKEEPER_CONNECT, v -> v, "localhost:2181");
+        opts.addHandler(OPT_TOPIC, v -> v, "kafkaSampleTopic");
+        opts.addHandler(OPT_PUB_CNT, v -> Integer.valueOf(v), -1);
+
+        // optional options (no default value)
+        opts.addHandler(OPT_GROUP_ID, v -> v);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/PublisherApp.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/PublisherApp.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/PublisherApp.java
new file mode 100644
index 0000000..6746a7d
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/PublisherApp.java
@@ -0,0 +1,81 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.connectors.kafka;
+
+import static org.apache.edgent.samples.connectors.kafka.KafkaClient.OPT_BOOTSTRAP_SERVERS;
+import static org.apache.edgent.samples.connectors.kafka.KafkaClient.OPT_PUB_CNT;
+import static org.apache.edgent.samples.connectors.kafka.KafkaClient.OPT_TOPIC;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.edgent.samples.connectors.MsgSupplier;
+import org.apache.edgent.samples.connectors.Options;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+import org.apache.edgent.topology.TopologyProvider;
+
+import org.apache.edgent.connectors.kafka.KafkaProducer;
+
+/**
+ * A Kafka producer/publisher topology application.
+ */
+public class PublisherApp {
+    private final TopologyProvider tp;
+    private final Options options;
+
+    /**
+     * @param tp the TopologyProvider to use.
+     * @param options
+     */
+    PublisherApp(TopologyProvider tp, Options options) {
+        this.tp = tp;
+        this.options = options;
+    }
+    
+    /**
+     * Create a topology for the publisher application.
+     * @return the Topology
+     */
+    public Topology buildAppTopology() {
+        Topology t = tp.newTopology("kafkaClientPublisher");
+        
+        // Create a sample stream of tuples to publish
+        TStream<String> msgs = t.poll(new MsgSupplier(options.get(OPT_PUB_CNT)),
+                                        1L, TimeUnit.SECONDS);
+
+        // Create the KafkaProducer broker connector
+        Map<String,Object> config = newConfig();
+        KafkaProducer kafka = new KafkaProducer(t, () -> config);
+        
+        // Publish the stream to the topic.  The String tuple is the message value.
+        kafka.publish(msgs, options.get(OPT_TOPIC));
+        
+        return t;
+    }
+    
+    private Map<String,Object> newConfig() {
+        Map<String,Object> config = new HashMap<>();
+        // required kafka configuration items
+        config.put("bootstrap.servers", options.get(OPT_BOOTSTRAP_SERVERS));
+        return config;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/README
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/README b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/README
new file mode 100644
index 0000000..6554f8b
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/README
@@ -0,0 +1,26 @@
+Sample Kafka Publisher and Subscriber topology applications.
+
+By default the samples assume the following kafka broker configuration:
+- bootstrap.servers="localhost:9092"
+- zookeeper.connect="localhost:2181"
+- kafka topic "kafkaSampleTopic" exists
+- no authentication
+
+See http://kafka.apache.org for the code and setup information for
+a Kafka broker.
+
+see scripts/connectors/kafka/README to run them
+
+The simple sample
+-----------------
+
+SimplePublisherApp.java - build and run the simple publisher application topology
+SimpleSubscriberApp.java - build and run the simple subscriber application topology
+
+The fully configurable client
+-----------------------------
+
+Runner.java - build and run the publisher or subscriber
+PublisherApp.java - build the publisher application topology
+SubscriberApp.java - build the subscriber application topology
+KafkaClient.java - the client's command line interface

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/Runner.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/Runner.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/Runner.java
new file mode 100644
index 0000000..2ffccfc
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/Runner.java
@@ -0,0 +1,68 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.connectors.kafka;
+
+import static org.apache.edgent.samples.connectors.kafka.KafkaClient.OPT_BOOTSTRAP_SERVERS;
+import static org.apache.edgent.samples.connectors.kafka.KafkaClient.OPT_PUB;
+import static org.apache.edgent.samples.connectors.kafka.KafkaClient.OPT_TOPIC;
+import static org.apache.edgent.samples.connectors.kafka.KafkaClient.OPT_ZOOKEEPER_CONNECT;
+
+import org.apache.edgent.console.server.HttpServer;
+import org.apache.edgent.providers.development.DevelopmentProvider;
+import org.apache.edgent.samples.connectors.Options;
+import org.apache.edgent.topology.Topology;
+
+/**
+ * Build and run the publisher or subscriber application.
+ */
+public class Runner {
+    /**
+     * Build and run the publisher or subscriber application.
+     * @param options command line options
+     * @throws Exception on failure
+     */
+    public static void run(Options options) throws Exception {
+        boolean isPub = options.get(OPT_PUB); 
+
+        // Get a topology runtime provider
+        DevelopmentProvider tp = new DevelopmentProvider();
+
+        Topology top;
+        if (isPub) {
+            PublisherApp publisher = new PublisherApp(tp, options);
+            top = publisher.buildAppTopology();
+        }
+        else {
+            SubscriberApp subscriber = new SubscriberApp(tp, options);
+            top = subscriber.buildAppTopology();
+        }
+        
+        // Submit the app/topology; send or receive the messages.
+        System.out.println(
+                "Using Kafka cluster at bootstrap.servers="
+                + options.get(OPT_BOOTSTRAP_SERVERS)
+                + " zookeeper.connect=" + options.get(OPT_ZOOKEEPER_CONNECT)
+                + "\n" + (isPub ? "Publishing" : "Subscribing") 
+                + " to topic " + options.get(OPT_TOPIC));
+        System.out.println("Console URL for the job: "
+                + tp.getServices().getService(HttpServer.class).getConsoleUrl());
+        tp.submit(top);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/SimplePublisherApp.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/SimplePublisherApp.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/SimplePublisherApp.java
new file mode 100644
index 0000000..a8b9492
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/SimplePublisherApp.java
@@ -0,0 +1,99 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.connectors.kafka;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.edgent.console.server.HttpServer;
+import org.apache.edgent.providers.development.DevelopmentProvider;
+import org.apache.edgent.samples.connectors.Util;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+
+import org.apache.edgent.connectors.kafka.KafkaProducer;
+
+/**
+ * A simple Kafka publisher topology application.
+ */
+public class SimplePublisherApp {
+    private final Properties props;
+    private final String topic;
+
+    public static void main(String[] args) throws Exception {
+        if (args.length != 1)
+            throw new Exception("missing pathname to kafka.properties file");
+        SimplePublisherApp publisher = new SimplePublisherApp(args[0]);
+        publisher.run();
+    }
+
+    /**
+     * @param kafkaPropsPath pathname to properties file
+     */
+    SimplePublisherApp(String kafkaPropsPath) throws Exception {
+        props = new Properties();
+        props.load(Files.newBufferedReader(new File(kafkaPropsPath).toPath()));
+        topic = props.getProperty("topic");
+    }
+    
+    private Map<String,Object> createKafkaConfig() {
+        Map<String,Object> kafkaConfig = new HashMap<>();
+        kafkaConfig.put("bootstrap.servers", props.get("bootstrap.servers"));
+        return kafkaConfig;
+    }
+    
+    /**
+     * Create a topology for the publisher application and run it.
+     */
+    private void run() throws Exception {
+        DevelopmentProvider tp = new DevelopmentProvider();
+        
+        // build the application/topology
+        
+        Topology t = tp.newTopology("kafkaSamplePublisher");
+
+        // Create the Kafka Producer broker connector
+        Map<String,Object> kafkaConfig = createKafkaConfig();
+        KafkaProducer kafka = new KafkaProducer(t, () -> kafkaConfig);
+        
+        // Create a sample stream of tuples to publish
+        AtomicInteger cnt = new AtomicInteger();
+        TStream<String> msgs = t.poll(
+                () -> {
+                    String msg = String.format("Message-%d from %s",
+                            cnt.incrementAndGet(), Util.simpleTS());
+                    System.out.println("poll generated msg to publish: " + msg);
+                    return msg;
+                }, 1L, TimeUnit.SECONDS);
+        
+        // Publish the stream to the topic.  The String tuple is the message value.
+        kafka.publish(msgs, topic);
+        
+        // run the application / topology
+        System.out.println("Console URL for the job: "
+                + tp.getServices().getService(HttpServer.class).getConsoleUrl());
+        tp.submit(t);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/SimpleSubscriberApp.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/SimpleSubscriberApp.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/SimpleSubscriberApp.java
new file mode 100644
index 0000000..7cef424
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/SimpleSubscriberApp.java
@@ -0,0 +1,95 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.connectors.kafka;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.edgent.console.server.HttpServer;
+import org.apache.edgent.providers.development.DevelopmentProvider;
+import org.apache.edgent.samples.connectors.Util;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+
+import org.apache.edgent.connectors.kafka.KafkaConsumer;
+
+/**
+ * A simple Kafka subscriber topology application.
+ */
+public class SimpleSubscriberApp {
+    private final Properties props;
+    private final String topic;
+
+    public static void main(String[] args) throws Exception {
+        if (args.length != 1)
+            throw new Exception("missing pathname to kafka.properties file");
+        SimpleSubscriberApp subscriber = new SimpleSubscriberApp(args[0]);
+        subscriber.run();
+    }
+
+    /**
+     * @param kafkaPropsPath pathname to properties file
+     */
+    SimpleSubscriberApp(String kafkaPropsPath) throws Exception {
+        props = new Properties();
+        props.load(Files.newBufferedReader(new File(kafkaPropsPath).toPath()));
+        topic = props.getProperty("topic");
+    }
+    
+    private Map<String,Object> createKafkaConfig() {
+        Map<String,Object> kafkaConfig = new HashMap<>();
+        kafkaConfig.put("zookeeper.connect", props.get("zookeeper.connect"));
+        // for the sample, be insensitive to old/multiple consumers for
+        // the topic/groupId hanging around
+        kafkaConfig.put("group.id", 
+                "kafkaSampleConsumer_" + Util.simpleTS().replaceAll(":", ""));
+        return kafkaConfig;
+    }
+    
+    /**
+     * Create a topology for the subscriber application and run it.
+     */
+    private void run() throws Exception {
+        DevelopmentProvider tp = new DevelopmentProvider();
+        
+        // build the application/topology
+        
+        Topology t = tp.newTopology("kafkaSampleSubscriber");
+        
+        // Create the Kafka Consumer broker connector
+        Map<String,Object> kafkaConfig = createKafkaConfig();
+        KafkaConsumer kafka = new KafkaConsumer(t, () -> kafkaConfig);
+        
+        // Subscribe to the topic and create a stream of messages
+        TStream<String> msgs = kafka.subscribe(rec -> rec.value(), topic);
+        
+        // Process the received msgs - just print them out
+        msgs.sink(tuple -> System.out.println(
+                String.format("[%s] received: %s", Util.simpleTS(), tuple)));
+        
+        // run the application / topology
+        System.out.println("Console URL for the job: "
+                + tp.getServices().getService(HttpServer.class).getConsoleUrl());
+        tp.submit(t);
+    }
+
+}