You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/07/21 13:17:05 UTC
[04/54] [abbrv] [partial] incubator-quarks git commit: add
"org.apache." prefix to edgent package names
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/elm327/Cmd.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/elm327/Cmd.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/elm327/Cmd.java
new file mode 100644
index 0000000..f3ae61a
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/elm327/Cmd.java
@@ -0,0 +1,76 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.connectors.elm327;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import com.google.gson.JsonObject;
+
+/**
+ * ELM327 and OBD-II command interface.
+ *
+ */
+public interface Cmd {
+ /**
+ * Key ({@value}) for PID identifier in JSON result.
+ */
+ String PID = "pid";
+
+ /**
+ * Key ({@value}) for timestamp in JSON result. Timestamp value is the
+ * number of milliseconds since the 1907 epoch.
+ */
+ String TS = "ts";
+
+ /**
+ * Key ({@value}) for the returned value in JSON result.
+ * May not be present.
+ */
+ String VALUE = "value";
+
+ /**
+ * How the command is written to the serial port.
+ *
+ * @param out
+ * OutputStream to write bytes to.
+ * @throws IOException
+ * Exception writing bytes.
+ */
+ void writeCmd(OutputStream out) throws IOException;
+
+ /**
+ * Process the reply into a result.
+ *
+ * @param result
+ * JSON object to populate with the result.
+ * @param reply
+ * Bytes that were returned from the command execution.
+ *
+ * @return {@code true} result is valid, {@code false} otherwise.
+ */
+ boolean result(JsonObject result, byte[] reply);
+
+ /**
+ * Unique identifier of the command.
+ *
+ * @return Unique identifier of the command.
+ */
+ String id();
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/elm327/Elm327Cmds.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/elm327/Elm327Cmds.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/elm327/Elm327Cmds.java
new file mode 100644
index 0000000..aed0c23
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/elm327/Elm327Cmds.java
@@ -0,0 +1,75 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.connectors.elm327;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.edgent.connectors.serial.SerialDevice;
+import org.apache.edgent.samples.connectors.elm327.runtime.CommandExecutor;
+
+import com.google.gson.JsonObject;
+
+/**
+ * ELM327 commands.
+ *
+ *
+ */
+public enum Elm327Cmds implements Cmd {
+
+ INIT("ATZ"),
+ ECHO_OFF("ATE0"),
+ PROTOCOL_3("ATSP3"),
+ PROTOCOL_5("ATSP5"),
+ BYPASS_INIT("ATBI"),
+ FAST_INIT("ATFI"),
+ SLOW_INIT("ATSI"),;
+
+ private byte[] cmd;
+
+ Elm327Cmds(String code) {
+ cmd = (code + "\r").getBytes(StandardCharsets.US_ASCII);
+ }
+
+ @Override
+ public void writeCmd(OutputStream out) throws IOException {
+ out.write(cmd);
+ }
+
+ @Override
+ public boolean result(JsonObject result, byte[] data) {
+ return true;
+ }
+
+ @Override
+ public String id() {
+ return name();
+ }
+
+ /**
+ * Initialize the ELM327 to a specific protocol.
+ * @param device Serial device the ELM327 is connected to.
+ * @param protocol OBD-II protocol to initialize to.
+ */
+ public static void initializeProtocol(SerialDevice device, Elm327Cmds protocol) {
+ device.setInitializer(port -> CommandExecutor.initialize(protocol, port.getOutput(), port.getInput()));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/elm327/Elm327Streams.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/elm327/Elm327Streams.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/elm327/Elm327Streams.java
new file mode 100644
index 0000000..dbaf4db
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/elm327/Elm327Streams.java
@@ -0,0 +1,70 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.connectors.elm327;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.edgent.connectors.serial.SerialDevice;
+import org.apache.edgent.function.Supplier;
+import org.apache.edgent.samples.connectors.elm327.runtime.CommandExecutor;
+import org.apache.edgent.topology.TStream;
+
+import com.google.gson.JsonArray;
+
+/**
+ * Streams fetching OBD-II data from an ELM327 through
+ * a serial device.
+ *
+ * @see <a href="https://en.wikipedia.org/wiki/ELM327">ELM327</a>
+ */
+public class Elm327Streams {
+
+ /**
+ * Periodically execute a number of ELM327 commands.
+ * Each tuple on the returned stream is a JSON array containing
+ * the result for each command.
+ * <BR>
+ * Each result is a JSON object containing the
+ * {@link Cmd#id() command identifier} with key {@link Cmd#PID pid}
+ * and any result set by the individual command, typically with
+ * the key {@link Cmd#VALUE value}.
+ *
+ * @param device Serial device the ELM327 is connected to.
+ * @param period Period to poll.
+ * @param unit Unit of {@code period}.
+ * @param cmds Commands to execute.
+ * @return Stream containing the results of the command exections.
+ */
+ public static TStream<JsonArray> poll(SerialDevice device, long period, TimeUnit unit, Cmd ... cmds) {
+
+ Supplier<JsonArray> data = device.getSource(
+ port ->
+ {
+ JsonArray array = new JsonArray();
+ for (Cmd cmd : cmds) {
+ array.add(CommandExecutor.execute(cmd, port.getOutput(), port.getInput()));
+ }
+ return array;
+
+ });
+
+ return device.topology().poll(data, period, unit);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/elm327/Pids01.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/elm327/Pids01.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/elm327/Pids01.java
new file mode 100644
index 0000000..10ecd1e
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/elm327/Pids01.java
@@ -0,0 +1,141 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.connectors.elm327;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.edgent.samples.connectors.elm327.runtime.CommandExecutor;
+
+import com.google.gson.JsonObject;
+
+/**
+ * OBD-II Standard Mode 01 Pids.
+ *
+ *
+ * @see <a href="https://en.wikipedia.org/wiki/OBD-II_PIDs#Mode_01">OBD-II Mode 01 Pids</a>
+ */
+public enum Pids01 implements Cmd {
+
+ /**
+ * Get the list of available PIDs.
+ */
+ AVAILABLE_PIDS("00"),
+
+ /**
+ * Engine coolant temperature in �C.
+ */
+ ENGINE_COOLANT_TEMP("05") {
+ @Override
+ protected boolean decode(JsonObject result, byte[] reply) {
+
+ int[] binary = CommandExecutor.binary(reply, 4, 2);
+
+ int c = binary[0] - 40;
+ result.addProperty(VALUE, c);
+
+ return true;
+ }
+ },
+
+ /**
+ * Engine speed in rpm.
+ */
+ RPM("0C") {
+ @Override
+ protected boolean decode(JsonObject result, byte[] reply) {
+
+ int[] binary = CommandExecutor.binary(reply, 4, 4);
+ int rpm = ((binary[0] * 256) + binary[1])/4;
+ result.addProperty(VALUE, rpm);
+
+ return true;
+ }
+ },
+
+ /**
+ * Vehicle speed in km/h.
+ */
+ SPEED("0D"){
+ @Override
+ protected boolean decode(JsonObject result, byte[] reply) {
+
+ int[] binary = CommandExecutor.binary(reply, 4, 2);
+
+ result.addProperty(VALUE, binary[0]);
+
+ return true;
+ }
+ },
+
+ /**
+ * Engine air intake temperature in �C.
+ */
+ AIR_INTAKE_TEMP("0F"){
+ @Override
+ protected boolean decode(JsonObject result, byte[] reply) {
+
+ int[] binary = CommandExecutor.binary(reply, 4, 2);
+
+ int c = binary[0] - 40;
+ result.addProperty(VALUE, c);
+
+ return true;
+ }
+ },
+ ;
+
+ private final String pid;
+ private final byte[] cmd;
+
+ Pids01(String pid) {
+ this.pid = pid;
+ cmd = ("01" + pid + "1\r").getBytes(StandardCharsets.US_ASCII);
+ }
+
+ public String id() {
+ return pid;
+ }
+
+ @Override
+ public void writeCmd(OutputStream out) throws IOException {
+ out.write(cmd);
+ }
+ @Override
+ public final boolean result(JsonObject result, byte[] data) {
+ return validateReply(data) && decode(result, data);
+ }
+ boolean decode(JsonObject result, byte[] data) {
+ return true;
+ }
+
+ boolean validateReply(byte[] reply) {
+ if (reply[0] != '4')
+ return false;
+ if (reply[1] != '1')
+ return false;
+ if (reply[2] != pid.charAt(0))
+ return false;
+ if (reply[3] != pid.charAt(1))
+ return false;
+
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/elm327/package-info.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/elm327/package-info.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/elm327/package-info.java
new file mode 100644
index 0000000..9d3a9b9
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/elm327/package-info.java
@@ -0,0 +1,27 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+/**
+ * OBD-II protocol sample using ELM327.
+ *
+ * ELM327 devices allow connectivity to a vehicle's OBD-II information.
+ *
+ * @see <a href="https://en.wikipedia.org/wiki/OBD-II">OBD-II</a>
+ * @see <a href="https://en.wikipedia.org/wiki/ELM327">ELM327</a>
+ */
+package org.apache.edgent.samples.connectors.elm327;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/elm327/runtime/CommandExecutor.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/elm327/runtime/CommandExecutor.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/elm327/runtime/CommandExecutor.java
new file mode 100644
index 0000000..b5a342f
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/elm327/runtime/CommandExecutor.java
@@ -0,0 +1,118 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.connectors.elm327.runtime;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.edgent.samples.connectors.elm327.Cmd;
+import org.apache.edgent.samples.connectors.elm327.Elm327Cmds;
+
+import com.google.gson.JsonObject;
+
+/**
+ * Runtime execution of ELM327 & OBD-II commands.
+ *
+ */
+public class CommandExecutor {
+
+ public static int[] binary(byte[] reply, int offset, int length) {
+ int[] binary = new int[length / 2];
+ for (int i = 0; i < binary.length; i++) {
+ int h = Character.digit(reply[offset++], 16);
+ int l = Character.digit(reply[offset++], 16);
+ binary[i] = ((h * 16) + l);
+ }
+ return binary;
+ }
+
+ public static void initialize(Cmd protocol, OutputStream out, InputStream in) {
+ try {
+
+ executeUntilOK(10, Elm327Cmds.INIT, out, in);
+ Thread.sleep(1000);
+
+ executeUntilOK(1, Elm327Cmds.ECHO_OFF, out, in);
+
+ executeUntilOK(1, protocol, out, in);
+ executeUntilOK(1, Elm327Cmds.SLOW_INIT, out, in);
+ Thread.sleep(1000);
+
+ } catch (Exception ioe) {
+ throw new RuntimeException(ioe);
+ }
+ }
+
+ private static boolean readUntilPrompt(InputStream in, ByteArrayOutputStream bytes) throws IOException {
+ bytes.reset();
+ for (;;) {
+ int b = in.read();
+ if (b == -1)
+ return false;
+ if (b == ' ')
+ continue;
+ if (b == '\r')
+ continue;
+ if (b == '>')
+ return true;
+
+ bytes.write(b);
+ }
+ }
+
+ public static JsonObject executeUntilOK(int n, Cmd cmd, OutputStream out, InputStream in) throws IOException {
+ try (ByteArrayOutputStream bytes = new ByteArrayOutputStream(16)) {
+ for (int i = 0; i < n; i++) {
+ cmd.writeCmd(out);
+ out.flush();
+
+ if (!readUntilPrompt(in, bytes))
+ continue;
+
+ byte[] reply = bytes.toByteArray();
+ JsonObject j = new JsonObject();
+ if (cmd.result(j, reply))
+ return j;
+ break;
+ }
+ }
+ throw new IllegalStateException("Could not execute command:" + cmd);
+ }
+
+ public static JsonObject execute(Cmd cmd, OutputStream out, InputStream in) {
+ try (ByteArrayOutputStream bytes = new ByteArrayOutputStream(16)) {
+ cmd.writeCmd(out);
+ out.flush();
+
+ JsonObject result = new JsonObject();
+ result.addProperty(Cmd.PID, cmd.id());
+ result.addProperty(Cmd.TS, System.currentTimeMillis());
+
+ readUntilPrompt(in, bytes);
+
+ cmd.result(result, bytes.toByteArray());
+
+ return result;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/file/FileReaderApp.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/file/FileReaderApp.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/file/FileReaderApp.java
new file mode 100644
index 0000000..80086b7
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/file/FileReaderApp.java
@@ -0,0 +1,88 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.connectors.file;
+
+import java.io.File;
+
+import org.apache.edgent.connectors.file.FileStreams;
+import org.apache.edgent.console.server.HttpServer;
+import org.apache.edgent.providers.development.DevelopmentProvider;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+
+/**
+ * Watch a directory for files and convert their contents into a stream.
+ */
+public class FileReaderApp {
+ private final String directory;
+ private static final String baseLeafname = "FileSample";
+
+ public static void main(String[] args) throws Exception {
+ if (args.length != 1)
+ throw new Exception("missing pathname to an existing directory");
+ FileReaderApp reader = new FileReaderApp(args[0]);
+ reader.run();
+ }
+
+ /**
+ *
+ * @param directory an existing directory to watch for file
+ */
+ public FileReaderApp(String directory) {
+ File dir = new File(directory);
+ if (!dir.exists())
+ throw new IllegalArgumentException("directory doesn't exist");
+ this.directory = directory;
+ }
+
+ public void run() throws Exception {
+ DevelopmentProvider tp = new DevelopmentProvider();
+
+ // build the application / topology
+
+ Topology t = tp.newTopology("FileSample consumer");
+
+ // watch for files
+ TStream<String> pathnames = FileStreams.directoryWatcher(t, () -> directory);
+
+ // create a stream containing the files' contents.
+ // use a preFn to include a separator in the results.
+ // use a postFn to delete the file once its been processed.
+ TStream<String> contents = FileStreams.textFileReader(pathnames,
+ tuple -> "<PRE-FUNCTION> "+tuple,
+ (tuple,exception) -> {
+ // exercise a little caution in case the user pointed
+ // us at a directory with other things in it
+ if (tuple.contains("/"+baseLeafname+"_")) {
+ new File(tuple).delete();
+ }
+ return null;
+ });
+
+ // print out what's being read
+ contents.print();
+
+ // run the application / topology
+ System.out.println("starting the reader watching directory " + directory);
+ System.out.println("Console URL for the job: "
+ + tp.getServices().getService(HttpServer.class).getConsoleUrl());
+ tp.submit(t);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/file/FileWriterApp.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/file/FileWriterApp.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/file/FileWriterApp.java
new file mode 100644
index 0000000..c956cb6
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/file/FileWriterApp.java
@@ -0,0 +1,94 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.connectors.file;
+
+import java.io.File;
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.edgent.connectors.file.FileStreams;
+import org.apache.edgent.connectors.file.FileWriterCycleConfig;
+import org.apache.edgent.connectors.file.FileWriterFlushConfig;
+import org.apache.edgent.connectors.file.FileWriterPolicy;
+import org.apache.edgent.connectors.file.FileWriterRetentionConfig;
+import org.apache.edgent.console.server.HttpServer;
+import org.apache.edgent.providers.development.DevelopmentProvider;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+
+/**
+ * Write a TStream<String> to files.
+ */
+public class FileWriterApp {
+ private final String directory;
+ private final String basePathname;
+ private static final String baseLeafname = "FileSample";
+
+ public static void main(String[] args) throws Exception {
+ if (args.length != 1)
+ throw new Exception("missing pathname to an existing directory");
+ FileWriterApp writer = new FileWriterApp(args[0]);
+ writer.run();
+ }
+
+ /**
+ *
+ * @param directory an existing directory to create files in
+ */
+ public FileWriterApp(String directory) {
+ File dir = new File(directory);
+ if (!dir.exists())
+ throw new IllegalArgumentException("directory doesn't exist");
+ this.directory = directory;
+ basePathname = directory+"/"+baseLeafname;
+ }
+
+ public void run() throws Exception {
+ DevelopmentProvider tp = new DevelopmentProvider();
+
+ // build the application / topology
+
+ Topology t = tp.newTopology("FileSample producer");
+
+ FileWriterPolicy<String> policy = new FileWriterPolicy<String>(
+ FileWriterFlushConfig.newImplicitConfig(),
+ FileWriterCycleConfig.newCountBasedConfig(5),
+ FileWriterRetentionConfig.newFileCountBasedConfig(3));
+
+ // create a tuple stream to write out
+ AtomicInteger cnt = new AtomicInteger();
+ TStream<String> stream = t.poll(() -> {
+ String str = String.format("sample tuple %d %s",
+ cnt.incrementAndGet(), new Date().toString());
+ System.out.println("created tuple: "+str);
+ return str;
+ }, 1, TimeUnit.SECONDS);
+
+ // write the stream
+ FileStreams.textFileWriter(stream, () -> basePathname, () -> policy);
+
+ // run the application / topology
+ System.out.println("starting the producer writing to directory " + directory);
+ System.out.println("Console URL for the job: "
+ + tp.getServices().getService(HttpServer.class).getConsoleUrl());
+ tp.submit(t);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/file/README
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/file/README b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/file/README
new file mode 100644
index 0000000..4477518
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/file/README
@@ -0,0 +1,11 @@
+Sample File Streams connector topology applications.
+
+The file writer application writes a stream's tuples to files.
+
+The file reader application watches a directory for files and reads their
+contents into a stream of tuples.
+
+see scripts/connectors/file/README to run them
+
+FileWriterApp.java - the writer application topology
+FileReaderApp.java - the reader application topology
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/file/package-info.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/file/package-info.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/file/package-info.java
new file mode 100644
index 0000000..a2cfe74
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/file/package-info.java
@@ -0,0 +1,32 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+/**
+ * Samples showing use of the
+ * <a href="{@docRoot}/org/apache/edgent/connectors/file/package-summary.html">
+ * File stream connector</a>.
+ * <p>
+ * See <edgent-release>/scripts/connectors/file/README to run the samples.
+ * <p>
+ * The following samples are provided:
+ * <ul>
+ * <li>FileReaderApp.java - a simple directory watcher and file reader application topology</li>
+ * <li>FileWriterApp.java - a simple file writer application topology</li>
+ * </ul>
+ */
+package org.apache.edgent.samples.connectors.file;
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotf/IotfQuickstart.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotf/IotfQuickstart.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotf/IotfQuickstart.java
new file mode 100644
index 0000000..e83b0a2
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotf/IotfQuickstart.java
@@ -0,0 +1,86 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.connectors.iotf;
+
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.edgent.connectors.iot.IotDevice;
+import org.apache.edgent.connectors.iot.QoS;
+import org.apache.edgent.connectors.iotf.IotfDevice;
+import org.apache.edgent.providers.direct.DirectProvider;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+
+import com.google.gson.JsonObject;
+
+/**
+ * IBM Watson IoT Platform Quickstart sample.
+ * Submits a JSON device event every second using the
+ * same format as the Quickstart device simulator,
+ * with keys {@code temp}, {@code humidity} and {@code objectTemp}
+ * and random values.
+ * <P>
+ * The device type is {@code iotsamples-edgent} and a random
+ * device identifier is generated. Both are printed out when
+ * the application starts.
+ * </P>
+ * A URL is also printed that allows viewing of the data
+ * as it received by the Quickstart service.
+ */
+public class IotfQuickstart {
+
+ public static void main(String[] args) {
+
+ DirectProvider tp = new DirectProvider();
+ Topology topology = tp.newTopology("IotfQuickstart");
+
+ // Declare a connection to IoTF Quickstart service
+ String deviceId = "qs" + Long.toHexString(new Random().nextLong());
+ IotDevice device = IotfDevice.quickstart(topology, deviceId);
+
+ System.out.println("Quickstart device type:" + IotfDevice.QUICKSTART_DEVICE_TYPE);
+ System.out.println("Quickstart device id :" + deviceId);
+ System.out.println("https://quickstart.internetofthings.ibmcloud.com/#/device/"
+ + deviceId);
+
+ Random r = new Random();
+ TStream<double[]> raw = topology.poll(() -> {
+ double[] v = new double[3];
+
+ v[0] = r.nextGaussian() * 10.0 + 40.0;
+ v[1] = r.nextGaussian() * 10.0 + 50.0;
+ v[2] = r.nextGaussian() * 10.0 + 60.0;
+
+ return v;
+ }, 1, TimeUnit.SECONDS);
+
+ TStream<JsonObject> json = raw.map(v -> {
+ JsonObject j = new JsonObject();
+ j.addProperty("temp", v[0]);
+ j.addProperty("humidity", v[1]);
+ j.addProperty("objectTemp", v[2]);
+ return j;
+ });
+
+ device.events(json, "sensors", QoS.FIRE_AND_FORGET);
+
+ tp.submit(topology);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotf/IotfSensors.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotf/IotfSensors.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotf/IotfSensors.java
new file mode 100644
index 0000000..9f56d37
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotf/IotfSensors.java
@@ -0,0 +1,163 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.samples.connectors.iotf;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.edgent.connectors.iot.HeartBeat;
+import org.apache.edgent.connectors.iot.IotDevice;
+import org.apache.edgent.connectors.iot.QoS;
+import org.apache.edgent.connectors.iotf.IotfDevice;
+import org.apache.edgent.providers.direct.DirectProvider;
+import org.apache.edgent.providers.direct.DirectTopology;
+import org.apache.edgent.samples.topology.SensorsAggregates;
+import org.apache.edgent.topology.TStream;
+
+import com.google.gson.JsonObject;
+
+/**
+ * Sample sending sensor device events to IBM Watson IoT Platform. <BR>
+ * Simulates a couple of bursty sensors and sends the readings from the sensors
+ * to IBM Watson IoT Platform as device events with id {@code sensors}. <BR>
+ * Subscribes to device commands with identifier {@code display}.
+ * <P>
+ * In addition a device event with id {@code hearbeat} is sent
+ * every minute. This ensure a connection attempt to IBM Watson IoT Platform
+ * is made immediately rather than waiting for a bursty sensor to become
+ * active.
+ * <P>
+ * This sample requires an IBM Watson IoT Platform service and a device configuration.
+ * The device configuration is read from the file {@code device.cfg} in the
+ * current directory. <BR>
+ * In order to see commands send from IBM Watson IoT Platform
+ * there must be an analytic application
+ * that sends commands with the identifier {@code display}.
+ * </P>
+ */
+public class IotfSensors {
+
+ /**
+ * Run the IotfSensors application.
+ *
+ * Takes a single argument that is the path to the
+ * device configuration file containing the connection
+ * authentication information.
+ *
+ * @param args Must contain the path to the device configuration file.
+ *
+ * @see IotfDevice#IotfDevice(org.apache.edgent.topology.Topology, File)
+ */
+ public static void main(String[] args) {
+
+ String deviceCfg = args[0];
+
+ DirectProvider tp = new DirectProvider();
+ DirectTopology topology = tp.newTopology("IotfSensors");
+
+ // Declare a connection to IoTF
+ IotDevice device = new IotfDevice(topology, new File(deviceCfg));
+
+ // Simulated sensors for this device.
+ simulatedSensors(device, true);
+
+ // Heartbeat
+ heartBeat(device, true);
+
+ // Subscribe to commands of id "display" for this
+ // device and print them to standard out
+ displayMessages(device, true);
+
+ tp.submit(topology);
+ }
+
+
+ /**
+ * Simulate two bursty sensors and send the readings as IoTF device events
+ * with an identifier of {@code sensors}.
+ *
+ * @param device
+ * IoT device
+ * @param print
+ * True if the data submitted as events should also be printed to
+ * standard out.
+ */
+ public static void simulatedSensors(IotDevice device, boolean print) {
+
+ TStream<JsonObject> sensors = SensorsAggregates.sensorsAB(device.topology());
+ if (print)
+ sensors.print();
+
+ // Send the device streams as IoTF device events
+ // with event identifier "sensors".
+ device.events(sensors, "sensors", QoS.FIRE_AND_FORGET);
+ }
+
+ /**
+ * Create a heart beat device event with
+ * identifier {@code heartbeat} to
+ * ensure there is some immediate output and
+ * the connection to IoTF happens as soon as possible.
+ * @param device IoT device
+ * @param print true to print generated heartbeat tuples to System.out.
+ */
+ public static void heartBeat(IotDevice device, boolean print) {
+ TStream<JsonObject> hbs =
+ HeartBeat.addHeartBeat(device, 1, TimeUnit.MINUTES, "heartbeat");
+ if (print)
+ hbs.print();
+ }
+
+
+ /**
+ * Subscribe to IoTF device commands with identifier {@code display}.
+ * Subscribing to device commands returns a stream of JSON objects that
+ * include a timestamp ({@code tsms}), command identifier ({@code command})
+ * and payload ({@code payload}). Payload is the application specific
+ * portion of the command. <BR>
+ * In this case the payload is expected to be a JSON object containing a
+ * {@code msg} key with a string display message. <BR>
+ * The returned stream consists of the display message string extracted from
+ * the JSON payload.
+ * <P>
+ * Note to receive commands a analytic application must exist that generates
+ * them through IBM Watson IoT Platform.
+ * </P>
+ *
+ * @param device the device
+ * @param print true to print the received command's payload to System.out.
+ * @return the stream
+ * @see IotDevice#commands(String...)
+ */
+ public static TStream<String> displayMessages(IotDevice device, boolean print) {
+ // Subscribe to commands of id "display" for this device
+ TStream<JsonObject> statusMsgs = device.commands("display");
+
+ // The returned JSON object includes several fields
+ // tsms - Timestamp in milliseconds (this is generic to a command)
+ // payload.msg - Status message (this is specific to this application)
+
+ // Map to a String object containing the message
+ TStream<String> messages = statusMsgs.map(j -> j.getAsJsonObject("payload").getAsJsonPrimitive("msg").getAsString());
+ if (print)
+ messages.print();
+ return messages;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotf/package-info.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotf/package-info.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotf/package-info.java
new file mode 100644
index 0000000..a3451af
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotf/package-info.java
@@ -0,0 +1,24 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+/**
+ * Samples showing device events and commands with IBM Watson IoT Platform.
+ */
+package org.apache.edgent.samples.connectors.iotf;
+
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/DbUtils.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/DbUtils.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/DbUtils.java
new file mode 100644
index 0000000..a0264f1
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/DbUtils.java
@@ -0,0 +1,140 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.connectors.jdbc;
+
+import java.lang.reflect.Method;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Properties;
+
+import javax.sql.DataSource;
+
+/**
+ * Utilities for the sample's non-streaming JDBC database related actions.
+ */
+public class DbUtils {
+
+ /**
+ * Get the JDBC {@link DataSource} for the database.
+ * <p>
+ * The "db.name" property specifies the name of the database.
+ * Defaults to "JdbcConnectorSampleDb".
+ *
+ * @param props configuration properties
+ * @return the DataSource
+ * @throws Exception on failure
+ */
+ public static DataSource getDataSource(Properties props) throws Exception {
+ return createDerbyEmbeddedDataSource(props);
+ }
+
+ /**
+ * Initialize the sample's database.
+ * <p>
+ * Tables are created as needed and purged.
+ * @param ds the DataSource
+ * @throws Exception on failure
+ */
+ public static void initDb(DataSource ds) throws Exception {
+ createTables(ds);
+ purgeTables(ds);
+ }
+
+ /**
+ * Purge the sample's tables
+ * @param ds the DataSource
+ * @throws Exception on failure
+ */
+ public static void purgeTables(DataSource ds) throws Exception {
+ try (Connection cn = ds.getConnection()) {
+ Statement stmt = cn.createStatement();
+ stmt.execute("DELETE FROM persons");
+ }
+ }
+
+ private static void createTables(DataSource ds) throws Exception {
+ try (Connection cn = ds.getConnection()) {
+ Statement stmt = cn.createStatement();
+ stmt.execute("CREATE TABLE persons "
+ + "("
+ + "id INTEGER NOT NULL,"
+ + "firstname VARCHAR(40) NOT NULL,"
+ + "lastname VARCHAR(40) NOT NULL,"
+ + "PRIMARY KEY (id)"
+ + ")"
+ );
+ }
+ catch (SQLException e) {
+ if (e.getLocalizedMessage().contains("already exists"))
+ return;
+ else
+ throw e;
+ }
+ }
+
+ private static DataSource createDerbyEmbeddedDataSource(Properties props) throws Exception
+ {
+ String dbName = props.getProperty("db.name", "JdbcConnectorSampleDb");
+
+ // For our sample, avoid a compile-time dependency to the jdbc driver.
+ // At runtime, require that the classpath can find it.
+
+ String DERBY_DATA_SOURCE = "org.apache.derby.jdbc.EmbeddedDataSource";
+
+ Class<?> nsDataSource = null;
+ try {
+ nsDataSource = Class.forName(DERBY_DATA_SOURCE);
+ }
+ catch (ClassNotFoundException e) {
+ String msg = "Fix the test classpath. ";
+ if (System.getenv("DERBY_HOME") == null) {
+ msg += "DERBY_HOME not set. ";
+ }
+ msg += "Class not found: "+e.getLocalizedMessage();
+ System.err.println(msg);
+ throw new IllegalStateException(msg);
+ }
+ DataSource ds = (DataSource) nsDataSource.newInstance();
+
+ @SuppressWarnings("rawtypes")
+ Class[] methodParams = new Class[] {String.class};
+ Method dbname = nsDataSource.getMethod("setDatabaseName", methodParams);
+ Object[] args = new Object[] {dbName};
+ dbname.invoke(ds, args);
+
+ // create the db if necessary
+ Method create = nsDataSource.getMethod("setCreateDatabase", methodParams);
+ args = new Object[] {"create"};
+ create.invoke(ds, args);
+
+ // set the user
+ Method setuser = nsDataSource.getMethod("setUser", methodParams);
+ args = new Object[] { props.getProperty("db.user", System.getProperty("user.name")) };
+ setuser.invoke(ds, args);
+
+ // optionally set the pw
+ Method setpw = nsDataSource.getMethod("setPassword", methodParams);
+ args = new Object[] { props.getProperty("db.password") };
+ if (args[0] != null)
+ setpw.invoke(ds, args);
+
+ return ds;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/Person.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/Person.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/Person.java
new file mode 100644
index 0000000..bb57629
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/Person.java
@@ -0,0 +1,37 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.connectors.jdbc;
+
+/**
+ * A Person object for the sample.
+ */
+public class Person {
+ int id;
+ String firstName;
+ String lastName;
+ Person(int id, String first, String last) {
+ this.id = id;
+ this.firstName = first;
+ this.lastName = last;
+ }
+ public String toString() {
+ return String.format("id=%d first=%s last=%s",
+ id, firstName, lastName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/PersonData.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/PersonData.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/PersonData.java
new file mode 100644
index 0000000..f7f1211
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/PersonData.java
@@ -0,0 +1,96 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.connectors.jdbc;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+/**
+ * Utilities for loading the sample's person data.
+ */
+public class PersonData {
+
+ /**
+ * Load the person data from the path specified by the "persondata.path"
+ * property.
+ * @param props configuration properties
+ * @return the loaded person data
+ * @throws Exception on failure
+ */
+ public static List<Person> loadPersonData(Properties props) throws Exception {
+ String pathname = props.getProperty("persondata.path");
+ List<Person> persons = new ArrayList<>();
+ Path path = new File(pathname).toPath();
+ try (BufferedReader br = Files.newBufferedReader(path)) {
+ int lineno = 0;
+ String line;
+ while ((line = br.readLine()) != null) {
+ lineno++;
+ Object[] fields = parseLine(line, lineno, pathname);
+ if (fields == null)
+ continue;
+ persons.add(new Person((Integer)fields[0], (String)fields[1], (String)fields[2]));
+ }
+ }
+ return persons;
+ }
+
+ private static Object[] parseLine(String line, int lineno, String pathname) {
+ line = line.trim();
+ if (line.startsWith("#"))
+ return null;
+
+ // id,firstName,lastName
+ String[] items = line.split(",");
+ if (items.length < 3)
+ throw new IllegalArgumentException("Invalid data on line "+lineno+" in "+pathname);
+ int id;
+ try {
+ id = new Integer(items[0]);
+ if (id < 1)
+ throw new IllegalArgumentException("Invalid data on line "+lineno+" in "+pathname);
+ }
+ catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Invalid data on line "+lineno+" in "+pathname);
+ }
+
+ Object[] fields = new Object[3];
+ fields[0] = id;
+ fields[1] = items[1].trim();
+ fields[2] = items[2].trim();
+ return fields;
+ }
+
+ /**
+ * Convert a {@code List<Person>} to a {@code List<PersonId>}
+ * @param persons the person list
+ * @return the person id list
+ */
+ public static List<PersonId> toPersonIds(List<Person> persons) {
+ return persons.stream()
+ .map(person -> new PersonId(person.id))
+ .collect(Collectors.toList());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/PersonId.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/PersonId.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/PersonId.java
new file mode 100644
index 0000000..218a027
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/PersonId.java
@@ -0,0 +1,32 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.connectors.jdbc;
+
+/**
+ * Another class containing a person id for the sample.
+ */
+public class PersonId {
+ int id;
+ PersonId(int id) {
+ this.id = id;
+ }
+ public String toString() {
+ return String.format("id=%d", id);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/SimpleReaderApp.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/SimpleReaderApp.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/SimpleReaderApp.java
new file mode 100644
index 0000000..006b459
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/SimpleReaderApp.java
@@ -0,0 +1,102 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.connectors.jdbc;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.edgent.connectors.jdbc.JdbcStreams;
+import org.apache.edgent.providers.direct.DirectProvider;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+
+/**
+ * A simple JDBC connector sample demonstrating streaming read access
+ * of a dbms table and creating stream tuples from the results.
+ */
+public class SimpleReaderApp {
+ private final Properties props;
+
+ public static void main(String[] args) throws Exception {
+ if (args.length != 1)
+ throw new Exception("missing pathname to jdbc.properties file");
+ SimpleReaderApp reader = new SimpleReaderApp(args[0]);
+ reader.run();
+ }
+
+ /**
+ * @param jdbcPropsPath pathname to properties file
+ */
+ SimpleReaderApp(String jdbcPropsPath) throws Exception {
+ props = new Properties();
+ props.load(Files.newBufferedReader(new File(jdbcPropsPath).toPath()));
+ }
+
+ /**
+ * Create a topology for the writer application and run it.
+ */
+ private void run() throws Exception {
+ DirectProvider tp = new DirectProvider();
+
+ // build the application/topology
+
+ Topology t = tp.newTopology("jdbcSampleWriter");
+
+ // Create the JDBC connector
+ JdbcStreams myDb = new JdbcStreams(t,
+ () -> DbUtils.getDataSource(props),
+ dataSource -> dataSource.getConnection());
+
+ // Create a sample stream of tuples containing a person id
+ List<PersonId> personIdList = PersonData.toPersonIds(PersonData.loadPersonData(props));
+ personIdList.add(new PersonId(99999));
+ TStream<PersonId> personIds = t.collection(personIdList);
+
+ // For each tuple on the stream, read info from the db table
+ // using the "id", and create a Person tuple on the result stream.
+ TStream<Person> persons = myDb.executeStatement(personIds,
+ () -> "SELECT id, firstname, lastname FROM persons WHERE id = ?",
+ (personId,stmt) -> stmt.setInt(1, personId.id),
+ (personId,rSet,exc,resultStream) -> {
+ if (exc != null) {
+ // some failure processing this tuple. an error was logged.
+ System.err.println("Unable to process id="+personId+": "+exc);
+ return;
+ }
+ if (rSet.next()) {
+ resultStream.accept(
+ new Person(rSet.getInt("id"),
+ rSet.getString("firstname"),
+ rSet.getString("lastname")));
+ }
+ else {
+ System.err.println("Unknown person id="+personId.id);
+ }
+ }
+ );
+
+ // print out Person tuples as they are retrieved
+ persons.sink(person -> System.out.println("retrieved person: "+person));
+
+ // run the application / topology
+ tp.submit(t);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/SimpleWriterApp.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/SimpleWriterApp.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/SimpleWriterApp.java
new file mode 100644
index 0000000..018c97b
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/SimpleWriterApp.java
@@ -0,0 +1,85 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.connectors.jdbc;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.Properties;
+
+import org.apache.edgent.connectors.jdbc.JdbcStreams;
+import org.apache.edgent.providers.direct.DirectProvider;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+
+/**
+ * A simple JDBC connector sample demonstrating streaming write access
+ * of a dbms to add stream tuples to a table.
+ */
+public class SimpleWriterApp {
+ private final Properties props;
+
+ public static void main(String[] args) throws Exception {
+ if (args.length != 1)
+ throw new Exception("missing pathname to jdbc.properties file");
+ SimpleWriterApp writer = new SimpleWriterApp(args[0]);
+ DbUtils.initDb(DbUtils.getDataSource(writer.props));
+ writer.run();
+ }
+
+ /**
+ * @param jdbcPropsPath pathname to properties file
+ */
+ SimpleWriterApp(String jdbcPropsPath) throws Exception {
+ props = new Properties();
+ props.load(Files.newBufferedReader(new File(jdbcPropsPath).toPath()));
+ }
+
+ /**
+ * Create a topology for the writer application and run it.
+ */
+ private void run() throws Exception {
+ DirectProvider tp = new DirectProvider();
+
+ // build the application/topology
+
+ Topology t = tp.newTopology("jdbcSampleWriter");
+
+ // Create the JDBC connector
+ JdbcStreams myDb = new JdbcStreams(t,
+ () -> DbUtils.getDataSource(props),
+ dataSource -> dataSource.getConnection());
+
+ // Create a sample stream of Person tuples
+ TStream<Person> persons = t.collection(PersonData.loadPersonData(props));
+
+ // Write stream tuples to a table.
+ myDb.executeStatement(persons,
+ () -> "INSERT INTO persons VALUES(?,?,?)",
+ (person,stmt) -> {
+ System.out.println("Inserting into persons table: person "+person);
+ stmt.setInt(1, person.id);
+ stmt.setString(2, person.firstName);
+ stmt.setString(3, person.lastName);
+ }
+ );
+
+ // run the application / topology
+ tp.submit(t);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/package-info.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/package-info.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/package-info.java
new file mode 100644
index 0000000..4e88b77
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/jdbc/package-info.java
@@ -0,0 +1,32 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+/**
+ * Samples showing use of the
+ * <a href="{@docRoot}/org/apache/edgent/connectors/jdbc/package-summary.html">
+ * JDBC stream connector</a>.
+ * <p>
+ * See <edgent-release>/scripts/connectors/jdbc/README to run the samples.
+ * <p>
+ * The following samples are provided:
+ * <ul>
+ * <li>SimpleReaderApp.java - a simple dbms reader application topology</li>
+ * <li>SimpleWriterApp.java - a simple dbms writer application topology</li>
+ * </ul>
+ */
+package org.apache.edgent.samples.connectors.jdbc;
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/KafkaClient.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/KafkaClient.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/KafkaClient.java
new file mode 100644
index 0000000..7d5a530
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/KafkaClient.java
@@ -0,0 +1,144 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.connectors.kafka;
+
+import org.apache.edgent.samples.connectors.Options;
+
+/**
+ * Demonstrate integrating with the Apache Kafka messaging system
+ * <a href="http://kafka.apache.org">http://kafka.apache.org</a>.
+ * <p>
+ * {@link org.apache.edgent.connectors.kafka.KafkaProducer KafkaProducer} is
+ * a connector used to create a bridge between topology streams
+ * and publishing to Kafka topics.
+ * <p>
+ * {@link org.apache.edgent.connectors.kafka.KafkaConsumer KafkaConsumer} is
+ * a connector used to create a bridge between topology streams
+ * and subscribing to Kafka topics.
+ * <p>
+ * The client either publishes messages to a topic or
+ * subscribes to the topic and reports the messages received.
+ * <p>
+ * By default, a running Kafka cluster with the following
+ * characteristics is assumed:
+ * <ul>
+ * <li>{@code bootstrap.servers="localhost:9092"}</li>
+ * <li>{@code zookeeper.connect="localhost:2181"}</li>
+ * <li>kafka topic {@code "kafkaSampleTopic"} exists</li>
+ * </ul>
+ * <p>
+ * See the Apache Kafka link above for information about setting up a Kafka
+ * cluster as well as creating a topic.
+ * <p>
+ * This may be executed from as:
+ * <UL>
+ * <LI>
+ * {@code java -cp samples/lib/org.apache.edgent.samples.connectors.kafka.jar
+ * org.apache.edgent.samples.connectors.kafka.KafkaClient -h
+ * } - Run directly from the command line.
+ * </LI>
+ * </UL>
+ * <UL>
+ * <LI>
+ * An application execution within your IDE once you set the class path to include the correct jars.</LI>
+ * </UL>
+ */
+public class KafkaClient {
+ private static final String usage = "usage: "
+ + "\n" + "[-v] [-h]"
+ + "\n" + "pub | sub"
+ + "\n" + "[bootstrap.servers=<value>]"
+ + "\n" + "[zookeeper.connect=<value>]"
+ + "\n" + "[group.id=<value>]"
+ + "\n" + "[pubcnt=<value>]"
+ ;
+
+ public static void main(String[] args) throws Exception {
+ Options options = processArgs(args);
+ if (options == null)
+ return;
+
+ Runner.run(options);
+ }
+
+ private static Options processArgs(String[] args) {
+ Options options = new Options();
+ initHandlers(options);
+ try {
+ options.processArgs(args);
+ }
+ catch (Exception e) {
+ System.err.println(e);
+ System.out.println(usage);
+ return null;
+ }
+
+ if ((Boolean)options.get(OPT_HELP)) {
+ System.out.println(usage);
+ return null;
+ }
+
+ if (!(Boolean)options.get(OPT_PUB) && !(Boolean)options.get(OPT_SUB)) {
+ System.err.println(String.format("Missing argument '%s' or '%s'.", OPT_PUB, OPT_SUB));
+ System.out.println(usage);
+ return null;
+ }
+
+ String[] announceOpts = new String[] {
+ };
+ if ((Boolean)options.get(OPT_VERBOSE))
+ announceOpts = options.getAll().stream().map(e -> e.getKey()).toArray(String[]::new);
+ for (String opt : announceOpts) {
+ Object value = options.get(opt);
+ if (value != null) {
+ if (opt.toLowerCase().contains("password"))
+ value = "*****";
+ System.out.println("Using "+opt+"="+value);
+ }
+ }
+
+ return options;
+ }
+
+ static final String OPT_VERBOSE = "-v";
+ static final String OPT_HELP = "-h";
+ static final String OPT_PUB = "pub";
+ static final String OPT_SUB = "sub";
+ static final String OPT_BOOTSTRAP_SERVERS = "bootstrap.servers";
+ static final String OPT_ZOOKEEPER_CONNECT = "zookeeper.connect";
+ static final String OPT_GROUP_ID = "group.id";
+ static final String OPT_TOPIC = "topic";
+ static final String OPT_PUB_CNT = "pubcnt";
+
+ private static void initHandlers(Options opts) {
+ // options for which we have a default
+ opts.addHandler(OPT_HELP, null, false);
+ opts.addHandler(OPT_VERBOSE, null, false);
+ opts.addHandler(OPT_PUB, null, false);
+ opts.addHandler(OPT_SUB, null, false);
+ opts.addHandler(OPT_BOOTSTRAP_SERVERS, v -> v, "localhost:9092");
+ opts.addHandler(OPT_ZOOKEEPER_CONNECT, v -> v, "localhost:2181");
+ opts.addHandler(OPT_TOPIC, v -> v, "kafkaSampleTopic");
+ opts.addHandler(OPT_PUB_CNT, v -> Integer.valueOf(v), -1);
+
+ // optional options (no default value)
+ opts.addHandler(OPT_GROUP_ID, v -> v);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/PublisherApp.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/PublisherApp.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/PublisherApp.java
new file mode 100644
index 0000000..6746a7d
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/PublisherApp.java
@@ -0,0 +1,81 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.connectors.kafka;
+
+import static org.apache.edgent.samples.connectors.kafka.KafkaClient.OPT_BOOTSTRAP_SERVERS;
+import static org.apache.edgent.samples.connectors.kafka.KafkaClient.OPT_PUB_CNT;
+import static org.apache.edgent.samples.connectors.kafka.KafkaClient.OPT_TOPIC;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.edgent.samples.connectors.MsgSupplier;
+import org.apache.edgent.samples.connectors.Options;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+import org.apache.edgent.topology.TopologyProvider;
+
+import org.apache.edgent.connectors.kafka.KafkaProducer;
+
+/**
+ * A Kafka producer/publisher topology application.
+ */
+public class PublisherApp {
+ private final TopologyProvider tp;
+ private final Options options;
+
+ /**
+ * @param tp the TopologyProvider to use.
+ * @param options
+ */
+ PublisherApp(TopologyProvider tp, Options options) {
+ this.tp = tp;
+ this.options = options;
+ }
+
+ /**
+ * Create a topology for the publisher application.
+ * @return the Topology
+ */
+ public Topology buildAppTopology() {
+ Topology t = tp.newTopology("kafkaClientPublisher");
+
+ // Create a sample stream of tuples to publish
+ TStream<String> msgs = t.poll(new MsgSupplier(options.get(OPT_PUB_CNT)),
+ 1L, TimeUnit.SECONDS);
+
+ // Create the KafkaProducer broker connector
+ Map<String,Object> config = newConfig();
+ KafkaProducer kafka = new KafkaProducer(t, () -> config);
+
+ // Publish the stream to the topic. The String tuple is the message value.
+ kafka.publish(msgs, options.get(OPT_TOPIC));
+
+ return t;
+ }
+
+ private Map<String,Object> newConfig() {
+ Map<String,Object> config = new HashMap<>();
+ // required kafka configuration items
+ config.put("bootstrap.servers", options.get(OPT_BOOTSTRAP_SERVERS));
+ return config;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/README
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/README b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/README
new file mode 100644
index 0000000..6554f8b
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/README
@@ -0,0 +1,26 @@
+Sample Kafka Publisher and Subscriber topology applications.
+
+By default the samples assume the following kafka broker configuration:
+- bootstrap.servers="localhost:9092"
+- zookeeper.connect="localhost:2181"
+- kafka topic "kafkaSampleTopic" exists
+- no authentication
+
+See http://kafka.apache.org for the code and setup information for
+a Kafka broker.
+
+see scripts/connectors/kafka/README to run them
+
+The simple sample
+-----------------
+
+SimplePublisherApp.java - build and run the simple publisher application topology
+SimpleSubscriberApp.java - build and run the simple subscriber application topology
+
+The fully configurable client
+-----------------------------
+
+Runner.java - build and run the publisher or subscriber
+PublisherApp.java - build the publisher application topology
+SubscriberApp.java - build the subscriber application topology
+KafkaClient.java - the client's command line interface
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/Runner.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/Runner.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/Runner.java
new file mode 100644
index 0000000..2ffccfc
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/Runner.java
@@ -0,0 +1,68 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.connectors.kafka;
+
+import static org.apache.edgent.samples.connectors.kafka.KafkaClient.OPT_BOOTSTRAP_SERVERS;
+import static org.apache.edgent.samples.connectors.kafka.KafkaClient.OPT_PUB;
+import static org.apache.edgent.samples.connectors.kafka.KafkaClient.OPT_TOPIC;
+import static org.apache.edgent.samples.connectors.kafka.KafkaClient.OPT_ZOOKEEPER_CONNECT;
+
+import org.apache.edgent.console.server.HttpServer;
+import org.apache.edgent.providers.development.DevelopmentProvider;
+import org.apache.edgent.samples.connectors.Options;
+import org.apache.edgent.topology.Topology;
+
+/**
+ * Build and run the publisher or subscriber application.
+ */
+public class Runner {
+ /**
+ * Build and run the publisher or subscriber application.
+ * @param options command line options
+ * @throws Exception on failure
+ */
+ public static void run(Options options) throws Exception {
+ boolean isPub = options.get(OPT_PUB);
+
+ // Get a topology runtime provider
+ DevelopmentProvider tp = new DevelopmentProvider();
+
+ Topology top;
+ if (isPub) {
+ PublisherApp publisher = new PublisherApp(tp, options);
+ top = publisher.buildAppTopology();
+ }
+ else {
+ SubscriberApp subscriber = new SubscriberApp(tp, options);
+ top = subscriber.buildAppTopology();
+ }
+
+ // Submit the app/topology; send or receive the messages.
+ System.out.println(
+ "Using Kafka cluster at bootstrap.servers="
+ + options.get(OPT_BOOTSTRAP_SERVERS)
+ + " zookeeper.connect=" + options.get(OPT_ZOOKEEPER_CONNECT)
+ + "\n" + (isPub ? "Publishing" : "Subscribing")
+ + " to topic " + options.get(OPT_TOPIC));
+ System.out.println("Console URL for the job: "
+ + tp.getServices().getService(HttpServer.class).getConsoleUrl());
+ tp.submit(top);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/SimplePublisherApp.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/SimplePublisherApp.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/SimplePublisherApp.java
new file mode 100644
index 0000000..a8b9492
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/SimplePublisherApp.java
@@ -0,0 +1,99 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.connectors.kafka;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.edgent.console.server.HttpServer;
+import org.apache.edgent.providers.development.DevelopmentProvider;
+import org.apache.edgent.samples.connectors.Util;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+
+import org.apache.edgent.connectors.kafka.KafkaProducer;
+
+/**
+ * A simple Kafka publisher topology application.
+ */
+public class SimplePublisherApp {
+ private final Properties props;
+ private final String topic;
+
+ public static void main(String[] args) throws Exception {
+ if (args.length != 1)
+ throw new Exception("missing pathname to kafka.properties file");
+ SimplePublisherApp publisher = new SimplePublisherApp(args[0]);
+ publisher.run();
+ }
+
+ /**
+ * @param kafkaPropsPath pathname to properties file
+ */
+ SimplePublisherApp(String kafkaPropsPath) throws Exception {
+ props = new Properties();
+ props.load(Files.newBufferedReader(new File(kafkaPropsPath).toPath()));
+ topic = props.getProperty("topic");
+ }
+
+ private Map<String,Object> createKafkaConfig() {
+ Map<String,Object> kafkaConfig = new HashMap<>();
+ kafkaConfig.put("bootstrap.servers", props.get("bootstrap.servers"));
+ return kafkaConfig;
+ }
+
+ /**
+ * Create a topology for the publisher application and run it.
+ */
+ private void run() throws Exception {
+ DevelopmentProvider tp = new DevelopmentProvider();
+
+ // build the application/topology
+
+ Topology t = tp.newTopology("kafkaSamplePublisher");
+
+ // Create the Kafka Producer broker connector
+ Map<String,Object> kafkaConfig = createKafkaConfig();
+ KafkaProducer kafka = new KafkaProducer(t, () -> kafkaConfig);
+
+ // Create a sample stream of tuples to publish
+ AtomicInteger cnt = new AtomicInteger();
+ TStream<String> msgs = t.poll(
+ () -> {
+ String msg = String.format("Message-%d from %s",
+ cnt.incrementAndGet(), Util.simpleTS());
+ System.out.println("poll generated msg to publish: " + msg);
+ return msg;
+ }, 1L, TimeUnit.SECONDS);
+
+ // Publish the stream to the topic. The String tuple is the message value.
+ kafka.publish(msgs, topic);
+
+ // run the application / topology
+ System.out.println("Console URL for the job: "
+ + tp.getServices().getService(HttpServer.class).getConsoleUrl());
+ tp.submit(t);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/SimpleSubscriberApp.java
----------------------------------------------------------------------
diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/SimpleSubscriberApp.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/SimpleSubscriberApp.java
new file mode 100644
index 0000000..7cef424
--- /dev/null
+++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/kafka/SimpleSubscriberApp.java
@@ -0,0 +1,95 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.samples.connectors.kafka;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.edgent.console.server.HttpServer;
+import org.apache.edgent.providers.development.DevelopmentProvider;
+import org.apache.edgent.samples.connectors.Util;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+
+import org.apache.edgent.connectors.kafka.KafkaConsumer;
+
+/**
+ * A simple Kafka subscriber topology application.
+ */
+public class SimpleSubscriberApp {
+ private final Properties props;
+ private final String topic;
+
+ public static void main(String[] args) throws Exception {
+ if (args.length != 1)
+ throw new Exception("missing pathname to kafka.properties file");
+ SimpleSubscriberApp subscriber = new SimpleSubscriberApp(args[0]);
+ subscriber.run();
+ }
+
+ /**
+ * @param kafkaPropsPath pathname to properties file
+ */
+ SimpleSubscriberApp(String kafkaPropsPath) throws Exception {
+ props = new Properties();
+ props.load(Files.newBufferedReader(new File(kafkaPropsPath).toPath()));
+ topic = props.getProperty("topic");
+ }
+
+ private Map<String,Object> createKafkaConfig() {
+ Map<String,Object> kafkaConfig = new HashMap<>();
+ kafkaConfig.put("zookeeper.connect", props.get("zookeeper.connect"));
+ // for the sample, be insensitive to old/multiple consumers for
+ // the topic/groupId hanging around
+ kafkaConfig.put("group.id",
+ "kafkaSampleConsumer_" + Util.simpleTS().replaceAll(":", ""));
+ return kafkaConfig;
+ }
+
+ /**
+ * Create a topology for the subscriber application and run it.
+ */
+ private void run() throws Exception {
+ DevelopmentProvider tp = new DevelopmentProvider();
+
+ // build the application/topology
+
+ Topology t = tp.newTopology("kafkaSampleSubscriber");
+
+ // Create the Kafka Consumer broker connector
+ Map<String,Object> kafkaConfig = createKafkaConfig();
+ KafkaConsumer kafka = new KafkaConsumer(t, () -> kafkaConfig);
+
+ // Subscribe to the topic and create a stream of messages
+ TStream<String> msgs = kafka.subscribe(rec -> rec.value(), topic);
+
+ // Process the received msgs - just print them out
+ msgs.sink(tuple -> System.out.println(
+ String.format("[%s] received: %s", Util.simpleTS(), tuple)));
+
+ // run the application / topology
+ System.out.println("Console URL for the job: "
+ + tp.getServices().getService(HttpServer.class).getConsoleUrl());
+ tp.submit(t);
+ }
+
+}